erp-construccion/docs/02-definicion-modulos/MAI-006-reportes-analytics/especificaciones/ET-BI-004-implementacion-exportacion-distribucion.md

46 KiB

ET-BI-004: Implementación de Exportación y Distribución

Épica: MAI-006 - Reportes y Business Intelligence Módulo: Exportación, Distribución y Integraciones Responsable Técnico: Backend + DevOps + Integrations Fecha: 2025-11-17 Versión: 1.0


1. Objetivo Técnico

Implementar el sistema de exportación y distribución de reportes con:

  • Exportación a múltiples formatos (PDF, Excel, CSV, JSON, Parquet)
  • Programación de reportes recurrentes (diario, semanal, mensual)
  • Distribución automática vía email con adjuntos
  • Integración con Power BI REST API
  • Integración con Tableau Hyper API
  • Procesamiento asíncrono con Bull/BullMQ
  • Rate limiting y autenticación OAuth2
  • Gestión de suscripciones a reportes
  • APIs RESTful para consumo externo

2. Stack Tecnológico

Backend

- NestJS 10+ con TypeScript
- PostgreSQL 15+ (schema: export_distribution)
- Bull/BullMQ para procesamiento asíncrono
- Redis para queue management
- node-cron para reportes programados

Exportación

- PDFKit para generación de PDFs
- ExcelJS para Excel (.xlsx)
- csv-writer para archivos CSV
- Apache Parquet (parquetjs) para data lakes
- Puppeteer para PDF desde HTML

Email

- Nodemailer con transporte SMTP
- Handlebars para templates de email
- Mjml para emails responsivos

Integraciones

- Power BI REST API (axios)
- Tableau Hyper API
- OAuth2 (passport-oauth2)
- API Gateway con rate limiting

3. Modelo de Datos SQL

3.1 Schema Principal

-- =====================================================
-- SCHEMA: export_distribution
-- Descripción: Exportación y distribución de reportes
-- =====================================================

CREATE SCHEMA IF NOT EXISTS export_distribution;

-- =====================================================
-- TABLE: export_distribution.scheduled_reports
-- Descripción: Reportes programados
-- =====================================================

CREATE TABLE export_distribution.scheduled_reports (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

  constructora_id UUID NOT NULL REFERENCES public.constructoras(id) ON DELETE CASCADE,

  -- Identificación
  report_code VARCHAR(50) NOT NULL,
  report_name VARCHAR(255) NOT NULL,
  description TEXT,

  -- Tipo de reporte
  report_type VARCHAR(30) NOT NULL,
  -- executive_summary, project_status, financial_report, custom

  -- Configuración del reporte
  report_config JSONB NOT NULL,
  /*
  {
    "dashboardId": "uuid",
    "filters": {
      "dateRange": "last_30_days",
      "projectIds": ["uuid-1", "uuid-2"]
    },
    "includeCharts": true,
    "includeRawData": false
  }
  */

  -- Programación
  schedule_type VARCHAR(20) NOT NULL,
  -- once, daily, weekly, monthly, quarterly
  schedule_config JSONB NOT NULL,
  /*
  {
    "frequency": "weekly",
    "dayOfWeek": 1, // Monday
    "time": "08:00",
    "timezone": "America/Mexico_City"
  }
  */
  next_execution TIMESTAMP,

  -- Formato de exportación
  export_formats VARCHAR(20)[] NOT NULL, -- ['pdf', 'excel', 'csv']

  -- Distribución
  distribution_method VARCHAR(20) NOT NULL DEFAULT 'email',
  -- email, sftp, webhook, api

  recipients JSONB NOT NULL,
  /*
  [
    {"email": "director@company.com", "name": "Director"},
    {"email": "cfo@company.com", "name": "CFO"}
  ]
  */

  email_subject VARCHAR(255),
  email_template VARCHAR(100), -- nombre del template

  -- Estado
  is_active BOOLEAN DEFAULT true,
  last_execution TIMESTAMP,
  last_execution_status VARCHAR(20),
  -- success, failed, partial

  execution_count INTEGER DEFAULT 0,
  failure_count INTEGER DEFAULT 0,

  -- Metadata
  created_by UUID NOT NULL REFERENCES auth.users(id),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

  CONSTRAINT valid_schedule_type CHECK (schedule_type IN (
    'once', 'daily', 'weekly', 'monthly', 'quarterly'
  )),
  CONSTRAINT valid_distribution_method CHECK (distribution_method IN (
    'email', 'sftp', 'webhook', 'api'
  )),
  UNIQUE(constructora_id, report_code)
);

CREATE INDEX idx_scheduled_reports_constructora ON export_distribution.scheduled_reports(constructora_id);
CREATE INDEX idx_scheduled_reports_active ON export_distribution.scheduled_reports(is_active) WHERE is_active = true;
CREATE INDEX idx_scheduled_reports_next_exec ON export_distribution.scheduled_reports(next_execution) WHERE is_active = true;


-- =====================================================
-- TABLE: export_distribution.report_subscriptions
-- Descripción: Suscripciones de usuarios a reportes
-- =====================================================

CREATE TABLE export_distribution.report_subscriptions (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

  scheduled_report_id UUID NOT NULL REFERENCES export_distribution.scheduled_reports(id) ON DELETE CASCADE,
  user_id UUID NOT NULL REFERENCES auth.users(id) ON DELETE CASCADE,

  -- Preferencias
  receive_email BOOLEAN DEFAULT true,
  preferred_formats VARCHAR(20)[], -- ['pdf', 'excel']

  -- Filtros personalizados
  custom_filters JSONB,
  /*
  {
    "projectIds": ["uuid-1"], // solo sus proyectos
    "includeDetails": false
  }
  */

  -- Estado
  is_active BOOLEAN DEFAULT true,
  last_received_at TIMESTAMP,

  -- Metadata
  subscribed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

  UNIQUE(scheduled_report_id, user_id)
);

CREATE INDEX idx_subscriptions_report ON export_distribution.report_subscriptions(scheduled_report_id);
CREATE INDEX idx_subscriptions_user ON export_distribution.report_subscriptions(user_id);


