# SPEC-FIRMA-ELECTRONICA-NOM151: Firma Electrónica y Conservación de Mensajes de Datos ## Metadata - **Código**: SPEC-MGN-013 - **Módulo**: Legal / Cumplimiento - **Gap Relacionado**: GAP-MGN-013-001 - **Prioridad**: P1 - **Esfuerzo Estimado**: 13 SP - **Versión**: 1.0 - **Última Actualización**: 2025-01-09 - **Referencias Normativas**: NOM-151-SCFI-2016, RFC 3161, CAdES, XAdES --- ## 1. Resumen Ejecutivo ### 1.1 Objetivo Implementar un sistema de firma electrónica avanzada y conservación de mensajes de datos que cumpla con la NOM-151-SCFI-2016, permitiendo: - Firma electrónica de documentos con validez legal en México - Emisión de Constancias de Conservación mediante PSCs autorizados - Integración con e.firma/FIEL del SAT - Sellado de tiempo mediante protocolo RFC 3161 - Conservación documental por el período legal requerido ### 1.2 Alcance - Gestión de certificados X.509 (e.firma, CSD) - Integración con PSCs autorizados por la SE - Generación de sellos de tiempo RFC 3161 - Firma CAdES/XAdES para documentos - Constancia de Conservación NOM-151 - Validación de firmas y certificados - Archivo legal por 5+ años ### 1.3 Marco Regulatorio | Norma/Ley | Descripción | Ámbito | |-----------|-------------|--------| | NOM-151-SCFI-2016 | Conservación de mensajes de datos | Nacional | | Código de Comercio Federal | Reconocimiento de firma electrónica | Nacional | | Código Fiscal de la Federación | Art. 30 - Conservación 5 años | Fiscal | | RFC 3161 | Time-Stamp Protocol | Internacional | | ETSI TS 103 173 | CAdES Baseline Profiles | Internacional | | ETSI TS 103 171 | XAdES Baseline Profiles | Internacional | --- ## 2. Modelo de Datos ### 2.1 Diagrama Entidad-Relación ``` ┌─────────────────────────────────┐ ┌─────────────────────────────────┐ │ certificates │ │ certificate_keys │ │─────────────────────────────────│ │─────────────────────────────────│ │ id (PK) │ │ id (PK) │ │ name │ │ name │ │ certificate_type │◄───┐│ key_type │ │ content (encrypted) │ ││ content (encrypted) │ │ private_key_id (FK)───────────┼────┘│ password_hash │ │ serial_number │ │ is_private │ │ subject_cn │ │ algorithm │ │ issuer_cn │ │ key_size │ │ date_start │ │ company_id (FK) │ │ date_end │ └─────────────────────────────────┘ │ fingerprint_sha256 │ │ rfc │ ┌─────────────────────────────────┐ │ company_id (FK) │ │ psc_providers │ └────────────────┬────────────────┘ │─────────────────────────────────│ │ │ id (PK) │ │ │ name │ │ │ code │ │ │ tsa_url │ │ │ api_url │ │ │ auth_type │ │ │ api_key (encrypted) │ │ │ client_certificate_id (FK) │ │ │ is_active │ │ └─────────────────────────────────┘ │ ▼ ┌─────────────────────────────────┐ ┌─────────────────────────────────┐ │ document_signatures │ │ preservation_records │ │─────────────────────────────────│ │─────────────────────────────────│ │ id (PK) │ │ id (PK) │ │ document_model │ │ document_signature_id (FK) │ │ document_id │ │ psc_provider_id (FK) │ │ document_hash │ │ constancia_number │ │ hash_algorithm │ │ timestamp_token (binary) │ │ certificate_id (FK) │ │ timestamp_value (UTC) │ │ signature_value (binary) │ │ tst_info (binary) │ │ signature_format │ │ asn1_evidence (binary) │ │ signature_level │ │ policy_oid │ │ timestamp_token_id (FK) │ │ serial_number │ │ signed_at │ │ psc_signature │ │ is_valid │ │ psc_certificate_chain │ │ validation_errors │ │ created_at │ │ company_id (FK) │ │ expires_at │ └─────────────────────────────────┘ │ validation_status │ └─────────────────────────────────┘ ▼ ┌─────────────────────────────────┐ │ timestamp_tokens │ │─────────────────────────────────│ │ id (PK) │ │ document_signature_id (FK) │ │ psc_provider_id (FK) │ │ request_hash │ │ hash_algorithm │ │ tst_info (binary) │ │ timestamp_value (UTC) │ │ serial_number │ │ policy_oid │ │ tsa_certificate │ │ accuracy_seconds │ │ nonce │ │ is_valid │ │ created_at │ └─────────────────────────────────┘ ``` ### 2.2 Definición de Tablas #### 2.2.1 legal.certificates ```sql CREATE TABLE legal.certificates ( -- Identificación id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), name VARCHAR(256) NOT NULL, -- Tipo de certificado certificate_type VARCHAR(32) NOT NULL DEFAULT 'generic' CHECK (certificate_type IN ( 'efirma', -- e.firma/FIEL del SAT 'csd', -- Certificado de Sello Digital 'psc', -- Certificado de PSC 'tsa', -- Certificado de TSA 'generic' -- Certificado genérico X.509 )), -- Contenido del certificado (encriptado en reposo) content BYTEA NOT NULL, content_format VARCHAR(16) NOT NULL DEFAULT 'pem' CHECK (content_format IN ('der', 'pem', 'pkcs12')), -- Llave privada asociada private_key_id UUID REFERENCES legal.certificate_keys(id), -- Metadatos extraídos del certificado serial_number VARCHAR(64) NOT NULL, subject_cn VARCHAR(256), subject_o VARCHAR(256), subject_email VARCHAR(256), issuer_cn VARCHAR(256), issuer_o VARCHAR(256), -- Validez date_start TIMESTAMPTZ NOT NULL, date_end TIMESTAMPTZ NOT NULL, is_valid BOOLEAN GENERATED ALWAYS AS ( NOW() BETWEEN date_start AND date_end ) STORED, -- Identificadores criptográficos fingerprint_sha1 VARCHAR(64), fingerprint_sha256 VARCHAR(128) NOT NULL, public_key_hash VARCHAR(128), -- Datos fiscales (para e.firma/CSD) rfc VARCHAR(13), curp VARCHAR(18), -- Cadena de certificación issuer_certificate_id UUID REFERENCES legal.certificates(id), is_root_ca BOOLEAN NOT NULL DEFAULT false, -- Relaciones company_id UUID NOT NULL REFERENCES core.companies(id), -- Estado is_active BOOLEAN NOT NULL DEFAULT true, loading_error TEXT, -- Auditoría created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_by UUID REFERENCES core.users(id), -- Constraints CONSTRAINT uk_certificate_serial_issuer UNIQUE (serial_number, issuer_cn), CONSTRAINT chk_date_range CHECK (date_end > date_start) ); -- Índices CREATE INDEX idx_certificates_company ON legal.certificates(company_id); CREATE INDEX idx_certificates_type ON legal.certificates(certificate_type) WHERE is_active; CREATE INDEX idx_certificates_rfc ON legal.certificates(rfc) WHERE rfc IS NOT NULL; CREATE INDEX idx_certificates_validity ON legal.certificates(date_start, date_end); CREATE INDEX idx_certificates_fingerprint ON legal.certificates(fingerprint_sha256); ``` #### 2.2.2 legal.certificate_keys ```sql CREATE TABLE legal.certificate_keys ( -- Identificación id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), name VARCHAR(256) NOT NULL, -- Tipo de llave key_type VARCHAR(16) NOT NULL DEFAULT 'private' CHECK (key_type IN ('private', 'public')), -- Algoritmo y tamaño algorithm VARCHAR(16) NOT NULL DEFAULT 'rsa' CHECK (algorithm IN ('rsa', 'ec', 'ed25519')), key_size INTEGER, -- 2048, 4096 para RSA; 256, 384 para EC ec_curve VARCHAR(32), -- P-256, P-384, P-521 -- Contenido (encriptado en reposo con clave maestra) content BYTEA NOT NULL, content_format VARCHAR(16) NOT NULL DEFAULT 'pem' CHECK (content_format IN ('der', 'pem', 'pkcs8')), -- Password de la llave privada (hash PBKDF2) password_hash VARCHAR(256), password_salt VARCHAR(64), -- Validación is_valid BOOLEAN NOT NULL DEFAULT true, loading_error TEXT, -- Relaciones company_id UUID NOT NULL REFERENCES core.companies(id), -- Auditoría created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_by UUID REFERENCES core.users(id), -- Constraints CONSTRAINT chk_rsa_key_size CHECK ( algorithm != 'rsa' OR key_size >= 2048 ) ); CREATE INDEX idx_certificate_keys_company ON legal.certificate_keys(company_id); ``` #### 2.2.3 legal.psc_providers ```sql CREATE TABLE legal.psc_providers ( -- Identificación id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), name VARCHAR(128) NOT NULL, code VARCHAR(32) NOT NULL UNIQUE, -- URLs de servicio tsa_url VARCHAR(512), -- Endpoint RFC 3161 api_url VARCHAR(512), -- API REST del PSC validation_url VARCHAR(512), -- Endpoint de validación -- Autenticación auth_type VARCHAR(32) NOT NULL DEFAULT 'api_key' CHECK (auth_type IN ( 'none', 'api_key', 'basic', 'oauth2', 'mtls' -- Mutual TLS con certificado cliente )), -- Credenciales (encriptadas) api_key BYTEA, api_secret BYTEA, client_certificate_id UUID REFERENCES legal.certificates(id), -- Certificado raíz del PSC root_certificate_id UUID REFERENCES legal.certificates(id), -- Configuración timeout_seconds INTEGER NOT NULL DEFAULT 30, retry_count INTEGER NOT NULL DEFAULT 3, hash_algorithm VARCHAR(16) NOT NULL DEFAULT 'sha256', -- Políticas OID soportadas supported_policies TEXT[], -- Estado is_active BOOLEAN NOT NULL DEFAULT true, last_health_check TIMESTAMPTZ, health_status VARCHAR(16) DEFAULT 'unknown', -- Relaciones company_id UUID REFERENCES core.companies(id), -- NULL = global -- Auditoría created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -- PSCs autorizados predefinidos (datos de ejemplo) INSERT INTO legal.psc_providers (name, code, tsa_url, auth_type, hash_algorithm) VALUES ('Edicom México', 'edicom', 'https://tsa.edicom.com/timestamp', 'mtls', 'sha256'), ('PSC World', 'pscworld', 'https://tsa.pscworld.com/api/v1/timestamp', 'api_key', 'sha256'), ('Interfactura', 'interfactura', 'https://tsa.interfactura.com/rfc3161', 'api_key', 'sha256'), ('SeguriData', 'seguridata', 'https://tsa.seguridata.com/timestamp', 'mtls', 'sha256'), ('Cincel', 'cincel', 'https://api.cincel.mx/v1/timestamp', 'oauth2', 'sha256'); ``` #### 2.2.4 legal.document_signatures ```sql CREATE TABLE legal.document_signatures ( -- Identificación id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), -- Documento firmado (referencia polimórfica) document_model VARCHAR(128) NOT NULL, -- 'invoicing.cfdi', 'documents.file', etc. document_id UUID NOT NULL, document_name VARCHAR(256), -- Hash del documento document_hash VARCHAR(128) NOT NULL, hash_algorithm VARCHAR(16) NOT NULL DEFAULT 'sha256' CHECK (hash_algorithm IN ('sha1', 'sha256', 'sha384', 'sha512')), -- Certificado utilizado certificate_id UUID NOT NULL REFERENCES legal.certificates(id), -- Firma digital signature_value BYTEA NOT NULL, signature_format VARCHAR(16) NOT NULL DEFAULT 'cades' CHECK (signature_format IN ( 'pkcs1', -- RSA PKCS#1 v1.5 (básico) 'pkcs7', -- CMS básico 'cades', -- CAdES (CMS Advanced) 'xades', -- XAdES (XML Advanced) 'pades' -- PAdES (PDF Advanced) )), -- Nivel de firma (según ETSI) signature_level VARCHAR(16) NOT NULL DEFAULT 'baseline_t' CHECK (signature_level IN ( 'baseline_b', -- Básico (sin timestamp) 'baseline_t', -- Con timestamp 'baseline_lt', -- Long-term (con certificados) 'baseline_lta' -- Long-term archival )), -- Timestamp asociado timestamp_token_id UUID REFERENCES legal.timestamp_tokens(id), -- Metadatos de firma signed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), signer_name VARCHAR(256), signer_role VARCHAR(128), signature_reason TEXT, signature_location VARCHAR(256), -- Validación is_valid BOOLEAN NOT NULL DEFAULT true, last_validation TIMESTAMPTZ, validation_errors JSONB DEFAULT '[]', -- Relaciones company_id UUID NOT NULL REFERENCES core.companies(id), -- Auditoría created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- Índice compuesto para documento CONSTRAINT uk_document_signature UNIQUE (document_model, document_id, certificate_id) ); CREATE INDEX idx_document_signatures_document ON legal.document_signatures(document_model, document_id); CREATE INDEX idx_document_signatures_certificate ON legal.document_signatures(certificate_id); CREATE INDEX idx_document_signatures_hash ON legal.document_signatures(document_hash); CREATE INDEX idx_document_signatures_date ON legal.document_signatures(signed_at DESC); ``` #### 2.2.5 legal.timestamp_tokens ```sql CREATE TABLE legal.timestamp_tokens ( -- Identificación id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), -- Origen psc_provider_id UUID NOT NULL REFERENCES legal.psc_providers(id), -- Request request_hash VARCHAR(128) NOT NULL, hash_algorithm VARCHAR(16) NOT NULL DEFAULT 'sha256', request_nonce VARCHAR(64), -- Response (RFC 3161 TSTInfo) tst_info BYTEA NOT NULL, -- ASN.1 DER encoded timestamp_value TIMESTAMPTZ NOT NULL, -- GenTime en UTC serial_number VARCHAR(64) NOT NULL, policy_oid VARCHAR(64) NOT NULL, -- Certificado TSA tsa_certificate BYTEA, tsa_certificate_chain BYTEA, -- Precisión accuracy_seconds DECIMAL(10,6), ordering BOOLEAN NOT NULL DEFAULT false, -- Validación is_valid BOOLEAN NOT NULL DEFAULT true, validation_errors JSONB DEFAULT '[]', -- Auditoría created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- Constraints CONSTRAINT uk_timestamp_serial_provider UNIQUE (serial_number, psc_provider_id) ); CREATE INDEX idx_timestamp_tokens_hash ON legal.timestamp_tokens(request_hash); CREATE INDEX idx_timestamp_tokens_date ON legal.timestamp_tokens(timestamp_value DESC); CREATE INDEX idx_timestamp_tokens_psc ON legal.timestamp_tokens(psc_provider_id); ``` #### 2.2.6 legal.preservation_records (Constancia de Conservación) ```sql CREATE TABLE legal.preservation_records ( -- Identificación id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), constancia_number VARCHAR(64) NOT NULL UNIQUE, -- Documento preservado document_signature_id UUID NOT NULL REFERENCES legal.document_signatures(id), -- PSC que emite la constancia psc_provider_id UUID NOT NULL REFERENCES legal.psc_providers(id), -- Timestamp RFC 3161 timestamp_token_id UUID NOT NULL REFERENCES legal.timestamp_tokens(id), -- Evidencia ASN.1 (archivo separado del documento original) asn1_evidence BYTEA NOT NULL, evidence_format VARCHAR(16) NOT NULL DEFAULT 'der', -- Datos de la constancia preservation_date TIMESTAMPTZ NOT NULL, -- Fecha cierta expiration_date TIMESTAMPTZ, -- Normalmente fecha + 5 años -- Firma del PSC psc_signature BYTEA NOT NULL, psc_certificate_chain BYTEA, signature_algorithm VARCHAR(32) NOT NULL, -- Política aplicada preservation_policy_oid VARCHAR(64), preservation_policy_name VARCHAR(128), -- Validación validation_status VARCHAR(16) NOT NULL DEFAULT 'valid' CHECK (validation_status IN ('valid', 'invalid', 'expired', 'revoked', 'pending')), last_validation TIMESTAMPTZ, validation_result JSONB, -- Renovación (para archivos de largo plazo) previous_record_id UUID REFERENCES legal.preservation_records(id), renewal_count INTEGER NOT NULL DEFAULT 0, -- Auditoría created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_by UUID REFERENCES core.users(id) ); CREATE INDEX idx_preservation_records_document ON legal.preservation_records(document_signature_id); CREATE INDEX idx_preservation_records_date ON legal.preservation_records(preservation_date DESC); CREATE INDEX idx_preservation_records_expiration ON legal.preservation_records(expiration_date) WHERE validation_status = 'valid'; CREATE INDEX idx_preservation_records_psc ON legal.preservation_records(psc_provider_id); ``` --- ## 3. Arquitectura del Sistema ### 3.1 Flujo de Firma y Conservación ``` ┌─────────────────────────────────────────────────────────────────────┐ │ PROCESO DE FIRMA NOM-151 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────┐ │ │ │ Documento │ │ │ │ (PDF/XML/Binary) │ │ │ └────────┬─────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ [1] Calcular │ │ │ │ Hash SHA-256 │ │ │ └────────┬─────────┘ │ │ │ │ │ ├─────────────────────────────────────────┐ │ │ ▼ ▼ │ │ ┌──────────────────┐ ┌──────────────────┐ │ │ │ [2] Firmar con │ │ [3] Solicitar │ │ │ │ Llave Privada │ │ Timestamp TSA │ │ │ │ (e.firma/CSD) │ │ (RFC 3161) │ │ │ └────────┬─────────┘ └────────┬─────────┘ │ │ │ │ │ │ │ ┌─────────────┐ │ │ │ └──────────►│ [4] Crear │◄────────────┘ │ │ │ CAdES/XAdES │ │ │ │ Signature │ │ │ └──────┬──────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ [5] Enviar a │ │ │ │ PSC Autorizado │ │ │ └────────┬─────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ [6] Recibir │ │ │ │ Constancia de │ │ │ │ Conservación │ │ │ └────────┬─────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────┐ │ │ │ [7] Almacenar │ │ │ │ ├─ Documento original │ │ │ │ ├─ Firma digital │ │ │ │ ├─ Timestamp token │ │ │ │ └─ Constancia ASN.1 (evidencia) │ │ │ └──────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘ ``` ### 3.2 Componentes del Sistema ``` ┌─────────────────────────────────────────────────────────────────────┐ │ ERP-SUITE │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ Módulo legal.signature │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ │ │ Certificate │ │ Signature │ │ Preservation │ │ │ │ │ │ Manager │ │ Engine │ │ Service │ │ │ │ │ └──────┬──────┘ └──────┬──────┘ └──────────┬──────────┘ │ │ │ │ │ │ │ │ │ │ │ └────────────────┼─────────────────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌─────────────────────┐ │ │ │ │ │ PSC Connector │ │ │ │ │ │ (RFC 3161 Client) │ │ │ │ │ └──────────┬──────────┘ │ │ │ └──────────────────────────┼───────────────────────────────────┘ │ │ │ │ └─────────────────────────────┼────────────────────────────────────────┘ │ HTTPS/mTLS ▼ ┌─────────────────────────────────────────┐ │ PSC Autorizado (SE) │ │ ┌───────────┐ ┌───────────────────┐ │ │ │ TSA │ │ Constancia │ │ │ │ RFC 3161 │ │ Generator │ │ │ └─────┬─────┘ └─────────┬─────────┘ │ │ │ │ │ │ └────────┬─────────┘ │ │ ▼ │ │ ┌───────────────┐ │ │ │ CENAM Atomic │ │ │ │ Clock (UTC) │ │ │ └───────────────┘ │ └─────────────────────────────────────────┘ ``` --- ## 4. Implementación del Motor de Firma ### 4.1 Clase Principal: SignatureEngine ```python from dataclasses import dataclass from typing import Optional, List, Union from datetime import datetime, timezone from enum import Enum import hashlib from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec from cryptography.x509 import load_pem_x509_certificate, load_der_x509_certificate from cryptography.hazmat.backends import default_backend class SignatureFormat(Enum): PKCS1 = "pkcs1" PKCS7 = "pkcs7" CADES = "cades" XADES = "xades" PADES = "pades" class SignatureLevel(Enum): BASELINE_B = "baseline_b" # Básico BASELINE_T = "baseline_t" # Con timestamp BASELINE_LT = "baseline_lt" # Long-term BASELINE_LTA = "baseline_lta" # Long-term archival @dataclass class SignatureResult: """Resultado de una operación de firma.""" success: bool signature_id: Optional[str] = None signature_value: Optional[bytes] = None document_hash: Optional[str] = None timestamp_token: Optional[bytes] = None signed_at: Optional[datetime] = None certificate_serial: Optional[str] = None errors: List[str] = None def __post_init__(self): if self.errors is None: self.errors = [] class SignatureEngine: """ Motor de firma electrónica compatible con NOM-151. Soporta múltiples formatos (CAdES, XAdES, PAdES) y niveles de firma. """ HASH_ALGORITHMS = { 'sha1': hashlib.sha1, 'sha256': hashlib.sha256, 'sha384': hashlib.sha384, 'sha512': hashlib.sha512, } def __init__( self, certificate_repository, timestamp_service, preservation_service ): self.cert_repo = certificate_repository self.timestamp_svc = timestamp_service self.preservation_svc = preservation_service def sign_document( self, document: bytes, certificate_id: str, signature_format: SignatureFormat = SignatureFormat.CADES, signature_level: SignatureLevel = SignatureLevel.BASELINE_T, hash_algorithm: str = 'sha256', signer_info: Optional[dict] = None, request_preservation: bool = True ) -> SignatureResult: """ Firma un documento con el certificado especificado. Args: document: Contenido binario del documento certificate_id: ID del certificado a usar signature_format: Formato de firma (CAdES, XAdES, etc.) signature_level: Nivel de firma ETSI hash_algorithm: Algoritmo de hash (sha256 recomendado) signer_info: Información adicional del firmante request_preservation: Si se debe solicitar Constancia de Conservación Returns: SignatureResult con el resultado de la operación """ try: # 1. Obtener certificado y validar certificate = self._get_and_validate_certificate(certificate_id) if not certificate: return SignatureResult( success=False, errors=["Certificado no válido o no encontrado"] ) # 2. Calcular hash del documento document_hash = self._calculate_hash(document, hash_algorithm) # 3. Crear firma según formato if signature_format == SignatureFormat.CADES: signature_value = self._create_cades_signature( document_hash, certificate, hash_algorithm ) elif signature_format == SignatureFormat.XADES: signature_value = self._create_xades_signature( document, certificate, hash_algorithm ) elif signature_format == SignatureFormat.PADES: signature_value = self._create_pades_signature( document, certificate, hash_algorithm ) else: signature_value = self._create_basic_signature( document_hash, certificate ) # 4. Obtener timestamp si es requerido timestamp_token = None if signature_level in [ SignatureLevel.BASELINE_T, SignatureLevel.BASELINE_LT, SignatureLevel.BASELINE_LTA ]: timestamp_result = self.timestamp_svc.get_timestamp( message_imprint=document_hash, hash_algorithm=hash_algorithm ) if timestamp_result.success: timestamp_token = timestamp_result.token else: return SignatureResult( success=False, errors=[f"Error obteniendo timestamp: {timestamp_result.error}"] ) # 5. Guardar firma en base de datos signature_record = self._save_signature( document_hash=document_hash, hash_algorithm=hash_algorithm, certificate_id=certificate_id, signature_value=signature_value, signature_format=signature_format.value, signature_level=signature_level.value, timestamp_token=timestamp_token, signer_info=signer_info ) # 6. Solicitar Constancia de Conservación si es requerido if request_preservation: self.preservation_svc.request_preservation( signature_id=signature_record.id ) return SignatureResult( success=True, signature_id=str(signature_record.id), signature_value=signature_value, document_hash=document_hash, timestamp_token=timestamp_token, signed_at=datetime.now(timezone.utc), certificate_serial=certificate.serial_number ) except Exception as e: return SignatureResult( success=False, errors=[str(e)] ) def verify_signature( self, document: bytes, signature_id: str, verify_timestamp: bool = True, verify_certificate_chain: bool = True ) -> dict: """ Verifica una firma existente. Returns: Dict con resultado de validación detallado """ result = { 'is_valid': False, 'hash_valid': False, 'signature_valid': False, 'timestamp_valid': None, 'certificate_valid': False, 'certificate_chain_valid': None, 'errors': [], 'warnings': [] } try: # Obtener registro de firma signature_record = self._get_signature_record(signature_id) if not signature_record: result['errors'].append("Firma no encontrada") return result # 1. Verificar hash del documento current_hash = self._calculate_hash( document, signature_record.hash_algorithm ) result['hash_valid'] = (current_hash == signature_record.document_hash) if not result['hash_valid']: result['errors'].append("El documento ha sido modificado") return result # 2. Verificar firma criptográfica certificate = self.cert_repo.get(signature_record.certificate_id) result['signature_valid'] = self._verify_signature_value( signature_record.signature_value, signature_record.document_hash, certificate ) if not result['signature_valid']: result['errors'].append("Firma criptográfica inválida") return result # 3. Verificar certificado result['certificate_valid'] = self._validate_certificate(certificate) if not result['certificate_valid']: result['warnings'].append( "El certificado ha expirado o no es válido actualmente" ) # 4. Verificar cadena de certificación if verify_certificate_chain: chain_result = self._verify_certificate_chain(certificate) result['certificate_chain_valid'] = chain_result['valid'] if not chain_result['valid']: result['warnings'].extend(chain_result['errors']) # 5. Verificar timestamp if verify_timestamp and signature_record.timestamp_token_id: timestamp_result = self.timestamp_svc.verify_timestamp( signature_record.timestamp_token_id ) result['timestamp_valid'] = timestamp_result['valid'] if not timestamp_result['valid']: result['warnings'].append( f"Timestamp inválido: {timestamp_result['error']}" ) # Determinar validez general result['is_valid'] = ( result['hash_valid'] and result['signature_valid'] ) return result except Exception as e: result['errors'].append(str(e)) return result def _calculate_hash(self, data: bytes, algorithm: str) -> str: """Calcula el hash del documento.""" hasher = self.HASH_ALGORITHMS.get(algorithm) if not hasher: raise ValueError(f"Algoritmo de hash no soportado: {algorithm}") return hasher(data).hexdigest() def _get_and_validate_certificate(self, certificate_id: str): """Obtiene y valida el certificado.""" cert = self.cert_repo.get(certificate_id) if not cert: return None # Verificar que no haya expirado now = datetime.now(timezone.utc) if not (cert.date_start <= now <= cert.date_end): return None # Verificar que tenga llave privada if not cert.private_key_id: return None return cert def _create_cades_signature( self, document_hash: str, certificate, hash_algorithm: str ) -> bytes: """ Crea una firma CAdES-BES (Basic Electronic Signature). """ from asn1crypto import cms, core, algos, x509 as asn1_x509 # Obtener llave privada private_key = self._load_private_key(certificate.private_key_id) # Construir SignedData hash_bytes = bytes.fromhex(document_hash) # MessageDigest attribute message_digest = cms.AttCertAttributeType({ 'type': 'message_digest', 'values': [core.OctetBitString(hash_bytes)] }) # Signing time signing_time = cms.AttCertAttributeType({ 'type': 'signing_time', 'values': [core.UTCTime(datetime.now(timezone.utc))] }) # Content type content_type = cms.AttCertAttributeType({ 'type': 'content_type', 'values': [cms.ContentType('data')] }) # Signed attributes signed_attrs = cms.CMSAttributes([ content_type, signing_time, message_digest ]) # Firmar los atributos attrs_der = signed_attrs.dump() if isinstance(private_key, rsa.RSAPrivateKey): signature = private_key.sign( attrs_der, padding.PKCS1v15(), hashes.SHA256() if hash_algorithm == 'sha256' else hashes.SHA1() ) elif isinstance(private_key, ec.EllipticCurvePrivateKey): signature = private_key.sign( attrs_der, ec.ECDSA(hashes.SHA256()) ) else: raise ValueError("Tipo de llave no soportado") # Construir SignerInfo cert_der = self._get_certificate_der(certificate) cert_asn1 = asn1_x509.Certificate.load(cert_der) signer_info = cms.SignerInfo({ 'version': 'v1', 'sid': cms.SignerIdentifier({ 'issuer_and_serial_number': cms.IssuerAndSerialNumber({ 'issuer': cert_asn1['tbs_certificate']['issuer'], 'serial_number': cert_asn1['tbs_certificate']['serial_number'] }) }), 'digest_algorithm': algos.DigestAlgorithm({ 'algorithm': 'sha256' if hash_algorithm == 'sha256' else 'sha1' }), 'signed_attrs': signed_attrs, 'signature_algorithm': algos.SignedDigestAlgorithm({ 'algorithm': 'sha256_rsa' if hash_algorithm == 'sha256' else 'sha1_rsa' }), 'signature': signature }) # Construir SignedData signed_data = cms.SignedData({ 'version': 'v1', 'digest_algorithms': [ algos.DigestAlgorithm({'algorithm': hash_algorithm}) ], 'encap_content_info': cms.ContentInfo({ 'content_type': 'data' }), 'certificates': [cert_asn1], 'signer_infos': [signer_info] }) # Envolver en ContentInfo content_info = cms.ContentInfo({ 'content_type': 'signed_data', 'content': signed_data }) return content_info.dump() def _create_xades_signature( self, document: bytes, certificate, hash_algorithm: str ) -> bytes: """ Crea una firma XAdES-BES (enveloped signature para XML). """ from lxml import etree from signxml import XMLSigner, methods # Cargar documento XML try: root = etree.fromstring(document) except etree.XMLSyntaxError: raise ValueError("El documento no es XML válido para XAdES") # Obtener llave privada y certificado private_key_pem = self._get_private_key_pem(certificate.private_key_id) cert_pem = self._get_certificate_pem(certificate) # Crear firmador XAdES signer = XMLSigner( method=methods.enveloped, digest_algorithm=hash_algorithm, signature_algorithm='rsa-sha256', c14n_algorithm='http://www.w3.org/2001/10/xml-exc-c14n#' ) # Firmar signed_root = signer.sign( root, key=private_key_pem, cert=cert_pem ) return etree.tostring(signed_root, xml_declaration=True, encoding='UTF-8') def _create_pades_signature( self, document: bytes, certificate, hash_algorithm: str ) -> bytes: """ Crea una firma PAdES (PDF Advanced Electronic Signatures). """ from pypdf import PdfReader, PdfWriter from endesive import pdf as endesive_pdf # Obtener llave privada y certificado private_key_pem = self._get_private_key_pem(certificate.private_key_id) cert_pem = self._get_certificate_pem(certificate) # Configuración de firma dct = { 'sigflags': 3, 'contact': certificate.subject_email or '', 'location': 'México', 'signingdate': datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S+00\'00\''), 'reason': 'Firma electrónica NOM-151', } # Firmar PDF signed_pdf = endesive_pdf.cms.sign( document, dct, private_key_pem, cert_pem, [], # Certificados intermedios 'sha256' ) return signed_pdf def _create_basic_signature( self, document_hash: str, certificate ) -> bytes: """Crea una firma PKCS#1 básica.""" private_key = self._load_private_key(certificate.private_key_id) hash_bytes = bytes.fromhex(document_hash) if isinstance(private_key, rsa.RSAPrivateKey): signature = private_key.sign( hash_bytes, padding.PKCS1v15(), hashes.SHA256() ) elif isinstance(private_key, ec.EllipticCurvePrivateKey): signature = private_key.sign( hash_bytes, ec.ECDSA(hashes.SHA256()) ) else: raise ValueError("Tipo de llave no soportado") return signature ``` ### 4.2 Cliente RFC 3161 (Timestamp) ```python import requests from asn1crypto import tsp, core, algos from dataclasses import dataclass from typing import Optional import hashlib import secrets @dataclass class TimestampResult: """Resultado de solicitud de timestamp.""" success: bool token: Optional[bytes] = None timestamp_value: Optional[datetime] = None serial_number: Optional[str] = None policy_oid: Optional[str] = None tst_info: Optional[bytes] = None error: Optional[str] = None class RFC3161Client: """ Cliente para obtener timestamps RFC 3161 de PSCs autorizados. """ def __init__(self, psc_provider): self.psc = psc_provider self.session = self._create_session() def _create_session(self) -> requests.Session: """Crea sesión HTTP con autenticación apropiada.""" session = requests.Session() session.timeout = self.psc.timeout_seconds if self.psc.auth_type == 'api_key': session.headers['Authorization'] = f'Bearer {self._decrypt_api_key()}' elif self.psc.auth_type == 'basic': session.auth = ( self._decrypt_api_key(), self._decrypt_api_secret() ) elif self.psc.auth_type == 'mtls': # Mutual TLS con certificado cliente session.cert = self._get_client_cert_path() return session def get_timestamp( self, message_imprint: str, hash_algorithm: str = 'sha256', policy_oid: Optional[str] = None, include_cert: bool = True ) -> TimestampResult: """ Solicita un timestamp RFC 3161 al PSC. Args: message_imprint: Hash del documento (hexadecimal) hash_algorithm: Algoritmo usado para el hash policy_oid: OID de política (opcional) include_cert: Incluir certificado TSA en respuesta Returns: TimestampResult con el token o error """ try: # Construir TimeStampReq hash_bytes = bytes.fromhex(message_imprint) nonce = secrets.randbits(64) # MessageImprint hash_oid = { 'sha1': '1.3.14.3.2.26', 'sha256': '2.16.840.1.101.3.4.2.1', 'sha384': '2.16.840.1.101.3.4.2.2', 'sha512': '2.16.840.1.101.3.4.2.3', }.get(hash_algorithm) message_imprint_obj = tsp.MessageImprint({ 'hash_algorithm': algos.DigestAlgorithm({ 'algorithm': hash_algorithm }), 'hashed_message': hash_bytes }) # TimeStampReq ts_req = tsp.TimeStampReq({ 'version': 1, 'message_imprint': message_imprint_obj, 'nonce': core.Integer(nonce), 'cert_req': include_cert }) if policy_oid: ts_req['req_policy'] = core.ObjectIdentifier(policy_oid) # Enviar solicitud request_bytes = ts_req.dump() response = self.session.post( self.psc.tsa_url, data=request_bytes, headers={ 'Content-Type': 'application/timestamp-query', 'Accept': 'application/timestamp-reply' } ) if response.status_code != 200: return TimestampResult( success=False, error=f"Error HTTP {response.status_code}: {response.text}" ) # Parsear respuesta ts_resp = tsp.TimeStampResp.load(response.content) # Verificar status status = ts_resp['status']['status'].native if status != 'granted' and status != 'granted_with_mods': status_string = ts_resp['status'].get('status_string') fail_info = ts_resp['status'].get('fail_info') return TimestampResult( success=False, error=f"TSA rechazó solicitud: {status_string or fail_info}" ) # Extraer TSTInfo time_stamp_token = ts_resp['time_stamp_token'] tst_info = self._extract_tst_info(time_stamp_token) # Verificar nonce if tst_info['nonce'].native != nonce: return TimestampResult( success=False, error="Nonce en respuesta no coincide con solicitud" ) # Verificar hash resp_hash = tst_info['message_imprint']['hashed_message'].native if resp_hash != hash_bytes: return TimestampResult( success=False, error="Hash en respuesta no coincide con solicitud" ) return TimestampResult( success=True, token=response.content, timestamp_value=tst_info['gen_time'].native, serial_number=str(tst_info['serial_number'].native), policy_oid=tst_info['policy'].native, tst_info=tst_info.dump() ) except Exception as e: return TimestampResult( success=False, error=str(e) ) def verify_timestamp( self, timestamp_token: bytes, original_hash: str, hash_algorithm: str = 'sha256' ) -> dict: """ Verifica un timestamp token existente. Returns: Dict con resultado de validación """ result = { 'valid': False, 'hash_matches': False, 'signature_valid': False, 'certificate_valid': False, 'timestamp_value': None, 'error': None } try: ts_resp = tsp.TimeStampResp.load(timestamp_token) time_stamp_token = ts_resp['time_stamp_token'] tst_info = self._extract_tst_info(time_stamp_token) # Verificar hash hash_bytes = bytes.fromhex(original_hash) resp_hash = tst_info['message_imprint']['hashed_message'].native result['hash_matches'] = (resp_hash == hash_bytes) if not result['hash_matches']: result['error'] = "Hash no coincide" return result # Extraer timestamp result['timestamp_value'] = tst_info['gen_time'].native # Verificar firma del TSA signed_data = time_stamp_token['content'] result['signature_valid'] = self._verify_cms_signature(signed_data) # Verificar certificado TSA certs = signed_data['certificates'] if certs: result['certificate_valid'] = self._verify_tsa_certificate(certs[0]) result['valid'] = ( result['hash_matches'] and result['signature_valid'] ) return result except Exception as e: result['error'] = str(e) return result def _extract_tst_info(self, time_stamp_token) -> tsp.TSTInfo: """Extrae TSTInfo del token.""" signed_data = time_stamp_token['content'] encap_content = signed_data['encap_content_info']['content'] return tsp.TSTInfo.load(encap_content.native) def _verify_cms_signature(self, signed_data) -> bool: """Verifica la firma CMS del timestamp.""" # Implementación simplificada # En producción usar biblioteca de verificación CMS completa signer_infos = signed_data['signer_infos'] if not signer_infos: return False # Verificar que hay al menos un firmante válido return len(signer_infos) > 0 def _verify_tsa_certificate(self, cert) -> bool: """Verifica el certificado del TSA.""" from cryptography import x509 try: cert_obj = x509.load_der_x509_certificate( cert.dump(), default_backend() ) # Verificar validez temporal now = datetime.now(timezone.utc) if not (cert_obj.not_valid_before_utc <= now <= cert_obj.not_valid_after_utc): return False # Verificar extended key usage para timestamping try: eku = cert_obj.extensions.get_extension_for_oid( x509.oid.ExtensionOID.EXTENDED_KEY_USAGE ) if x509.oid.ExtendedKeyUsageOID.TIME_STAMPING not in eku.value: return False except x509.extensions.ExtensionNotFound: pass # EKU no es obligatorio return True except Exception: return False ``` ### 4.3 Servicio de Constancia de Conservación ```python @dataclass class PreservationResult: """Resultado de solicitud de Constancia de Conservación.""" success: bool constancia_number: Optional[str] = None preservation_date: Optional[datetime] = None asn1_evidence: Optional[bytes] = None expiration_date: Optional[datetime] = None error: Optional[str] = None class PreservationService: """ Servicio para gestionar Constancias de Conservación NOM-151. """ PRESERVATION_YEARS = 5 # Mínimo legal según CFF Art. 30 def __init__( self, psc_connector, signature_repository, preservation_repository, timestamp_service ): self.psc = psc_connector self.sig_repo = signature_repository self.pres_repo = preservation_repository self.ts_svc = timestamp_service def request_preservation( self, signature_id: str, psc_provider_id: Optional[str] = None ) -> PreservationResult: """ Solicita una Constancia de Conservación para un documento firmado. Args: signature_id: ID de la firma del documento psc_provider_id: ID del PSC a usar (opcional, usa default) Returns: PreservationResult con la constancia o error """ try: # Obtener firma signature = self.sig_repo.get(signature_id) if not signature: return PreservationResult( success=False, error="Firma no encontrada" ) # Obtener PSC psc = self._get_psc_provider(psc_provider_id) if not psc: return PreservationResult( success=False, error="No hay PSC disponible" ) # Solicitar timestamp si no existe if not signature.timestamp_token_id: ts_result = self.ts_svc.get_timestamp( message_imprint=signature.document_hash, hash_algorithm=signature.hash_algorithm ) if not ts_result.success: return PreservationResult( success=False, error=f"Error obteniendo timestamp: {ts_result.error}" ) # Guardar timestamp timestamp_record = self._save_timestamp(ts_result, psc.id) signature.timestamp_token_id = timestamp_record.id self.sig_repo.update(signature) else: timestamp_record = self._get_timestamp(signature.timestamp_token_id) # Generar Constancia constancia = self._generate_constancia( signature=signature, timestamp=timestamp_record, psc=psc ) # Guardar registro de preservación preservation_record = self._save_preservation( signature_id=signature_id, timestamp_id=timestamp_record.id, psc_id=psc.id, constancia=constancia ) return PreservationResult( success=True, constancia_number=preservation_record.constancia_number, preservation_date=preservation_record.preservation_date, asn1_evidence=preservation_record.asn1_evidence, expiration_date=preservation_record.expiration_date ) except Exception as e: return PreservationResult( success=False, error=str(e) ) def validate_preservation( self, preservation_id: str, original_document: Optional[bytes] = None ) -> dict: """ Valida una Constancia de Conservación existente. Returns: Dict con resultado de validación detallado """ result = { 'valid': False, 'preservation_valid': False, 'timestamp_valid': False, 'document_integrity': None, 'expiration_status': None, 'psc_signature_valid': False, 'errors': [], 'warnings': [] } try: # Obtener registro preservation = self.pres_repo.get(preservation_id) if not preservation: result['errors'].append("Registro de preservación no encontrado") return result # Verificar expiración now = datetime.now(timezone.utc) if preservation.expiration_date: if now > preservation.expiration_date: result['expiration_status'] = 'expired' result['warnings'].append( f"Constancia expirada el {preservation.expiration_date}" ) elif now > preservation.expiration_date - timedelta(days=90): result['expiration_status'] = 'expiring_soon' result['warnings'].append( f"Constancia expira el {preservation.expiration_date}" ) else: result['expiration_status'] = 'valid' # Verificar timestamp ts_result = self.ts_svc.verify_timestamp(preservation.timestamp_token_id) result['timestamp_valid'] = ts_result.get('valid', False) # Verificar firma del PSC result['psc_signature_valid'] = self._verify_psc_signature(preservation) # Verificar integridad del documento si se proporciona if original_document: signature = self.sig_repo.get(preservation.document_signature_id) current_hash = self._calculate_hash( original_document, signature.hash_algorithm ) result['document_integrity'] = (current_hash == signature.document_hash) if not result['document_integrity']: result['errors'].append("El documento ha sido modificado") # Determinar validez general result['preservation_valid'] = ( result['timestamp_valid'] and result['psc_signature_valid'] and result['expiration_status'] != 'expired' ) result['valid'] = result['preservation_valid'] if result['document_integrity'] is not None: result['valid'] = result['valid'] and result['document_integrity'] return result except Exception as e: result['errors'].append(str(e)) return result def renew_preservation( self, preservation_id: str, psc_provider_id: Optional[str] = None ) -> PreservationResult: """ Renueva una Constancia de Conservación antes de su expiración. Crea un nuevo registro vinculado al anterior. """ try: # Obtener registro actual current = self.pres_repo.get(preservation_id) if not current: return PreservationResult( success=False, error="Registro de preservación no encontrado" ) # Verificar que aún es válido validation = self.validate_preservation(preservation_id) if not validation['valid']: return PreservationResult( success=False, error="No se puede renovar una constancia inválida" ) # Obtener la firma original signature = self.sig_repo.get(current.document_signature_id) # Solicitar nueva constancia result = self.request_preservation( signature_id=str(signature.id), psc_provider_id=psc_provider_id ) if result.success: # Vincular con registro anterior new_record = self.pres_repo.get_by_constancia(result.constancia_number) new_record.previous_record_id = current.id new_record.renewal_count = current.renewal_count + 1 self.pres_repo.update(new_record) return result except Exception as e: return PreservationResult( success=False, error=str(e) ) def _generate_constancia( self, signature, timestamp, psc ) -> dict: """ Genera la estructura de la Constancia de Conservación. """ from asn1crypto import core, cms # Crear estructura ASN.1 de evidencia preservation_date = datetime.now(timezone.utc) expiration_date = preservation_date + timedelta(days=365 * self.PRESERVATION_YEARS) # Número de constancia único constancia_number = self._generate_constancia_number(psc.code) evidence = { 'version': 1, 'constancia_number': constancia_number, 'preservation_date': preservation_date.isoformat(), 'document_hash': signature.document_hash, 'hash_algorithm': signature.hash_algorithm, 'timestamp_serial': timestamp.serial_number, 'timestamp_value': timestamp.timestamp_value.isoformat(), 'psc_code': psc.code, 'psc_name': psc.name, 'policy_oid': timestamp.policy_oid, 'expiration_date': expiration_date.isoformat() } # Serializar a ASN.1 (simplificado - en producción usar estructura completa) evidence_bytes = core.UTF8String(str(evidence)).dump() # Firmar con certificado del PSC psc_signature = self._sign_with_psc(evidence_bytes, psc) return { 'constancia_number': constancia_number, 'preservation_date': preservation_date, 'expiration_date': expiration_date, 'asn1_evidence': evidence_bytes, 'psc_signature': psc_signature } def _generate_constancia_number(self, psc_code: str) -> str: """Genera número único de constancia.""" import uuid timestamp = datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S') unique = uuid.uuid4().hex[:8].upper() return f"NOM151-{psc_code.upper()}-{timestamp}-{unique}" ``` --- ## 5. API REST ### 5.1 Endpoints ```yaml openapi: 3.0.3 info: title: Electronic Signature and Preservation API version: 1.0.0 description: API para firma electrónica NOM-151 y Constancias de Conservación paths: /api/v1/legal/certificates: get: summary: Listar certificados parameters: - name: type in: query schema: type: string enum: [efirma, csd, psc, tsa, generic] - name: is_valid in: query schema: type: boolean - name: rfc in: query schema: type: string responses: '200': description: Lista de certificados content: application/json: schema: type: array items: $ref: '#/components/schemas/Certificate' post: summary: Cargar certificado requestBody: content: multipart/form-data: schema: type: object properties: name: type: string certificate_type: type: string enum: [efirma, csd, psc, tsa, generic] certificate_file: type: string format: binary private_key_file: type: string format: binary password: type: string format: password required: - name - certificate_file responses: '201': description: Certificado cargado content: application/json: schema: $ref: '#/components/schemas/Certificate' /api/v1/legal/certificates/{id}: get: summary: Obtener certificado responses: '200': description: Detalle del certificado content: application/json: schema: $ref: '#/components/schemas/CertificateDetail' delete: summary: Eliminar certificado responses: '204': description: Certificado eliminado /api/v1/legal/certificates/{id}/validate: post: summary: Validar certificado responses: '200': description: Resultado de validación content: application/json: schema: $ref: '#/components/schemas/CertificateValidation' /api/v1/legal/signatures: get: summary: Listar firmas parameters: - name: document_model in: query schema: type: string - name: document_id in: query schema: type: string format: uuid - name: date_from in: query schema: type: string format: date - name: date_to in: query schema: type: string format: date responses: '200': description: Lista de firmas content: application/json: schema: type: array items: $ref: '#/components/schemas/DocumentSignature' post: summary: Firmar documento requestBody: content: application/json: schema: type: object properties: document: type: string format: byte description: Documento en base64 document_model: type: string description: Modelo del documento (ej. invoicing.cfdi) document_id: type: string format: uuid certificate_id: type: string format: uuid signature_format: type: string enum: [pkcs1, pkcs7, cades, xades, pades] default: cades signature_level: type: string enum: [baseline_b, baseline_t, baseline_lt, baseline_lta] default: baseline_t request_preservation: type: boolean default: true signer_info: type: object properties: name: type: string role: type: string reason: type: string location: type: string required: - document - certificate_id responses: '201': description: Documento firmado content: application/json: schema: $ref: '#/components/schemas/SignatureResult' /api/v1/legal/signatures/{id}: get: summary: Obtener firma responses: '200': description: Detalle de la firma content: application/json: schema: $ref: '#/components/schemas/DocumentSignatureDetail' /api/v1/legal/signatures/{id}/verify: post: summary: Verificar firma requestBody: content: application/json: schema: type: object properties: document: type: string format: byte description: Documento original en base64 verify_timestamp: type: boolean default: true verify_certificate_chain: type: boolean default: true responses: '200': description: Resultado de verificación content: application/json: schema: $ref: '#/components/schemas/SignatureVerification' /api/v1/legal/timestamps: post: summary: Solicitar timestamp RFC 3161 requestBody: content: application/json: schema: type: object properties: document_hash: type: string description: Hash del documento (hexadecimal) hash_algorithm: type: string enum: [sha1, sha256, sha384, sha512] default: sha256 psc_provider_id: type: string format: uuid required: - document_hash responses: '201': description: Timestamp generado content: application/json: schema: $ref: '#/components/schemas/TimestampToken' /api/v1/legal/timestamps/{id}/verify: post: summary: Verificar timestamp requestBody: content: application/json: schema: type: object properties: document_hash: type: string description: Hash original del documento required: - document_hash responses: '200': description: Resultado de verificación content: application/json: schema: $ref: '#/components/schemas/TimestampVerification' /api/v1/legal/preservation: get: summary: Listar Constancias de Conservación parameters: - name: document_signature_id in: query schema: type: string format: uuid - name: validation_status in: query schema: type: string enum: [valid, invalid, expired, revoked, pending] - name: expiring_before in: query schema: type: string format: date description: Filtrar por próximas a expirar responses: '200': description: Lista de constancias content: application/json: schema: type: array items: $ref: '#/components/schemas/PreservationRecord' post: summary: Solicitar Constancia de Conservación requestBody: content: application/json: schema: type: object properties: signature_id: type: string format: uuid psc_provider_id: type: string format: uuid required: - signature_id responses: '201': description: Constancia emitida content: application/json: schema: $ref: '#/components/schemas/PreservationResult' /api/v1/legal/preservation/{id}: get: summary: Obtener Constancia de Conservación responses: '200': description: Detalle de la constancia content: application/json: schema: $ref: '#/components/schemas/PreservationRecordDetail' /api/v1/legal/preservation/{id}/validate: post: summary: Validar Constancia de Conservación requestBody: content: application/json: schema: type: object properties: document: type: string format: byte description: Documento original en base64 (opcional) responses: '200': description: Resultado de validación content: application/json: schema: $ref: '#/components/schemas/PreservationValidation' /api/v1/legal/preservation/{id}/renew: post: summary: Renovar Constancia de Conservación requestBody: content: application/json: schema: type: object properties: psc_provider_id: type: string format: uuid responses: '201': description: Constancia renovada content: application/json: schema: $ref: '#/components/schemas/PreservationResult' /api/v1/legal/preservation/{id}/download: get: summary: Descargar evidencia ASN.1 responses: '200': description: Archivo de evidencia content: application/octet-stream: schema: type: string format: binary /api/v1/legal/psc-providers: get: summary: Listar PSCs disponibles responses: '200': description: Lista de PSCs content: application/json: schema: type: array items: $ref: '#/components/schemas/PSCProvider' components: schemas: Certificate: type: object properties: id: type: string format: uuid name: type: string certificate_type: type: string enum: [efirma, csd, psc, tsa, generic] serial_number: type: string subject_cn: type: string issuer_cn: type: string date_start: type: string format: date-time date_end: type: string format: date-time is_valid: type: boolean rfc: type: string fingerprint_sha256: type: string has_private_key: type: boolean DocumentSignature: type: object properties: id: type: string format: uuid document_model: type: string document_id: type: string format: uuid document_name: type: string document_hash: type: string certificate_id: type: string format: uuid certificate_name: type: string signature_format: type: string signature_level: type: string signed_at: type: string format: date-time signer_name: type: string is_valid: type: boolean has_timestamp: type: boolean has_preservation: type: boolean SignatureResult: type: object properties: success: type: boolean signature_id: type: string format: uuid document_hash: type: string timestamp_value: type: string format: date-time preservation_requested: type: boolean constancia_number: type: string errors: type: array items: type: string SignatureVerification: type: object properties: is_valid: type: boolean hash_valid: type: boolean signature_valid: type: boolean timestamp_valid: type: boolean certificate_valid: type: boolean certificate_chain_valid: type: boolean signed_at: type: string format: date-time errors: type: array items: type: string warnings: type: array items: type: string TimestampToken: type: object properties: id: type: string format: uuid psc_provider_name: type: string timestamp_value: type: string format: date-time serial_number: type: string policy_oid: type: string hash_algorithm: type: string is_valid: type: boolean PreservationRecord: type: object properties: id: type: string format: uuid constancia_number: type: string document_signature_id: type: string format: uuid document_name: type: string psc_provider_name: type: string preservation_date: type: string format: date-time expiration_date: type: string format: date-time validation_status: type: string enum: [valid, invalid, expired, revoked, pending] renewal_count: type: integer PreservationResult: type: object properties: success: type: boolean constancia_number: type: string preservation_date: type: string format: date-time expiration_date: type: string format: date-time psc_provider_name: type: string error: type: string PreservationValidation: type: object properties: valid: type: boolean preservation_valid: type: boolean timestamp_valid: type: boolean document_integrity: type: boolean expiration_status: type: string enum: [valid, expiring_soon, expired] psc_signature_valid: type: boolean errors: type: array items: type: string warnings: type: array items: type: string PSCProvider: type: object properties: id: type: string format: uuid name: type: string code: type: string auth_type: type: string is_active: type: boolean health_status: type: string supported_services: type: array items: type: string ``` --- ## 6. Integración con CFDI ### 6.1 Flujo de Firma de CFDI ```python class CFDISignatureService: """ Servicio de firma para CFDIs cumpliendo NOM-151. """ def sign_cfdi( self, cfdi_xml: bytes, csd_certificate_id: str, request_preservation: bool = True ) -> dict: """ Firma un CFDI con el CSD y opcionalmente solicita Constancia. Args: cfdi_xml: XML del CFDI sin firmar csd_certificate_id: ID del CSD a usar request_preservation: Solicitar Constancia NOM-151 Returns: Dict con CFDI firmado y metadata """ # 1. Validar XML contra esquema SAT self._validate_cfdi_schema(cfdi_xml) # 2. Calcular cadena original cadena_original = self._build_cadena_original(cfdi_xml) # 3. Firmar con XAdES-BES enveloped signature_result = self.signature_engine.sign_document( document=cfdi_xml, certificate_id=csd_certificate_id, signature_format=SignatureFormat.XADES, signature_level=SignatureLevel.BASELINE_T, hash_algorithm='sha256', request_preservation=request_preservation ) if not signature_result.success: raise SignatureError(signature_result.errors) # 4. Insertar sello en CFDI signed_cfdi = self._insert_sello( cfdi_xml, signature_result.signature_value, csd_certificate_id ) return { 'cfdi_xml': signed_cfdi, 'signature_id': signature_result.signature_id, 'sello': base64.b64encode(signature_result.signature_value).decode(), 'cadena_original': cadena_original, 'constancia_number': signature_result.constancia_number if request_preservation else None } def _build_cadena_original(self, cfdi_xml: bytes) -> str: """Construye la cadena original del CFDI usando XSLT del SAT.""" from lxml import etree # Cargar XSLT del SAT xslt_path = self._get_sat_xslt_path() xslt = etree.parse(xslt_path) transform = etree.XSLT(xslt) # Aplicar transformación cfdi_doc = etree.fromstring(cfdi_xml) result = transform(cfdi_doc) return str(result) def _insert_sello( self, cfdi_xml: bytes, signature: bytes, certificate_id: str ) -> bytes: """Inserta el sello digital en el CFDI.""" from lxml import etree # Obtener certificado certificate = self.cert_repo.get(certificate_id) # Parsear XML root = etree.fromstring(cfdi_xml) # Insertar atributos root.set('Sello', base64.b64encode(signature).decode()) root.set('Certificado', self._get_certificate_base64(certificate)) root.set('NoCertificado', certificate.serial_number) return etree.tostring(root, xml_declaration=True, encoding='UTF-8') ``` --- ## 7. Consideraciones de Seguridad ### 7.1 Almacenamiento de Llaves Privadas ```python class SecureKeyStorage: """ Almacenamiento seguro de llaves privadas. Usa encriptación en reposo con clave maestra. """ def __init__(self, master_key: bytes): """ Args: master_key: Clave maestra de 256 bits derivada de HSM o KMS """ from cryptography.fernet import Fernet from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # Derivar clave de encriptación kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=b'erp_suite_key_storage', # Salt fijo para este propósito iterations=100000, ) derived_key = base64.urlsafe_b64encode(kdf.derive(master_key)) self.cipher = Fernet(derived_key) def encrypt_key(self, private_key: bytes) -> bytes: """Encripta una llave privada para almacenamiento.""" return self.cipher.encrypt(private_key) def decrypt_key(self, encrypted_key: bytes) -> bytes: """Desencripta una llave privada.""" return self.cipher.decrypt(encrypted_key) ``` ### 7.2 Políticas de Acceso | Operación | Permiso Requerido | Nivel | |-----------|-------------------|-------| | Ver certificados | `legal.certificate.read` | Usuario | | Cargar certificados | `legal.certificate.create` | Administrador | | Firmar documentos | `legal.signature.create` | Usuario (con certificado asignado) | | Ver firmas | `legal.signature.read` | Usuario | | Verificar firmas | `legal.signature.verify` | Público | | Solicitar Constancia | `legal.preservation.create` | Usuario | | Validar Constancia | `legal.preservation.verify` | Público | ### 7.3 Auditoría ```sql -- Tabla de auditoría para operaciones de firma CREATE TABLE legal.signature_audit_log ( id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), operation VARCHAR(32) NOT NULL, entity_type VARCHAR(32) NOT NULL, entity_id UUID NOT NULL, user_id UUID NOT NULL REFERENCES core.users(id), ip_address INET, user_agent TEXT, details JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_signature_audit_entity ON legal.signature_audit_log(entity_type, entity_id); CREATE INDEX idx_signature_audit_user ON legal.signature_audit_log(user_id, created_at DESC); ``` --- ## 8. Mantenimiento y Renovación ### 8.1 Alertas de Expiración ```python class ExpirationMonitor: """ Monitor de expiraciones de certificados y constancias. """ ALERT_THRESHOLDS = [ (90, 'warning'), # 90 días antes (30, 'urgent'), # 30 días antes (7, 'critical'), # 7 días antes ] def check_expirations(self) -> List[dict]: """ Verifica expiraciones próximas y genera alertas. """ alerts = [] now = datetime.now(timezone.utc) # Verificar certificados for days, level in self.ALERT_THRESHOLDS: expiring_certs = self.cert_repo.find_expiring( before=now + timedelta(days=days) ) for cert in expiring_certs: alerts.append({ 'type': 'certificate_expiration', 'level': level, 'entity_id': cert.id, 'entity_name': cert.name, 'expires_at': cert.date_end, 'days_remaining': (cert.date_end - now).days }) # Verificar constancias de conservación for days, level in self.ALERT_THRESHOLDS: expiring_preservations = self.pres_repo.find_expiring( before=now + timedelta(days=days) ) for pres in expiring_preservations: alerts.append({ 'type': 'preservation_expiration', 'level': level, 'entity_id': pres.id, 'entity_name': pres.constancia_number, 'expires_at': pres.expiration_date, 'days_remaining': (pres.expiration_date - now).days }) return alerts def auto_renew_preservations(self): """ Renueva automáticamente constancias próximas a expirar. """ now = datetime.now(timezone.utc) threshold = now + timedelta(days=90) # Renovar con 90 días de anticipación expiring = self.pres_repo.find_expiring( before=threshold, status='valid' ) for pres in expiring: try: self.preservation_svc.renew_preservation(str(pres.id)) self.logger.info(f"Constancia {pres.constancia_number} renovada") except Exception as e: self.logger.error( f"Error renovando constancia {pres.constancia_number}: {e}" ) ``` --- ## 9. PSCs Autorizados (Referencia) ### 9.1 Lista de PSCs Acreditados por la SE | PSC | Servicios | URL | |-----|-----------|-----| | Edicom México | TSA, NOM-151, CFDI | edicom.com | | PSC World | TSA, NOM-151 | pscworld.com | | Interfactura | TSA, NOM-151, PAC | interfactura.com | | SeguriData | TSA, NOM-151 | seguridata.com | | Cincel | TSA, NOM-151 | cincel.mx | | Legalex | TSA, NOM-151 | legalex.mx | | Cecoban | TSA | cecoban.org.mx | ### 9.2 Directorio Oficial - **URL**: https://psc.economia.gob.mx/directorio.html - **Autoridad**: Secretaría de Economía --- ## 10. Referencias ### 10.1 Normativas - NOM-151-SCFI-2016 (DOF 30/03/2017) - Código de Comercio Federal, Art. 89-114 - Código Fiscal de la Federación, Art. 30 - Ley de Firma Electrónica Avanzada ### 10.2 Estándares Técnicos - RFC 3161: Time-Stamp Protocol - RFC 5652: Cryptographic Message Syntax (CMS) - ETSI TS 103 173: CAdES Baseline Profiles - ETSI TS 103 171: XAdES Baseline Profiles - X.509: Public Key Infrastructure ### 10.3 SAT - Portal e.firma: https://www.sat.gob.mx/ - Especificaciones CFDI: https://www.sat.gob.mx/factura-electronica --- **Documento generado para ERP-SUITE** **Versión**: 1.0 **Fecha**: 2025-01-09