88 KiB
88 KiB
SPEC-FIRMA-ELECTRONICA-NOM151: Firma Electrónica y Conservación de Mensajes de Datos
Metadata
- Código: SPEC-MGN-013
- Módulo: Legal / Cumplimiento
- Gap Relacionado: GAP-MGN-013-001
- Prioridad: P1
- Esfuerzo Estimado: 13 SP
- Versión: 1.0
- Última Actualización: 2025-01-09
- Referencias Normativas: NOM-151-SCFI-2016, RFC 3161, CAdES, XAdES
1. Resumen Ejecutivo
1.1 Objetivo
Implementar un sistema de firma electrónica avanzada y conservación de mensajes de datos que cumpla con la NOM-151-SCFI-2016, permitiendo:
- Firma electrónica de documentos con validez legal en México
- Emisión de Constancias de Conservación mediante PSCs autorizados
- Integración con e.firma/FIEL del SAT
- Sellado de tiempo mediante protocolo RFC 3161
- Conservación documental por el período legal requerido
1.2 Alcance
- Gestión de certificados X.509 (e.firma, CSD)
- Integración con PSCs autorizados por la SE
- Generación de sellos de tiempo RFC 3161
- Firma CAdES/XAdES para documentos
- Constancia de Conservación NOM-151
- Validación de firmas y certificados
- Archivo legal por 5+ años
1.3 Marco Regulatorio
| Norma/Ley | Descripción | Ámbito |
|---|---|---|
| NOM-151-SCFI-2016 | Conservación de mensajes de datos | Nacional |
| Código de Comercio Federal | Reconocimiento de firma electrónica | Nacional |
| Código Fiscal de la Federación | Art. 30 - Conservación 5 años | Fiscal |
| RFC 3161 | Time-Stamp Protocol | Internacional |
| ETSI TS 103 173 | CAdES Baseline Profiles | Internacional |
| ETSI TS 103 171 | XAdES Baseline Profiles | Internacional |
2. Modelo de Datos
2.1 Diagrama Entidad-Relación
┌─────────────────────────────────┐ ┌─────────────────────────────────┐
│ certificates │ │ certificate_keys │
│─────────────────────────────────│ │─────────────────────────────────│
│ id (PK) │ │ id (PK) │
│ name │ │ name │
│ certificate_type │◄───┐│ key_type │
│ content (encrypted) │ ││ content (encrypted) │
│ private_key_id (FK)───────────┼────┘│ password_hash │
│ serial_number │ │ is_private │
│ subject_cn │ │ algorithm │
│ issuer_cn │ │ key_size │
│ date_start │ │ company_id (FK) │
│ date_end │ └─────────────────────────────────┘
│ fingerprint_sha256 │
│ rfc │ ┌─────────────────────────────────┐
│ company_id (FK) │ │ psc_providers │
└────────────────┬────────────────┘ │─────────────────────────────────│
│ │ id (PK) │
│ │ name │
│ │ code │
│ │ tsa_url │
│ │ api_url │
│ │ auth_type │
│ │ api_key (encrypted) │
│ │ client_certificate_id (FK) │
│ │ is_active │
│ └─────────────────────────────────┘
│
▼
┌─────────────────────────────────┐ ┌─────────────────────────────────┐
│ document_signatures │ │ preservation_records │
│─────────────────────────────────│ │─────────────────────────────────│
│ id (PK) │ │ id (PK) │
│ document_model │ │ document_signature_id (FK) │
│ document_id │ │ psc_provider_id (FK) │
│ document_hash │ │ constancia_number │
│ hash_algorithm │ │ timestamp_token (binary) │
│ certificate_id (FK) │ │ timestamp_value (UTC) │
│ signature_value (binary) │ │ tst_info (binary) │
│ signature_format │ │ asn1_evidence (binary) │
│ signature_level │ │ policy_oid │
│ timestamp_token_id (FK) │ │ serial_number │
│ signed_at │ │ psc_signature │
│ is_valid │ │ psc_certificate_chain │
│ validation_errors │ │ created_at │
│ company_id (FK) │ │ expires_at │
└─────────────────────────────────┘ │ validation_status │
└─────────────────────────────────┘
▼
┌─────────────────────────────────┐
│ timestamp_tokens │
│─────────────────────────────────│
│ id (PK) │
│ document_signature_id (FK) │
│ psc_provider_id (FK) │
│ request_hash │
│ hash_algorithm │
│ tst_info (binary) │
│ timestamp_value (UTC) │
│ serial_number │
│ policy_oid │
│ tsa_certificate │
│ accuracy_seconds │
│ nonce │
│ is_valid │
│ created_at │
└─────────────────────────────────┘
2.2 Definición de Tablas
2.2.1 legal.certificates
CREATE TABLE legal.certificates (
-- Identificación
id UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
name VARCHAR(256) NOT NULL,
-- Tipo de certificado
certificate_type VARCHAR(32) NOT NULL DEFAULT 'generic'
CHECK (certificate_type IN (
'efirma', -- e.firma/FIEL del SAT
'csd', -- Certificado de Sello Digital
'psc', -- Certificado de PSC
'tsa', -- Certificado de TSA
'generic' -- Certificado genérico X.509
)),
-- Contenido del certificado (encriptado en reposo)
content BYTEA NOT NULL,
content_format VARCHAR(16) NOT NULL DEFAULT 'pem'
CHECK (content_format IN ('der', 'pem', 'pkcs12')),
-- Llave privada asociada
private_key_id UUID REFERENCES legal.certificate_keys(id),
-- Metadatos extraídos del certificado
serial_number VARCHAR(64) NOT NULL,
subject_cn VARCHAR(256),
subject_o VARCHAR(256),
subject_email VARCHAR(256),
issuer_cn VARCHAR(256),
issuer_o VARCHAR(256),
-- Validez
date_start TIMESTAMPTZ NOT NULL,
date_end TIMESTAMPTZ NOT NULL,
is_valid BOOLEAN GENERATED ALWAYS AS (
NOW() BETWEEN date_start AND date_end
) STORED,
-- Identificadores criptográficos
fingerprint_sha1 VARCHAR(64),
fingerprint_sha256 VARCHAR(128) NOT NULL,
public_key_hash VARCHAR(128),
-- Datos fiscales (para e.firma/CSD)
rfc VARCHAR(13),
curp VARCHAR(18),
-- Cadena de certificación
issuer_certificate_id UUID REFERENCES legal.certificates(id),
is_root_ca BOOLEAN NOT NULL DEFAULT false,
-- Relaciones
company_id UUID NOT NULL REFERENCES core.companies(id),
-- Estado
is_active BOOLEAN NOT NULL DEFAULT true,
loading_error TEXT,
-- Auditoría
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by UUID REFERENCES core.users(id),
-- Constraints
CONSTRAINT uk_certificate_serial_issuer UNIQUE (serial_number, issuer_cn),
CONSTRAINT chk_date_range CHECK (date_end > date_start)
);
-- Índices
CREATE INDEX idx_certificates_company ON legal.certificates(company_id);
CREATE INDEX idx_certificates_type ON legal.certificates(certificate_type) WHERE is_active;
CREATE INDEX idx_certificates_rfc ON legal.certificates(rfc) WHERE rfc IS NOT NULL;
CREATE INDEX idx_certificates_validity ON legal.certificates(date_start, date_end);
CREATE INDEX idx_certificates_fingerprint ON legal.certificates(fingerprint_sha256);
2.2.2 legal.certificate_keys
CREATE TABLE legal.certificate_keys (
-- Identificación
id UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
name VARCHAR(256) NOT NULL,
-- Tipo de llave
key_type VARCHAR(16) NOT NULL DEFAULT 'private'
CHECK (key_type IN ('private', 'public')),
-- Algoritmo y tamaño
algorithm VARCHAR(16) NOT NULL DEFAULT 'rsa'
CHECK (algorithm IN ('rsa', 'ec', 'ed25519')),
key_size INTEGER, -- 2048, 4096 para RSA; 256, 384 para EC
ec_curve VARCHAR(32), -- P-256, P-384, P-521
-- Contenido (encriptado en reposo con clave maestra)
content BYTEA NOT NULL,
content_format VARCHAR(16) NOT NULL DEFAULT 'pem'
CHECK (content_format IN ('der', 'pem', 'pkcs8')),
-- Password de la llave privada (hash PBKDF2)
password_hash VARCHAR(256),
password_salt VARCHAR(64),
-- Validación
is_valid BOOLEAN NOT NULL DEFAULT true,
loading_error TEXT,
-- Relaciones
company_id UUID NOT NULL REFERENCES core.companies(id),
-- Auditoría
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by UUID REFERENCES core.users(id),
-- Constraints
CONSTRAINT chk_rsa_key_size CHECK (
algorithm != 'rsa' OR key_size >= 2048
)
);
CREATE INDEX idx_certificate_keys_company ON legal.certificate_keys(company_id);
2.2.3 legal.psc_providers
CREATE TABLE legal.psc_providers (
-- Identificación
id UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
name VARCHAR(128) NOT NULL,
code VARCHAR(32) NOT NULL UNIQUE,
-- URLs de servicio
tsa_url VARCHAR(512), -- Endpoint RFC 3161
api_url VARCHAR(512), -- API REST del PSC
validation_url VARCHAR(512), -- Endpoint de validación
-- Autenticación
auth_type VARCHAR(32) NOT NULL DEFAULT 'api_key'
CHECK (auth_type IN (
'none',
'api_key',
'basic',
'oauth2',
'mtls' -- Mutual TLS con certificado cliente
)),
-- Credenciales (encriptadas)
api_key BYTEA,
api_secret BYTEA,
client_certificate_id UUID REFERENCES legal.certificates(id),
-- Certificado raíz del PSC
root_certificate_id UUID REFERENCES legal.certificates(id),
-- Configuración
timeout_seconds INTEGER NOT NULL DEFAULT 30,
retry_count INTEGER NOT NULL DEFAULT 3,
hash_algorithm VARCHAR(16) NOT NULL DEFAULT 'sha256',
-- Políticas OID soportadas
supported_policies TEXT[],
-- Estado
is_active BOOLEAN NOT NULL DEFAULT true,
last_health_check TIMESTAMPTZ,
health_status VARCHAR(16) DEFAULT 'unknown',
-- Relaciones
company_id UUID REFERENCES core.companies(id), -- NULL = global
-- Auditoría
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- PSCs autorizados predefinidos (datos de ejemplo)
INSERT INTO legal.psc_providers (name, code, tsa_url, auth_type, hash_algorithm) VALUES
('Edicom México', 'edicom', 'https://tsa.edicom.com/timestamp', 'mtls', 'sha256'),
('PSC World', 'pscworld', 'https://tsa.pscworld.com/api/v1/timestamp', 'api_key', 'sha256'),
('Interfactura', 'interfactura', 'https://tsa.interfactura.com/rfc3161', 'api_key', 'sha256'),
('SeguriData', 'seguridata', 'https://tsa.seguridata.com/timestamp', 'mtls', 'sha256'),
('Cincel', 'cincel', 'https://api.cincel.mx/v1/timestamp', 'oauth2', 'sha256');
2.2.4 legal.document_signatures
CREATE TABLE legal.document_signatures (
-- Identificación
id UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
-- Documento firmado (referencia polimórfica)
document_model VARCHAR(128) NOT NULL, -- 'invoicing.cfdi', 'documents.file', etc.
document_id UUID NOT NULL,
document_name VARCHAR(256),
-- Hash del documento
document_hash VARCHAR(128) NOT NULL,
hash_algorithm VARCHAR(16) NOT NULL DEFAULT 'sha256'
CHECK (hash_algorithm IN ('sha1', 'sha256', 'sha384', 'sha512')),
-- Certificado utilizado
certificate_id UUID NOT NULL REFERENCES legal.certificates(id),
-- Firma digital
signature_value BYTEA NOT NULL,
signature_format VARCHAR(16) NOT NULL DEFAULT 'cades'
CHECK (signature_format IN (
'pkcs1', -- RSA PKCS#1 v1.5 (básico)
'pkcs7', -- CMS básico
'cades', -- CAdES (CMS Advanced)
'xades', -- XAdES (XML Advanced)
'pades' -- PAdES (PDF Advanced)
)),
-- Nivel de firma (según ETSI)
signature_level VARCHAR(16) NOT NULL DEFAULT 'baseline_t'
CHECK (signature_level IN (
'baseline_b', -- Básico (sin timestamp)
'baseline_t', -- Con timestamp
'baseline_lt', -- Long-term (con certificados)
'baseline_lta' -- Long-term archival
)),
-- Timestamp asociado
timestamp_token_id UUID REFERENCES legal.timestamp_tokens(id),
-- Metadatos de firma
signed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
signer_name VARCHAR(256),
signer_role VARCHAR(128),
signature_reason TEXT,
signature_location VARCHAR(256),
-- Validación
is_valid BOOLEAN NOT NULL DEFAULT true,
last_validation TIMESTAMPTZ,
validation_errors JSONB DEFAULT '[]',
-- Relaciones
company_id UUID NOT NULL REFERENCES core.companies(id),
-- Auditoría
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Índice compuesto para documento
CONSTRAINT uk_document_signature UNIQUE (document_model, document_id, certificate_id)
);
CREATE INDEX idx_document_signatures_document
ON legal.document_signatures(document_model, document_id);
CREATE INDEX idx_document_signatures_certificate
ON legal.document_signatures(certificate_id);
CREATE INDEX idx_document_signatures_hash
ON legal.document_signatures(document_hash);
CREATE INDEX idx_document_signatures_date
ON legal.document_signatures(signed_at DESC);
2.2.5 legal.timestamp_tokens
CREATE TABLE legal.timestamp_tokens (
-- Identificación
id UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
-- Origen
psc_provider_id UUID NOT NULL REFERENCES legal.psc_providers(id),
-- Request
request_hash VARCHAR(128) NOT NULL,
hash_algorithm VARCHAR(16) NOT NULL DEFAULT 'sha256',
request_nonce VARCHAR(64),
-- Response (RFC 3161 TSTInfo)
tst_info BYTEA NOT NULL, -- ASN.1 DER encoded
timestamp_value TIMESTAMPTZ NOT NULL, -- GenTime en UTC
serial_number VARCHAR(64) NOT NULL,
policy_oid VARCHAR(64) NOT NULL,
-- Certificado TSA
tsa_certificate BYTEA,
tsa_certificate_chain BYTEA,
-- Precisión
accuracy_seconds DECIMAL(10,6),
ordering BOOLEAN NOT NULL DEFAULT false,
-- Validación
is_valid BOOLEAN NOT NULL DEFAULT true,
validation_errors JSONB DEFAULT '[]',
-- Auditoría
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Constraints
CONSTRAINT uk_timestamp_serial_provider UNIQUE (serial_number, psc_provider_id)
);
CREATE INDEX idx_timestamp_tokens_hash ON legal.timestamp_tokens(request_hash);
CREATE INDEX idx_timestamp_tokens_date ON legal.timestamp_tokens(timestamp_value DESC);
CREATE INDEX idx_timestamp_tokens_psc ON legal.timestamp_tokens(psc_provider_id);
2.2.6 legal.preservation_records (Constancia de Conservación)
CREATE TABLE legal.preservation_records (
-- Identificación
id UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
constancia_number VARCHAR(64) NOT NULL UNIQUE,
-- Documento preservado
document_signature_id UUID NOT NULL REFERENCES legal.document_signatures(id),
-- PSC que emite la constancia
psc_provider_id UUID NOT NULL REFERENCES legal.psc_providers(id),
-- Timestamp RFC 3161
timestamp_token_id UUID NOT NULL REFERENCES legal.timestamp_tokens(id),
-- Evidencia ASN.1 (archivo separado del documento original)
asn1_evidence BYTEA NOT NULL,
evidence_format VARCHAR(16) NOT NULL DEFAULT 'der',
-- Datos de la constancia
preservation_date TIMESTAMPTZ NOT NULL, -- Fecha cierta
expiration_date TIMESTAMPTZ, -- Normalmente fecha + 5 años
-- Firma del PSC
psc_signature BYTEA NOT NULL,
psc_certificate_chain BYTEA,
signature_algorithm VARCHAR(32) NOT NULL,
-- Política aplicada
preservation_policy_oid VARCHAR(64),
preservation_policy_name VARCHAR(128),
-- Validación
validation_status VARCHAR(16) NOT NULL DEFAULT 'valid'
CHECK (validation_status IN ('valid', 'invalid', 'expired', 'revoked', 'pending')),
last_validation TIMESTAMPTZ,
validation_result JSONB,
-- Renovación (para archivos de largo plazo)
previous_record_id UUID REFERENCES legal.preservation_records(id),
renewal_count INTEGER NOT NULL DEFAULT 0,
-- Auditoría
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by UUID REFERENCES core.users(id)
);
CREATE INDEX idx_preservation_records_document
ON legal.preservation_records(document_signature_id);
CREATE INDEX idx_preservation_records_date
ON legal.preservation_records(preservation_date DESC);
CREATE INDEX idx_preservation_records_expiration
ON legal.preservation_records(expiration_date)
WHERE validation_status = 'valid';
CREATE INDEX idx_preservation_records_psc
ON legal.preservation_records(psc_provider_id);
3. Arquitectura del Sistema
3.1 Flujo de Firma y Conservación
┌─────────────────────────────────────────────────────────────────────┐
│ PROCESO DE FIRMA NOM-151 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ │
│ │ Documento │ │
│ │ (PDF/XML/Binary) │ │
│ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ [1] Calcular │ │
│ │ Hash SHA-256 │ │
│ └────────┬─────────┘ │
│ │ │
│ ├─────────────────────────────────────────┐ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ [2] Firmar con │ │ [3] Solicitar │ │
│ │ Llave Privada │ │ Timestamp TSA │ │
│ │ (e.firma/CSD) │ │ (RFC 3161) │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ │ ┌─────────────┐ │ │
│ └──────────►│ [4] Crear │◄────────────┘ │
│ │ CAdES/XAdES │ │
│ │ Signature │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ [5] Enviar a │ │
│ │ PSC Autorizado │ │
│ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ [6] Recibir │ │
│ │ Constancia de │ │
│ │ Conservación │ │
│ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ [7] Almacenar │ │
│ │ ├─ Documento original │ │
│ │ ├─ Firma digital │ │
│ │ ├─ Timestamp token │ │
│ │ └─ Constancia ASN.1 (evidencia) │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
3.2 Componentes del Sistema
┌─────────────────────────────────────────────────────────────────────┐
│ ERP-SUITE │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Módulo legal.signature │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Certificate │ │ Signature │ │ Preservation │ │ │
│ │ │ Manager │ │ Engine │ │ Service │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ └──────────┬──────────┘ │ │
│ │ │ │ │ │ │
│ │ └────────────────┼─────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ PSC Connector │ │ │
│ │ │ (RFC 3161 Client) │ │ │
│ │ └──────────┬──────────┘ │ │
│ └──────────────────────────┼───────────────────────────────────┘ │
│ │ │
└─────────────────────────────┼────────────────────────────────────────┘
│ HTTPS/mTLS
▼
┌─────────────────────────────────────────┐
│ PSC Autorizado (SE) │
│ ┌───────────┐ ┌───────────────────┐ │
│ │ TSA │ │ Constancia │ │
│ │ RFC 3161 │ │ Generator │ │
│ └─────┬─────┘ └─────────┬─────────┘ │
│ │ │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌───────────────┐ │
│ │ CENAM Atomic │ │
│ │ Clock (UTC) │ │
│ └───────────────┘ │
└─────────────────────────────────────────┘
4. Implementación del Motor de Firma
4.1 Clase Principal: SignatureEngine
from dataclasses import dataclass
from typing import Optional, List, Union
from datetime import datetime, timezone
from enum import Enum
import hashlib
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec
from cryptography.x509 import load_pem_x509_certificate, load_der_x509_certificate
from cryptography.hazmat.backends import default_backend
class SignatureFormat(Enum):
PKCS1 = "pkcs1"
PKCS7 = "pkcs7"
CADES = "cades"
XADES = "xades"
PADES = "pades"
class SignatureLevel(Enum):
BASELINE_B = "baseline_b" # Básico
BASELINE_T = "baseline_t" # Con timestamp
BASELINE_LT = "baseline_lt" # Long-term
BASELINE_LTA = "baseline_lta" # Long-term archival
@dataclass
class SignatureResult:
"""Resultado de una operación de firma."""
success: bool
signature_id: Optional[str] = None
signature_value: Optional[bytes] = None
document_hash: Optional[str] = None
timestamp_token: Optional[bytes] = None
signed_at: Optional[datetime] = None
certificate_serial: Optional[str] = None
errors: List[str] = None
def __post_init__(self):
if self.errors is None:
self.errors = []
class SignatureEngine:
"""
Motor de firma electrónica compatible con NOM-151.
Soporta múltiples formatos (CAdES, XAdES, PAdES) y niveles de firma.
"""
HASH_ALGORITHMS = {
'sha1': hashlib.sha1,
'sha256': hashlib.sha256,
'sha384': hashlib.sha384,
'sha512': hashlib.sha512,
}
def __init__(
self,
certificate_repository,
timestamp_service,
preservation_service
):
self.cert_repo = certificate_repository
self.timestamp_svc = timestamp_service
self.preservation_svc = preservation_service
def sign_document(
self,
document: bytes,
certificate_id: str,
signature_format: SignatureFormat = SignatureFormat.CADES,
signature_level: SignatureLevel = SignatureLevel.BASELINE_T,
hash_algorithm: str = 'sha256',
signer_info: Optional[dict] = None,
request_preservation: bool = True
) -> SignatureResult:
"""
Firma un documento con el certificado especificado.
Args:
document: Contenido binario del documento
certificate_id: ID del certificado a usar
signature_format: Formato de firma (CAdES, XAdES, etc.)
signature_level: Nivel de firma ETSI
hash_algorithm: Algoritmo de hash (sha256 recomendado)
signer_info: Información adicional del firmante
request_preservation: Si se debe solicitar Constancia de Conservación
Returns:
SignatureResult con el resultado de la operación
"""
try:
# 1. Obtener certificado y validar
certificate = self._get_and_validate_certificate(certificate_id)
if not certificate:
return SignatureResult(
success=False,
errors=["Certificado no válido o no encontrado"]
)
# 2. Calcular hash del documento
document_hash = self._calculate_hash(document, hash_algorithm)
# 3. Crear firma según formato
if signature_format == SignatureFormat.CADES:
signature_value = self._create_cades_signature(
document_hash, certificate, hash_algorithm
)
elif signature_format == SignatureFormat.XADES:
signature_value = self._create_xades_signature(
document, certificate, hash_algorithm
)
elif signature_format == SignatureFormat.PADES:
signature_value = self._create_pades_signature(
document, certificate, hash_algorithm
)
else:
signature_value = self._create_basic_signature(
document_hash, certificate
)
# 4. Obtener timestamp si es requerido
timestamp_token = None
if signature_level in [
SignatureLevel.BASELINE_T,
SignatureLevel.BASELINE_LT,
SignatureLevel.BASELINE_LTA
]:
timestamp_result = self.timestamp_svc.get_timestamp(
message_imprint=document_hash,
hash_algorithm=hash_algorithm
)
if timestamp_result.success:
timestamp_token = timestamp_result.token
else:
return SignatureResult(
success=False,
errors=[f"Error obteniendo timestamp: {timestamp_result.error}"]
)
# 5. Guardar firma en base de datos
signature_record = self._save_signature(
document_hash=document_hash,
hash_algorithm=hash_algorithm,
certificate_id=certificate_id,
signature_value=signature_value,
signature_format=signature_format.value,
signature_level=signature_level.value,
timestamp_token=timestamp_token,
signer_info=signer_info
)
# 6. Solicitar Constancia de Conservación si es requerido
if request_preservation:
self.preservation_svc.request_preservation(
signature_id=signature_record.id
)
return SignatureResult(
success=True,
signature_id=str(signature_record.id),
signature_value=signature_value,
document_hash=document_hash,
timestamp_token=timestamp_token,
signed_at=datetime.now(timezone.utc),
certificate_serial=certificate.serial_number
)
except Exception as e:
return SignatureResult(
success=False,
errors=[str(e)]
)
def verify_signature(
self,
document: bytes,
signature_id: str,
verify_timestamp: bool = True,
verify_certificate_chain: bool = True
) -> dict:
"""
Verifica una firma existente.
Returns:
Dict con resultado de validación detallado
"""
result = {
'is_valid': False,
'hash_valid': False,
'signature_valid': False,
'timestamp_valid': None,
'certificate_valid': False,
'certificate_chain_valid': None,
'errors': [],
'warnings': []
}
try:
# Obtener registro de firma
signature_record = self._get_signature_record(signature_id)
if not signature_record:
result['errors'].append("Firma no encontrada")
return result
# 1. Verificar hash del documento
current_hash = self._calculate_hash(
document, signature_record.hash_algorithm
)
result['hash_valid'] = (current_hash == signature_record.document_hash)
if not result['hash_valid']:
result['errors'].append("El documento ha sido modificado")
return result
# 2. Verificar firma criptográfica
certificate = self.cert_repo.get(signature_record.certificate_id)
result['signature_valid'] = self._verify_signature_value(
signature_record.signature_value,
signature_record.document_hash,
certificate
)
if not result['signature_valid']:
result['errors'].append("Firma criptográfica inválida")
return result
# 3. Verificar certificado
result['certificate_valid'] = self._validate_certificate(certificate)
if not result['certificate_valid']:
result['warnings'].append(
"El certificado ha expirado o no es válido actualmente"
)
# 4. Verificar cadena de certificación
if verify_certificate_chain:
chain_result = self._verify_certificate_chain(certificate)
result['certificate_chain_valid'] = chain_result['valid']
if not chain_result['valid']:
result['warnings'].extend(chain_result['errors'])
# 5. Verificar timestamp
if verify_timestamp and signature_record.timestamp_token_id:
timestamp_result = self.timestamp_svc.verify_timestamp(
signature_record.timestamp_token_id
)
result['timestamp_valid'] = timestamp_result['valid']
if not timestamp_result['valid']:
result['warnings'].append(
f"Timestamp inválido: {timestamp_result['error']}"
)
# Determinar validez general
result['is_valid'] = (
result['hash_valid'] and
result['signature_valid']
)
return result
except Exception as e:
result['errors'].append(str(e))
return result
def _calculate_hash(self, data: bytes, algorithm: str) -> str:
"""Calcula el hash del documento."""
hasher = self.HASH_ALGORITHMS.get(algorithm)
if not hasher:
raise ValueError(f"Algoritmo de hash no soportado: {algorithm}")
return hasher(data).hexdigest()
def _get_and_validate_certificate(self, certificate_id: str):
"""Obtiene y valida el certificado."""
cert = self.cert_repo.get(certificate_id)
if not cert:
return None
# Verificar que no haya expirado
now = datetime.now(timezone.utc)
if not (cert.date_start <= now <= cert.date_end):
return None
# Verificar que tenga llave privada
if not cert.private_key_id:
return None
return cert
def _create_cades_signature(
self,
document_hash: str,
certificate,
hash_algorithm: str
) -> bytes:
"""
Crea una firma CAdES-BES (Basic Electronic Signature).
"""
from asn1crypto import cms, core, algos, x509 as asn1_x509
# Obtener llave privada
private_key = self._load_private_key(certificate.private_key_id)
# Construir SignedData
hash_bytes = bytes.fromhex(document_hash)
# MessageDigest attribute
message_digest = cms.AttCertAttributeType({
'type': 'message_digest',
'values': [core.OctetBitString(hash_bytes)]
})
# Signing time
signing_time = cms.AttCertAttributeType({
'type': 'signing_time',
'values': [core.UTCTime(datetime.now(timezone.utc))]
})
# Content type
content_type = cms.AttCertAttributeType({
'type': 'content_type',
'values': [cms.ContentType('data')]
})
# Signed attributes
signed_attrs = cms.CMSAttributes([
content_type,
signing_time,
message_digest
])
# Firmar los atributos
attrs_der = signed_attrs.dump()
if isinstance(private_key, rsa.RSAPrivateKey):
signature = private_key.sign(
attrs_der,
padding.PKCS1v15(),
hashes.SHA256() if hash_algorithm == 'sha256' else hashes.SHA1()
)
elif isinstance(private_key, ec.EllipticCurvePrivateKey):
signature = private_key.sign(
attrs_der,
ec.ECDSA(hashes.SHA256())
)
else:
raise ValueError("Tipo de llave no soportado")
# Construir SignerInfo
cert_der = self._get_certificate_der(certificate)
cert_asn1 = asn1_x509.Certificate.load(cert_der)
signer_info = cms.SignerInfo({
'version': 'v1',
'sid': cms.SignerIdentifier({
'issuer_and_serial_number': cms.IssuerAndSerialNumber({
'issuer': cert_asn1['tbs_certificate']['issuer'],
'serial_number': cert_asn1['tbs_certificate']['serial_number']
})
}),
'digest_algorithm': algos.DigestAlgorithm({
'algorithm': 'sha256' if hash_algorithm == 'sha256' else 'sha1'
}),
'signed_attrs': signed_attrs,
'signature_algorithm': algos.SignedDigestAlgorithm({
'algorithm': 'sha256_rsa' if hash_algorithm == 'sha256' else 'sha1_rsa'
}),
'signature': signature
})
# Construir SignedData
signed_data = cms.SignedData({
'version': 'v1',
'digest_algorithms': [
algos.DigestAlgorithm({'algorithm': hash_algorithm})
],
'encap_content_info': cms.ContentInfo({
'content_type': 'data'
}),
'certificates': [cert_asn1],
'signer_infos': [signer_info]
})
# Envolver en ContentInfo
content_info = cms.ContentInfo({
'content_type': 'signed_data',
'content': signed_data
})
return content_info.dump()
def _create_xades_signature(
self,
document: bytes,
certificate,
hash_algorithm: str
) -> bytes:
"""
Crea una firma XAdES-BES (enveloped signature para XML).
"""
from lxml import etree
from signxml import XMLSigner, methods
# Cargar documento XML
try:
root = etree.fromstring(document)
except etree.XMLSyntaxError:
raise ValueError("El documento no es XML válido para XAdES")
# Obtener llave privada y certificado
private_key_pem = self._get_private_key_pem(certificate.private_key_id)
cert_pem = self._get_certificate_pem(certificate)
# Crear firmador XAdES
signer = XMLSigner(
method=methods.enveloped,
digest_algorithm=hash_algorithm,
signature_algorithm='rsa-sha256',
c14n_algorithm='http://www.w3.org/2001/10/xml-exc-c14n#'
)
# Firmar
signed_root = signer.sign(
root,
key=private_key_pem,
cert=cert_pem
)
return etree.tostring(signed_root, xml_declaration=True, encoding='UTF-8')
def _create_pades_signature(
self,
document: bytes,
certificate,
hash_algorithm: str
) -> bytes:
"""
Crea una firma PAdES (PDF Advanced Electronic Signatures).
"""
from pypdf import PdfReader, PdfWriter
from endesive import pdf as endesive_pdf
# Obtener llave privada y certificado
private_key_pem = self._get_private_key_pem(certificate.private_key_id)
cert_pem = self._get_certificate_pem(certificate)
# Configuración de firma
dct = {
'sigflags': 3,
'contact': certificate.subject_email or '',
'location': 'México',
'signingdate': datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S+00\'00\''),
'reason': 'Firma electrónica NOM-151',
}
# Firmar PDF
signed_pdf = endesive_pdf.cms.sign(
document,
dct,
private_key_pem,
cert_pem,
[], # Certificados intermedios
'sha256'
)
return signed_pdf
def _create_basic_signature(
self,
document_hash: str,
certificate
) -> bytes:
"""Crea una firma PKCS#1 básica."""
private_key = self._load_private_key(certificate.private_key_id)
hash_bytes = bytes.fromhex(document_hash)
if isinstance(private_key, rsa.RSAPrivateKey):
signature = private_key.sign(
hash_bytes,
padding.PKCS1v15(),
hashes.SHA256()
)
elif isinstance(private_key, ec.EllipticCurvePrivateKey):
signature = private_key.sign(
hash_bytes,
ec.ECDSA(hashes.SHA256())
)
else:
raise ValueError("Tipo de llave no soportado")
return signature
4.2 Cliente RFC 3161 (Timestamp)
import requests
from asn1crypto import tsp, core, algos
from dataclasses import dataclass
from typing import Optional
import hashlib
import secrets
@dataclass
class TimestampResult:
"""Resultado de solicitud de timestamp."""
success: bool
token: Optional[bytes] = None
timestamp_value: Optional[datetime] = None
serial_number: Optional[str] = None
policy_oid: Optional[str] = None
tst_info: Optional[bytes] = None
error: Optional[str] = None
class RFC3161Client:
"""
Cliente para obtener timestamps RFC 3161 de PSCs autorizados.
"""
def __init__(self, psc_provider):
self.psc = psc_provider
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""Crea sesión HTTP con autenticación apropiada."""
session = requests.Session()
session.timeout = self.psc.timeout_seconds
if self.psc.auth_type == 'api_key':
session.headers['Authorization'] = f'Bearer {self._decrypt_api_key()}'
elif self.psc.auth_type == 'basic':
session.auth = (
self._decrypt_api_key(),
self._decrypt_api_secret()
)
elif self.psc.auth_type == 'mtls':
# Mutual TLS con certificado cliente
session.cert = self._get_client_cert_path()
return session
def get_timestamp(
self,
message_imprint: str,
hash_algorithm: str = 'sha256',
policy_oid: Optional[str] = None,
include_cert: bool = True
) -> TimestampResult:
"""
Solicita un timestamp RFC 3161 al PSC.
Args:
message_imprint: Hash del documento (hexadecimal)
hash_algorithm: Algoritmo usado para el hash
policy_oid: OID de política (opcional)
include_cert: Incluir certificado TSA en respuesta
Returns:
TimestampResult con el token o error
"""
try:
# Construir TimeStampReq
hash_bytes = bytes.fromhex(message_imprint)
nonce = secrets.randbits(64)
# MessageImprint
hash_oid = {
'sha1': '1.3.14.3.2.26',
'sha256': '2.16.840.1.101.3.4.2.1',
'sha384': '2.16.840.1.101.3.4.2.2',
'sha512': '2.16.840.1.101.3.4.2.3',
}.get(hash_algorithm)
message_imprint_obj = tsp.MessageImprint({
'hash_algorithm': algos.DigestAlgorithm({
'algorithm': hash_algorithm
}),
'hashed_message': hash_bytes
})
# TimeStampReq
ts_req = tsp.TimeStampReq({
'version': 1,
'message_imprint': message_imprint_obj,
'nonce': core.Integer(nonce),
'cert_req': include_cert
})
if policy_oid:
ts_req['req_policy'] = core.ObjectIdentifier(policy_oid)
# Enviar solicitud
request_bytes = ts_req.dump()
response = self.session.post(
self.psc.tsa_url,
data=request_bytes,
headers={
'Content-Type': 'application/timestamp-query',
'Accept': 'application/timestamp-reply'
}
)
if response.status_code != 200:
return TimestampResult(
success=False,
error=f"Error HTTP {response.status_code}: {response.text}"
)
# Parsear respuesta
ts_resp = tsp.TimeStampResp.load(response.content)
# Verificar status
status = ts_resp['status']['status'].native
if status != 'granted' and status != 'granted_with_mods':
status_string = ts_resp['status'].get('status_string')
fail_info = ts_resp['status'].get('fail_info')
return TimestampResult(
success=False,
error=f"TSA rechazó solicitud: {status_string or fail_info}"
)
# Extraer TSTInfo
time_stamp_token = ts_resp['time_stamp_token']
tst_info = self._extract_tst_info(time_stamp_token)
# Verificar nonce
if tst_info['nonce'].native != nonce:
return TimestampResult(
success=False,
error="Nonce en respuesta no coincide con solicitud"
)
# Verificar hash
resp_hash = tst_info['message_imprint']['hashed_message'].native
if resp_hash != hash_bytes:
return TimestampResult(
success=False,
error="Hash en respuesta no coincide con solicitud"
)
return TimestampResult(
success=True,
token=response.content,
timestamp_value=tst_info['gen_time'].native,
serial_number=str(tst_info['serial_number'].native),
policy_oid=tst_info['policy'].native,
tst_info=tst_info.dump()
)
except Exception as e:
return TimestampResult(
success=False,
error=str(e)
)
def verify_timestamp(
self,
timestamp_token: bytes,
original_hash: str,
hash_algorithm: str = 'sha256'
) -> dict:
"""
Verifica un timestamp token existente.
Returns:
Dict con resultado de validación
"""
result = {
'valid': False,
'hash_matches': False,
'signature_valid': False,
'certificate_valid': False,
'timestamp_value': None,
'error': None
}
try:
ts_resp = tsp.TimeStampResp.load(timestamp_token)
time_stamp_token = ts_resp['time_stamp_token']
tst_info = self._extract_tst_info(time_stamp_token)
# Verificar hash
hash_bytes = bytes.fromhex(original_hash)
resp_hash = tst_info['message_imprint']['hashed_message'].native
result['hash_matches'] = (resp_hash == hash_bytes)
if not result['hash_matches']:
result['error'] = "Hash no coincide"
return result
# Extraer timestamp
result['timestamp_value'] = tst_info['gen_time'].native
# Verificar firma del TSA
signed_data = time_stamp_token['content']
result['signature_valid'] = self._verify_cms_signature(signed_data)
# Verificar certificado TSA
certs = signed_data['certificates']
if certs:
result['certificate_valid'] = self._verify_tsa_certificate(certs[0])
result['valid'] = (
result['hash_matches'] and
result['signature_valid']
)
return result
except Exception as e:
result['error'] = str(e)
return result
def _extract_tst_info(self, time_stamp_token) -> tsp.TSTInfo:
"""Extrae TSTInfo del token."""
signed_data = time_stamp_token['content']
encap_content = signed_data['encap_content_info']['content']
return tsp.TSTInfo.load(encap_content.native)
def _verify_cms_signature(self, signed_data) -> bool:
"""Verifica la firma CMS del timestamp."""
# Implementación simplificada
# En producción usar biblioteca de verificación CMS completa
signer_infos = signed_data['signer_infos']
if not signer_infos:
return False
# Verificar que hay al menos un firmante válido
return len(signer_infos) > 0
def _verify_tsa_certificate(self, cert) -> bool:
"""Verifica el certificado del TSA."""
from cryptography import x509
try:
cert_obj = x509.load_der_x509_certificate(
cert.dump(),
default_backend()
)
# Verificar validez temporal
now = datetime.now(timezone.utc)
if not (cert_obj.not_valid_before_utc <= now <= cert_obj.not_valid_after_utc):
return False
# Verificar extended key usage para timestamping
try:
eku = cert_obj.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.EXTENDED_KEY_USAGE
)
if x509.oid.ExtendedKeyUsageOID.TIME_STAMPING not in eku.value:
return False
except x509.extensions.ExtensionNotFound:
pass # EKU no es obligatorio
return True
except Exception:
return False
4.3 Servicio de Constancia de Conservación
@dataclass
class PreservationResult:
"""Resultado de solicitud de Constancia de Conservación."""
success: bool
constancia_number: Optional[str] = None
preservation_date: Optional[datetime] = None
asn1_evidence: Optional[bytes] = None
expiration_date: Optional[datetime] = None
error: Optional[str] = None
class PreservationService:
"""
Servicio para gestionar Constancias de Conservación NOM-151.
"""
PRESERVATION_YEARS = 5 # Mínimo legal según CFF Art. 30
def __init__(
self,
psc_connector,
signature_repository,
preservation_repository,
timestamp_service
):
self.psc = psc_connector
self.sig_repo = signature_repository
self.pres_repo = preservation_repository
self.ts_svc = timestamp_service
def request_preservation(
self,
signature_id: str,
psc_provider_id: Optional[str] = None
) -> PreservationResult:
"""
Solicita una Constancia de Conservación para un documento firmado.
Args:
signature_id: ID de la firma del documento
psc_provider_id: ID del PSC a usar (opcional, usa default)
Returns:
PreservationResult con la constancia o error
"""
try:
# Obtener firma
signature = self.sig_repo.get(signature_id)
if not signature:
return PreservationResult(
success=False,
error="Firma no encontrada"
)
# Obtener PSC
psc = self._get_psc_provider(psc_provider_id)
if not psc:
return PreservationResult(
success=False,
error="No hay PSC disponible"
)
# Solicitar timestamp si no existe
if not signature.timestamp_token_id:
ts_result = self.ts_svc.get_timestamp(
message_imprint=signature.document_hash,
hash_algorithm=signature.hash_algorithm
)
if not ts_result.success:
return PreservationResult(
success=False,
error=f"Error obteniendo timestamp: {ts_result.error}"
)
# Guardar timestamp
timestamp_record = self._save_timestamp(ts_result, psc.id)
signature.timestamp_token_id = timestamp_record.id
self.sig_repo.update(signature)
else:
timestamp_record = self._get_timestamp(signature.timestamp_token_id)
# Generar Constancia
constancia = self._generate_constancia(
signature=signature,
timestamp=timestamp_record,
psc=psc
)
# Guardar registro de preservación
preservation_record = self._save_preservation(
signature_id=signature_id,
timestamp_id=timestamp_record.id,
psc_id=psc.id,
constancia=constancia
)
return PreservationResult(
success=True,
constancia_number=preservation_record.constancia_number,
preservation_date=preservation_record.preservation_date,
asn1_evidence=preservation_record.asn1_evidence,
expiration_date=preservation_record.expiration_date
)
except Exception as e:
return PreservationResult(
success=False,
error=str(e)
)
def validate_preservation(
self,
preservation_id: str,
original_document: Optional[bytes] = None
) -> dict:
"""
Valida una Constancia de Conservación existente.
Returns:
Dict con resultado de validación detallado
"""
result = {
'valid': False,
'preservation_valid': False,
'timestamp_valid': False,
'document_integrity': None,
'expiration_status': None,
'psc_signature_valid': False,
'errors': [],
'warnings': []
}
try:
# Obtener registro
preservation = self.pres_repo.get(preservation_id)
if not preservation:
result['errors'].append("Registro de preservación no encontrado")
return result
# Verificar expiración
now = datetime.now(timezone.utc)
if preservation.expiration_date:
if now > preservation.expiration_date:
result['expiration_status'] = 'expired'
result['warnings'].append(
f"Constancia expirada el {preservation.expiration_date}"
)
elif now > preservation.expiration_date - timedelta(days=90):
result['expiration_status'] = 'expiring_soon'
result['warnings'].append(
f"Constancia expira el {preservation.expiration_date}"
)
else:
result['expiration_status'] = 'valid'
# Verificar timestamp
ts_result = self.ts_svc.verify_timestamp(preservation.timestamp_token_id)
result['timestamp_valid'] = ts_result.get('valid', False)
# Verificar firma del PSC
result['psc_signature_valid'] = self._verify_psc_signature(preservation)
# Verificar integridad del documento si se proporciona
if original_document:
signature = self.sig_repo.get(preservation.document_signature_id)
current_hash = self._calculate_hash(
original_document,
signature.hash_algorithm
)
result['document_integrity'] = (current_hash == signature.document_hash)
if not result['document_integrity']:
result['errors'].append("El documento ha sido modificado")
# Determinar validez general
result['preservation_valid'] = (
result['timestamp_valid'] and
result['psc_signature_valid'] and
result['expiration_status'] != 'expired'
)
result['valid'] = result['preservation_valid']
if result['document_integrity'] is not None:
result['valid'] = result['valid'] and result['document_integrity']
return result
except Exception as e:
result['errors'].append(str(e))
return result
def renew_preservation(
self,
preservation_id: str,
psc_provider_id: Optional[str] = None
) -> PreservationResult:
"""
Renueva una Constancia de Conservación antes de su expiración.
Crea un nuevo registro vinculado al anterior.
"""
try:
# Obtener registro actual
current = self.pres_repo.get(preservation_id)
if not current:
return PreservationResult(
success=False,
error="Registro de preservación no encontrado"
)
# Verificar que aún es válido
validation = self.validate_preservation(preservation_id)
if not validation['valid']:
return PreservationResult(
success=False,
error="No se puede renovar una constancia inválida"
)
# Obtener la firma original
signature = self.sig_repo.get(current.document_signature_id)
# Solicitar nueva constancia
result = self.request_preservation(
signature_id=str(signature.id),
psc_provider_id=psc_provider_id
)
if result.success:
# Vincular con registro anterior
new_record = self.pres_repo.get_by_constancia(result.constancia_number)
new_record.previous_record_id = current.id
new_record.renewal_count = current.renewal_count + 1
self.pres_repo.update(new_record)
return result
except Exception as e:
return PreservationResult(
success=False,
error=str(e)
)
def _generate_constancia(
self,
signature,
timestamp,
psc
) -> dict:
"""
Genera la estructura de la Constancia de Conservación.
"""
from asn1crypto import core, cms
# Crear estructura ASN.1 de evidencia
preservation_date = datetime.now(timezone.utc)
expiration_date = preservation_date + timedelta(days=365 * self.PRESERVATION_YEARS)
# Número de constancia único
constancia_number = self._generate_constancia_number(psc.code)
evidence = {
'version': 1,
'constancia_number': constancia_number,
'preservation_date': preservation_date.isoformat(),
'document_hash': signature.document_hash,
'hash_algorithm': signature.hash_algorithm,
'timestamp_serial': timestamp.serial_number,
'timestamp_value': timestamp.timestamp_value.isoformat(),
'psc_code': psc.code,
'psc_name': psc.name,
'policy_oid': timestamp.policy_oid,
'expiration_date': expiration_date.isoformat()
}
# Serializar a ASN.1 (simplificado - en producción usar estructura completa)
evidence_bytes = core.UTF8String(str(evidence)).dump()
# Firmar con certificado del PSC
psc_signature = self._sign_with_psc(evidence_bytes, psc)
return {
'constancia_number': constancia_number,
'preservation_date': preservation_date,
'expiration_date': expiration_date,
'asn1_evidence': evidence_bytes,
'psc_signature': psc_signature
}
def _generate_constancia_number(self, psc_code: str) -> str:
"""Genera número único de constancia."""
import uuid
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')
unique = uuid.uuid4().hex[:8].upper()
return f"NOM151-{psc_code.upper()}-{timestamp}-{unique}"
5. API REST
5.1 Endpoints
openapi: 3.0.3
info:
title: Electronic Signature and Preservation API
version: 1.0.0
description: API para firma electrónica NOM-151 y Constancias de Conservación
paths:
/api/v1/legal/certificates:
get:
summary: Listar certificados
parameters:
- name: type
in: query
schema:
type: string
enum: [efirma, csd, psc, tsa, generic]
- name: is_valid
in: query
schema:
type: boolean
- name: rfc
in: query
schema:
type: string
responses:
'200':
description: Lista de certificados
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/Certificate'
post:
summary: Cargar certificado
requestBody:
content:
multipart/form-data:
schema:
type: object
properties:
name:
type: string
certificate_type:
type: string
enum: [efirma, csd, psc, tsa, generic]
certificate_file:
type: string
format: binary
private_key_file:
type: string
format: binary
password:
type: string
format: password
required:
- name
- certificate_file
responses:
'201':
description: Certificado cargado
content:
application/json:
schema:
$ref: '#/components/schemas/Certificate'
/api/v1/legal/certificates/{id}:
get:
summary: Obtener certificado
responses:
'200':
description: Detalle del certificado
content:
application/json:
schema:
$ref: '#/components/schemas/CertificateDetail'
delete:
summary: Eliminar certificado
responses:
'204':
description: Certificado eliminado
/api/v1/legal/certificates/{id}/validate:
post:
summary: Validar certificado
responses:
'200':
description: Resultado de validación
content:
application/json:
schema:
$ref: '#/components/schemas/CertificateValidation'
/api/v1/legal/signatures:
get:
summary: Listar firmas
parameters:
- name: document_model
in: query
schema:
type: string
- name: document_id
in: query
schema:
type: string
format: uuid
- name: date_from
in: query
schema:
type: string
format: date
- name: date_to
in: query
schema:
type: string
format: date
responses:
'200':
description: Lista de firmas
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/DocumentSignature'
post:
summary: Firmar documento
requestBody:
content:
application/json:
schema:
type: object
properties:
document:
type: string
format: byte
description: Documento en base64
document_model:
type: string
description: Modelo del documento (ej. invoicing.cfdi)
document_id:
type: string
format: uuid
certificate_id:
type: string
format: uuid
signature_format:
type: string
enum: [pkcs1, pkcs7, cades, xades, pades]
default: cades
signature_level:
type: string
enum: [baseline_b, baseline_t, baseline_lt, baseline_lta]
default: baseline_t
request_preservation:
type: boolean
default: true
signer_info:
type: object
properties:
name:
type: string
role:
type: string
reason:
type: string
location:
type: string
required:
- document
- certificate_id
responses:
'201':
description: Documento firmado
content:
application/json:
schema:
$ref: '#/components/schemas/SignatureResult'
/api/v1/legal/signatures/{id}:
get:
summary: Obtener firma
responses:
'200':
description: Detalle de la firma
content:
application/json:
schema:
$ref: '#/components/schemas/DocumentSignatureDetail'
/api/v1/legal/signatures/{id}/verify:
post:
summary: Verificar firma
requestBody:
content:
application/json:
schema:
type: object
properties:
document:
type: string
format: byte
description: Documento original en base64
verify_timestamp:
type: boolean
default: true
verify_certificate_chain:
type: boolean
default: true
responses:
'200':
description: Resultado de verificación
content:
application/json:
schema:
$ref: '#/components/schemas/SignatureVerification'
/api/v1/legal/timestamps:
post:
summary: Solicitar timestamp RFC 3161
requestBody:
content:
application/json:
schema:
type: object
properties:
document_hash:
type: string
description: Hash del documento (hexadecimal)
hash_algorithm:
type: string
enum: [sha1, sha256, sha384, sha512]
default: sha256
psc_provider_id:
type: string
format: uuid
required:
- document_hash
responses:
'201':
description: Timestamp generado
content:
application/json:
schema:
$ref: '#/components/schemas/TimestampToken'
/api/v1/legal/timestamps/{id}/verify:
post:
summary: Verificar timestamp
requestBody:
content:
application/json:
schema:
type: object
properties:
document_hash:
type: string
description: Hash original del documento
required:
- document_hash
responses:
'200':
description: Resultado de verificación
content:
application/json:
schema:
$ref: '#/components/schemas/TimestampVerification'
/api/v1/legal/preservation:
get:
summary: Listar Constancias de Conservación
parameters:
- name: document_signature_id
in: query
schema:
type: string
format: uuid
- name: validation_status
in: query
schema:
type: string
enum: [valid, invalid, expired, revoked, pending]
- name: expiring_before
in: query
schema:
type: string
format: date
description: Filtrar por próximas a expirar
responses:
'200':
description: Lista de constancias
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/PreservationRecord'
post:
summary: Solicitar Constancia de Conservación
requestBody:
content:
application/json:
schema:
type: object
properties:
signature_id:
type: string
format: uuid
psc_provider_id:
type: string
format: uuid
required:
- signature_id
responses:
'201':
description: Constancia emitida
content:
application/json:
schema:
$ref: '#/components/schemas/PreservationResult'
/api/v1/legal/preservation/{id}:
get:
summary: Obtener Constancia de Conservación
responses:
'200':
description: Detalle de la constancia
content:
application/json:
schema:
$ref: '#/components/schemas/PreservationRecordDetail'
/api/v1/legal/preservation/{id}/validate:
post:
summary: Validar Constancia de Conservación
requestBody:
content:
application/json:
schema:
type: object
properties:
document:
type: string
format: byte
description: Documento original en base64 (opcional)
responses:
'200':
description: Resultado de validación
content:
application/json:
schema:
$ref: '#/components/schemas/PreservationValidation'
/api/v1/legal/preservation/{id}/renew:
post:
summary: Renovar Constancia de Conservación
requestBody:
content:
application/json:
schema:
type: object
properties:
psc_provider_id:
type: string
format: uuid
responses:
'201':
description: Constancia renovada
content:
application/json:
schema:
$ref: '#/components/schemas/PreservationResult'
/api/v1/legal/preservation/{id}/download:
get:
summary: Descargar evidencia ASN.1
responses:
'200':
description: Archivo de evidencia
content:
application/octet-stream:
schema:
type: string
format: binary
/api/v1/legal/psc-providers:
get:
summary: Listar PSCs disponibles
responses:
'200':
description: Lista de PSCs
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/PSCProvider'
components:
schemas:
Certificate:
type: object
properties:
id:
type: string
format: uuid
name:
type: string
certificate_type:
type: string
enum: [efirma, csd, psc, tsa, generic]
serial_number:
type: string
subject_cn:
type: string
issuer_cn:
type: string
date_start:
type: string
format: date-time
date_end:
type: string
format: date-time
is_valid:
type: boolean
rfc:
type: string
fingerprint_sha256:
type: string
has_private_key:
type: boolean
DocumentSignature:
type: object
properties:
id:
type: string
format: uuid
document_model:
type: string
document_id:
type: string
format: uuid
document_name:
type: string
document_hash:
type: string
certificate_id:
type: string
format: uuid
certificate_name:
type: string
signature_format:
type: string
signature_level:
type: string
signed_at:
type: string
format: date-time
signer_name:
type: string
is_valid:
type: boolean
has_timestamp:
type: boolean
has_preservation:
type: boolean
SignatureResult:
type: object
properties:
success:
type: boolean
signature_id:
type: string
format: uuid
document_hash:
type: string
timestamp_value:
type: string
format: date-time
preservation_requested:
type: boolean
constancia_number:
type: string
errors:
type: array
items:
type: string
SignatureVerification:
type: object
properties:
is_valid:
type: boolean
hash_valid:
type: boolean
signature_valid:
type: boolean
timestamp_valid:
type: boolean
certificate_valid:
type: boolean
certificate_chain_valid:
type: boolean
signed_at:
type: string
format: date-time
errors:
type: array
items:
type: string
warnings:
type: array
items:
type: string
TimestampToken:
type: object
properties:
id:
type: string
format: uuid
psc_provider_name:
type: string
timestamp_value:
type: string
format: date-time
serial_number:
type: string
policy_oid:
type: string
hash_algorithm:
type: string
is_valid:
type: boolean
PreservationRecord:
type: object
properties:
id:
type: string
format: uuid
constancia_number:
type: string
document_signature_id:
type: string
format: uuid
document_name:
type: string
psc_provider_name:
type: string
preservation_date:
type: string
format: date-time
expiration_date:
type: string
format: date-time
validation_status:
type: string
enum: [valid, invalid, expired, revoked, pending]
renewal_count:
type: integer
PreservationResult:
type: object
properties:
success:
type: boolean
constancia_number:
type: string
preservation_date:
type: string
format: date-time
expiration_date:
type: string
format: date-time
psc_provider_name:
type: string
error:
type: string
PreservationValidation:
type: object
properties:
valid:
type: boolean
preservation_valid:
type: boolean
timestamp_valid:
type: boolean
document_integrity:
type: boolean
expiration_status:
type: string
enum: [valid, expiring_soon, expired]
psc_signature_valid:
type: boolean
errors:
type: array
items:
type: string
warnings:
type: array
items:
type: string
PSCProvider:
type: object
properties:
id:
type: string
format: uuid
name:
type: string
code:
type: string
auth_type:
type: string
is_active:
type: boolean
health_status:
type: string
supported_services:
type: array
items:
type: string
6. Integración con CFDI
6.1 Flujo de Firma de CFDI
class CFDISignatureService:
"""
Servicio de firma para CFDIs cumpliendo NOM-151.
"""
def sign_cfdi(
self,
cfdi_xml: bytes,
csd_certificate_id: str,
request_preservation: bool = True
) -> dict:
"""
Firma un CFDI con el CSD y opcionalmente solicita Constancia.
Args:
cfdi_xml: XML del CFDI sin firmar
csd_certificate_id: ID del CSD a usar
request_preservation: Solicitar Constancia NOM-151
Returns:
Dict con CFDI firmado y metadata
"""
# 1. Validar XML contra esquema SAT
self._validate_cfdi_schema(cfdi_xml)
# 2. Calcular cadena original
cadena_original = self._build_cadena_original(cfdi_xml)
# 3. Firmar con XAdES-BES enveloped
signature_result = self.signature_engine.sign_document(
document=cfdi_xml,
certificate_id=csd_certificate_id,
signature_format=SignatureFormat.XADES,
signature_level=SignatureLevel.BASELINE_T,
hash_algorithm='sha256',
request_preservation=request_preservation
)
if not signature_result.success:
raise SignatureError(signature_result.errors)
# 4. Insertar sello en CFDI
signed_cfdi = self._insert_sello(
cfdi_xml,
signature_result.signature_value,
csd_certificate_id
)
return {
'cfdi_xml': signed_cfdi,
'signature_id': signature_result.signature_id,
'sello': base64.b64encode(signature_result.signature_value).decode(),
'cadena_original': cadena_original,
'constancia_number': signature_result.constancia_number if request_preservation else None
}
def _build_cadena_original(self, cfdi_xml: bytes) -> str:
"""Construye la cadena original del CFDI usando XSLT del SAT."""
from lxml import etree
# Cargar XSLT del SAT
xslt_path = self._get_sat_xslt_path()
xslt = etree.parse(xslt_path)
transform = etree.XSLT(xslt)
# Aplicar transformación
cfdi_doc = etree.fromstring(cfdi_xml)
result = transform(cfdi_doc)
return str(result)
def _insert_sello(
self,
cfdi_xml: bytes,
signature: bytes,
certificate_id: str
) -> bytes:
"""Inserta el sello digital en el CFDI."""
from lxml import etree
# Obtener certificado
certificate = self.cert_repo.get(certificate_id)
# Parsear XML
root = etree.fromstring(cfdi_xml)
# Insertar atributos
root.set('Sello', base64.b64encode(signature).decode())
root.set('Certificado', self._get_certificate_base64(certificate))
root.set('NoCertificado', certificate.serial_number)
return etree.tostring(root, xml_declaration=True, encoding='UTF-8')
7. Consideraciones de Seguridad
7.1 Almacenamiento de Llaves Privadas
class SecureKeyStorage:
"""
Almacenamiento seguro de llaves privadas.
Usa encriptación en reposo con clave maestra.
"""
def __init__(self, master_key: bytes):
"""
Args:
master_key: Clave maestra de 256 bits derivada de HSM o KMS
"""
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
# Derivar clave de encriptación
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b'erp_suite_key_storage', # Salt fijo para este propósito
iterations=100000,
)
derived_key = base64.urlsafe_b64encode(kdf.derive(master_key))
self.cipher = Fernet(derived_key)
def encrypt_key(self, private_key: bytes) -> bytes:
"""Encripta una llave privada para almacenamiento."""
return self.cipher.encrypt(private_key)
def decrypt_key(self, encrypted_key: bytes) -> bytes:
"""Desencripta una llave privada."""
return self.cipher.decrypt(encrypted_key)
7.2 Políticas de Acceso
| Operación | Permiso Requerido | Nivel |
|---|---|---|
| Ver certificados | legal.certificate.read |
Usuario |
| Cargar certificados | legal.certificate.create |
Administrador |
| Firmar documentos | legal.signature.create |
Usuario (con certificado asignado) |
| Ver firmas | legal.signature.read |
Usuario |
| Verificar firmas | legal.signature.verify |
Público |
| Solicitar Constancia | legal.preservation.create |
Usuario |
| Validar Constancia | legal.preservation.verify |
Público |
7.3 Auditoría
-- Tabla de auditoría para operaciones de firma
CREATE TABLE legal.signature_audit_log (
id UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
operation VARCHAR(32) NOT NULL,
entity_type VARCHAR(32) NOT NULL,
entity_id UUID NOT NULL,
user_id UUID NOT NULL REFERENCES core.users(id),
ip_address INET,
user_agent TEXT,
details JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_signature_audit_entity
ON legal.signature_audit_log(entity_type, entity_id);
CREATE INDEX idx_signature_audit_user
ON legal.signature_audit_log(user_id, created_at DESC);
8. Mantenimiento y Renovación
8.1 Alertas de Expiración
class ExpirationMonitor:
"""
Monitor de expiraciones de certificados y constancias.
"""
ALERT_THRESHOLDS = [
(90, 'warning'), # 90 días antes
(30, 'urgent'), # 30 días antes
(7, 'critical'), # 7 días antes
]
def check_expirations(self) -> List[dict]:
"""
Verifica expiraciones próximas y genera alertas.
"""
alerts = []
now = datetime.now(timezone.utc)
# Verificar certificados
for days, level in self.ALERT_THRESHOLDS:
expiring_certs = self.cert_repo.find_expiring(
before=now + timedelta(days=days)
)
for cert in expiring_certs:
alerts.append({
'type': 'certificate_expiration',
'level': level,
'entity_id': cert.id,
'entity_name': cert.name,
'expires_at': cert.date_end,
'days_remaining': (cert.date_end - now).days
})
# Verificar constancias de conservación
for days, level in self.ALERT_THRESHOLDS:
expiring_preservations = self.pres_repo.find_expiring(
before=now + timedelta(days=days)
)
for pres in expiring_preservations:
alerts.append({
'type': 'preservation_expiration',
'level': level,
'entity_id': pres.id,
'entity_name': pres.constancia_number,
'expires_at': pres.expiration_date,
'days_remaining': (pres.expiration_date - now).days
})
return alerts
def auto_renew_preservations(self):
"""
Renueva automáticamente constancias próximas a expirar.
"""
now = datetime.now(timezone.utc)
threshold = now + timedelta(days=90) # Renovar con 90 días de anticipación
expiring = self.pres_repo.find_expiring(
before=threshold,
status='valid'
)
for pres in expiring:
try:
self.preservation_svc.renew_preservation(str(pres.id))
self.logger.info(f"Constancia {pres.constancia_number} renovada")
except Exception as e:
self.logger.error(
f"Error renovando constancia {pres.constancia_number}: {e}"
)
9. PSCs Autorizados (Referencia)
9.1 Lista de PSCs Acreditados por la SE
| PSC | Servicios | URL |
|---|---|---|
| Edicom México | TSA, NOM-151, CFDI | edicom.com |
| PSC World | TSA, NOM-151 | pscworld.com |
| Interfactura | TSA, NOM-151, PAC | interfactura.com |
| SeguriData | TSA, NOM-151 | seguridata.com |
| Cincel | TSA, NOM-151 | cincel.mx |
| Legalex | TSA, NOM-151 | legalex.mx |
| Cecoban | TSA | cecoban.org.mx |
9.2 Directorio Oficial
- URL: https://psc.economia.gob.mx/directorio.html
- Autoridad: Secretaría de Economía
10. Referencias
10.1 Normativas
- NOM-151-SCFI-2016 (DOF 30/03/2017)
- Código de Comercio Federal, Art. 89-114
- Código Fiscal de la Federación, Art. 30
- Ley de Firma Electrónica Avanzada
10.2 Estándares Técnicos
- RFC 3161: Time-Stamp Protocol
- RFC 5652: Cryptographic Message Syntax (CMS)
- ETSI TS 103 173: CAdES Baseline Profiles
- ETSI TS 103 171: XAdES Baseline Profiles
- X.509: Public Key Infrastructure
10.3 SAT
- Portal e.firma: https://www.sat.gob.mx/
- Especificaciones CFDI: https://www.sat.gob.mx/factura-electronica
Documento generado para ERP-SUITE Versión: 1.0 Fecha: 2025-01-09