-- =====================================================
-- TABLE: export_distribution.export_jobs
-- Descripción: Jobs de exportación (queue)
-- =====================================================

CREATE TABLE export_distribution.export_jobs (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

  scheduled_report_id UUID REFERENCES export_distribution.scheduled_reports(id) ON DELETE SET NULL,
  user_id UUID REFERENCES auth.users(id),

  -- Tipo de job
  job_type VARCHAR(30) NOT NULL,
  -- scheduled_report, on_demand_export, bulk_export

  -- Configuración
  export_config JSONB NOT NULL,
  /*
  {
    "format": "pdf",
    "reportId": "uuid",
    "filters": {...},
    "includeCharts": true
  }
  */

  -- Estado del job
  status VARCHAR(20) NOT NULL DEFAULT 'pending',
  -- pending, processing, completed, failed, cancelled

  progress_pct DECIMAL(5,2) DEFAULT 0,

  -- Resultado
  output_file_path VARCHAR(500),
  output_file_size BIGINT, -- bytes
  download_url VARCHAR(500),
  download_expires_at TIMESTAMP,

  -- Error
  error_message TEXT,
  error_stack TEXT,

  -- Performance
  started_at TIMESTAMP,
  completed_at TIMESTAMP,
  processing_duration_ms INTEGER,

  -- Metadata
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

  CONSTRAINT valid_job_type CHECK (job_type IN (
    'scheduled_report', 'on_demand_export', 'bulk_export'
  )),
  CONSTRAINT valid_status CHECK (status IN (
    'pending', 'processing', 'completed', 'failed', 'cancelled'
  ))
);

CREATE INDEX idx_export_jobs_status ON export_distribution.export_jobs(status);
CREATE INDEX idx_export_jobs_user ON export_distribution.export_jobs(user_id);
CREATE INDEX idx_export_jobs_created ON export_distribution.export_jobs(created_at DESC);


-- =====================================================
-- TABLE: export_distribution.bi_integrations
-- Descripción: Integraciones con herramientas BI externas
-- =====================================================

CREATE TABLE export_distribution.bi_integrations (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

  constructora_id UUID NOT NULL REFERENCES public.constructoras(id) ON DELETE CASCADE,

  -- Identificación
  integration_code VARCHAR(50) NOT NULL,
  integration_name VARCHAR(255) NOT NULL,
  description TEXT,

  -- Tipo de integración
  integration_type VARCHAR(30) NOT NULL,
  -- powerbi, tableau, looker, metabase, custom_api

  -- Configuración
  config JSONB NOT NULL,
  /*
  PowerBI:
  {
    "workspaceId": "...",
    "datasetId": "...",
    "clientId": "...",
    "tenantId": "...",
    "refreshSchedule": "daily"
  }

  Tableau:
  {
    "serverUrl": "https://tableau.company.com",
    "siteName": "...",
    "projectName": "...",
    "datasourceName": "..."
  }

  Custom API:
  {
    "baseUrl": "https://api.external.com",
    "authType": "oauth2",
    "apiKey": "encrypted..."
  }
  */

  -- Credenciales (encriptadas)
  credentials JSONB,
  oauth_access_token TEXT,
  oauth_refresh_token TEXT,
  oauth_expires_at TIMESTAMP,

  -- Sincronización
  sync_frequency VARCHAR(20) DEFAULT 'daily',
  -- realtime, hourly, daily, weekly, manual
  last_sync_at TIMESTAMP,
  next_sync_at TIMESTAMP,
  last_sync_status VARCHAR(20),
  -- success, failed, partial

  -- Estado
  is_active BOOLEAN DEFAULT true,

  -- Metadata
  created_by UUID REFERENCES auth.users(id),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

  CONSTRAINT valid_integration_type CHECK (integration_type IN (
    'powerbi', 'tableau', 'looker', 'metabase', 'custom_api'
  )),
  UNIQUE(constructora_id, integration_code)
);

CREATE INDEX idx_bi_integrations_constructora ON export_distribution.bi_integrations(constructora_id);
CREATE INDEX idx_bi_integrations_type ON export_distribution.bi_integrations(integration_type);
CREATE INDEX idx_bi_integrations_active ON export_distribution.bi_integrations(is_active) WHERE is_active = true;


-- =====================================================
-- TABLE: export_distribution.api_tokens
-- Descripción: Tokens para acceso a API externo
-- =====================================================

CREATE TABLE export_distribution.api_tokens (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

  constructora_id UUID NOT NULL REFERENCES public.constructoras(id) ON DELETE CASCADE,
  user_id UUID REFERENCES auth.users(id),

  -- Token
  token VARCHAR(255) NOT NULL UNIQUE,
  token_hash VARCHAR(255) NOT NULL, -- bcrypt hash

  -- Identificación
  token_name VARCHAR(255) NOT NULL,
  description TEXT,

  -- Permisos
  scopes VARCHAR(50)[], -- ['read:reports', 'write:reports', 'export:data']
  allowed_endpoints VARCHAR(100)[], -- ['/api/reports/*', '/api/analytics/*']

  -- Rate limiting
  rate_limit_requests INTEGER DEFAULT 1000,
  rate_limit_window INTEGER DEFAULT 3600, -- segundos

  -- Restricciones IP
  allowed_ips INET[],

  -- Estado
  is_active BOOLEAN DEFAULT true,
  expires_at TIMESTAMP,

  -- Uso
  last_used_at TIMESTAMP,
  usage_count INTEGER DEFAULT 0,

  -- Metadata
  created_by UUID REFERENCES auth.users(id),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  revoked_at TIMESTAMP,
  revoked_by UUID REFERENCES auth.users(id)
);

CREATE INDEX idx_api_tokens_token_hash ON export_distribution.api_tokens(token_hash);
CREATE INDEX idx_api_tokens_constructora ON export_distribution.api_tokens(constructora_id);
CREATE INDEX idx_api_tokens_active ON export_distribution.api_tokens(is_active) WHERE is_active = true;


-- =====================================================
-- TABLE: export_distribution.export_templates
-- Descripción: Templates para exportación
-- =====================================================

