# DIRECTIVA: ML Services - Estándares Python y Miniconda **ID**: DOQ-003 **Tipo**: Desarrollo ML **Alcance**: Trading Platform (trading-platform) - ML Engine **Versión**: 1.1 **Fecha**: 2025-12-07 **Migrado desde**: trading ## Propósito Establecer estándares y buenas prácticas para el desarrollo de servicios de Machine Learning en Python, incluyendo manejo de ambientes con Miniconda. ## Gestión de Ambientes con Miniconda ### Instalación Verificada ```bash # Miniconda ya instalado en: ~/miniconda3/bin/conda # Verificar instalación $HOME/miniconda3/bin/conda --version ``` ### Crear Ambiente del Proyecto ```bash # Crear ambiente desde archivo conda env create -f apps/ml-engine/environment.yml # O crear manualmente conda create -n trading-ml python=3.11 -y conda activate trading-ml ``` ### Archivo environment.yml ```yaml name: trading-ml channels: - pytorch - nvidia - conda-forge - defaults dependencies: - python=3.11 - pytorch=2.5.0 - pytorch-cuda=12.1 - cudatoolkit=12.1 - numpy=1.26 - pandas=2.2 - scikit-learn=1.5 - matplotlib=3.9 - seaborn=0.13 - jupyter=1.0 - pip - pip: - fastapi>=0.115.0 - uvicorn[standard]>=0.32.0 - websockets>=13.0 - xgboost>=2.1.0 - ta-lib>=0.4.32 - pandas-ta>=0.3.14b - pymysql>=1.1.0 - sqlalchemy>=2.0.0 - pydantic>=2.10.0 - loguru>=0.7.0 - python-dotenv>=1.0.0 - httpx>=0.28.0 - joblib>=1.4.0 - pyyaml>=6.0.0 ``` ### Comandos de Ambiente ```bash # Activar conda activate trading-ml # Desactivar conda deactivate # Listar ambientes conda env list # Exportar ambiente conda env export > environment.yml # Actualizar ambiente conda env update -f environment.yml --prune # Eliminar ambiente conda env remove -n trading-ml ``` ## Estructura de Código Python ### Estructura de Directorios ``` apps/ml-engine/ ├── src/ │ ├── __init__.py │ ├── api/ │ │ ├── __init__.py │ │ ├── server.py # FastAPI app principal │ │ ├── routes/ │ │ │ ├── __init__.py │ │ │ ├── predict.py # /api/predict endpoints │ │ │ ├── signals.py # /api/signals endpoints │ │ │ ├── indicators.py # /api/indicators endpoints │ │ │ └── backtest.py # /api/backtest endpoints │ │ ├── schemas/ │ │ │ ├── __init__.py │ │ │ ├── prediction.py # Pydantic models │ │ │ ├── signal.py │ │ │ └── common.py │ │ └── middleware/ │ │ └── __init__.py │ ├── models/ │ │ ├── __init__.py │ │ ├── base/ │ │ │ ├── __init__.py │ │ │ ├── base_model.py │ │ │ ├── xgboost_model.py │ │ │ ├── gru_model.py │ │ │ └── transformer_model.py │ │ ├── ensemble/ │ │ │ ├── __init__.py │ │ │ └── meta_model.py │ │ └── predictors/ │ │ ├── __init__.py │ │ ├── price_predictor.py │ │ └── signal_generator.py │ ├── data/ │ │ ├── __init__.py │ │ ├── pipeline.py # ETL pipeline │ │ ├── database.py # MySQL connection │ │ ├── indicators.py # Technical indicators │ │ └── features.py # Feature engineering │ ├── training/ │ │ ├── __init__.py │ │ └── walk_forward.py │ ├── strategies/ │ │ ├── __init__.py │ │ └── amd_detector.py │ ├── visualization/ │ │ ├── __init__.py │ │ └── dashboard.py │ └── utils/ │ ├── __init__.py │ ├── logging.py │ └── validators.py ├── config/ │ ├── trading.yaml │ ├── models.yaml │ └── database.yaml ├── tests/ │ ├── __init__.py │ ├── test_api/ │ ├── test_models/ │ └── test_data/ ├── notebooks/ │ └── exploration.ipynb ├── .env ├── .env.example ├── environment.yml ├── requirements.txt ├── pyproject.toml └── README.md ``` ## Estándares de Código Python ### Estilo de Código ```python # PEP 8 + Black + isort # Configuración en pyproject.toml [tool.black] line-length = 100 target-version = ['py311'] include = '\.pyi?$' exclude = ''' /( \.git | \.hg | \.mypy_cache | \.tox | \.venv | _build | buck-out | build | dist )/ ''' [tool.isort] profile = "black" line_length = 100 skip_gitignore = true [tool.mypy] python_version = "3.11" warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true ``` ### Nomenclatura ```python # Variables y funciones: snake_case prediction_result = get_price_prediction(symbol) # Clases: PascalCase class MaxMinPricePredictor: pass # Constantes: UPPER_SNAKE_CASE DEFAULT_HORIZONS = ["scalping", "intraday", "swing", "position"] MAX_SEQUENCE_LENGTH = 128 # Archivos: snake_case.py # price_predictor.py, data_pipeline.py # Packages: snake_case # src/models/base/ # Private: prefijo _ def _calculate_internal_metric(self): pass ``` ### Type Hints Obligatorios ```python from typing import Optional, List, Dict, Any, Union from pydantic import BaseModel def predict( symbol: str, horizon: str = "all", confidence_threshold: float = 0.5 ) -> Dict[str, Any]: """ Genera predicción de precios para un símbolo. Args: symbol: Símbolo del activo (e.g., "BTCUSDT") horizon: Horizonte temporal ("scalping", "intraday", etc.) confidence_threshold: Umbral mínimo de confianza Returns: Diccionario con predicciones por horizonte Raises: ValueError: Si el símbolo no es válido ModelNotLoadedError: Si el modelo no está cargado """ pass ``` ### Pydantic para Validación ```python from pydantic import BaseModel, Field, validator from typing import Optional, List from enum import Enum class HorizonEnum(str, Enum): SCALPING = "scalping" INTRADAY = "intraday" SWING = "swing" POSITION = "position" ALL = "all" class PredictionRequest(BaseModel): symbol: str = Field(..., min_length=1, max_length=20) horizon: HorizonEnum = HorizonEnum.ALL confidence_threshold: float = Field(0.5, ge=0.0, le=1.0) @validator('symbol') def symbol_uppercase(cls, v): return v.upper() class PredictionResponse(BaseModel): symbol: str timestamp: str predictions: Dict[str, HorizonPrediction] model_version: str class Config: json_schema_extra = { "example": { "symbol": "BTCUSDT", "timestamp": "2025-12-05T10:30:00Z", "predictions": { "scalping": { "high": 98500.0, "low": 97800.0, "confidence": 0.72 } }, "model_version": "1.0.0" } } ``` ### Logging con Loguru ```python from loguru import logger import sys # Configuración logger.remove() logger.add( sys.stderr, format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", level="INFO" ) logger.add( "logs/ml_services_{time}.log", rotation="100 MB", retention="30 days", level="DEBUG" ) # Uso logger.info(f"Prediction requested for {symbol}") logger.debug(f"Model inputs: {features}") logger.warning(f"Low confidence prediction: {confidence}") logger.error(f"Model failed: {error}") ``` ### Configuración con YAML ```python import yaml from pathlib import Path from pydantic import BaseSettings class Settings(BaseSettings): # Database mysql_host: str = "localhost" mysql_port: int = 3306 mysql_database: str = "trading_trading" mysql_user: str = "trading_user" mysql_password: str = "" # API api_host: str = "0.0.0.0" api_port: int = 8000 # Models model_path: str = "models/" class Config: env_file = ".env" def load_yaml_config(config_name: str) -> dict: config_path = Path("config") / f"{config_name}.yaml" with open(config_path) as f: return yaml.safe_load(f) ``` ### Manejo de Errores ```python # Excepciones personalizadas class MLServiceError(Exception): """Base exception for ML Service""" pass class ModelNotLoadedError(MLServiceError): """Model is not loaded""" pass class InvalidSymbolError(MLServiceError): """Invalid trading symbol""" pass class PredictionError(MLServiceError): """Error during prediction""" pass # Uso con contexto from fastapi import HTTPException, status @app.get("/api/predict/{symbol}") async def predict(symbol: str): try: result = predictor.predict(symbol) return result except InvalidSymbolError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) except ModelNotLoadedError as e: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Model not available" ) except Exception as e: logger.error(f"Unexpected error: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error" ) ``` ## FastAPI Server Template ```python # src/api/server.py from fastapi import FastAPI, WebSocket from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from loguru import logger from src.api.routes import predict, signals, indicators, backtest from src.models.predictors.price_predictor import MaxMinPricePredictor from src.data.database import DatabaseManager from src.data.pipeline import DataPipeline # Global instances predictor: MaxMinPricePredictor = None db_manager: DatabaseManager = None pipeline: DataPipeline = None @asynccontextmanager async def lifespan(app: FastAPI): """Initialize and cleanup resources""" global predictor, db_manager, pipeline logger.info("Starting ML Services...") # Initialize db_manager = DatabaseManager() pipeline = DataPipeline() predictor = MaxMinPricePredictor() predictor.load_models() logger.info("ML Services ready") yield # Cleanup logger.info("Shutting down ML Services...") if db_manager: db_manager.close() app = FastAPI( title="Trading PlatformIA ML Services", description="Machine Learning services for trading predictions", version="1.0.0", lifespan=lifespan ) # CORS app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "http://localhost:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Routes app.include_router(predict.router, prefix="/api", tags=["Predictions"]) app.include_router(signals.router, prefix="/api", tags=["Signals"]) app.include_router(indicators.router, prefix="/api", tags=["Indicators"]) app.include_router(backtest.router, prefix="/api", tags=["Backtesting"]) @app.get("/health") async def health_check(): return { "status": "healthy", "model_loaded": predictor is not None, "db_connected": db_manager.is_connected() if db_manager else False } @app.websocket("/ws/{symbol}") async def websocket_endpoint(websocket: WebSocket, symbol: str): await websocket.accept() try: while True: # Stream predictions prediction = predictor.predict(symbol) await websocket.send_json(prediction) await asyncio.sleep(5) # Update every 5 seconds except WebSocketDisconnect: logger.info(f"WebSocket disconnected for {symbol}") ``` ## Testing ### Estructura de Tests ```python # tests/test_api/test_predict.py import pytest from fastapi.testclient import TestClient from src.api.server import app client = TestClient(app) class TestPredictEndpoint: def test_predict_valid_symbol(self): response = client.get("/api/predict/BTCUSDT") assert response.status_code == 200 data = response.json() assert "predictions" in data assert "symbol" in data def test_predict_invalid_symbol(self): response = client.get("/api/predict/INVALID") assert response.status_code == 400 def test_predict_with_horizon(self): response = client.get("/api/predict/BTCUSDT?horizon=scalping") assert response.status_code == 200 data = response.json() assert "scalping" in data["predictions"] # tests/test_models/test_predictor.py import pytest import numpy as np from src.models.predictors.price_predictor import MaxMinPricePredictor class TestMaxMinPricePredictor: @pytest.fixture def predictor(self): return MaxMinPricePredictor(config={}) def test_predict_shape(self, predictor): # Mock data features = np.random.randn(1, 14) result = predictor._predict_raw(features) assert result.shape == (1, 8) # 4 horizons * 2 (high/low) ``` ### Comandos de Test ```bash # Ejecutar todos los tests pytest tests/ -v # Con coverage pytest tests/ --cov=src --cov-report=html # Solo tests de API pytest tests/test_api/ -v # Marker específico pytest -m "slow" -v ``` ## Comandos de Desarrollo ```bash # Activar ambiente conda activate trading-ml # Iniciar servidor de desarrollo uvicorn src.api.server:app --reload --host 0.0.0.0 --port 8000 # Formatear código black src/ tests/ isort src/ tests/ # Type checking mypy src/ # Linting flake8 src/ tests/ # Tests pytest tests/ -v # Generar requirements desde conda pip freeze > requirements.txt ``` ## Variables de Entorno (.env) ```env # Database - MySQL (Trading Data) MYSQL_HOST=localhost MYSQL_PORT=3306 MYSQL_DATABASE=trading_trading MYSQL_USER=trading_user MYSQL_PASSWORD=secure_password # API Configuration API_HOST=0.0.0.0 API_PORT=8000 API_WORKERS=4 # Models MODEL_PATH=./models MODEL_VERSION=1.0.0 # Logging LOG_LEVEL=INFO LOG_PATH=./logs # GPU CUDA_VISIBLE_DEVICES=0 USE_GPU=true # Security (comunicación con NestJS) INTERNAL_API_KEY=your_internal_api_key NESTJS_URL=http://localhost:3000 ``` ## Checklist de Desarrollo ML - [ ] Ambiente conda creado y activado - [ ] Dependencias instaladas (environment.yml) - [ ] Estructura de directorios completa - [ ] Type hints en todas las funciones públicas - [ ] Pydantic schemas para request/response - [ ] Logging configurado con loguru - [ ] Tests unitarios escritos - [ ] Código formateado con black/isort - [ ] Documentación de API con FastAPI/Swagger - [ ] Variables de entorno configuradas - [ ] Health check endpoint funcionando