/** * Offline Sync Service * * Service for managing offline data synchronization queue and conflict resolution * * @module Mobile */ import { Repository, DataSource } from 'typeorm'; import { OfflineSyncQueue } from '../entities/offline-sync-queue.entity'; import { SyncConflict, ConflictType } from '../entities/sync-conflict.entity'; import { CreateSyncQueueItemDto, ProcessSyncBatchDto, ResolveSyncConflictDto, SyncQueueResponseDto, SyncConflictResponseDto, SyncFilterDto, SyncResultDto, SyncStatusDto, } from '../dto/offline-sync.dto'; import { ServiceContext } from './mobile-session.service'; export class OfflineSyncService { private queueRepository: Repository; private conflictRepository: Repository; constructor(dataSource: DataSource) { this.queueRepository = dataSource.getRepository(OfflineSyncQueue); this.conflictRepository = dataSource.getRepository(SyncConflict); } /** * Add item to sync queue */ async queueItem(ctx: ServiceContext, dto: CreateSyncQueueItemDto): Promise { const item = this.queueRepository.create({ userId: dto.userId, deviceId: dto.deviceId, tenantId: ctx.tenantId, sessionId: dto.sessionId, entityType: dto.entityType, entityId: dto.entityId, operation: dto.operation, payload: dto.payload, metadata: dto.metadata || {}, sequenceNumber: dto.sequenceNumber, dependsOn: dto.dependsOn, status: 'pending', retryCount: 0, maxRetries: 3, }); return this.queueRepository.save(item); } /** * Queue batch of items */ async queueBatch(ctx: ServiceContext, dto: ProcessSyncBatchDto): Promise { const items = dto.items.map(itemDto => this.queueRepository.create({ userId: itemDto.userId, deviceId: itemDto.deviceId, tenantId: ctx.tenantId, sessionId: itemDto.sessionId, entityType: itemDto.entityType, entityId: itemDto.entityId, operation: itemDto.operation, payload: itemDto.payload, metadata: itemDto.metadata || {}, sequenceNumber: itemDto.sequenceNumber, dependsOn: itemDto.dependsOn, status: 'pending', retryCount: 0, maxRetries: 3, })); return this.queueRepository.save(items); } /** * Find queue item by ID */ async findById(ctx: ServiceContext, id: string): Promise { return this.queueRepository.findOne({ where: { id, tenantId: ctx.tenantId }, }); } /** * Find queue items with filters */ async findAll( ctx: ServiceContext, filter: SyncFilterDto ): Promise<{ data: OfflineSyncQueue[]; total: number }> { const query = this.queueRepository .createQueryBuilder('item') .where('item.tenantId = :tenantId', { tenantId: ctx.tenantId }); if (filter.userId) { query.andWhere('item.userId = :userId', { userId: filter.userId }); } if (filter.deviceId) { query.andWhere('item.deviceId = :deviceId', { deviceId: filter.deviceId }); } if (filter.sessionId) { query.andWhere('item.sessionId = :sessionId', { sessionId: filter.sessionId }); } if (filter.entityType) { query.andWhere('item.entityType = :entityType', { entityType: filter.entityType }); } if (filter.status) { query.andWhere('item.status = :status', { status: filter.status }); } const total = await query.getCount(); query.orderBy('item.sequenceNumber', 'ASC'); if (filter.limit) { query.take(filter.limit); } if (filter.offset) { query.skip(filter.offset); } const data = await query.getMany(); return { data, total }; } /** * Get pending items for processing */ async getPendingItems(ctx: ServiceContext, deviceId: string, limit: number = 100): Promise { return this.queueRepository.find({ where: { tenantId: ctx.tenantId, deviceId, status: 'pending', }, order: { sequenceNumber: 'ASC' }, take: limit, }); } /** * Process sync queue for device */ async processQueue(ctx: ServiceContext, deviceId: string): Promise { const items = await this.getPendingItems(ctx, deviceId); let processed = 0; let failed = 0; let conflicts = 0; const errors: { id: string; error: string }[] = []; for (const item of items) { // Check if item depends on another that hasn't been processed if (item.dependsOn) { const dependency = await this.findById(ctx, item.dependsOn); if (dependency && dependency.status !== 'completed') { continue; // Skip, dependency not processed yet } } try { const result = await this.processItem(ctx, item); if (result.success) { processed++; } else if (result.conflict) { conflicts++; } else { failed++; errors.push({ id: item.id, error: result.error || 'Unknown error' }); } } catch (error: any) { failed++; errors.push({ id: item.id, error: error.message }); // Update item with error item.status = 'failed'; item.lastError = error.message; item.retryCount++; await this.queueRepository.save(item); } } return { success: failed === 0 && conflicts === 0, processed, failed, conflicts, errors: errors.length > 0 ? errors : undefined, }; } /** * Process single item */ async processItem( ctx: ServiceContext, item: OfflineSyncQueue ): Promise<{ success: boolean; conflict?: boolean; error?: string }> { item.status = 'processing'; await this.queueRepository.save(item); try { // Simulate processing based on operation // In production, this would call the actual entity service // Check for conflicts (simulated) const hasConflict = await this.checkForConflict(ctx, item); if (hasConflict) { item.status = 'conflict'; await this.queueRepository.save(item); // Create conflict record await this.createConflict(ctx, item, hasConflict); return { success: false, conflict: true }; } // Apply change (simulated) await this.applyChange(item); item.status = 'completed'; item.processedAt = new Date(); await this.queueRepository.save(item); return { success: true }; } catch (error: any) { item.status = 'failed'; item.lastError = error.message; item.retryCount++; await this.queueRepository.save(item); return { success: false, error: error.message }; } } /** * Retry failed items */ async retryFailedItems(ctx: ServiceContext, deviceId: string): Promise { const result = await this.queueRepository.update( { tenantId: ctx.tenantId, deviceId, status: 'failed', }, { status: 'pending' } ); return result.affected || 0; } /** * Get sync status for device */ async getSyncStatus(ctx: ServiceContext, deviceId: string): Promise { const counts = await this.queueRepository .createQueryBuilder('item') .select('item.status', 'status') .addSelect('COUNT(*)', 'count') .where('item.tenantId = :tenantId', { tenantId: ctx.tenantId }) .andWhere('item.deviceId = :deviceId', { deviceId }) .groupBy('item.status') .getRawMany(); const statusCounts: Record = {}; for (const row of counts) { statusCounts[row.status] = parseInt(row.count, 10); } const lastProcessed = await this.queueRepository.findOne({ where: { tenantId: ctx.tenantId, deviceId, status: 'completed' }, order: { processedAt: 'DESC' }, }); return { pendingCount: statusCounts['pending'] || 0, processingCount: statusCounts['processing'] || 0, failedCount: statusCounts['failed'] || 0, conflictCount: statusCounts['conflict'] || 0, lastSyncAt: lastProcessed?.processedAt, }; } /** * Get conflicts for device */ async getConflicts(ctx: ServiceContext, deviceId?: string): Promise { const query = this.conflictRepository .createQueryBuilder('conflict') .leftJoinAndSelect('conflict.syncQueue', 'syncQueue') .where('conflict.tenantId = :tenantId', { tenantId: ctx.tenantId }) .andWhere('conflict.resolution IS NULL'); if (deviceId) { query.andWhere('syncQueue.deviceId = :deviceId', { deviceId }); } return query.getMany(); } /** * Get conflict by ID */ async getConflictById(ctx: ServiceContext, id: string): Promise { return this.conflictRepository.findOne({ where: { id, tenantId: ctx.tenantId }, relations: ['syncQueue'], }); } /** * Resolve conflict */ async resolveConflict( ctx: ServiceContext, id: string, dto: ResolveSyncConflictDto ): Promise { const conflict = await this.getConflictById(ctx, id); if (!conflict) { throw new Error('Conflict not found'); } if (conflict.resolution) { throw new Error('Conflict already resolved'); } conflict.resolution = dto.resolution; conflict.mergedData = dto.mergedData || {}; conflict.resolvedBy = ctx.userId || ''; conflict.resolvedAt = new Date(); await this.conflictRepository.save(conflict); // Update queue item based on resolution if (conflict.syncQueue) { const queueItem = conflict.syncQueue; switch (dto.resolution) { case 'local_wins': // Re-process with local data queueItem.status = 'pending'; queueItem.conflictResolution = 'local_wins'; break; case 'server_wins': // Mark as completed (server data already in place) queueItem.status = 'completed'; queueItem.conflictResolution = 'server_wins'; queueItem.processedAt = new Date(); break; case 'merged': // Update payload with merged data and re-process queueItem.payload = dto.mergedData || conflict.localData; queueItem.status = 'pending'; queueItem.conflictResolution = 'merged'; break; case 'manual': // Mark as completed (handled manually) queueItem.status = 'completed'; queueItem.conflictResolution = 'manual'; queueItem.processedAt = new Date(); break; } queueItem.conflictData = { conflictId: conflict.id, resolution: dto.resolution, resolvedAt: conflict.resolvedAt, }; queueItem.conflictResolvedAt = conflict.resolvedAt; await this.queueRepository.save(queueItem); } return conflict; } /** * Delete processed items older than specified days */ async cleanupOldItems(ctx: ServiceContext, olderThanDays: number = 30): Promise { const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - olderThanDays); const result = await this.queueRepository .createQueryBuilder() .delete() .where('tenantId = :tenantId', { tenantId: ctx.tenantId }) .andWhere('status = :status', { status: 'completed' }) .andWhere('processedAt < :cutoffDate', { cutoffDate }) .execute(); return result.affected || 0; } /** * Check for conflict (simulated) */ private async checkForConflict( _ctx: ServiceContext, item: OfflineSyncQueue ): Promise<{ type: ConflictType; serverData: Record } | null> { // In production, this would check if the server version has changed // since the client made the offline change // Simulate 5% conflict rate if (Math.random() > 0.95) { return { type: 'data_conflict', serverData: { ...item.payload, serverModified: true }, }; } return null; } /** * Create conflict record */ private async createConflict( ctx: ServiceContext, item: OfflineSyncQueue, conflictInfo: { type: ConflictType; serverData: Record } ): Promise { const conflict = this.conflictRepository.create({ syncQueueId: item.id, userId: item.userId, tenantId: ctx.tenantId, conflictType: conflictInfo.type, localData: item.payload, serverData: conflictInfo.serverData, }); return this.conflictRepository.save(conflict); } /** * Apply change (simulated) */ private async applyChange(_item: OfflineSyncQueue): Promise { // In production, this would call the actual entity service // to create/update/delete the entity // Simulate processing time await new Promise(resolve => setTimeout(resolve, 50)); } /** * Convert queue item to response DTO */ toQueueResponseDto(item: OfflineSyncQueue): SyncQueueResponseDto { return { id: item.id, userId: item.userId, deviceId: item.deviceId, sessionId: item.sessionId, entityType: item.entityType, entityId: item.entityId, operation: item.operation, payload: item.payload, sequenceNumber: Number(item.sequenceNumber), status: item.status, retryCount: item.retryCount, lastError: item.lastError, processedAt: item.processedAt, conflictData: item.conflictData, conflictResolution: item.conflictResolution, createdAt: item.createdAt, }; } /** * Convert conflict to response DTO */ toConflictResponseDto(conflict: SyncConflict): SyncConflictResponseDto { return { id: conflict.id, syncQueueId: conflict.syncQueueId, userId: conflict.userId, conflictType: conflict.conflictType, localData: conflict.localData, serverData: conflict.serverData, resolution: conflict.resolution, mergedData: conflict.mergedData, resolvedBy: conflict.resolvedBy, resolvedAt: conflict.resolvedAt, createdAt: conflict.createdAt, }; } }