CREATE TABLE export_distribution.export_templates (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

  constructora_id UUID NOT NULL REFERENCES public.constructoras(id) ON DELETE CASCADE,

  -- Identificación
  template_code VARCHAR(50) NOT NULL,
  template_name VARCHAR(255) NOT NULL,
  description TEXT,

  -- Tipo
  template_type VARCHAR(30) NOT NULL,
  -- pdf, excel, email

  -- Contenido del template
  template_content TEXT NOT NULL,
  -- HTML para PDF, JSON para Excel, Handlebars para Email

  -- Variables disponibles
  available_variables JSONB,
  /*
  [
    {"name": "projectName", "type": "string", "description": "..."},
    {"name": "totalCost", "type": "number", "format": "currency"}
  ]
  */

  -- Configuración
  config JSONB DEFAULT '{}',
  /*
  {
    "pageSize": "letter",
    "orientation": "portrait",
    "margins": {"top": 20, "right": 20, "bottom": 20, "left": 20},
    "header": {...},
    "footer": {...}
  }
  */

  -- Estado
  is_active BOOLEAN DEFAULT true,
  is_default BOOLEAN DEFAULT false,

  -- Metadata
  created_by UUID REFERENCES auth.users(id),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

  UNIQUE(constructora_id, template_code)
);

CREATE INDEX idx_templates_constructora ON export_distribution.export_templates(constructora_id);
CREATE INDEX idx_templates_type ON export_distribution.export_templates(template_type);

3.2 Functions y Triggers

-- =====================================================
-- FUNCTION: Calcular próxima ejecución de reporte
-- =====================================================

CREATE OR REPLACE FUNCTION export_distribution.calculate_next_execution(
  p_schedule_type VARCHAR,
  p_schedule_config JSONB,
  p_last_execution TIMESTAMP DEFAULT NULL
) RETURNS TIMESTAMP AS $$
DECLARE
  v_next_execution TIMESTAMP;
  v_base_date TIMESTAMP;
BEGIN
  v_base_date := COALESCE(p_last_execution, CURRENT_TIMESTAMP);

  CASE p_schedule_type
    WHEN 'daily' THEN
      v_next_execution := v_base_date + INTERVAL '1 day';

    WHEN 'weekly' THEN
      -- Calcular próximo día de la semana especificado
      v_next_execution := v_base_date + INTERVAL '7 days';

    WHEN 'monthly' THEN
      v_next_execution := v_base_date + INTERVAL '1 month';

    WHEN 'quarterly' THEN
      v_next_execution := v_base_date + INTERVAL '3 months';

    ELSE
      v_next_execution := NULL;
  END CASE;

  -- Ajustar a la hora especificada
  IF p_schedule_config ? 'time' THEN
    v_next_execution := DATE_TRUNC('day', v_next_execution) +
                        (p_schedule_config->>'time')::TIME;
  END IF;

  RETURN v_next_execution;
END;
$$ LANGUAGE plpgsql;


-- =====================================================
-- TRIGGER: Actualizar next_execution automáticamente
-- =====================================================

CREATE OR REPLACE FUNCTION export_distribution.update_next_execution()
RETURNS TRIGGER AS $$
BEGIN
  -- Solo si se ejecutó exitosamente
  IF NEW.last_execution_status = 'success' AND
     NEW.last_execution IS DISTINCT FROM OLD.last_execution THEN

    NEW.next_execution := export_distribution.calculate_next_execution(
      NEW.schedule_type,
      NEW.schedule_config,
      NEW.last_execution
    );
  END IF;

  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_update_next_execution
  BEFORE UPDATE ON export_distribution.scheduled_reports
  FOR EACH ROW
  EXECUTE FUNCTION export_distribution.update_next_execution();


-- =====================================================
-- FUNCTION: Limpiar exports antiguos
-- =====================================================

CREATE OR REPLACE FUNCTION export_distribution.cleanup_old_exports(
  p_retention_days INTEGER DEFAULT 30
) RETURNS INTEGER AS $$
DECLARE
  v_deleted INTEGER;
BEGIN
  DELETE FROM export_distribution.export_jobs
  WHERE status = 'completed'
    AND completed_at < CURRENT_TIMESTAMP - (p_retention_days || ' days')::INTERVAL;

  GET DIAGNOSTICS v_deleted = ROW_COUNT;

  RETURN v_deleted;
END;
$$ LANGUAGE plpgsql;

-- CRON job para ejecutar semanalmente
-- SELECT cron.schedule('cleanup-old-exports', '0 2 * * 0',
--   'SELECT export_distribution.cleanup_old_exports(30)');

4. TypeORM Entities

4.1 ScheduledReport Entity

// src/modules/export/entities/scheduled-report.entity.ts

import {
  Entity,
  PrimaryGeneratedColumn,
  Column,
  ManyToOne,
  OneToMany,
  JoinColumn,
  CreateDateColumn,
  UpdateDateColumn,
  Index,
} from 'typeorm';
import { Constructora } from '../../constructoras/entities/constructora.entity';
import { User } from '../../auth/entities/user.entity';
import { ReportSubscription } from './report-subscription.entity';

export enum ScheduleType {
  ONCE = 'once',
  DAILY = 'daily',
  WEEKLY = 'weekly',
  MONTHLY = 'monthly',
  QUARTERLY = 'quarterly',
}

export enum DistributionMethod {
  EMAIL = 'email',
  SFTP = 'sftp',
  WEBHOOK = 'webhook',
  API = 'api',
}

export interface Recipient {
  email: string;
  name: string;
}

@Entity('scheduled_reports', { schema: 'export_distribution' })
@Index(['constructoraId', 'reportCode'], { unique: true })
export class ScheduledReport {
  @PrimaryGeneratedColumn('uuid')
  id: string;

  @Column({ name: 'constructora_id', type: 'uuid' })
  @Index()
  constructoraId: string;

  @ManyToOne(() => Constructora)
  @JoinColumn({ name: 'constructora_id' })
  constructora: Constructora;

  // Identificación
  @Column({ name: 'report_code', type: 'varchar', length: 50 })
  reportCode: string;

  @Column({ name: 'report_name', type: 'varchar', length: 255 })
  reportName: string;

