64 KiB
64 KiB
SPEC-TWO-FACTOR-AUTHENTICATION: Autenticación de Dos Factores (2FA)
Metadata
- Código: SPEC-MGN-001
- Módulo: Seguridad / Autenticación
- Gap Relacionado: GAP-MGN-001-004
- Prioridad: P1
- Esfuerzo Estimado: 8 SP
- Versión: 1.0
- Última Actualización: 2025-01-09
- Referencias: RFC 6238 (TOTP), OWASP Authentication Guidelines
1. Resumen Ejecutivo
1.1 Objetivo
Implementar un sistema de autenticación de dos factores (2FA) que proporcione:
- TOTP (Time-based One-Time Password) como método principal
- Códigos de respaldo para recuperación
- Dispositivos de confianza para mejorar UX
- Múltiples métodos de verificación (TOTP, SMS, Email)
- Protección contra fuerza bruta y replay attacks
1.2 Alcance
- Configuración y gestión de 2FA por usuario
- Generación y validación de códigos TOTP
- Gestión de códigos de respaldo (backup codes)
- Sistema de dispositivos de confianza
- Integración con el flujo de autenticación existente
- Detección de anomalías y alertas de seguridad
1.3 Métodos de 2FA Soportados
| Método | Prioridad | Descripción |
|---|---|---|
| TOTP | Principal | Códigos de 6 dígitos vía app (Google Authenticator, Authy) |
| SMS | Secundario | Códigos enviados por SMS |
| Terciario | Códigos enviados por correo | |
| WebAuthn | Futuro | Llaves de seguridad hardware (FIDO2) |
2. Modelo de Datos
2.1 Diagrama Entidad-Relación
┌─────────────────────────────────┐
│ core.users │
│─────────────────────────────────│
│ id (PK) │
│ ... │
│ mfa_enabled │
│ mfa_method │
│ mfa_secret (encrypted) │
│ backup_codes (encrypted) │
│ mfa_setup_at │
└────────────────┬────────────────┘
│
┌────────┴────────┐
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────────────┐
│ auth.sessions │ │ auth.trusted_devices │
│─────────────────│ │─────────────────────────│
│ id (PK) │ │ id (PK) │
│ user_id (FK) │ │ user_id (FK) │
│ device_id │ │ device_fingerprint │
│ is_2fa_verified │ │ device_name │
│ ip_address │ │ user_agent │
│ user_agent │ │ last_used_at │
│ location │ │ trust_expires_at │
│ created_at │ │ is_active │
│ expires_at │ └─────────────────────────┘
└─────────────────┘
│
▼
┌─────────────────────────────────┐
│ auth.verification_codes │
│─────────────────────────────────│
│ id (PK) │
│ user_id (FK) │
│ session_id (FK) │
│ code_type │
│ code_hash │
│ attempts │
│ created_at │
│ expires_at │
│ used_at │
└─────────────────────────────────┘
2.2 Definición de Tablas
2.2.1 Extensión de core.users
-- Agregar columnas de MFA a la tabla de usuarios existente
ALTER TABLE core.users ADD COLUMN IF NOT EXISTS
mfa_enabled BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE core.users ADD COLUMN IF NOT EXISTS
mfa_method VARCHAR(16) DEFAULT 'none'
CHECK (mfa_method IN ('none', 'totp', 'sms', 'email'));
-- Secreto TOTP encriptado con AES-256-GCM
ALTER TABLE core.users ADD COLUMN IF NOT EXISTS
mfa_secret BYTEA;
-- Códigos de respaldo (array de hashes SHA-256)
ALTER TABLE core.users ADD COLUMN IF NOT EXISTS
backup_codes JSONB DEFAULT '[]';
ALTER TABLE core.users ADD COLUMN IF NOT EXISTS
backup_codes_count INTEGER NOT NULL DEFAULT 0;
ALTER TABLE core.users ADD COLUMN IF NOT EXISTS
mfa_setup_at TIMESTAMPTZ;
ALTER TABLE core.users ADD COLUMN IF NOT EXISTS
last_2fa_verification TIMESTAMPTZ;
-- Constraint de consistencia
ALTER TABLE core.users ADD CONSTRAINT chk_mfa_consistency CHECK (
(mfa_enabled = true AND mfa_secret IS NOT NULL AND mfa_method != 'none') OR
(mfa_enabled = false)
);
-- Índice para búsqueda de usuarios con MFA
CREATE INDEX idx_users_mfa_enabled ON core.users(mfa_enabled) WHERE mfa_enabled = true;
2.2.2 auth.trusted_devices
CREATE TABLE auth.trusted_devices (
-- Identificación
id UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
-- Relación con usuario
user_id UUID NOT NULL REFERENCES core.users(id) ON DELETE CASCADE,
-- Identificación del dispositivo
device_fingerprint VARCHAR(128) NOT NULL,
device_name VARCHAR(128), -- "iPhone de Juan", "Chrome en MacBook"
device_type VARCHAR(32), -- 'mobile', 'desktop', 'tablet'
-- Información del dispositivo
user_agent TEXT,
browser_name VARCHAR(64),
browser_version VARCHAR(32),
os_name VARCHAR(64),
os_version VARCHAR(32),
-- Ubicación del registro
registered_ip INET NOT NULL,
registered_location JSONB, -- {country, city, lat, lng}
-- Estado de confianza
is_active BOOLEAN NOT NULL DEFAULT true,
trust_level VARCHAR(16) NOT NULL DEFAULT 'standard'
CHECK (trust_level IN ('standard', 'high', 'temporary')),
trust_expires_at TIMESTAMPTZ, -- NULL = no expira
-- Uso
last_used_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_used_ip INET,
use_count INTEGER NOT NULL DEFAULT 1,
-- Auditoría
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
revoked_at TIMESTAMPTZ,
revoked_reason VARCHAR(128),
-- Constraints
CONSTRAINT uk_trusted_device_user_fingerprint
UNIQUE (user_id, device_fingerprint)
);
CREATE INDEX idx_trusted_devices_user ON auth.trusted_devices(user_id) WHERE is_active;
CREATE INDEX idx_trusted_devices_fingerprint ON auth.trusted_devices(device_fingerprint);
CREATE INDEX idx_trusted_devices_expires ON auth.trusted_devices(trust_expires_at)
WHERE trust_expires_at IS NOT NULL AND is_active;
2.2.3 auth.verification_codes
CREATE TABLE auth.verification_codes (
-- Identificación
id UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
-- Relaciones
user_id UUID NOT NULL REFERENCES core.users(id) ON DELETE CASCADE,
session_id UUID REFERENCES auth.sessions(id) ON DELETE CASCADE,
-- Tipo de código
code_type VARCHAR(16) NOT NULL
CHECK (code_type IN ('totp_setup', 'sms', 'email', 'backup')),
-- Código (hash SHA-256)
code_hash VARCHAR(64) NOT NULL,
code_length INTEGER NOT NULL DEFAULT 6,
-- Destino (para SMS/Email)
destination VARCHAR(256), -- Teléfono o email
-- Intentos
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 5,
-- Validez
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
used_at TIMESTAMPTZ,
-- Metadata
ip_address INET,
user_agent TEXT,
-- Constraint
CONSTRAINT chk_code_not_expired CHECK (used_at IS NULL OR used_at <= expires_at)
);
CREATE INDEX idx_verification_codes_user ON auth.verification_codes(user_id, code_type)
WHERE used_at IS NULL;
CREATE INDEX idx_verification_codes_expires ON auth.verification_codes(expires_at)
WHERE used_at IS NULL;
2.2.4 auth.mfa_audit_log
CREATE TABLE auth.mfa_audit_log (
-- Identificación
id UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
-- Usuario
user_id UUID NOT NULL REFERENCES core.users(id),
-- Evento
event_type VARCHAR(32) NOT NULL
CHECK (event_type IN (
'mfa_setup_initiated',
'mfa_setup_completed',
'mfa_disabled',
'totp_verified',
'totp_failed',
'backup_code_used',
'backup_codes_regenerated',
'device_trusted',
'device_revoked',
'anomaly_detected',
'account_locked',
'account_unlocked'
)),
-- Resultado
success BOOLEAN NOT NULL,
failure_reason VARCHAR(128),
-- Contexto
ip_address INET,
user_agent TEXT,
device_fingerprint VARCHAR(128),
location JSONB,
-- Metadata adicional
metadata JSONB DEFAULT '{}',
-- Timestamp
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_mfa_audit_user ON auth.mfa_audit_log(user_id, created_at DESC);
CREATE INDEX idx_mfa_audit_event ON auth.mfa_audit_log(event_type, created_at DESC);
CREATE INDEX idx_mfa_audit_failures ON auth.mfa_audit_log(user_id, created_at DESC)
WHERE success = false;
3. Especificación TOTP (RFC 6238)
3.1 Parámetros del Algoritmo
| Parámetro | Valor | Descripción |
|---|---|---|
| Algoritmo | HMAC-SHA1 | Estándar RFC 6238 |
| Dígitos | 6 | Longitud del código |
| Período | 30 segundos | Ventana de tiempo |
| Ventana de tolerancia | ±1 período | ±30 segundos |
| Longitud del secreto | 32 bytes | 256 bits de entropía |
| Codificación | Base32 | Para apps authenticator |
3.2 Pseudocódigo del Algoritmo
def generate_totp(secret: bytes, time_step: int = 30, digits: int = 6) -> str:
"""
Genera un código TOTP según RFC 6238.
Args:
secret: Secreto compartido (32 bytes)
time_step: Período en segundos (default 30)
digits: Número de dígitos (default 6)
Returns:
Código TOTP de 6 dígitos
"""
import hmac
import hashlib
import time
import struct
# Calcular contador de tiempo
counter = int(time.time()) // time_step
# Convertir contador a bytes (big-endian, 8 bytes)
counter_bytes = struct.pack('>Q', counter)
# Calcular HMAC-SHA1
hmac_result = hmac.new(secret, counter_bytes, hashlib.sha1).digest()
# Truncamiento dinámico
offset = hmac_result[-1] & 0x0F
binary_code = struct.unpack('>I', hmac_result[offset:offset+4])[0]
binary_code &= 0x7FFFFFFF # Eliminar bit de signo
# Obtener dígitos
otp = binary_code % (10 ** digits)
return str(otp).zfill(digits)
def verify_totp(
secret: bytes,
code: str,
time_step: int = 30,
window: int = 1
) -> bool:
"""
Verifica un código TOTP con tolerancia de ventana.
Args:
secret: Secreto compartido
code: Código a verificar
time_step: Período en segundos
window: Ventanas de tolerancia (±)
Returns:
True si el código es válido
"""
import time
current_counter = int(time.time()) // time_step
# Verificar ventana actual y adyacentes
for offset in range(-window, window + 1):
counter = current_counter + offset
expected_code = generate_totp_from_counter(secret, counter)
if hmac.compare_digest(code, expected_code):
return True
return False
3.3 Implementación del Servicio TOTP
import secrets
import base64
import hashlib
import hmac
from dataclasses import dataclass
from typing import Optional, List, Tuple
from datetime import datetime, timezone, timedelta
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from urllib.parse import quote
@dataclass
class TOTPSetupResult:
"""Resultado de configuración de TOTP."""
secret_base32: str
qr_code_url: str
backup_codes: List[str]
otpauth_uri: str
@dataclass
class TOTPVerifyResult:
"""Resultado de verificación de TOTP."""
valid: bool
method_used: str # 'totp' o 'backup_code'
remaining_backup_codes: Optional[int] = None
class TOTPService:
"""
Servicio de gestión de TOTP para 2FA.
"""
ALGORITHM = 'SHA1'
DIGITS = 6
PERIOD = 30
WINDOW = 1
SECRET_LENGTH = 32 # bytes
BACKUP_CODE_COUNT = 10
BACKUP_CODE_LENGTH = 8 # caracteres hex
def __init__(self, encryption_key: bytes, issuer: str = "ERP-Suite"):
"""
Args:
encryption_key: Clave AES-256 para encriptar secretos
issuer: Nombre del emisor para apps authenticator
"""
self.cipher = AESGCM(encryption_key)
self.issuer = issuer
def setup_totp(self, user_id: str, user_email: str) -> TOTPSetupResult:
"""
Inicia la configuración de TOTP para un usuario.
Returns:
TOTPSetupResult con secreto, QR y códigos de respaldo
"""
# Generar secreto aleatorio (32 bytes = 256 bits)
secret = secrets.token_bytes(self.SECRET_LENGTH)
# Codificar en Base32 para apps authenticator
secret_base32 = base64.b32encode(secret).decode('ascii')
# Remover padding
secret_base32 = secret_base32.rstrip('=')
# Generar URI OTPAuth
otpauth_uri = self._generate_otpauth_uri(
secret_base32=secret_base32,
account_name=user_email,
issuer=self.issuer
)
# Generar QR Code
qr_code_url = self._generate_qr_code(otpauth_uri)
# Generar códigos de respaldo
backup_codes = self._generate_backup_codes()
return TOTPSetupResult(
secret_base32=secret_base32,
qr_code_url=qr_code_url,
backup_codes=backup_codes,
otpauth_uri=otpauth_uri
)
def enable_totp(
self,
user_id: str,
secret_base32: str,
verification_code: str,
backup_codes: List[str]
) -> Tuple[bool, Optional[bytes], Optional[List[str]]]:
"""
Habilita TOTP después de verificar el código de configuración.
Returns:
(success, encrypted_secret, hashed_backup_codes)
"""
# Decodificar secreto
secret = base64.b32decode(secret_base32 + '=' * (8 - len(secret_base32) % 8))
# Verificar código
if not self._verify_code(secret, verification_code):
return False, None, None
# Encriptar secreto para almacenamiento
encrypted_secret = self._encrypt_secret(secret)
# Hashear códigos de respaldo
hashed_backup_codes = [
hashlib.sha256(code.encode()).hexdigest()
for code in backup_codes
]
return True, encrypted_secret, hashed_backup_codes
def verify_totp(
self,
encrypted_secret: bytes,
code: str,
hashed_backup_codes: List[str]
) -> TOTPVerifyResult:
"""
Verifica un código TOTP o de respaldo.
Returns:
TOTPVerifyResult con estado de verificación
"""
# Primero intentar como código TOTP
secret = self._decrypt_secret(encrypted_secret)
if self._verify_code(secret, code):
return TOTPVerifyResult(
valid=True,
method_used='totp'
)
# Intentar como código de respaldo
code_normalized = code.upper().replace('-', '')
code_hash = hashlib.sha256(code_normalized.encode()).hexdigest()
if code_hash in hashed_backup_codes:
# Remover código usado
remaining_codes = [c for c in hashed_backup_codes if c != code_hash]
return TOTPVerifyResult(
valid=True,
method_used='backup_code',
remaining_backup_codes=len(remaining_codes)
)
return TOTPVerifyResult(valid=False, method_used='none')
def regenerate_backup_codes(
self,
encrypted_secret: bytes,
verification_code: str
) -> Optional[List[str]]:
"""
Regenera códigos de respaldo después de verificar TOTP.
Returns:
Lista de nuevos códigos o None si verificación falla
"""
secret = self._decrypt_secret(encrypted_secret)
if not self._verify_code(secret, verification_code):
return None
return self._generate_backup_codes()
def _verify_code(self, secret: bytes, code: str) -> bool:
"""Verifica un código TOTP con tolerancia de ventana."""
import time
import struct
# Normalizar código
code = code.strip()
if len(code) != self.DIGITS or not code.isdigit():
return False
current_counter = int(time.time()) // self.PERIOD
for offset in range(-self.WINDOW, self.WINDOW + 1):
counter = current_counter + offset
expected = self._generate_code(secret, counter)
if hmac.compare_digest(code, expected):
return True
return False
def _generate_code(self, secret: bytes, counter: int) -> str:
"""Genera código TOTP para un contador específico."""
import struct
counter_bytes = struct.pack('>Q', counter)
hmac_result = hmac.new(secret, counter_bytes, hashlib.sha1).digest()
offset = hmac_result[-1] & 0x0F
binary_code = struct.unpack('>I', hmac_result[offset:offset+4])[0]
binary_code &= 0x7FFFFFFF
otp = binary_code % (10 ** self.DIGITS)
return str(otp).zfill(self.DIGITS)
def _generate_otpauth_uri(
self,
secret_base32: str,
account_name: str,
issuer: str
) -> str:
"""Genera URI OTPAuth para QR code."""
return (
f"otpauth://totp/{quote(issuer)}:{quote(account_name)}"
f"?secret={secret_base32}"
f"&issuer={quote(issuer)}"
f"&algorithm={self.ALGORITHM}"
f"&digits={self.DIGITS}"
f"&period={self.PERIOD}"
)
def _generate_qr_code(self, uri: str) -> str:
"""Genera QR code como Data URL."""
import qrcode
import io
import base64
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = io.BytesIO()
img.save(buffer, format='PNG')
img_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/png;base64,{img_str}"
def _generate_backup_codes(self) -> List[str]:
"""Genera códigos de respaldo formateados."""
codes = []
for _ in range(self.BACKUP_CODE_COUNT):
# 8 caracteres hex = 32 bits de entropía
raw = secrets.token_hex(4).upper()
# Formato XXXX-XXXX
formatted = f"{raw[:4]}-{raw[4:]}"
codes.append(formatted)
return codes
def _encrypt_secret(self, secret: bytes) -> bytes:
"""Encripta secreto con AES-256-GCM."""
nonce = secrets.token_bytes(12) # 96-bit nonce
ciphertext = self.cipher.encrypt(nonce, secret, None)
return nonce + ciphertext
def _decrypt_secret(self, encrypted: bytes) -> bytes:
"""Desencripta secreto."""
nonce = encrypted[:12]
ciphertext = encrypted[12:]
return self.cipher.decrypt(nonce, ciphertext, None)
4. Sistema de Dispositivos de Confianza
4.1 Generación de Fingerprint
import hashlib
from dataclasses import dataclass
from typing import Optional
@dataclass
class DeviceInfo:
"""Información del dispositivo."""
fingerprint: str
device_name: str
device_type: str # 'mobile', 'desktop', 'tablet'
browser_name: str
browser_version: str
os_name: str
os_version: str
user_agent: str
class DeviceFingerprintService:
"""
Servicio para generar y gestionar fingerprints de dispositivos.
"""
def generate_fingerprint(
self,
user_agent: str,
accept_language: str,
accept_encoding: str,
client_hints: Optional[dict] = None
) -> DeviceInfo:
"""
Genera fingerprint del dispositivo basado en headers HTTP.
Args:
user_agent: Header User-Agent
accept_language: Header Accept-Language
accept_encoding: Header Accept-Encoding
client_hints: Headers Sec-CH-* (opcional)
Returns:
DeviceInfo con fingerprint y metadata
"""
from user_agents import parse
# Parsear User-Agent
ua = parse(user_agent)
# Componentes del fingerprint
components = [
user_agent,
accept_language,
accept_encoding,
]
# Agregar client hints si están disponibles
if client_hints:
components.extend([
client_hints.get('sec-ch-ua', ''),
client_hints.get('sec-ch-ua-platform', ''),
client_hints.get('sec-ch-ua-mobile', ''),
])
# Generar hash
fingerprint = hashlib.sha256(
'|'.join(components).encode()
).hexdigest()[:64]
# Determinar tipo de dispositivo
if ua.is_mobile:
device_type = 'mobile'
elif ua.is_tablet:
device_type = 'tablet'
else:
device_type = 'desktop'
# Generar nombre amigable
device_name = f"{ua.browser.family} en {ua.os.family}"
return DeviceInfo(
fingerprint=fingerprint,
device_name=device_name,
device_type=device_type,
browser_name=ua.browser.family,
browser_version=ua.browser.version_string,
os_name=ua.os.family,
os_version=ua.os.version_string,
user_agent=user_agent
)
class TrustedDeviceService:
"""
Servicio para gestionar dispositivos de confianza.
"""
TRUST_DURATION_DAYS = 30 # Duración de confianza por defecto
MAX_TRUSTED_DEVICES = 5 # Máximo dispositivos por usuario
def __init__(self, db_session, fingerprint_service: DeviceFingerprintService):
self.db = db_session
self.fp_service = fingerprint_service
def check_device_trusted(
self,
user_id: str,
device_fingerprint: str
) -> bool:
"""
Verifica si un dispositivo es de confianza.
Returns:
True si el dispositivo es de confianza y no ha expirado
"""
result = self.db.execute("""
SELECT id FROM auth.trusted_devices
WHERE user_id = :user_id
AND device_fingerprint = :fingerprint
AND is_active = true
AND (trust_expires_at IS NULL OR trust_expires_at > NOW())
""", {
'user_id': user_id,
'fingerprint': device_fingerprint
})
return result.fetchone() is not None
def trust_device(
self,
user_id: str,
device_info: DeviceInfo,
ip_address: str,
location: Optional[dict] = None,
trust_level: str = 'standard',
duration_days: Optional[int] = None
) -> str:
"""
Marca un dispositivo como de confianza.
Returns:
ID del dispositivo de confianza
"""
duration = duration_days or self.TRUST_DURATION_DAYS
expires_at = datetime.now(timezone.utc) + timedelta(days=duration)
# Verificar límite de dispositivos
count = self.db.execute("""
SELECT COUNT(*) FROM auth.trusted_devices
WHERE user_id = :user_id AND is_active = true
""", {'user_id': user_id}).scalar()
if count >= self.MAX_TRUSTED_DEVICES:
# Remover el dispositivo más antiguo
self.db.execute("""
UPDATE auth.trusted_devices
SET is_active = false,
revoked_at = NOW(),
revoked_reason = 'max_devices_exceeded'
WHERE id = (
SELECT id FROM auth.trusted_devices
WHERE user_id = :user_id AND is_active = true
ORDER BY last_used_at ASC
LIMIT 1
)
""", {'user_id': user_id})
# Insertar o actualizar dispositivo
result = self.db.execute("""
INSERT INTO auth.trusted_devices (
user_id, device_fingerprint, device_name, device_type,
user_agent, browser_name, browser_version,
os_name, os_version, registered_ip, registered_location,
trust_level, trust_expires_at
) VALUES (
:user_id, :fingerprint, :name, :type,
:user_agent, :browser, :browser_version,
:os, :os_version, :ip, :location,
:trust_level, :expires_at
)
ON CONFLICT (user_id, device_fingerprint)
DO UPDATE SET
is_active = true,
trust_expires_at = :expires_at,
last_used_at = NOW(),
use_count = auth.trusted_devices.use_count + 1,
revoked_at = NULL,
revoked_reason = NULL
RETURNING id
""", {
'user_id': user_id,
'fingerprint': device_info.fingerprint,
'name': device_info.device_name,
'type': device_info.device_type,
'user_agent': device_info.user_agent,
'browser': device_info.browser_name,
'browser_version': device_info.browser_version,
'os': device_info.os_name,
'os_version': device_info.os_version,
'ip': ip_address,
'location': location,
'trust_level': trust_level,
'expires_at': expires_at
})
return result.fetchone()[0]
def revoke_device(
self,
user_id: str,
device_id: str,
reason: str = 'user_requested'
) -> bool:
"""Revoca la confianza de un dispositivo."""
result = self.db.execute("""
UPDATE auth.trusted_devices
SET is_active = false,
revoked_at = NOW(),
revoked_reason = :reason
WHERE id = :device_id AND user_id = :user_id
RETURNING id
""", {
'device_id': device_id,
'user_id': user_id,
'reason': reason
})
return result.fetchone() is not None
def revoke_all_devices(self, user_id: str, reason: str = 'security_reset'):
"""Revoca todos los dispositivos de confianza de un usuario."""
self.db.execute("""
UPDATE auth.trusted_devices
SET is_active = false,
revoked_at = NOW(),
revoked_reason = :reason
WHERE user_id = :user_id AND is_active = true
""", {
'user_id': user_id,
'reason': reason
})
def get_user_devices(self, user_id: str) -> List[dict]:
"""Obtiene la lista de dispositivos de un usuario."""
result = self.db.execute("""
SELECT
id,
device_name,
device_type,
browser_name,
os_name,
registered_ip,
trust_level,
last_used_at,
trust_expires_at,
is_active,
created_at
FROM auth.trusted_devices
WHERE user_id = :user_id
ORDER BY last_used_at DESC
""", {'user_id': user_id})
return [dict(row) for row in result.fetchall()]
5. Flujo de Autenticación con 2FA
5.1 Diagrama de Flujo
┌─────────────────────────────────────────────────────────────────────┐
│ FLUJO DE LOGIN CON 2FA │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ │
│ │ Usuario ingresa │ │
│ │ email/password │ │
│ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Verificar │ │
│ │ credenciales │ │
│ └────────┬─────────┘ │
│ │ │
│ ¿Válidas? │
│ / \ │
│ NO SI │
│ │ │ │
│ ▼ ▼ │
│ ┌────────┐ ┌──────────────────┐ │
│ │ Error │ │ ¿MFA habilitado? │ │
│ │ login │ └────────┬─────────┘ │
│ └────────┘ / \ │
│ NO SI │
│ │ │ │
│ │ ▼ │
│ │ ┌──────────────────┐ │
│ │ │ ¿Dispositivo de │ │
│ │ │ confianza? │ │
│ │ └────────┬─────────┘ │
│ │ / \ │
│ │ SI NO │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ ┌──────────────────┐ │
│ │ │ │ Emitir token │ │
│ │ │ │ temporal (5 min) │ │
│ │ │ └────────┬─────────┘ │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ ┌──────────────────┐ │
│ │ │ │ Solicitar código │ │
│ │ │ │ TOTP/Backup │ │
│ │ │ └────────┬─────────┘ │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ ┌──────────────────┐ │
│ │ │ │ Verificar código │ │
│ │ │ └────────┬─────────┘ │
│ │ │ / \ │
│ │ │ OK FAIL │
│ │ │ │ │ │
│ │ │ │ ▼ │
│ │ │ │ ┌──────────┐ │
│ │ │ │ │ ¿Reintentos │ │
│ │ │ │ │ disponibles? │ │
│ │ │ │ └─────┬────────┘ │
│ │ │ │ / \ │
│ │ │ │ SI NO │
│ │ │ │ │ │ │
│ │ │ │ │ ▼ │
│ │ │ │ │ Bloquear │
│ │ │ │ │ cuenta │
│ │ │ │ │ │ │
│ │ │ │ ▼ │ │
│ │ │ │ Volver a │ │
│ │ │ │ solicitar │ │
│ │ │ │ │ │
│ │ └──────┼────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────┐ │
│ │ Emitir tokens de │ │
│ │ sesión (access + │ │
│ │ refresh) │ │
│ └──────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ ¿Confiar en este │ │
│ │ dispositivo? │◄── Opcional (checkbox) │
│ └──────────┬───────────┘ │
│ / \ │
│ SI NO │
│ │ │ │
│ ▼ │ │
│ Marcar como │ │
│ confianza │ │
│ │ │ │
│ └────────┤ │
│ ▼ │
│ ┌───────────┐ │
│ │ Login OK │ │
│ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
5.2 Implementación del Controlador
from dataclasses import dataclass
from typing import Optional
from datetime import datetime, timezone, timedelta
import jwt
@dataclass
class LoginRequest:
email: str
password: str
totp_code: Optional[str] = None
trust_device: bool = False
@dataclass
class LoginResponse:
success: bool
requires_2fa: bool = False
temp_token: Optional[str] = None
access_token: Optional[str] = None
refresh_token: Optional[str] = None
user: Optional[dict] = None
error: Optional[str] = None
class AuthController:
"""
Controlador de autenticación con soporte 2FA.
"""
TEMP_TOKEN_EXPIRY = 300 # 5 minutos
MAX_2FA_ATTEMPTS = 5
LOCKOUT_DURATION = 900 # 15 minutos
def __init__(
self,
user_service,
totp_service: TOTPService,
trusted_device_service: TrustedDeviceService,
token_service,
rate_limiter
):
self.user_svc = user_service
self.totp_svc = totp_service
self.device_svc = trusted_device_service
self.token_svc = token_service
self.rate_limiter = rate_limiter
async def login(
self,
request: LoginRequest,
ip_address: str,
user_agent: str
) -> LoginResponse:
"""
Procesa solicitud de login con soporte 2FA.
"""
# Rate limiting por IP
if not await self.rate_limiter.check(f"login:{ip_address}"):
return LoginResponse(
success=False,
error="Demasiados intentos. Intenta más tarde."
)
# Verificar credenciales
user = await self.user_svc.authenticate(
request.email,
request.password
)
if not user:
await self.rate_limiter.increment(f"login:{ip_address}")
return LoginResponse(
success=False,
error="Credenciales inválidas"
)
# Verificar si cuenta está bloqueada
if user.locked_until and user.locked_until > datetime.now(timezone.utc):
return LoginResponse(
success=False,
error=f"Cuenta bloqueada hasta {user.locked_until}"
)
# Si no tiene 2FA, login directo
if not user.mfa_enabled:
return await self._complete_login(user, ip_address, user_agent)
# Verificar dispositivo de confianza
device_info = self.device_svc.fp_service.generate_fingerprint(user_agent, "", "")
if self.device_svc.check_device_trusted(user.id, device_info.fingerprint):
# Dispositivo confiable, no requiere 2FA
return await self._complete_login(
user, ip_address, user_agent,
device_info=device_info
)
# Si se proporciona código TOTP, verificar
if request.totp_code:
return await self._verify_2fa_and_login(
user=user,
code=request.totp_code,
ip_address=ip_address,
user_agent=user_agent,
device_info=device_info,
trust_device=request.trust_device
)
# Requiere 2FA, emitir token temporal
temp_token = self.token_svc.generate_temp_token(
user_id=user.id,
purpose='2fa_verification',
expires_in=self.TEMP_TOKEN_EXPIRY
)
return LoginResponse(
success=True,
requires_2fa=True,
temp_token=temp_token
)
async def verify_2fa(
self,
temp_token: str,
code: str,
trust_device: bool,
ip_address: str,
user_agent: str
) -> LoginResponse:
"""
Verifica código 2FA después de login inicial.
"""
# Validar token temporal
try:
payload = self.token_svc.verify_temp_token(temp_token)
if payload.get('purpose') != '2fa_verification':
raise ValueError("Token inválido")
user_id = payload['user_id']
except Exception:
return LoginResponse(
success=False,
error="Sesión expirada. Inicia sesión nuevamente."
)
# Obtener usuario
user = await self.user_svc.get_by_id(user_id)
if not user:
return LoginResponse(
success=False,
error="Usuario no encontrado"
)
device_info = self.device_svc.fp_service.generate_fingerprint(user_agent, "", "")
return await self._verify_2fa_and_login(
user=user,
code=code,
ip_address=ip_address,
user_agent=user_agent,
device_info=device_info,
trust_device=trust_device
)
async def _verify_2fa_and_login(
self,
user,
code: str,
ip_address: str,
user_agent: str,
device_info: DeviceInfo,
trust_device: bool
) -> LoginResponse:
"""Verifica código 2FA y completa login."""
# Verificar intentos de 2FA
attempts_key = f"2fa_attempts:{user.id}"
attempts = await self.rate_limiter.get_count(attempts_key)
if attempts >= self.MAX_2FA_ATTEMPTS:
# Bloquear cuenta
await self.user_svc.lock_account(
user.id,
duration_seconds=self.LOCKOUT_DURATION,
reason='2fa_max_attempts'
)
return LoginResponse(
success=False,
error="Demasiados intentos. Cuenta bloqueada temporalmente."
)
# Verificar código
result = self.totp_svc.verify_totp(
encrypted_secret=user.mfa_secret,
code=code,
hashed_backup_codes=user.backup_codes
)
if not result.valid:
await self.rate_limiter.increment(attempts_key)
remaining = self.MAX_2FA_ATTEMPTS - attempts - 1
# Log intento fallido
await self._log_mfa_event(
user_id=user.id,
event_type='totp_failed',
success=False,
ip_address=ip_address,
device_fingerprint=device_info.fingerprint
)
return LoginResponse(
success=False,
error=f"Código inválido. {remaining} intentos restantes."
)
# Si usó código de respaldo, actualizar
if result.method_used == 'backup_code':
await self.user_svc.update_backup_codes(
user.id,
remaining_count=result.remaining_backup_codes
)
await self._log_mfa_event(
user_id=user.id,
event_type='backup_code_used',
success=True,
ip_address=ip_address,
metadata={'remaining_codes': result.remaining_backup_codes}
)
# Limpiar intentos fallidos
await self.rate_limiter.reset(attempts_key)
# Marcar dispositivo como confianza si se solicita
if trust_device:
await self.device_svc.trust_device(
user_id=user.id,
device_info=device_info,
ip_address=ip_address
)
await self._log_mfa_event(
user_id=user.id,
event_type='device_trusted',
success=True,
ip_address=ip_address,
device_fingerprint=device_info.fingerprint
)
return await self._complete_login(
user, ip_address, user_agent,
device_info=device_info
)
async def _complete_login(
self,
user,
ip_address: str,
user_agent: str,
device_info: Optional[DeviceInfo] = None
) -> LoginResponse:
"""Completa el login emitiendo tokens."""
# Generar tokens
tokens = self.token_svc.generate_session_tokens(
user_id=user.id,
device_fingerprint=device_info.fingerprint if device_info else None
)
# Actualizar último login
await self.user_svc.update_last_login(
user_id=user.id,
ip_address=ip_address
)
return LoginResponse(
success=True,
requires_2fa=False,
access_token=tokens.access_token,
refresh_token=tokens.refresh_token,
user={
'id': user.id,
'email': user.email,
'name': user.name,
'mfa_enabled': user.mfa_enabled
}
)
async def _log_mfa_event(
self,
user_id: str,
event_type: str,
success: bool,
ip_address: str,
device_fingerprint: Optional[str] = None,
metadata: Optional[dict] = None
):
"""Registra evento de MFA en log de auditoría."""
await self.db.execute("""
INSERT INTO auth.mfa_audit_log (
user_id, event_type, success, ip_address,
device_fingerprint, metadata
) VALUES (
:user_id, :event_type, :success, :ip_address,
:device_fingerprint, :metadata
)
""", {
'user_id': user_id,
'event_type': event_type,
'success': success,
'ip_address': ip_address,
'device_fingerprint': device_fingerprint,
'metadata': metadata or {}
})
6. API REST
6.1 Endpoints
openapi: 3.0.3
info:
title: Two-Factor Authentication API
version: 1.0.0
paths:
/api/v1/auth/login:
post:
summary: Iniciar sesión
requestBody:
content:
application/json:
schema:
type: object
properties:
email:
type: string
format: email
password:
type: string
format: password
totp_code:
type: string
pattern: '^\d{6}$'
description: Código TOTP (opcional si 2FA habilitado)
trust_device:
type: boolean
default: false
required:
- email
- password
responses:
'200':
description: Login exitoso o requiere 2FA
content:
application/json:
schema:
oneOf:
- $ref: '#/components/schemas/LoginSuccess'
- $ref: '#/components/schemas/Requires2FA'
/api/v1/auth/verify-2fa:
post:
summary: Verificar código 2FA
requestBody:
content:
application/json:
schema:
type: object
properties:
temp_token:
type: string
code:
type: string
description: Código TOTP o código de respaldo
trust_device:
type: boolean
default: false
required:
- temp_token
- code
responses:
'200':
description: Verificación exitosa
content:
application/json:
schema:
$ref: '#/components/schemas/LoginSuccess'
/api/v1/auth/2fa/setup:
post:
summary: Iniciar configuración de 2FA
security:
- bearerAuth: []
responses:
'200':
description: Datos para configuración
content:
application/json:
schema:
$ref: '#/components/schemas/TOTPSetup'
/api/v1/auth/2fa/enable:
post:
summary: Habilitar 2FA
security:
- bearerAuth: []
requestBody:
content:
application/json:
schema:
type: object
properties:
code:
type: string
pattern: '^\d{6}$'
required:
- code
responses:
'200':
description: 2FA habilitado
content:
application/json:
schema:
type: object
properties:
success:
type: boolean
message:
type: string
backup_codes_remaining:
type: integer
/api/v1/auth/2fa/disable:
post:
summary: Deshabilitar 2FA
security:
- bearerAuth: []
requestBody:
content:
application/json:
schema:
type: object
properties:
code:
type: string
description: Código TOTP o código de respaldo
password:
type: string
format: password
required:
- code
- password
responses:
'200':
description: 2FA deshabilitado
/api/v1/auth/2fa/backup-codes:
post:
summary: Regenerar códigos de respaldo
security:
- bearerAuth: []
requestBody:
content:
application/json:
schema:
type: object
properties:
code:
type: string
pattern: '^\d{6}$'
required:
- code
responses:
'200':
description: Nuevos códigos de respaldo
content:
application/json:
schema:
type: object
properties:
backup_codes:
type: array
items:
type: string
pattern: '^[A-F0-9]{4}-[A-F0-9]{4}$'
/api/v1/auth/2fa/status:
get:
summary: Obtener estado de 2FA
security:
- bearerAuth: []
responses:
'200':
description: Estado de 2FA
content:
application/json:
schema:
$ref: '#/components/schemas/MFAStatus'
/api/v1/auth/trusted-devices:
get:
summary: Listar dispositivos de confianza
security:
- bearerAuth: []
responses:
'200':
description: Lista de dispositivos
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/TrustedDevice'
/api/v1/auth/trusted-devices/{id}:
delete:
summary: Revocar dispositivo de confianza
security:
- bearerAuth: []
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
'204':
description: Dispositivo revocado
/api/v1/auth/trusted-devices/revoke-all:
post:
summary: Revocar todos los dispositivos
security:
- bearerAuth: []
requestBody:
content:
application/json:
schema:
type: object
properties:
code:
type: string
description: Código TOTP para confirmar
required:
- code
responses:
'200':
description: Todos los dispositivos revocados
components:
schemas:
LoginSuccess:
type: object
properties:
success:
type: boolean
example: true
requires_2fa:
type: boolean
example: false
access_token:
type: string
refresh_token:
type: string
expires_in:
type: integer
example: 3600
user:
type: object
properties:
id:
type: string
format: uuid
email:
type: string
name:
type: string
mfa_enabled:
type: boolean
Requires2FA:
type: object
properties:
success:
type: boolean
example: true
requires_2fa:
type: boolean
example: true
temp_token:
type: string
description: Token temporal para verificación 2FA
expires_in:
type: integer
example: 300
description: Segundos hasta expiración
TOTPSetup:
type: object
properties:
secret:
type: string
description: Secreto en Base32 para entrada manual
qr_code_url:
type: string
description: Data URL del código QR
otpauth_uri:
type: string
description: URI OTPAuth para apps authenticator
backup_codes:
type: array
items:
type: string
pattern: '^[A-F0-9]{4}-[A-F0-9]{4}$'
description: Códigos de respaldo (mostrar una sola vez)
MFAStatus:
type: object
properties:
mfa_enabled:
type: boolean
mfa_method:
type: string
enum: [none, totp, sms, email]
setup_at:
type: string
format: date-time
backup_codes_remaining:
type: integer
trusted_devices_count:
type: integer
last_verification:
type: string
format: date-time
TrustedDevice:
type: object
properties:
id:
type: string
format: uuid
device_name:
type: string
example: "Chrome en Windows"
device_type:
type: string
enum: [mobile, desktop, tablet]
browser_name:
type: string
os_name:
type: string
last_used_at:
type: string
format: date-time
trust_expires_at:
type: string
format: date-time
is_current:
type: boolean
description: Si es el dispositivo actual
7. Seguridad y Protecciones
7.1 Rate Limiting
# Configuración de rate limiting para 2FA
RATE_LIMITS = {
'login_attempt': {
'window': 900, # 15 minutos
'max_requests': 5,
'key_format': 'login:{ip}:{email}'
},
'2fa_verification': {
'window': 300, # 5 minutos
'max_requests': 5,
'key_format': '2fa:{user_id}'
},
'2fa_setup': {
'window': 3600, # 1 hora
'max_requests': 3,
'key_format': 'setup:{user_id}'
},
'backup_regenerate': {
'window': 86400, # 24 horas
'max_requests': 2,
'key_format': 'backup:{user_id}'
}
}
7.2 Protección Contra Replay Attacks
class ReplayProtection:
"""
Previene reutilización de códigos TOTP dentro del mismo período.
"""
def __init__(self, redis_client):
self.redis = redis_client
async def check_and_mark_used(
self,
user_id: str,
code: str,
ttl_seconds: int = 30
) -> bool:
"""
Verifica si el código ya fue usado y lo marca.
Returns:
True si el código es nuevo, False si ya fue usado
"""
key = f"totp_used:{user_id}:{code}"
# SETNX retorna True si la clave no existía
is_new = await self.redis.setnx(key, "1")
if is_new:
# Establecer TTL igual al período TOTP
await self.redis.expire(key, ttl_seconds)
return True
return False
7.3 Bloqueo Progresivo
LOCKOUT_PROGRESSION = [
{'attempts': 5, 'duration': 15 * 60}, # 5 intentos: 15 minutos
{'attempts': 10, 'duration': 60 * 60}, # 10 intentos: 1 hora
{'attempts': 15, 'duration': 24 * 60 * 60} # 15 intentos: 24 horas
]
async def get_lockout_duration(failed_attempts: int) -> int:
"""Determina duración de bloqueo según intentos fallidos."""
for level in reversed(LOCKOUT_PROGRESSION):
if failed_attempts >= level['attempts']:
return level['duration']
return 0
7.4 Notificaciones de Seguridad
class SecurityNotifications:
"""Notificaciones de eventos de seguridad."""
async def notify_2fa_enabled(self, user, device_info, ip_address):
"""Notifica al usuario que 2FA fue habilitado."""
await self.email_service.send(
to=user.email,
template='security/2fa_enabled',
context={
'user_name': user.name,
'device': device_info.device_name,
'ip': ip_address,
'timestamp': datetime.now(timezone.utc).isoformat(),
'disable_link': self._generate_disable_link(user.id)
}
)
async def notify_backup_code_used(self, user, remaining_codes):
"""Alerta sobre uso de código de respaldo."""
await self.email_service.send(
to=user.email,
template='security/backup_code_used',
context={
'user_name': user.name,
'remaining_codes': remaining_codes,
'regenerate_link': self._generate_regenerate_link(user.id)
}
)
async def notify_new_device_login(self, user, device_info, location):
"""Notifica login desde nuevo dispositivo."""
await self.email_service.send(
to=user.email,
template='security/new_device_login',
context={
'user_name': user.name,
'device': device_info.device_name,
'location': f"{location.get('city')}, {location.get('country')}",
'timestamp': datetime.now(timezone.utc).isoformat(),
'revoke_link': self._generate_revoke_link(user.id, device_info.fingerprint)
}
)
8. Consideraciones de UX
8.1 Flujo de Configuración de 2FA
- Mostrar beneficios antes de iniciar configuración
- Código QR prominente con opción de entrada manual
- Verificación inmediata del código antes de habilitar
- Mostrar códigos de respaldo con opción de descarga/copia
- Confirmación explícita de que los códigos fueron guardados
8.2 Mensajes de Error Claros
ERROR_MESSAGES = {
'invalid_code': 'Código incorrecto. Verifica que coincida con tu app.',
'code_expired': 'El código ha expirado. Usa el código actual de tu app.',
'too_many_attempts': 'Demasiados intentos. Tu cuenta está bloqueada por {minutes} minutos.',
'session_expired': 'Tu sesión expiró. Inicia sesión nuevamente.',
'backup_code_used': 'Este código de respaldo ya fue utilizado.',
'no_backup_codes': 'No tienes códigos de respaldo disponibles. Contacta soporte.',
}
8.3 Opciones de Recuperación
┌─────────────────────────────────────────────────────────────────────┐
│ OPCIONES DE RECUPERACIÓN │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Código de Respaldo │
│ └─ Usuario ingresa uno de los 10 códigos generados │
│ │
│ 2. Email de Recuperación (si configurado) │
│ └─ Enviar código temporal al email alternativo │
│ │
│ 3. SMS de Recuperación (si configurado) │
│ └─ Enviar código temporal al teléfono registrado │
│ │
│ 4. Soporte Técnico │
│ └─ Verificación de identidad manual │
│ └─ Requiere documentación (ID, selfie, etc.) │
│ └─ Proceso de 24-48 horas │
│ │
└─────────────────────────────────────────────────────────────────────┘
9. Índices y Optimización
-- Índices para rendimiento de autenticación 2FA
CREATE INDEX idx_users_mfa_lookup
ON core.users(email, mfa_enabled, mfa_secret)
WHERE is_active = true;
CREATE INDEX idx_trusted_devices_lookup
ON auth.trusted_devices(user_id, device_fingerprint)
WHERE is_active = true;
CREATE INDEX idx_verification_codes_lookup
ON auth.verification_codes(user_id, code_type, expires_at)
WHERE used_at IS NULL;
CREATE INDEX idx_mfa_audit_recent
ON auth.mfa_audit_log(user_id, created_at DESC)
WHERE created_at > NOW() - INTERVAL '30 days';
-- Cleanup job para códigos expirados
CREATE OR REPLACE FUNCTION auth.cleanup_expired_codes()
RETURNS void AS $$
BEGIN
DELETE FROM auth.verification_codes
WHERE expires_at < NOW() - INTERVAL '1 day';
DELETE FROM auth.trusted_devices
WHERE trust_expires_at < NOW() - INTERVAL '7 days'
AND trust_expires_at IS NOT NULL;
END;
$$ LANGUAGE plpgsql;
10. Checklist de Implementación
10.1 Componentes Requeridos
- Servicio TOTP (generación, verificación)
- Generación de códigos de respaldo
- Encriptación AES-256-GCM para secretos
- Rate limiting en endpoints
- Protección contra replay attacks
- Sistema de dispositivos de confianza
- Generación de fingerprint de dispositivo
- Bloqueo progresivo de cuenta
- Log de auditoría MFA
- Notificaciones por email
- API REST completa
- Validación de entrada
10.2 Tests Requeridos
- Generación y verificación TOTP
- Tolerancia de ventana de tiempo (±30s)
- Uso y consumo de códigos de respaldo
- Prevención de replay attacks
- Rate limiting
- Bloqueo progresivo
- Encriptación/desencriptación de secretos
- Flujo completo de login con 2FA
- Configuración y deshabilitación de 2FA
11. Referencias
11.1 RFCs y Estándares
- RFC 6238: TOTP Algorithm
- RFC 4226: HOTP Algorithm
- RFC 4648: Base32 Encoding
11.2 OWASP
- OWASP Authentication Cheat Sheet
- OWASP Multi-Factor Authentication Cheat Sheet
- OWASP Session Management Cheat Sheet
11.3 Librerías Recomendadas
- Python:
pyotp,cryptography - JavaScript:
speakeasy,otplib - QR Code:
qrcode,qr-image
Documento generado para ERP-SUITE Versión: 1.0 Fecha: 2025-01-09