template-saas/apps/backend/dist/modules/notifications/services/notification-queue.service.js
rckrdmrd 50a821a415
Some checks failed
CI / Backend CI (push) Has been cancelled
CI / Frontend CI (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / CI Summary (push) Has been cancelled
[SIMCO-V38] feat: Actualizar a SIMCO v3.8.0
- HERENCIA-SIMCO.md actualizado con directivas v3.7 y v3.8
- Actualizaciones de configuracion

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 08:53:08 -06:00

233 lines
9.3 KiB
JavaScript

"use strict";
var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) {
var c = arguments.length, r = c < 3 ? target : desc === null ? desc = Object.getOwnPropertyDescriptor(target, key) : desc, d;
if (typeof Reflect === "object" && typeof Reflect.decorate === "function") r = Reflect.decorate(decorators, target, key, desc);
else for (var i = decorators.length - 1; i >= 0; i--) if (d = decorators[i]) r = (c < 3 ? d(r) : c > 3 ? d(target, key, r) : d(target, key)) || r;
return c > 3 && r && Object.defineProperty(target, key, r), r;
};
var __metadata = (this && this.__metadata) || function (k, v) {
if (typeof Reflect === "object" && typeof Reflect.metadata === "function") return Reflect.metadata(k, v);
};
var __param = (this && this.__param) || function (paramIndex, decorator) {
return function (target, key) { decorator(target, key, paramIndex); }
};
var NotificationQueueService_1;
Object.defineProperty(exports, "__esModule", { value: true });
exports.NotificationQueueService = void 0;
const common_1 = require("@nestjs/common");
const typeorm_1 = require("@nestjs/typeorm");
const typeorm_2 = require("typeorm");
const entities_1 = require("../entities");
let NotificationQueueService = NotificationQueueService_1 = class NotificationQueueService {
constructor(queueRepository, logRepository, notificationRepository) {
this.queueRepository = queueRepository;
this.logRepository = logRepository;
this.notificationRepository = notificationRepository;
this.logger = new common_1.Logger(NotificationQueueService_1.name);
}
async enqueue(notificationId, channel, priority = 'normal', scheduledFor) {
const priorityValue = this.getPriorityValue(priority);
const queueItem = this.queueRepository.create({
notification_id: notificationId,
channel,
priority_value: priorityValue,
scheduled_for: scheduledFor || new Date(),
status: 'queued',
attempts: 0,
max_attempts: 3,
});
const saved = await this.queueRepository.save(queueItem);
this.logger.debug(`Enqueued notification ${notificationId} for channel ${channel}`);
return saved;
}
async enqueueBatch(notificationId, channels, priority = 'normal') {
const priorityValue = this.getPriorityValue(priority);
const now = new Date();
const items = channels.map((channel) => this.queueRepository.create({
notification_id: notificationId,
channel,
priority_value: priorityValue,
scheduled_for: now,
status: 'queued',
attempts: 0,
max_attempts: 3,
}));
const saved = await this.queueRepository.save(items);
this.logger.debug(`Enqueued notification ${notificationId} for ${channels.length} channels`);
return saved;
}
async getPendingItems(limit = 100, channel) {
const now = new Date();
const queryBuilder = this.queueRepository
.createQueryBuilder('q')
.leftJoinAndSelect('q.notification', 'n')
.where('q.status IN (:...statuses)', {
statuses: ['queued', 'retrying'],
})
.andWhere('(q.scheduled_for IS NULL OR q.scheduled_for <= :now)', { now })
.andWhere('(q.next_retry_at IS NULL OR q.next_retry_at <= :now)', { now })
.orderBy('q.priority_value', 'DESC')
.addOrderBy('q.created_at', 'ASC')
.take(limit);
if (channel) {
queryBuilder.andWhere('q.channel = :channel', { channel });
}
return queryBuilder.getMany();
}
async markAsProcessing(queueId) {
await this.queueRepository.update(queueId, {
status: 'processing',
last_attempt_at: new Date(),
});
}
async markAsSent(queueId, provider, providerMessageId, providerResponse) {
const queueItem = await this.queueRepository.findOne({
where: { id: queueId },
});
if (!queueItem) {
return;
}
await this.queueRepository.update(queueId, {
status: 'sent',
completed_at: new Date(),
attempts: queueItem.attempts + 1,
});
await this.notificationRepository.update(queueItem.notification_id, {
delivery_status: 'sent',
sent_at: new Date(),
});
await this.logRepository.save({
notification_id: queueItem.notification_id,
queue_id: queueId,
channel: queueItem.channel,
status: 'sent',
provider,
provider_message_id: providerMessageId,
provider_response: providerResponse,
delivered_at: new Date(),
});
this.logger.debug(`Queue item ${queueId} marked as sent`);
}
async markAsFailed(queueId, errorMessage, provider) {
const queueItem = await this.queueRepository.findOne({
where: { id: queueId },
});
if (!queueItem) {
return;
}
const newAttempts = queueItem.attempts + 1;
const shouldRetry = newAttempts < queueItem.max_attempts;
if (shouldRetry) {
const retryDelay = Math.pow(2, queueItem.attempts) * 60 * 1000;
const nextRetryAt = new Date(Date.now() + retryDelay);
await this.queueRepository.update(queueId, {
status: 'retrying',
attempts: newAttempts,
error_message: errorMessage,
error_count: queueItem.error_count + 1,
next_retry_at: nextRetryAt,
});
this.logger.debug(`Queue item ${queueId} scheduled for retry at ${nextRetryAt.toISOString()}`);
}
else {
await this.queueRepository.update(queueId, {
status: 'failed',
attempts: newAttempts,
error_message: errorMessage,
error_count: queueItem.error_count + 1,
completed_at: new Date(),
});
await this.notificationRepository.update(queueItem.notification_id, {
delivery_status: 'failed',
});
this.logger.warn(`Queue item ${queueId} failed permanently after ${newAttempts} attempts`);
}
await this.logRepository.save({
notification_id: queueItem.notification_id,
queue_id: queueId,
channel: queueItem.channel,
status: 'failed',
provider,
error_message: errorMessage,
});
}
async getStats() {
const stats = await this.queueRepository
.createQueryBuilder('q')
.select('q.status', 'status')
.addSelect('COUNT(*)', 'count')
.groupBy('q.status')
.getRawMany();
const result = {
queued: 0,
processing: 0,
sent: 0,
failed: 0,
retrying: 0,
};
for (const row of stats) {
result[row.status] = parseInt(row.count, 10);
}
return result;
}
async getStatsByChannel() {
return this.queueRepository
.createQueryBuilder('q')
.select('q.channel', 'channel')
.addSelect('q.status', 'status')
.addSelect('COUNT(*)', 'count')
.groupBy('q.channel')
.addGroupBy('q.status')
.getRawMany();
}
async cleanupOldItems(daysToKeep = 30) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - daysToKeep);
const result = await this.queueRepository
.createQueryBuilder()
.delete()
.where('status IN (:...statuses)', { statuses: ['sent', 'failed'] })
.andWhere('completed_at < :cutoff', { cutoff: cutoffDate })
.execute();
if (result.affected && result.affected > 0) {
this.logger.log(`Cleaned up ${result.affected} old queue items`);
}
return result.affected || 0;
}
async cancelPending(notificationId) {
const result = await this.queueRepository.update({
notification_id: notificationId,
status: (0, typeorm_2.In)(['queued', 'retrying']),
}, {
status: 'failed',
error_message: 'Cancelled',
completed_at: new Date(),
});
return result.affected || 0;
}
getPriorityValue(priority) {
switch (priority) {
case 'urgent':
return 10;
case 'high':
return 5;
case 'normal':
return 0;
case 'low':
return -5;
default:
return 0;
}
}
};
exports.NotificationQueueService = NotificationQueueService;
exports.NotificationQueueService = NotificationQueueService = NotificationQueueService_1 = __decorate([
(0, common_1.Injectable)(),
__param(0, (0, typeorm_1.InjectRepository)(entities_1.NotificationQueue)),
__param(1, (0, typeorm_1.InjectRepository)(entities_1.NotificationLog)),
__param(2, (0, typeorm_1.InjectRepository)(entities_1.Notification)),
__metadata("design:paramtypes", [typeorm_2.Repository,
typeorm_2.Repository,
typeorm_2.Repository])
], NotificationQueueService);
//# sourceMappingURL=notification-queue.service.js.map