  @Column({ type: 'text', nullable: true })
  description?: string;

  @Column({ name: 'report_type', type: 'varchar', length: 30 })
  reportType: string;

  // Configuración
  @Column({ name: 'report_config', type: 'jsonb' })
  reportConfig: any;

  // Programación
  @Column({ name: 'schedule_type', type: 'enum', enum: ScheduleType })
  scheduleType: ScheduleType;

  @Column({ name: 'schedule_config', type: 'jsonb' })
  scheduleConfig: any;

  @Column({ name: 'next_execution', type: 'timestamp', nullable: true })
  @Index()
  nextExecution?: Date;

  // Formato
  @Column({ name: 'export_formats', type: 'varchar', array: true })
  exportFormats: string[];

  // Distribución
  @Column({ name: 'distribution_method', type: 'enum', enum: DistributionMethod, default: DistributionMethod.EMAIL })
  distributionMethod: DistributionMethod;

  @Column({ type: 'jsonb' })
  recipients: Recipient[];

  @Column({ name: 'email_subject', type: 'varchar', length: 255, nullable: true })
  emailSubject?: string;

  @Column({ name: 'email_template', type: 'varchar', length: 100, nullable: true })
  emailTemplate?: string;

  // Estado
  @Column({ name: 'is_active', type: 'boolean', default: true })
  @Index()
  isActive: boolean;

  @Column({ name: 'last_execution', type: 'timestamp', nullable: true })
  lastExecution?: Date;

  @Column({ name: 'last_execution_status', type: 'varchar', length: 20, nullable: true })
  lastExecutionStatus?: string;

  @Column({ name: 'execution_count', type: 'integer', default: 0 })
  executionCount: number;

  @Column({ name: 'failure_count', type: 'integer', default: 0 })
  failureCount: number;

  // Metadata
  @Column({ name: 'created_by', type: 'uuid' })
  createdBy: string;

  @ManyToOne(() => User)
  @JoinColumn({ name: 'created_by' })
  creator: User;

  @CreateDateColumn({ name: 'created_at' })
  createdAt: Date;

  @UpdateDateColumn({ name: 'updated_at' })
  updatedAt: Date;

  // Relaciones
  @OneToMany(() => ReportSubscription, (sub) => sub.scheduledReport)
  subscriptions: ReportSubscription[];
}

4.2 ExportJob Entity

// src/modules/export/entities/export-job.entity.ts

import {
  Entity,
  PrimaryGeneratedColumn,
  Column,
  ManyToOne,
  JoinColumn,
  CreateDateColumn,
  Index,
} from 'typeorm';
import { ScheduledReport } from './scheduled-report.entity';
import { User } from '../../auth/entities/user.entity';

export enum JobType {
  SCHEDULED_REPORT = 'scheduled_report',
  ON_DEMAND_EXPORT = 'on_demand_export',
  BULK_EXPORT = 'bulk_export',
}

export enum JobStatus {
  PENDING = 'pending',
  PROCESSING = 'processing',
  COMPLETED = 'completed',
  FAILED = 'failed',
  CANCELLED = 'cancelled',
}

@Entity('export_jobs', { schema: 'export_distribution' })
export class ExportJob {
  @PrimaryGeneratedColumn('uuid')
  id: string;

  @Column({ name: 'scheduled_report_id', type: 'uuid', nullable: true })
  scheduledReportId?: string;

  @ManyToOne(() => ScheduledReport)
  @JoinColumn({ name: 'scheduled_report_id' })
  scheduledReport?: ScheduledReport;

  @Column({ name: 'user_id', type: 'uuid', nullable: true })
  @Index()
  userId?: string;

  @ManyToOne(() => User)
  @JoinColumn({ name: 'user_id' })
  user?: User;

  // Tipo
  @Column({ name: 'job_type', type: 'enum', enum: JobType })
  jobType: JobType;

  // Configuración
  @Column({ name: 'export_config', type: 'jsonb' })
  exportConfig: any;

  // Estado
  @Column({ type: 'enum', enum: JobStatus, default: JobStatus.PENDING })
  @Index()
  status: JobStatus;

  @Column({ name: 'progress_pct', type: 'decimal', precision: 5, scale: 2, default: 0 })
  progressPct: number;

  // Resultado
  @Column({ name: 'output_file_path', type: 'varchar', length: 500, nullable: true })
  outputFilePath?: string;

  @Column({ name: 'output_file_size', type: 'bigint', nullable: true })
  outputFileSize?: number;

  @Column({ name: 'download_url', type: 'varchar', length: 500, nullable: true })
  downloadUrl?: string;

  @Column({ name: 'download_expires_at', type: 'timestamp', nullable: true })
  downloadExpiresAt?: Date;

  // Error
  @Column({ name: 'error_message', type: 'text', nullable: true })
  errorMessage?: string;

  @Column({ name: 'error_stack', type: 'text', nullable: true })
  errorStack?: string;

  // Performance
  @Column({ name: 'started_at', type: 'timestamp', nullable: true })
  startedAt?: Date;

  @Column({ name: 'completed_at', type: 'timestamp', nullable: true })
  completedAt?: Date;

  @Column({ name: 'processing_duration_ms', type: 'integer', nullable: true })
  processingDurationMs?: number;

  // Metadata
  @CreateDateColumn({ name: 'created_at' })
  @Index()
  createdAt: Date;
}

4.3 BIIntegration Entity

// src/modules/export/entities/bi-integration.entity.ts

import {
  Entity,
  PrimaryGeneratedColumn,
  Column,
  ManyToOne,
  JoinColumn,
  CreateDateColumn,
  UpdateDateColumn,
  Index,
} from 'typeorm';
import { Constructora } from '../../constructoras/entities/constructora.entity';
import { User } from '../../auth/entities/user.entity';

export enum IntegrationType {
  POWERBI = 'powerbi',
  TABLEAU = 'tableau',
  LOOKER = 'looker',
  METABASE = 'metabase',
  CUSTOM_API = 'custom_api',
}

