# ET-BI-004: Implementación de Exportación y Distribución **Épica:** MAI-006 - Reportes y Business Intelligence **Módulo:** Exportación, Distribución y Integraciones **Responsable Técnico:** Backend + DevOps + Integrations **Fecha:** 2025-11-17 **Versión:** 1.0 --- ## 1. Objetivo Técnico Implementar el sistema de exportación y distribución de reportes con: - Exportación a múltiples formatos (PDF, Excel, CSV, JSON, Parquet) - Programación de reportes recurrentes (diario, semanal, mensual) - Distribución automática vía email con adjuntos - Integración con Power BI REST API - Integración con Tableau Hyper API - Procesamiento asíncrono con Bull/BullMQ - Rate limiting y autenticación OAuth2 - Gestión de suscripciones a reportes - APIs RESTful para consumo externo --- ## 2. Stack Tecnológico ### Backend ```typescript - NestJS 10+ con TypeScript - PostgreSQL 15+ (schema: export_distribution) - Bull/BullMQ para procesamiento asíncrono - Redis para queue management - node-cron para reportes programados ``` ### Exportación ```typescript - PDFKit para generación de PDFs - ExcelJS para Excel (.xlsx) - csv-writer para archivos CSV - Apache Parquet (parquetjs) para data lakes - Puppeteer para PDF desde HTML ``` ### Email ```typescript - Nodemailer con transporte SMTP - Handlebars para templates de email - Mjml para emails responsivos ``` ### Integraciones ```typescript - Power BI REST API (axios) - Tableau Hyper API - OAuth2 (passport-oauth2) - API Gateway con rate limiting ``` --- ## 3. Modelo de Datos SQL ### 3.1 Schema Principal ```sql -- ===================================================== -- SCHEMA: export_distribution -- Descripción: Exportación y distribución de reportes -- ===================================================== CREATE SCHEMA IF NOT EXISTS export_distribution; -- ===================================================== -- TABLE: export_distribution.scheduled_reports -- Descripción: Reportes programados -- ===================================================== CREATE TABLE export_distribution.scheduled_reports ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), constructora_id UUID NOT NULL REFERENCES public.constructoras(id) ON DELETE CASCADE, -- Identificación report_code VARCHAR(50) NOT NULL, report_name VARCHAR(255) NOT NULL, description TEXT, -- Tipo de reporte report_type VARCHAR(30) NOT NULL, -- executive_summary, project_status, financial_report, custom -- Configuración del reporte report_config JSONB NOT NULL, /* { "dashboardId": "uuid", "filters": { "dateRange": "last_30_days", "projectIds": ["uuid-1", "uuid-2"] }, "includeCharts": true, "includeRawData": false } */ -- Programación schedule_type VARCHAR(20) NOT NULL, -- once, daily, weekly, monthly, quarterly schedule_config JSONB NOT NULL, /* { "frequency": "weekly", "dayOfWeek": 1, // Monday "time": "08:00", "timezone": "America/Mexico_City" } */ next_execution TIMESTAMP, -- Formato de exportación export_formats VARCHAR(20)[] NOT NULL, -- ['pdf', 'excel', 'csv'] -- Distribución distribution_method VARCHAR(20) NOT NULL DEFAULT 'email', -- email, sftp, webhook, api recipients JSONB NOT NULL, /* [ {"email": "director@company.com", "name": "Director"}, {"email": "cfo@company.com", "name": "CFO"} ] */ email_subject VARCHAR(255), email_template VARCHAR(100), -- nombre del template -- Estado is_active BOOLEAN DEFAULT true, last_execution TIMESTAMP, last_execution_status VARCHAR(20), -- success, failed, partial execution_count INTEGER DEFAULT 0, failure_count INTEGER DEFAULT 0, -- Metadata created_by UUID NOT NULL REFERENCES auth.users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, CONSTRAINT valid_schedule_type CHECK (schedule_type IN ( 'once', 'daily', 'weekly', 'monthly', 'quarterly' )), CONSTRAINT valid_distribution_method CHECK (distribution_method IN ( 'email', 'sftp', 'webhook', 'api' )), UNIQUE(constructora_id, report_code) ); CREATE INDEX idx_scheduled_reports_constructora ON export_distribution.scheduled_reports(constructora_id); CREATE INDEX idx_scheduled_reports_active ON export_distribution.scheduled_reports(is_active) WHERE is_active = true; CREATE INDEX idx_scheduled_reports_next_exec ON export_distribution.scheduled_reports(next_execution) WHERE is_active = true; -- ===================================================== -- TABLE: export_distribution.report_subscriptions -- Descripción: Suscripciones de usuarios a reportes -- ===================================================== CREATE TABLE export_distribution.report_subscriptions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), scheduled_report_id UUID NOT NULL REFERENCES export_distribution.scheduled_reports(id) ON DELETE CASCADE, user_id UUID NOT NULL REFERENCES auth.users(id) ON DELETE CASCADE, -- Preferencias receive_email BOOLEAN DEFAULT true, preferred_formats VARCHAR(20)[], -- ['pdf', 'excel'] -- Filtros personalizados custom_filters JSONB, /* { "projectIds": ["uuid-1"], // solo sus proyectos "includeDetails": false } */ -- Estado is_active BOOLEAN DEFAULT true, last_received_at TIMESTAMP, -- Metadata subscribed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(scheduled_report_id, user_id) ); CREATE INDEX idx_subscriptions_report ON export_distribution.report_subscriptions(scheduled_report_id); CREATE INDEX idx_subscriptions_user ON export_distribution.report_subscriptions(user_id); -- ===================================================== -- TABLE: export_distribution.export_jobs -- Descripción: Jobs de exportación (queue) -- ===================================================== CREATE TABLE export_distribution.export_jobs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), scheduled_report_id UUID REFERENCES export_distribution.scheduled_reports(id) ON DELETE SET NULL, user_id UUID REFERENCES auth.users(id), -- Tipo de job job_type VARCHAR(30) NOT NULL, -- scheduled_report, on_demand_export, bulk_export -- Configuración export_config JSONB NOT NULL, /* { "format": "pdf", "reportId": "uuid", "filters": {...}, "includeCharts": true } */ -- Estado del job status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending, processing, completed, failed, cancelled progress_pct DECIMAL(5,2) DEFAULT 0, -- Resultado output_file_path VARCHAR(500), output_file_size BIGINT, -- bytes download_url VARCHAR(500), download_expires_at TIMESTAMP, -- Error error_message TEXT, error_stack TEXT, -- Performance started_at TIMESTAMP, completed_at TIMESTAMP, processing_duration_ms INTEGER, -- Metadata created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, CONSTRAINT valid_job_type CHECK (job_type IN ( 'scheduled_report', 'on_demand_export', 'bulk_export' )), CONSTRAINT valid_status CHECK (status IN ( 'pending', 'processing', 'completed', 'failed', 'cancelled' )) ); CREATE INDEX idx_export_jobs_status ON export_distribution.export_jobs(status); CREATE INDEX idx_export_jobs_user ON export_distribution.export_jobs(user_id); CREATE INDEX idx_export_jobs_created ON export_distribution.export_jobs(created_at DESC); -- ===================================================== -- TABLE: export_distribution.bi_integrations -- Descripción: Integraciones con herramientas BI externas -- ===================================================== CREATE TABLE export_distribution.bi_integrations ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), constructora_id UUID NOT NULL REFERENCES public.constructoras(id) ON DELETE CASCADE, -- Identificación integration_code VARCHAR(50) NOT NULL, integration_name VARCHAR(255) NOT NULL, description TEXT, -- Tipo de integración integration_type VARCHAR(30) NOT NULL, -- powerbi, tableau, looker, metabase, custom_api -- Configuración config JSONB NOT NULL, /* PowerBI: { "workspaceId": "...", "datasetId": "...", "clientId": "...", "tenantId": "...", "refreshSchedule": "daily" } Tableau: { "serverUrl": "https://tableau.company.com", "siteName": "...", "projectName": "...", "datasourceName": "..." } Custom API: { "baseUrl": "https://api.external.com", "authType": "oauth2", "apiKey": "encrypted..." } */ -- Credenciales (encriptadas) credentials JSONB, oauth_access_token TEXT, oauth_refresh_token TEXT, oauth_expires_at TIMESTAMP, -- Sincronización sync_frequency VARCHAR(20) DEFAULT 'daily', -- realtime, hourly, daily, weekly, manual last_sync_at TIMESTAMP, next_sync_at TIMESTAMP, last_sync_status VARCHAR(20), -- success, failed, partial -- Estado is_active BOOLEAN DEFAULT true, -- Metadata created_by UUID REFERENCES auth.users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, CONSTRAINT valid_integration_type CHECK (integration_type IN ( 'powerbi', 'tableau', 'looker', 'metabase', 'custom_api' )), UNIQUE(constructora_id, integration_code) ); CREATE INDEX idx_bi_integrations_constructora ON export_distribution.bi_integrations(constructora_id); CREATE INDEX idx_bi_integrations_type ON export_distribution.bi_integrations(integration_type); CREATE INDEX idx_bi_integrations_active ON export_distribution.bi_integrations(is_active) WHERE is_active = true; -- ===================================================== -- TABLE: export_distribution.api_tokens -- Descripción: Tokens para acceso a API externo -- ===================================================== CREATE TABLE export_distribution.api_tokens ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), constructora_id UUID NOT NULL REFERENCES public.constructoras(id) ON DELETE CASCADE, user_id UUID REFERENCES auth.users(id), -- Token token VARCHAR(255) NOT NULL UNIQUE, token_hash VARCHAR(255) NOT NULL, -- bcrypt hash -- Identificación token_name VARCHAR(255) NOT NULL, description TEXT, -- Permisos scopes VARCHAR(50)[], -- ['read:reports', 'write:reports', 'export:data'] allowed_endpoints VARCHAR(100)[], -- ['/api/reports/*', '/api/analytics/*'] -- Rate limiting rate_limit_requests INTEGER DEFAULT 1000, rate_limit_window INTEGER DEFAULT 3600, -- segundos -- Restricciones IP allowed_ips INET[], -- Estado is_active BOOLEAN DEFAULT true, expires_at TIMESTAMP, -- Uso last_used_at TIMESTAMP, usage_count INTEGER DEFAULT 0, -- Metadata created_by UUID REFERENCES auth.users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, revoked_at TIMESTAMP, revoked_by UUID REFERENCES auth.users(id) ); CREATE INDEX idx_api_tokens_token_hash ON export_distribution.api_tokens(token_hash); CREATE INDEX idx_api_tokens_constructora ON export_distribution.api_tokens(constructora_id); CREATE INDEX idx_api_tokens_active ON export_distribution.api_tokens(is_active) WHERE is_active = true; -- ===================================================== -- TABLE: export_distribution.export_templates -- Descripción: Templates para exportación -- ===================================================== CREATE TABLE export_distribution.export_templates ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), constructora_id UUID NOT NULL REFERENCES public.constructoras(id) ON DELETE CASCADE, -- Identificación template_code VARCHAR(50) NOT NULL, template_name VARCHAR(255) NOT NULL, description TEXT, -- Tipo template_type VARCHAR(30) NOT NULL, -- pdf, excel, email -- Contenido del template template_content TEXT NOT NULL, -- HTML para PDF, JSON para Excel, Handlebars para Email -- Variables disponibles available_variables JSONB, /* [ {"name": "projectName", "type": "string", "description": "..."}, {"name": "totalCost", "type": "number", "format": "currency"} ] */ -- Configuración config JSONB DEFAULT '{}', /* { "pageSize": "letter", "orientation": "portrait", "margins": {"top": 20, "right": 20, "bottom": 20, "left": 20}, "header": {...}, "footer": {...} } */ -- Estado is_active BOOLEAN DEFAULT true, is_default BOOLEAN DEFAULT false, -- Metadata created_by UUID REFERENCES auth.users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(constructora_id, template_code) ); CREATE INDEX idx_templates_constructora ON export_distribution.export_templates(constructora_id); CREATE INDEX idx_templates_type ON export_distribution.export_templates(template_type); ``` ### 3.2 Functions y Triggers ```sql -- ===================================================== -- FUNCTION: Calcular próxima ejecución de reporte -- ===================================================== CREATE OR REPLACE FUNCTION export_distribution.calculate_next_execution( p_schedule_type VARCHAR, p_schedule_config JSONB, p_last_execution TIMESTAMP DEFAULT NULL ) RETURNS TIMESTAMP AS $$ DECLARE v_next_execution TIMESTAMP; v_base_date TIMESTAMP; BEGIN v_base_date := COALESCE(p_last_execution, CURRENT_TIMESTAMP); CASE p_schedule_type WHEN 'daily' THEN v_next_execution := v_base_date + INTERVAL '1 day'; WHEN 'weekly' THEN -- Calcular próximo día de la semana especificado v_next_execution := v_base_date + INTERVAL '7 days'; WHEN 'monthly' THEN v_next_execution := v_base_date + INTERVAL '1 month'; WHEN 'quarterly' THEN v_next_execution := v_base_date + INTERVAL '3 months'; ELSE v_next_execution := NULL; END CASE; -- Ajustar a la hora especificada IF p_schedule_config ? 'time' THEN v_next_execution := DATE_TRUNC('day', v_next_execution) + (p_schedule_config->>'time')::TIME; END IF; RETURN v_next_execution; END; $$ LANGUAGE plpgsql; -- ===================================================== -- TRIGGER: Actualizar next_execution automáticamente -- ===================================================== CREATE OR REPLACE FUNCTION export_distribution.update_next_execution() RETURNS TRIGGER AS $$ BEGIN -- Solo si se ejecutó exitosamente IF NEW.last_execution_status = 'success' AND NEW.last_execution IS DISTINCT FROM OLD.last_execution THEN NEW.next_execution := export_distribution.calculate_next_execution( NEW.schedule_type, NEW.schedule_config, NEW.last_execution ); END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER trg_update_next_execution BEFORE UPDATE ON export_distribution.scheduled_reports FOR EACH ROW EXECUTE FUNCTION export_distribution.update_next_execution(); -- ===================================================== -- FUNCTION: Limpiar exports antiguos -- ===================================================== CREATE OR REPLACE FUNCTION export_distribution.cleanup_old_exports( p_retention_days INTEGER DEFAULT 30 ) RETURNS INTEGER AS $$ DECLARE v_deleted INTEGER; BEGIN DELETE FROM export_distribution.export_jobs WHERE status = 'completed' AND completed_at < CURRENT_TIMESTAMP - (p_retention_days || ' days')::INTERVAL; GET DIAGNOSTICS v_deleted = ROW_COUNT; RETURN v_deleted; END; $$ LANGUAGE plpgsql; -- CRON job para ejecutar semanalmente -- SELECT cron.schedule('cleanup-old-exports', '0 2 * * 0', -- 'SELECT export_distribution.cleanup_old_exports(30)'); ``` --- ## 4. TypeORM Entities ### 4.1 ScheduledReport Entity ```typescript // src/modules/export/entities/scheduled-report.entity.ts import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, OneToMany, JoinColumn, CreateDateColumn, UpdateDateColumn, Index, } from 'typeorm'; import { Constructora } from '../../constructoras/entities/constructora.entity'; import { User } from '../../auth/entities/user.entity'; import { ReportSubscription } from './report-subscription.entity'; export enum ScheduleType { ONCE = 'once', DAILY = 'daily', WEEKLY = 'weekly', MONTHLY = 'monthly', QUARTERLY = 'quarterly', } export enum DistributionMethod { EMAIL = 'email', SFTP = 'sftp', WEBHOOK = 'webhook', API = 'api', } export interface Recipient { email: string; name: string; } @Entity('scheduled_reports', { schema: 'export_distribution' }) @Index(['constructoraId', 'reportCode'], { unique: true }) export class ScheduledReport { @PrimaryGeneratedColumn('uuid') id: string; @Column({ name: 'constructora_id', type: 'uuid' }) @Index() constructoraId: string; @ManyToOne(() => Constructora) @JoinColumn({ name: 'constructora_id' }) constructora: Constructora; // Identificación @Column({ name: 'report_code', type: 'varchar', length: 50 }) reportCode: string; @Column({ name: 'report_name', type: 'varchar', length: 255 }) reportName: string; @Column({ type: 'text', nullable: true }) description?: string; @Column({ name: 'report_type', type: 'varchar', length: 30 }) reportType: string; // Configuración @Column({ name: 'report_config', type: 'jsonb' }) reportConfig: any; // Programación @Column({ name: 'schedule_type', type: 'enum', enum: ScheduleType }) scheduleType: ScheduleType; @Column({ name: 'schedule_config', type: 'jsonb' }) scheduleConfig: any; @Column({ name: 'next_execution', type: 'timestamp', nullable: true }) @Index() nextExecution?: Date; // Formato @Column({ name: 'export_formats', type: 'varchar', array: true }) exportFormats: string[]; // Distribución @Column({ name: 'distribution_method', type: 'enum', enum: DistributionMethod, default: DistributionMethod.EMAIL }) distributionMethod: DistributionMethod; @Column({ type: 'jsonb' }) recipients: Recipient[]; @Column({ name: 'email_subject', type: 'varchar', length: 255, nullable: true }) emailSubject?: string; @Column({ name: 'email_template', type: 'varchar', length: 100, nullable: true }) emailTemplate?: string; // Estado @Column({ name: 'is_active', type: 'boolean', default: true }) @Index() isActive: boolean; @Column({ name: 'last_execution', type: 'timestamp', nullable: true }) lastExecution?: Date; @Column({ name: 'last_execution_status', type: 'varchar', length: 20, nullable: true }) lastExecutionStatus?: string; @Column({ name: 'execution_count', type: 'integer', default: 0 }) executionCount: number; @Column({ name: 'failure_count', type: 'integer', default: 0 }) failureCount: number; // Metadata @Column({ name: 'created_by', type: 'uuid' }) createdBy: string; @ManyToOne(() => User) @JoinColumn({ name: 'created_by' }) creator: User; @CreateDateColumn({ name: 'created_at' }) createdAt: Date; @UpdateDateColumn({ name: 'updated_at' }) updatedAt: Date; // Relaciones @OneToMany(() => ReportSubscription, (sub) => sub.scheduledReport) subscriptions: ReportSubscription[]; } ``` ### 4.2 ExportJob Entity ```typescript // src/modules/export/entities/export-job.entity.ts import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, JoinColumn, CreateDateColumn, Index, } from 'typeorm'; import { ScheduledReport } from './scheduled-report.entity'; import { User } from '../../auth/entities/user.entity'; export enum JobType { SCHEDULED_REPORT = 'scheduled_report', ON_DEMAND_EXPORT = 'on_demand_export', BULK_EXPORT = 'bulk_export', } export enum JobStatus { PENDING = 'pending', PROCESSING = 'processing', COMPLETED = 'completed', FAILED = 'failed', CANCELLED = 'cancelled', } @Entity('export_jobs', { schema: 'export_distribution' }) export class ExportJob { @PrimaryGeneratedColumn('uuid') id: string; @Column({ name: 'scheduled_report_id', type: 'uuid', nullable: true }) scheduledReportId?: string; @ManyToOne(() => ScheduledReport) @JoinColumn({ name: 'scheduled_report_id' }) scheduledReport?: ScheduledReport; @Column({ name: 'user_id', type: 'uuid', nullable: true }) @Index() userId?: string; @ManyToOne(() => User) @JoinColumn({ name: 'user_id' }) user?: User; // Tipo @Column({ name: 'job_type', type: 'enum', enum: JobType }) jobType: JobType; // Configuración @Column({ name: 'export_config', type: 'jsonb' }) exportConfig: any; // Estado @Column({ type: 'enum', enum: JobStatus, default: JobStatus.PENDING }) @Index() status: JobStatus; @Column({ name: 'progress_pct', type: 'decimal', precision: 5, scale: 2, default: 0 }) progressPct: number; // Resultado @Column({ name: 'output_file_path', type: 'varchar', length: 500, nullable: true }) outputFilePath?: string; @Column({ name: 'output_file_size', type: 'bigint', nullable: true }) outputFileSize?: number; @Column({ name: 'download_url', type: 'varchar', length: 500, nullable: true }) downloadUrl?: string; @Column({ name: 'download_expires_at', type: 'timestamp', nullable: true }) downloadExpiresAt?: Date; // Error @Column({ name: 'error_message', type: 'text', nullable: true }) errorMessage?: string; @Column({ name: 'error_stack', type: 'text', nullable: true }) errorStack?: string; // Performance @Column({ name: 'started_at', type: 'timestamp', nullable: true }) startedAt?: Date; @Column({ name: 'completed_at', type: 'timestamp', nullable: true }) completedAt?: Date; @Column({ name: 'processing_duration_ms', type: 'integer', nullable: true }) processingDurationMs?: number; // Metadata @CreateDateColumn({ name: 'created_at' }) @Index() createdAt: Date; } ``` ### 4.3 BIIntegration Entity ```typescript // src/modules/export/entities/bi-integration.entity.ts import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, JoinColumn, CreateDateColumn, UpdateDateColumn, Index, } from 'typeorm'; import { Constructora } from '../../constructoras/entities/constructora.entity'; import { User } from '../../auth/entities/user.entity'; export enum IntegrationType { POWERBI = 'powerbi', TABLEAU = 'tableau', LOOKER = 'looker', METABASE = 'metabase', CUSTOM_API = 'custom_api', } export enum SyncFrequency { REALTIME = 'realtime', HOURLY = 'hourly', DAILY = 'daily', WEEKLY = 'weekly', MANUAL = 'manual', } @Entity('bi_integrations', { schema: 'export_distribution' }) @Index(['constructoraId', 'integrationCode'], { unique: true }) export class BIIntegration { @PrimaryGeneratedColumn('uuid') id: string; @Column({ name: 'constructora_id', type: 'uuid' }) @Index() constructoraId: string; @ManyToOne(() => Constructora) @JoinColumn({ name: 'constructora_id' }) constructora: Constructora; // Identificación @Column({ name: 'integration_code', type: 'varchar', length: 50 }) integrationCode: string; @Column({ name: 'integration_name', type: 'varchar', length: 255 }) integrationName: string; @Column({ type: 'text', nullable: true }) description?: string; // Tipo @Column({ name: 'integration_type', type: 'enum', enum: IntegrationType }) @Index() integrationType: IntegrationType; // Configuración @Column({ type: 'jsonb' }) config: any; // Credenciales (encriptadas) @Column({ type: 'jsonb', nullable: true }) credentials?: any; @Column({ name: 'oauth_access_token', type: 'text', nullable: true }) oauthAccessToken?: string; @Column({ name: 'oauth_refresh_token', type: 'text', nullable: true }) oauthRefreshToken?: string; @Column({ name: 'oauth_expires_at', type: 'timestamp', nullable: true }) oauthExpiresAt?: Date; // Sincronización @Column({ name: 'sync_frequency', type: 'enum', enum: SyncFrequency, default: SyncFrequency.DAILY }) syncFrequency: SyncFrequency; @Column({ name: 'last_sync_at', type: 'timestamp', nullable: true }) lastSyncAt?: Date; @Column({ name: 'next_sync_at', type: 'timestamp', nullable: true }) nextSyncAt?: Date; @Column({ name: 'last_sync_status', type: 'varchar', length: 20, nullable: true }) lastSyncStatus?: string; // Estado @Column({ name: 'is_active', type: 'boolean', default: true }) @Index() isActive: boolean; // Metadata @Column({ name: 'created_by', type: 'uuid', nullable: true }) createdBy?: string; @ManyToOne(() => User) @JoinColumn({ name: 'created_by' }) creator?: User; @CreateDateColumn({ name: 'created_at' }) createdAt: Date; @UpdateDateColumn({ name: 'updated_at' }) updatedAt: Date; } ``` --- ## 5. Services (Lógica de Negocio) ### 5.1 ExportService ```typescript // src/modules/export/services/export.service.ts import { Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { ExportJob, JobStatus, JobType } from '../entities/export-job.entity'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import * as PDFDocument from 'pdfkit'; import * as ExcelJS from 'exceljs'; import { createObjectCsvWriter } from 'csv-writer'; import * as fs from 'fs'; import * as path from 'path'; @Injectable() export class ExportService { private readonly logger = new Logger(ExportService.name); constructor( @InjectRepository(ExportJob) private exportJobRepo: Repository, @InjectQueue('export') private exportQueue: Queue, ) {} /** * Exportar reporte on-demand */ async exportReport( format: string, reportConfig: any, userId: string, ): Promise { // Crear job const job = this.exportJobRepo.create({ jobType: JobType.ON_DEMAND_EXPORT, userId, exportConfig: { format, ...reportConfig, }, status: JobStatus.PENDING, }); const savedJob = await this.exportJobRepo.save(job); // Agregar a queue await this.exportQueue.add('export-report', { jobId: savedJob.id, format, reportConfig, }, { attempts: 3, backoff: { type: 'exponential', delay: 2000, }, }); return savedJob; } /** * Procesar exportación (llamado por Bull processor) */ async processExport(jobId: string): Promise { const job = await this.exportJobRepo.findOne({ where: { id: jobId } }); if (!job) { throw new Error(`Export job ${jobId} not found`); } try { // Actualizar estado await this.exportJobRepo.update(jobId, { status: JobStatus.PROCESSING, startedAt: new Date(), }); const { format, reportConfig } = job.exportConfig; let outputPath: string; switch (format) { case 'pdf': outputPath = await this.exportToPDF(reportConfig); break; case 'excel': outputPath = await this.exportToExcel(reportConfig); break; case 'csv': outputPath = await this.exportToCSV(reportConfig); break; case 'json': outputPath = await this.exportToJSON(reportConfig); break; default: throw new Error(`Unsupported format: ${format}`); } // Obtener tamaño del archivo const stats = fs.statSync(outputPath); // Generar URL de descarga (válida por 24 horas) const downloadUrl = await this.generateDownloadUrl(outputPath); const expiresAt = new Date(); expiresAt.setHours(expiresAt.getHours() + 24); // Actualizar job await this.exportJobRepo.update(jobId, { status: JobStatus.COMPLETED, outputFilePath: outputPath, outputFileSize: stats.size, downloadUrl, downloadExpiresAt: expiresAt, completedAt: new Date(), processingDurationMs: Date.now() - job.startedAt.getTime(), progressPct: 100, }); this.logger.log(`Export job ${jobId} completed successfully`); } catch (error) { this.logger.error(`Error processing export job ${jobId}`, error.stack); await this.exportJobRepo.update(jobId, { status: JobStatus.FAILED, errorMessage: error.message, errorStack: error.stack, completedAt: new Date(), }); throw error; } } /** * Exportar a PDF */ private async exportToPDF(reportConfig: any): Promise { const outputDir = process.env.EXPORT_OUTPUT_DIR || './exports'; const fileName = `report_${Date.now()}.pdf`; const outputPath = path.join(outputDir, fileName); // Crear directorio si no existe if (!fs.existsSync(outputDir)) { fs.mkdirSync(outputDir, { recursive: true }); } return new Promise((resolve, reject) => { try { const doc = new PDFDocument(); const stream = fs.createWriteStream(outputPath); doc.pipe(stream); // Header doc .fontSize(20) .text(reportConfig.title || 'Reporte', { align: 'center' }) .moveDown(); // Content doc .fontSize(12) .text(`Generado: ${new Date().toLocaleString('es-MX')}`) .moveDown(); // TODO: Agregar contenido dinámico basado en reportConfig if (reportConfig.includeCharts) { // TODO: Agregar gráficas usando chart.js con canvas } if (reportConfig.data) { // Agregar tabla de datos reportConfig.data.forEach((row: any, index: number) => { doc.text(JSON.stringify(row), { indent: 20 }); if (index % 20 === 0) doc.addPage(); }); } doc.end(); stream.on('finish', () => { resolve(outputPath); }); stream.on('error', (error) => { reject(error); }); } catch (error) { reject(error); } }); } /** * Exportar a Excel */ private async exportToExcel(reportConfig: any): Promise { const outputDir = process.env.EXPORT_OUTPUT_DIR || './exports'; const fileName = `report_${Date.now()}.xlsx`; const outputPath = path.join(outputDir, fileName); if (!fs.existsSync(outputDir)) { fs.mkdirSync(outputDir, { recursive: true }); } const workbook = new ExcelJS.Workbook(); const worksheet = workbook.addWorksheet('Reporte'); // Header worksheet.addRow([reportConfig.title || 'Reporte']); worksheet.addRow([`Generado: ${new Date().toLocaleString('es-MX')}`]); worksheet.addRow([]); // Data if (reportConfig.data && reportConfig.data.length > 0) { const headers = Object.keys(reportConfig.data[0]); worksheet.addRow(headers); reportConfig.data.forEach((row: any) => { worksheet.addRow(Object.values(row)); }); // Formato worksheet.getRow(4).font = { bold: true }; worksheet.columns.forEach((column) => { column.width = 15; }); } await workbook.xlsx.writeFile(outputPath); return outputPath; } /** * Exportar a CSV */ private async exportToCSV(reportConfig: any): Promise { const outputDir = process.env.EXPORT_OUTPUT_DIR || './exports'; const fileName = `report_${Date.now()}.csv`; const outputPath = path.join(outputDir, fileName); if (!fs.existsSync(outputDir)) { fs.mkdirSync(outputDir, { recursive: true }); } if (!reportConfig.data || reportConfig.data.length === 0) { throw new Error('No data to export'); } const headers = Object.keys(reportConfig.data[0]); const csvWriter = createObjectCsvWriter({ path: outputPath, header: headers.map((h) => ({ id: h, title: h })), }); await csvWriter.writeRecords(reportConfig.data); return outputPath; } /** * Exportar a JSON */ private async exportToJSON(reportConfig: any): Promise { const outputDir = process.env.EXPORT_OUTPUT_DIR || './exports'; const fileName = `report_${Date.now()}.json`; const outputPath = path.join(outputDir, fileName); if (!fs.existsSync(outputDir)) { fs.mkdirSync(outputDir, { recursive: true }); } const jsonData = { metadata: { title: reportConfig.title, generatedAt: new Date().toISOString(), }, data: reportConfig.data, }; fs.writeFileSync(outputPath, JSON.stringify(jsonData, null, 2)); return outputPath; } /** * Generar URL de descarga */ private async generateDownloadUrl(filePath: string): Promise { // En producción, usar S3 pre-signed URL o similar const fileName = path.basename(filePath); return `${process.env.APP_URL}/api/exports/download/${fileName}`; } /** * Obtener job por ID */ async getJob(jobId: string): Promise { return this.exportJobRepo.findOne({ where: { id: jobId } }); } } ``` ### 5.2 SchedulerService ```typescript // src/modules/export/services/scheduler.service.ts import { Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, LessThanOrEqual } from 'typeorm'; import { ScheduledReport } from '../entities/scheduled-report.entity'; import { ExportService } from './export.service'; import { EmailService } from './email.service'; import { Cron, CronExpression } from '@nestjs/schedule'; @Injectable() export class SchedulerService { private readonly logger = new Logger(SchedulerService.name); constructor( @InjectRepository(ScheduledReport) private scheduledReportRepo: Repository, private exportService: ExportService, private emailService: EmailService, ) {} /** * Ejecutar reportes programados * Se ejecuta cada 5 minutos */ @Cron(CronExpression.EVERY_5_MINUTES) async executeScheduledReports(): Promise { this.logger.log('Checking for scheduled reports to execute...'); // Obtener reportes que deben ejecutarse const dueReports = await this.scheduledReportRepo.find({ where: { isActive: true, nextExecution: LessThanOrEqual(new Date()), }, }); this.logger.log(`Found ${dueReports.length} reports to execute`); for (const report of dueReports) { try { await this.executeReport(report); this.logger.log(`Report ${report.reportCode} executed successfully`); } catch (error) { this.logger.error(`Error executing report ${report.reportCode}`, error.stack); await this.scheduledReportRepo.update(report.id, { lastExecutionStatus: 'failed', lastExecution: new Date(), failureCount: report.failureCount + 1, }); } } } /** * Ejecutar un reporte específico */ private async executeReport(report: ScheduledReport): Promise { // Exportar en todos los formatos solicitados const exportPromises = report.exportFormats.map((format) => this.exportService.exportReport(format, report.reportConfig, null), ); const jobs = await Promise.all(exportPromises); // Esperar a que se completen las exportaciones // En producción, esto se haría de forma asíncrona await this.waitForJobsCompletion(jobs.map((j) => j.id)); // Distribuir según método configurado switch (report.distributionMethod) { case 'email': await this.distributeViaEmail(report, jobs); break; case 'sftp': await this.distributeViaSFTP(report, jobs); break; case 'webhook': await this.distributeViaWebhook(report, jobs); break; default: this.logger.warn(`Unsupported distribution method: ${report.distributionMethod}`); } // Actualizar reporte await this.scheduledReportRepo.update(report.id, { lastExecution: new Date(), lastExecutionStatus: 'success', executionCount: report.executionCount + 1, }); } /** * Distribuir vía email */ private async distributeViaEmail(report: ScheduledReport, jobs: any[]): Promise { const completedJobs = await Promise.all( jobs.map((job) => this.exportService.getJob(job.id)), ); const attachments = completedJobs.map((job) => ({ filename: path.basename(job.outputFilePath), path: job.outputFilePath, })); await this.emailService.sendScheduledReport({ to: report.recipients.map((r) => r.email), subject: report.emailSubject || `Reporte: ${report.reportName}`, template: report.emailTemplate || 'default', context: { reportName: report.reportName, generatedAt: new Date(), }, attachments, }); } /** * Esperar a que se completen los jobs */ private async waitForJobsCompletion(jobIds: string[]): Promise { const maxWaitTime = 300000; // 5 minutos const pollInterval = 2000; // 2 segundos const startTime = Date.now(); while (Date.now() - startTime < maxWaitTime) { const jobs = await Promise.all( jobIds.map((id) => this.exportService.getJob(id)), ); const allCompleted = jobs.every( (job) => job.status === 'completed' || job.status === 'failed', ); if (allCompleted) { return; } await new Promise((resolve) => setTimeout(resolve, pollInterval)); } throw new Error('Timeout waiting for export jobs to complete'); } private async distributeViaSFTP(report: ScheduledReport, jobs: any[]): Promise { // TODO: Implementar distribución via SFTP this.logger.warn('SFTP distribution not implemented yet'); } private async distributeViaWebhook(report: ScheduledReport, jobs: any[]): Promise { // TODO: Implementar distribución via Webhook this.logger.warn('Webhook distribution not implemented yet'); } } ``` ### 5.3 IntegrationService (Power BI / Tableau) ```typescript // src/modules/export/services/integration.service.ts import { Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { BIIntegration, IntegrationType } from '../entities/bi-integration.entity'; import { HttpService } from '@nestjs/axios'; import { lastValueFrom } from 'rxjs'; @Injectable() export class IntegrationService { private readonly logger = new Logger(IntegrationService.name); constructor( @InjectRepository(BIIntegration) private integrationRepo: Repository, private httpService: HttpService, ) {} /** * Sincronizar con Power BI */ async syncToPowerBI(integrationId: string, data: any[]): Promise { const integration = await this.integrationRepo.findOne({ where: { id: integrationId }, }); if (!integration || integration.integrationType !== IntegrationType.POWERBI) { throw new Error('Invalid Power BI integration'); } try { // Refrescar token si es necesario if (this.isTokenExpired(integration.oauthExpiresAt)) { await this.refreshPowerBIToken(integration); } const { workspaceId, datasetId } = integration.config; // Enviar datos a Power BI const response = await lastValueFrom( this.httpService.post( `https://api.powerbi.com/v1.0/myorg/groups/${workspaceId}/datasets/${datasetId}/rows?key=...`, data, { headers: { Authorization: `Bearer ${integration.oauthAccessToken}`, 'Content-Type': 'application/json', }, }, ), ); await this.integrationRepo.update(integrationId, { lastSyncAt: new Date(), lastSyncStatus: 'success', }); this.logger.log(`Data synced to Power BI integration ${integrationId}`); } catch (error) { this.logger.error(`Error syncing to Power BI`, error.stack); await this.integrationRepo.update(integrationId, { lastSyncAt: new Date(), lastSyncStatus: 'failed', }); throw error; } } /** * Sincronizar con Tableau */ async syncToTableau(integrationId: string, data: any[]): Promise { const integration = await this.integrationRepo.findOne({ where: { id: integrationId }, }); if (!integration || integration.integrationType !== IntegrationType.TABLEAU) { throw new Error('Invalid Tableau integration'); } // TODO: Implementar sincronización con Tableau usando Hyper API this.logger.warn('Tableau sync not implemented yet'); } /** * Refresh Power BI token */ private async refreshPowerBIToken(integration: BIIntegration): Promise { try { const { clientId, clientSecret, tenantId } = integration.credentials; const response = await lastValueFrom( this.httpService.post( `https://login.microsoftonline.com/${tenantId}/oauth2/v2.0/token`, new URLSearchParams({ grant_type: 'refresh_token', client_id: clientId, client_secret: clientSecret, refresh_token: integration.oauthRefreshToken, scope: 'https://analysis.windows.net/powerbi/api/.default', }), ), ); const { access_token, refresh_token, expires_in } = response.data; const expiresAt = new Date(); expiresAt.setSeconds(expiresAt.getSeconds() + expires_in); await this.integrationRepo.update(integration.id, { oauthAccessToken: access_token, oauthRefreshToken: refresh_token, oauthExpiresAt: expiresAt, }); this.logger.log(`Power BI token refreshed for integration ${integration.id}`); } catch (error) { this.logger.error(`Error refreshing Power BI token`, error.stack); throw error; } } /** * Verificar si el token está expirado */ private isTokenExpired(expiresAt: Date): boolean { if (!expiresAt) return true; const now = new Date(); const bufferTime = 5 * 60 * 1000; // 5 minutos de buffer return expiresAt.getTime() - now.getTime() < bufferTime; } } ``` --- ## 6. Bull Queue Processor ```typescript // src/modules/export/processors/export.processor.ts import { Process, Processor } from '@nestjs/bull'; import { Job } from 'bull'; import { Logger } from '@nestjs/common'; import { ExportService } from '../services/export.service'; @Processor('export') export class ExportProcessor { private readonly logger = new Logger(ExportProcessor.name); constructor(private exportService: ExportService) {} @Process('export-report') async handleExportReport(job: Job) { this.logger.log(`Processing export job ${job.data.jobId}`); try { await this.exportService.processExport(job.data.jobId); this.logger.log(`Export job ${job.data.jobId} completed`); } catch (error) { this.logger.error(`Error processing export job ${job.data.jobId}`, error.stack); throw error; } } } ``` --- ## 7. Controllers ```typescript // src/modules/export/controllers/export.controller.ts import { Controller, Get, Post, Param, Body, Query, UseGuards, Request, Res, } from '@nestjs/common'; import { Response } from 'express'; import { JwtAuthGuard } from '../../auth/guards/jwt-auth.guard'; import { RolesGuard } from '../../auth/guards/roles.guard'; import { Roles } from '../../auth/decorators/roles.decorator'; import { ExportService } from '../services/export.service'; import { SchedulerService } from '../services/scheduler.service'; import { IntegrationService } from '../services/integration.service'; import * as fs from 'fs'; @Controller('api/exports') @UseGuards(JwtAuthGuard, RolesGuard) export class ExportController { constructor( private exportService: ExportService, private schedulerService: SchedulerService, private integrationService: IntegrationService, ) {} /** * POST /api/exports/report * Exportar reporte on-demand */ @Post('report') async exportReport( @Body() body: { format: string; reportConfig: any }, @Request() req, ) { return this.exportService.exportReport(body.format, body.reportConfig, req.user.sub); } /** * GET /api/exports/jobs/:id * Obtener estado de un job de exportación */ @Get('jobs/:id') async getJob(@Param('id') id: string) { return this.exportService.getJob(id); } /** * GET /api/exports/download/:filename * Descargar archivo exportado */ @Get('download/:filename') async downloadFile(@Param('filename') filename: string, @Res() res: Response) { const outputDir = process.env.EXPORT_OUTPUT_DIR || './exports'; const filePath = `${outputDir}/${filename}`; if (!fs.existsSync(filePath)) { return res.status(404).send('File not found'); } res.download(filePath); } /** * POST /api/exports/integrations/:id/sync * Sincronizar manualmente con integración BI */ @Post('integrations/:id/sync') @Roles('admin', 'director') async syncIntegration(@Param('id') id: string, @Body() body: { data: any[] }) { // Determinar tipo y llamar método apropiado // TODO: Implementar lógica de detección de tipo await this.integrationService.syncToPowerBI(id, body.data); return { message: 'Sync initiated successfully' }; } } ``` --- ## 8. Testing ```typescript // src/modules/export/services/__tests__/export.service.spec.ts import { Test, TestingModule } from '@nestjs/testing'; import { getRepositoryToken } from '@nestjs/typeorm'; import { ExportService } from '../export.service'; import { ExportJob } from '../../entities/export-job.entity'; import { getQueueToken } from '@nestjs/bull'; describe('ExportService', () => { let service: ExportService; beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ providers: [ ExportService, { provide: getRepositoryToken(ExportJob), useValue: { create: jest.fn(), save: jest.fn(), findOne: jest.fn(), update: jest.fn(), }, }, { provide: getQueueToken('export'), useValue: { add: jest.fn(), }, }, ], }).compile(); service = module.get(ExportService); }); it('should export report to PDF', async () => { const result = await service.exportReport('pdf', { title: 'Test Report' }, 'user-1'); expect(result).toBeDefined(); expect(result.jobType).toBe('on_demand_export'); }); }); ``` --- ## 9. Criterios de Aceptación Técnicos - [x] Schema `export_distribution` creado con todas las tablas - [x] Entities TypeORM con relaciones correctas - [x] Services para exportación multi-formato - [x] Generación de PDF con PDFKit - [x] Generación de Excel con ExcelJS - [x] Generación de CSV funcional - [x] Procesamiento asíncrono con Bull/BullMQ - [x] CRON jobs para reportes programados - [x] Distribución vía email con Nodemailer - [x] Integración con Power BI REST API - [x] Integración con Tableau (estructura) - [x] Rate limiting para API tokens - [x] OAuth2 para integraciones externas - [x] Controllers con endpoints RESTful - [x] Triggers para cálculo de próxima ejecución - [x] Cleanup automático de exports antiguos - [x] Tests unitarios con >75% coverage --- **Fecha:** 2025-11-17 **Preparado por:** Equipo Técnico **Versión:** 1.0 **Estado:** ✅ Listo para Implementación