"use strict"; var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) { var c = arguments.length, r = c < 3 ? target : desc === null ? desc = Object.getOwnPropertyDescriptor(target, key) : desc, d; if (typeof Reflect === "object" && typeof Reflect.decorate === "function") r = Reflect.decorate(decorators, target, key, desc); else for (var i = decorators.length - 1; i >= 0; i--) if (d = decorators[i]) r = (c < 3 ? d(r) : c > 3 ? d(target, key, r) : d(target, key)) || r; return c > 3 && r && Object.defineProperty(target, key, r), r; }; var __metadata = (this && this.__metadata) || function (k, v) { if (typeof Reflect === "object" && typeof Reflect.metadata === "function") return Reflect.metadata(k, v); }; var __param = (this && this.__param) || function (paramIndex, decorator) { return function (target, key) { decorator(target, key, paramIndex); } }; var NotificationQueueService_1; Object.defineProperty(exports, "__esModule", { value: true }); exports.NotificationQueueService = void 0; const common_1 = require("@nestjs/common"); const typeorm_1 = require("@nestjs/typeorm"); const typeorm_2 = require("typeorm"); const entities_1 = require("../entities"); let NotificationQueueService = NotificationQueueService_1 = class NotificationQueueService { constructor(queueRepository, logRepository, notificationRepository) { this.queueRepository = queueRepository; this.logRepository = logRepository; this.notificationRepository = notificationRepository; this.logger = new common_1.Logger(NotificationQueueService_1.name); } async enqueue(notificationId, channel, priority = 'normal', scheduledFor) { const priorityValue = this.getPriorityValue(priority); const queueItem = this.queueRepository.create({ notification_id: notificationId, channel, priority_value: priorityValue, scheduled_for: scheduledFor || new Date(), status: 'queued', attempts: 0, max_attempts: 3, }); const saved = await this.queueRepository.save(queueItem); this.logger.debug(`Enqueued notification ${notificationId} for channel ${channel}`); return saved; } async enqueueBatch(notificationId, channels, priority = 'normal') { const priorityValue = this.getPriorityValue(priority); const now = new Date(); const items = channels.map((channel) => this.queueRepository.create({ notification_id: notificationId, channel, priority_value: priorityValue, scheduled_for: now, status: 'queued', attempts: 0, max_attempts: 3, })); const saved = await this.queueRepository.save(items); this.logger.debug(`Enqueued notification ${notificationId} for ${channels.length} channels`); return saved; } async getPendingItems(limit = 100, channel) { const now = new Date(); const queryBuilder = this.queueRepository .createQueryBuilder('q') .leftJoinAndSelect('q.notification', 'n') .where('q.status IN (:...statuses)', { statuses: ['queued', 'retrying'], }) .andWhere('(q.scheduled_for IS NULL OR q.scheduled_for <= :now)', { now }) .andWhere('(q.next_retry_at IS NULL OR q.next_retry_at <= :now)', { now }) .orderBy('q.priority_value', 'DESC') .addOrderBy('q.created_at', 'ASC') .take(limit); if (channel) { queryBuilder.andWhere('q.channel = :channel', { channel }); } return queryBuilder.getMany(); } async markAsProcessing(queueId) { await this.queueRepository.update(queueId, { status: 'processing', last_attempt_at: new Date(), }); } async markAsSent(queueId, provider, providerMessageId, providerResponse) { const queueItem = await this.queueRepository.findOne({ where: { id: queueId }, }); if (!queueItem) { return; } await this.queueRepository.update(queueId, { status: 'sent', completed_at: new Date(), attempts: queueItem.attempts + 1, }); await this.notificationRepository.update(queueItem.notification_id, { delivery_status: 'sent', sent_at: new Date(), }); await this.logRepository.save({ notification_id: queueItem.notification_id, queue_id: queueId, channel: queueItem.channel, status: 'sent', provider, provider_message_id: providerMessageId, provider_response: providerResponse, delivered_at: new Date(), }); this.logger.debug(`Queue item ${queueId} marked as sent`); } async markAsFailed(queueId, errorMessage, provider) { const queueItem = await this.queueRepository.findOne({ where: { id: queueId }, }); if (!queueItem) { return; } const newAttempts = queueItem.attempts + 1; const shouldRetry = newAttempts < queueItem.max_attempts; if (shouldRetry) { const retryDelay = Math.pow(2, queueItem.attempts) * 60 * 1000; const nextRetryAt = new Date(Date.now() + retryDelay); await this.queueRepository.update(queueId, { status: 'retrying', attempts: newAttempts, error_message: errorMessage, error_count: queueItem.error_count + 1, next_retry_at: nextRetryAt, }); this.logger.debug(`Queue item ${queueId} scheduled for retry at ${nextRetryAt.toISOString()}`); } else { await this.queueRepository.update(queueId, { status: 'failed', attempts: newAttempts, error_message: errorMessage, error_count: queueItem.error_count + 1, completed_at: new Date(), }); await this.notificationRepository.update(queueItem.notification_id, { delivery_status: 'failed', }); this.logger.warn(`Queue item ${queueId} failed permanently after ${newAttempts} attempts`); } await this.logRepository.save({ notification_id: queueItem.notification_id, queue_id: queueId, channel: queueItem.channel, status: 'failed', provider, error_message: errorMessage, }); } async getStats() { const stats = await this.queueRepository .createQueryBuilder('q') .select('q.status', 'status') .addSelect('COUNT(*)', 'count') .groupBy('q.status') .getRawMany(); const result = { queued: 0, processing: 0, sent: 0, failed: 0, retrying: 0, }; for (const row of stats) { result[row.status] = parseInt(row.count, 10); } return result; } async getStatsByChannel() { return this.queueRepository .createQueryBuilder('q') .select('q.channel', 'channel') .addSelect('q.status', 'status') .addSelect('COUNT(*)', 'count') .groupBy('q.channel') .addGroupBy('q.status') .getRawMany(); } async cleanupOldItems(daysToKeep = 30) { const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - daysToKeep); const result = await this.queueRepository .createQueryBuilder() .delete() .where('status IN (:...statuses)', { statuses: ['sent', 'failed'] }) .andWhere('completed_at < :cutoff', { cutoff: cutoffDate }) .execute(); if (result.affected && result.affected > 0) { this.logger.log(`Cleaned up ${result.affected} old queue items`); } return result.affected || 0; } async cancelPending(notificationId) { const result = await this.queueRepository.update({ notification_id: notificationId, status: (0, typeorm_2.In)(['queued', 'retrying']), }, { status: 'failed', error_message: 'Cancelled', completed_at: new Date(), }); return result.affected || 0; } getPriorityValue(priority) { switch (priority) { case 'urgent': return 10; case 'high': return 5; case 'normal': return 0; case 'low': return -5; default: return 0; } } }; exports.NotificationQueueService = NotificationQueueService; exports.NotificationQueueService = NotificationQueueService = NotificationQueueService_1 = __decorate([ (0, common_1.Injectable)(), __param(0, (0, typeorm_1.InjectRepository)(entities_1.NotificationQueue)), __param(1, (0, typeorm_1.InjectRepository)(entities_1.NotificationLog)), __param(2, (0, typeorm_1.InjectRepository)(entities_1.Notification)), __metadata("design:paramtypes", [typeorm_2.Repository, typeorm_2.Repository, typeorm_2.Repository]) ], NotificationQueueService); //# sourceMappingURL=notification-queue.service.js.map