export enum SyncFrequency {
  REALTIME = 'realtime',
  HOURLY = 'hourly',
  DAILY = 'daily',
  WEEKLY = 'weekly',
  MANUAL = 'manual',
}

@Entity('bi_integrations', { schema: 'export_distribution' })
@Index(['constructoraId', 'integrationCode'], { unique: true })
export class BIIntegration {
  @PrimaryGeneratedColumn('uuid')
  id: string;

  @Column({ name: 'constructora_id', type: 'uuid' })
  @Index()
  constructoraId: string;

  @ManyToOne(() => Constructora)
  @JoinColumn({ name: 'constructora_id' })
  constructora: Constructora;

  // Identificación
  @Column({ name: 'integration_code', type: 'varchar', length: 50 })
  integrationCode: string;

  @Column({ name: 'integration_name', type: 'varchar', length: 255 })
  integrationName: string;

  @Column({ type: 'text', nullable: true })
  description?: string;

  // Tipo
  @Column({ name: 'integration_type', type: 'enum', enum: IntegrationType })
  @Index()
  integrationType: IntegrationType;

  // Configuración
  @Column({ type: 'jsonb' })
  config: any;

  // Credenciales (encriptadas)
  @Column({ type: 'jsonb', nullable: true })
  credentials?: any;

  @Column({ name: 'oauth_access_token', type: 'text', nullable: true })
  oauthAccessToken?: string;

  @Column({ name: 'oauth_refresh_token', type: 'text', nullable: true })
  oauthRefreshToken?: string;

  @Column({ name: 'oauth_expires_at', type: 'timestamp', nullable: true })
  oauthExpiresAt?: Date;

  // Sincronización
  @Column({ name: 'sync_frequency', type: 'enum', enum: SyncFrequency, default: SyncFrequency.DAILY })
  syncFrequency: SyncFrequency;

  @Column({ name: 'last_sync_at', type: 'timestamp', nullable: true })
  lastSyncAt?: Date;

  @Column({ name: 'next_sync_at', type: 'timestamp', nullable: true })
  nextSyncAt?: Date;

  @Column({ name: 'last_sync_status', type: 'varchar', length: 20, nullable: true })
  lastSyncStatus?: string;

  // Estado
  @Column({ name: 'is_active', type: 'boolean', default: true })
  @Index()
  isActive: boolean;

  // Metadata
  @Column({ name: 'created_by', type: 'uuid', nullable: true })
  createdBy?: string;

  @ManyToOne(() => User)
  @JoinColumn({ name: 'created_by' })
  creator?: User;

  @CreateDateColumn({ name: 'created_at' })
  createdAt: Date;

  @UpdateDateColumn({ name: 'updated_at' })
  updatedAt: Date;
}

5. Services (Lógica de Negocio)

5.1 ExportService

// src/modules/export/services/export.service.ts

import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { ExportJob, JobStatus, JobType } from '../entities/export-job.entity';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import * as PDFDocument from 'pdfkit';
import * as ExcelJS from 'exceljs';
import { createObjectCsvWriter } from 'csv-writer';
import * as fs from 'fs';
import * as path from 'path';

@Injectable()
export class ExportService {
  private readonly logger = new Logger(ExportService.name);

  constructor(
    @InjectRepository(ExportJob)
    private exportJobRepo: Repository<ExportJob>,
    @InjectQueue('export')
    private exportQueue: Queue,
  ) {}

  /**
   * Exportar reporte on-demand
   */
  async exportReport(
    format: string,
    reportConfig: any,
    userId: string,
  ): Promise<ExportJob> {
    // Crear job
    const job = this.exportJobRepo.create({
      jobType: JobType.ON_DEMAND_EXPORT,
      userId,
      exportConfig: {
        format,
        ...reportConfig,
      },
      status: JobStatus.PENDING,
    });

    const savedJob = await this.exportJobRepo.save(job);

    // Agregar a queue
    await this.exportQueue.add('export-report', {
      jobId: savedJob.id,
      format,
      reportConfig,
    }, {
      attempts: 3,
      backoff: {
        type: 'exponential',
        delay: 2000,
      },
    });

    return savedJob;
  }

  /**
   * Procesar exportación (llamado por Bull processor)
   */
  async processExport(jobId: string): Promise<void> {
    const job = await this.exportJobRepo.findOne({ where: { id: jobId } });

    if (!job) {
      throw new Error(`Export job ${jobId} not found`);
    }

    try {
      // Actualizar estado
      await this.exportJobRepo.update(jobId, {
        status: JobStatus.PROCESSING,
        startedAt: new Date(),
      });

      const { format, reportConfig } = job.exportConfig;

      let outputPath: string;

      switch (format) {
        case 'pdf':
          outputPath = await this.exportToPDF(reportConfig);
          break;

        case 'excel':
          outputPath = await this.exportToExcel(reportConfig);
          break;

        case 'csv':
          outputPath = await this.exportToCSV(reportConfig);
          break;

        case 'json':
          outputPath = await this.exportToJSON(reportConfig);
          break;

        default:
          throw new Error(`Unsupported format: ${format}`);
      }

      // Obtener tamaño del archivo
      const stats = fs.statSync(outputPath);

      // Generar URL de descarga (válida por 24 horas)
      const downloadUrl = await this.generateDownloadUrl(outputPath);
      const expiresAt = new Date();
      expiresAt.setHours(expiresAt.getHours() + 24);

      // Actualizar job
      await this.exportJobRepo.update(jobId, {
        status: JobStatus.COMPLETED,
        outputFilePath: outputPath,
        outputFileSize: stats.size,
        downloadUrl,
        downloadExpiresAt: expiresAt,
        completedAt: new Date(),
        processingDurationMs: Date.now() - job.startedAt.getTime(),
        progressPct: 100,
      });

      this.logger.log(`Export job ${jobId} completed successfully`);
    } catch (error) {
      this.logger.error(`Error processing export job ${jobId}`, error.stack);

      await this.exportJobRepo.update(jobId, {
        status: JobStatus.FAILED,
        errorMessage: error.message,
        errorStack: error.stack,
        completedAt: new Date(),
      });

      throw error;
    }
  }

  /**
   * Exportar a PDF
   */
  private async exportToPDF(reportConfig: any): Promise<string> {
    const outputDir = process.env.EXPORT_OUTPUT_DIR || './exports';
    const fileName = `report_${Date.now()}.pdf`;
    const outputPath = path.join(outputDir, fileName);

    // Crear directorio si no existe
    if (!fs.existsSync(outputDir)) {
      fs.mkdirSync(outputDir, { recursive: true });
    }

    return new Promise((resolve, reject) => {
      try {
        const doc = new PDFDocument();
        const stream = fs.createWriteStream(outputPath);

        doc.pipe(stream);

        // Header
        doc
          .fontSize(20)
          .text(reportConfig.title || 'Reporte', { align: 'center' })
          .moveDown();

        // Content
        doc
          .fontSize(12)
          .text(`Generado: ${new Date().toLocaleString('es-MX')}`)
          .moveDown();

        // TODO: Agregar contenido dinámico basado en reportConfig

        if (reportConfig.includeCharts) {
          // TODO: Agregar gráficas usando chart.js con canvas
        }

        if (reportConfig.data) {
          // Agregar tabla de datos
          reportConfig.data.forEach((row: any, index: number) => {
            doc.text(JSON.stringify(row), { indent: 20 });
            if (index % 20 === 0) doc.addPage();
          });
        }

        doc.end();

        stream.on('finish', () => {
          resolve(outputPath);
        });

        stream.on('error', (error) => {
          reject(error);
        });
      } catch (error) {
        reject(error);
      }
    });
  }

  /**
   * Exportar a Excel
   */
  private async exportToExcel(reportConfig: any): Promise<string> {
    const outputDir = process.env.EXPORT_OUTPUT_DIR || './exports';
    const fileName = `report_${Date.now()}.xlsx`;
    const outputPath = path.join(outputDir, fileName);

    if (!fs.existsSync(outputDir)) {
      fs.mkdirSync(outputDir, { recursive: true });
    }

    const workbook = new ExcelJS.Workbook();
    const worksheet = workbook.addWorksheet('Reporte');

    // Header
    worksheet.addRow([reportConfig.title || 'Reporte']);
    worksheet.addRow([`Generado: ${new Date().toLocaleString('es-MX')}`]);
    worksheet.addRow([]);

    // Data
    if (reportConfig.data && reportConfig.data.length > 0) {
      const headers = Object.keys(reportConfig.data[0]);
      worksheet.addRow(headers);

      reportConfig.data.forEach((row: any) => {
        worksheet.addRow(Object.values(row));
      });

      // Formato
      worksheet.getRow(4).font = { bold: true };
      worksheet.columns.forEach((column) => {
        column.width = 15;
      });
    }

    await workbook.xlsx.writeFile(outputPath);

    return outputPath;
  }

  /**
   * Exportar a CSV
   */
  private async exportToCSV(reportConfig: any): Promise<string> {
    const outputDir = process.env.EXPORT_OUTPUT_DIR || './exports';
    const fileName = `report_${Date.now()}.csv`;
    const outputPath = path.join(outputDir, fileName);

    if (!fs.existsSync(outputDir)) {
      fs.mkdirSync(outputDir, { recursive: true });
    }

    if (!reportConfig.data || reportConfig.data.length === 0) {
      throw new Error('No data to export');
    }

    const headers = Object.keys(reportConfig.data[0]);

    const csvWriter = createObjectCsvWriter({
      path: outputPath,
      header: headers.map((h) => ({ id: h, title: h })),
    });

    await csvWriter.writeRecords(reportConfig.data);

    return outputPath;
  }

  /**
   * Exportar a JSON
   */
  private async exportToJSON(reportConfig: any): Promise<string> {
    const outputDir = process.env.EXPORT_OUTPUT_DIR || './exports';
    const fileName = `report_${Date.now()}.json`;
    const outputPath = path.join(outputDir, fileName);

    if (!fs.existsSync(outputDir)) {
      fs.mkdirSync(outputDir, { recursive: true });
    }

    const jsonData = {
      metadata: {
        title: reportConfig.title,
        generatedAt: new Date().toISOString(),
      },
      data: reportConfig.data,
    };

    fs.writeFileSync(outputPath, JSON.stringify(jsonData, null, 2));

    return outputPath;
  }

  /**
   * Generar URL de descarga
   */
  private async generateDownloadUrl(filePath: string): Promise<string> {
    // En producción, usar S3 pre-signed URL o similar
    const fileName = path.basename(filePath);
    return `${process.env.APP_URL}/api/exports/download/${fileName}`;
  }

  /**
   * Obtener job por ID
   */
  async getJob(jobId: string): Promise<ExportJob> {
    return this.exportJobRepo.findOne({ where: { id: jobId } });
  }
}

5.2 SchedulerService

// src/modules/export/services/scheduler.service.ts

import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, LessThanOrEqual } from 'typeorm';
import { ScheduledReport } from '../entities/scheduled-report.entity';
import { ExportService } from './export.service';
import { EmailService } from './email.service';
import { Cron, CronExpression } from '@nestjs/schedule';

@Injectable()
export class SchedulerService {
  private readonly logger = new Logger(SchedulerService.name);

  constructor(
    @InjectRepository(ScheduledReport)
    private scheduledReportRepo: Repository<ScheduledReport>,
    private exportService: ExportService,
    private emailService: EmailService,
  ) {}

  /**
   * Ejecutar reportes programados
   * Se ejecuta cada 5 minutos
   */
  @Cron(CronExpression.EVERY_5_MINUTES)
  async executeScheduledReports(): Promise<void> {
    this.logger.log('Checking for scheduled reports to execute...');

    // Obtener reportes que deben ejecutarse
    const dueReports = await this.scheduledReportRepo.find({
      where: {
        isActive: true,
        nextExecution: LessThanOrEqual(new Date()),
      },
    });

    this.logger.log(`Found ${dueReports.length} reports to execute`);

    for (const report of dueReports) {
      try {
        await this.executeReport(report);
        this.logger.log(`Report ${report.reportCode} executed successfully`);
      } catch (error) {
        this.logger.error(`Error executing report ${report.reportCode}`, error.stack);

        await this.scheduledReportRepo.update(report.id, {
          lastExecutionStatus: 'failed',
          lastExecution: new Date(),
          failureCount: report.failureCount + 1,
        });
      }
    }
  }

  /**
   * Ejecutar un reporte específico
   */
  private async executeReport(report: ScheduledReport): Promise<void> {
    // Exportar en todos los formatos solicitados
    const exportPromises = report.exportFormats.map((format) =>
      this.exportService.exportReport(format, report.reportConfig, null),
    );

    const jobs = await Promise.all(exportPromises);

    // Esperar a que se completen las exportaciones
    // En producción, esto se haría de forma asíncrona
    await this.waitForJobsCompletion(jobs.map((j) => j.id));

    // Distribuir según método configurado
    switch (report.distributionMethod) {
      case 'email':
        await this.distributeViaEmail(report, jobs);
        break;

      case 'sftp':
        await this.distributeViaSFTP(report, jobs);
        break;

      case 'webhook':
        await this.distributeViaWebhook(report, jobs);
        break;

      default:
        this.logger.warn(`Unsupported distribution method: ${report.distributionMethod}`);
    }

    // Actualizar reporte
    await this.scheduledReportRepo.update(report.id, {
      lastExecution: new Date(),
      lastExecutionStatus: 'success',
      executionCount: report.executionCount + 1,
    });
  }

  /**
   * Distribuir vía email
   */
  private async distributeViaEmail(report: ScheduledReport, jobs: any[]): Promise<void> {
    const completedJobs = await Promise.all(
      jobs.map((job) => this.exportService.getJob(job.id)),
    );

    const attachments = completedJobs.map((job) => ({
      filename: path.basename(job.outputFilePath),
      path: job.outputFilePath,
    }));

    await this.emailService.sendScheduledReport({
      to: report.recipients.map((r) => r.email),
      subject: report.emailSubject || `Reporte: ${report.reportName}`,
      template: report.emailTemplate || 'default',
      context: {
        reportName: report.reportName,
        generatedAt: new Date(),
      },
      attachments,
    });
  }

  /**
   * Esperar a que se completen los jobs
   */
  private async waitForJobsCompletion(jobIds: string[]): Promise<void> {
    const maxWaitTime = 300000; // 5 minutos
    const pollInterval = 2000; // 2 segundos
    const startTime = Date.now();

    while (Date.now() - startTime < maxWaitTime) {
      const jobs = await Promise.all(
        jobIds.map((id) => this.exportService.getJob(id)),
      );

      const allCompleted = jobs.every(
        (job) => job.status === 'completed' || job.status === 'failed',
      );

      if (allCompleted) {
        return;
      }

      await new Promise((resolve) => setTimeout(resolve, pollInterval));
    }

    throw new Error('Timeout waiting for export jobs to complete');
  }

  private async distributeViaSFTP(report: ScheduledReport, jobs: any[]): Promise<void> {
    // TODO: Implementar distribución via SFTP
    this.logger.warn('SFTP distribution not implemented yet');
  }

  private async distributeViaWebhook(report: ScheduledReport, jobs: any[]): Promise<void> {
    // TODO: Implementar distribución via Webhook
    this.logger.warn('Webhook distribution not implemented yet');
  }
}

5.3 IntegrationService (Power BI / Tableau)

// src/modules/export/services/integration.service.ts

import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { BIIntegration, IntegrationType } from '../entities/bi-integration.entity';
import { HttpService } from '@nestjs/axios';
import { lastValueFrom } from 'rxjs';

@Injectable()
export class IntegrationService {
  private readonly logger = new Logger(IntegrationService.name);

  constructor(
    @InjectRepository(BIIntegration)
    private integrationRepo: Repository<BIIntegration>,
    private httpService: HttpService,
  ) {}

  /**
   * Sincronizar con Power BI
   */
  async syncToPowerBI(integrationId: string, data: any[]): Promise<void> {
    const integration = await this.integrationRepo.findOne({
      where: { id: integrationId },
    });

    if (!integration || integration.integrationType !== IntegrationType.POWERBI) {
      throw new Error('Invalid Power BI integration');
    }

    try {
      // Refrescar token si es necesario
      if (this.isTokenExpired(integration.oauthExpiresAt)) {
        await this.refreshPowerBIToken(integration);
      }

      const { workspaceId, datasetId } = integration.config;

      // Enviar datos a Power BI
      const response = await lastValueFrom(
        this.httpService.post(
          `https://api.powerbi.com/v1.0/myorg/groups/${workspaceId}/datasets/${datasetId}/rows?key=...`,
          data,
          {
            headers: {
              Authorization: `Bearer ${integration.oauthAccessToken}`,
              'Content-Type': 'application/json',
            },
          },
        ),
      );

      await this.integrationRepo.update(integrationId, {
        lastSyncAt: new Date(),
        lastSyncStatus: 'success',
      });

      this.logger.log(`Data synced to Power BI integration ${integrationId}`);
    } catch (error) {
      this.logger.error(`Error syncing to Power BI`, error.stack);

      await this.integrationRepo.update(integrationId, {
        lastSyncAt: new Date(),
        lastSyncStatus: 'failed',
      });

      throw error;
    }
  }

  /**
   * Sincronizar con Tableau
   */
  async syncToTableau(integrationId: string, data: any[]): Promise<void> {
    const integration = await this.integrationRepo.findOne({
      where: { id: integrationId },
    });

    if (!integration || integration.integrationType !== IntegrationType.TABLEAU) {
      throw new Error('Invalid Tableau integration');
    }

    // TODO: Implementar sincronización con Tableau usando Hyper API
    this.logger.warn('Tableau sync not implemented yet');
  }

  /**
   * Refresh Power BI token
   */
  private async refreshPowerBIToken(integration: BIIntegration): Promise<void> {
    try {
      const { clientId, clientSecret, tenantId } = integration.credentials;

      const response = await lastValueFrom(
        this.httpService.post(
          `https://login.microsoftonline.com/${tenantId}/oauth2/v2.0/token`,
          new URLSearchParams({
            grant_type: 'refresh_token',
            client_id: clientId,
            client_secret: clientSecret,
            refresh_token: integration.oauthRefreshToken,
            scope: 'https://analysis.windows.net/powerbi/api/.default',
          }),
        ),
      );

      const { access_token, refresh_token, expires_in } = response.data;

      const expiresAt = new Date();
      expiresAt.setSeconds(expiresAt.getSeconds() + expires_in);

      await this.integrationRepo.update(integration.id, {
        oauthAccessToken: access_token,
        oauthRefreshToken: refresh_token,
        oauthExpiresAt: expiresAt,
      });

      this.logger.log(`Power BI token refreshed for integration ${integration.id}`);
    } catch (error) {
      this.logger.error(`Error refreshing Power BI token`, error.stack);
      throw error;
    }
  }

  /**
   * Verificar si el token está expirado
   */
  private isTokenExpired(expiresAt: Date): boolean {
    if (!expiresAt) return true;

    const now = new Date();
    const bufferTime = 5 * 60 * 1000; // 5 minutos de buffer

    return expiresAt.getTime() - now.getTime() < bufferTime;
  }
}

6. Bull Queue Processor

// src/modules/export/processors/export.processor.ts

import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { Logger } from '@nestjs/common';
import { ExportService } from '../services/export.service';

@Processor('export')
export class ExportProcessor {
  private readonly logger = new Logger(ExportProcessor.name);

  constructor(private exportService: ExportService) {}

  @Process('export-report')
  async handleExportReport(job: Job) {
    this.logger.log(`Processing export job ${job.data.jobId}`);

    try {
      await this.exportService.processExport(job.data.jobId);

      this.logger.log(`Export job ${job.data.jobId} completed`);
    } catch (error) {
      this.logger.error(`Error processing export job ${job.data.jobId}`, error.stack);
      throw error;
    }
  }
}

7. Controllers

// src/modules/export/controllers/export.controller.ts

import {
  Controller,
  Get,
  Post,
  Param,
  Body,
  Query,
  UseGuards,
  Request,
  Res,
} from '@nestjs/common';
import { Response } from 'express';
import { JwtAuthGuard } from '../../auth/guards/jwt-auth.guard';
import { RolesGuard } from '../../auth/guards/roles.guard';
import { Roles } from '../../auth/decorators/roles.decorator';
import { ExportService } from '../services/export.service';
import { SchedulerService } from '../services/scheduler.service';
import { IntegrationService } from '../services/integration.service';
import * as fs from 'fs';

@Controller('api/exports')
@UseGuards(JwtAuthGuard, RolesGuard)
export class ExportController {
  constructor(
    private exportService: ExportService,
    private schedulerService: SchedulerService,
    private integrationService: IntegrationService,
  ) {}

  /**
   * POST /api/exports/report
   * Exportar reporte on-demand
   */
  @Post('report')
  async exportReport(
    @Body() body: { format: string; reportConfig: any },
    @Request() req,
  ) {
    return this.exportService.exportReport(body.format, body.reportConfig, req.user.sub);
  }

  /**
   * GET /api/exports/jobs/:id
   * Obtener estado de un job de exportación
   */
  @Get('jobs/:id')
  async getJob(@Param('id') id: string) {
    return this.exportService.getJob(id);
  }

  /**
   * GET /api/exports/download/:filename
   * Descargar archivo exportado
   */
  @Get('download/:filename')
  async downloadFile(@Param('filename') filename: string, @Res() res: Response) {
    const outputDir = process.env.EXPORT_OUTPUT_DIR || './exports';
    const filePath = `${outputDir}/${filename}`;

    if (!fs.existsSync(filePath)) {
      return res.status(404).send('File not found');
    }

    res.download(filePath);
  }

  /**
   * POST /api/exports/integrations/:id/sync
   * Sincronizar manualmente con integración BI
   */
  @Post('integrations/:id/sync')
  @Roles('admin', 'director')
  async syncIntegration(@Param('id') id: string, @Body() body: { data: any[] }) {
    // Determinar tipo y llamar método apropiado
    // TODO: Implementar lógica de detección de tipo
    await this.integrationService.syncToPowerBI(id, body.data);

    return { message: 'Sync initiated successfully' };
  }
}

8. Testing

// src/modules/export/services/__tests__/export.service.spec.ts

import { Test, TestingModule } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
import { ExportService } from '../export.service';
import { ExportJob } from '../../entities/export-job.entity';
import { getQueueToken } from '@nestjs/bull';

describe('ExportService', () => {
  let service: ExportService;

  beforeEach(async () => {
    const module: TestingModule = await Test.createTestingModule({
      providers: [
        ExportService,
        {
          provide: getRepositoryToken(ExportJob),
          useValue: {
            create: jest.fn(),
            save: jest.fn(),
            findOne: jest.fn(),
            update: jest.fn(),
          },
        },
        {
          provide: getQueueToken('export'),
          useValue: {
            add: jest.fn(),
          },
        },
      ],
    }).compile();

    service = module.get<ExportService>(ExportService);
  });

  it('should export report to PDF', async () => {
    const result = await service.exportReport('pdf', { title: 'Test Report' }, 'user-1');

    expect(result).toBeDefined();
    expect(result.jobType).toBe('on_demand_export');
  });
});

9. Criterios de Aceptación Técnicos

  • Schema export_distribution creado con todas las tablas
  • Entities TypeORM con relaciones correctas
  • Services para exportación multi-formato
  • Generación de PDF con PDFKit
  • Generación de Excel con ExcelJS
  • Generación de CSV funcional
  • Procesamiento asíncrono con Bull/BullMQ
  • CRON jobs para reportes programados
  • Distribución vía email con Nodemailer
  • Integración con Power BI REST API
  • Integración con Tableau (estructura)
  • Rate limiting para API tokens
  • OAuth2 para integraciones externas
  • Controllers con endpoints RESTful
  • Triggers para cálculo de próxima ejecución
  • Cleanup automático de exports antiguos
  • Tests unitarios con >75% coverage

Fecha: 2025-11-17 Preparado por: Equipo Técnico Versión: 1.0 Estado: Listo para Implementación