inmobiliaria-analytics/docs/01-fase-alcance-inicial/IAI-008-ml-analytics/especificaciones/ET-ML-001-avm.md
rckrdmrd f570727617 feat: Documentation and orchestration updates
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 05:35:40 -06:00

37 KiB

id title type epic status version project created_date updated_date
ET-ML-avm Especificacion Tecnica - Modelo de Valuacion Automatizada (AVM) Technical Specification IAI-008 Draft 1.0 inmobiliaria-analytics 2026-01-04 2026-01-04

ET-IA-008-avm: Modelo de Valuacion Automatizada (AVM)


1. Resumen

Sistema de Machine Learning para estimar el valor de mercado de propiedades inmobiliarias basado en caracteristicas fisicas, ubicacion, condiciones de mercado y comparables recientes.


2. Arquitectura del Sistema

┌─────────────────────────────────────────────────────────────────────┐
│                        AVM PIPELINE                                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │   Feature   │───▶│   Model     │───▶│   Post      │             │
│  │   Engine    │    │   Ensemble  │    │   Process   │             │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│        │                   │                   │                    │
│        ▼                   ▼                   ▼                    │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │  Property   │    │  XGBoost    │    │  Confidence │             │
│  │  Features   │    │  LightGBM   │    │  Intervals  │             │
│  │  Location   │    │  CatBoost   │    │  SHAP       │             │
│  │  Market     │    │  Averaging  │    │  Comparable │             │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │    FastAPI      │
                    │    Endpoint     │
                    └─────────────────┘

3. Features del Modelo

3.1 Categorias de Features

# src/ml/avm/features.py
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum

class FeatureCategory(Enum):
    PHYSICAL = "physical"
    LOCATION = "location"
    TEMPORAL = "temporal"
    MARKET = "market"
    DERIVED = "derived"

@dataclass
class PropertyFeatures:
    """Features de la propiedad fisica"""
    property_type: str           # casa, departamento, etc.
    constructed_area_m2: float
    land_area_m2: Optional[float]
    bedrooms: int
    bathrooms: float
    parking_spaces: int
    floors: int
    year_built: Optional[int]
    has_pool: bool
    has_garden: bool
    has_gym: bool
    has_security: bool
    has_elevator: bool
    amenities_count: int

@dataclass
class LocationFeatures:
    """Features de ubicacion"""
    neighborhood: str
    municipality: str
    latitude: float
    longitude: float
    distance_to_center_km: float
    distance_to_metro_km: Optional[float]
    distance_to_park_km: float
    distance_to_school_km: float
    distance_to_hospital_km: float
    walk_score: float            # 0-100
    crime_index: float           # 0-100 (mayor = mas seguro)
    noise_level: float           # 0-100
    avg_income_zone: float       # ingreso promedio zona

@dataclass
class TemporalFeatures:
    """Features temporales"""
    month: int
    quarter: int
    year: int
    days_since_listing: int
    is_holiday_season: bool
    inflation_rate: float
    interest_rate: float

@dataclass
class MarketFeatures:
    """Features de mercado"""
    avg_price_m2_neighborhood: float
    median_price_m2_neighborhood: float
    price_trend_3m: float        # % cambio ultimos 3 meses
    price_trend_12m: float       # % cambio ultimo ano
    inventory_count: int         # propiedades activas en zona
    absorption_rate: float       # ventas/inventario
    days_on_market_avg: float    # dias promedio en mercado
    comparable_count: int        # num comparables encontrados
    supply_demand_ratio: float

@dataclass
class DerivedFeatures:
    """Features derivados/calculados"""
    price_per_m2_estimated: float
    age_years: Optional[int]
    bathroom_bedroom_ratio: float
    parking_per_bedroom: float
    area_per_bedroom: float
    is_new_construction: bool    # < 2 anos
    is_premium_zone: bool
    relative_size: float         # vs promedio zona
    quality_score: float         # 0-100 basado en amenidades

3.2 Feature Engineering Pipeline

# src/ml/avm/feature_engineering.py
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from geopy.distance import geodesic

class PropertyFeatureEngineer(BaseEstimator, TransformerMixin):
    """Pipeline de ingenieria de features"""

    def __init__(self, market_data_service):
        self.market_data = market_data_service
        self.city_center = (20.6736, -103.3927)  # Guadalajara centro

    def fit(self, X, y=None):
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        df = X.copy()

        # 1. Features derivados de propiedad
        df = self._add_property_derived(df)

        # 2. Features de ubicacion
        df = self._add_location_features(df)

        # 3. Features de mercado
        df = self._add_market_features(df)

        # 4. Features temporales
        df = self._add_temporal_features(df)

        # 5. Encoding categoricos
        df = self._encode_categoricals(df)

        return df

    def _add_property_derived(self, df: pd.DataFrame) -> pd.DataFrame:
        # Edad de la propiedad
        current_year = pd.Timestamp.now().year
        df['age_years'] = np.where(
            df['year_built'].notna(),
            current_year - df['year_built'],
            np.nan
        )

        # Ratios
        df['bathroom_bedroom_ratio'] = df['bathrooms'] / df['bedrooms'].clip(lower=1)
        df['parking_per_bedroom'] = df['parking_spaces'] / df['bedrooms'].clip(lower=1)
        df['area_per_bedroom'] = df['constructed_area_m2'] / df['bedrooms'].clip(lower=1)

        # Flags
        df['is_new_construction'] = df['age_years'] <= 2
        df['has_land'] = df['land_area_m2'].notna() & (df['land_area_m2'] > 0)

        # Quality score basado en amenidades
        amenity_cols = ['has_pool', 'has_garden', 'has_gym', 'has_security', 'has_elevator']
        df['quality_score'] = df[amenity_cols].sum(axis=1) * 20

        # Log transforms para areas
        df['log_constructed_area'] = np.log1p(df['constructed_area_m2'])
        df['log_land_area'] = np.log1p(df['land_area_m2'].fillna(0))

        return df

    def _add_location_features(self, df: pd.DataFrame) -> pd.DataFrame:
        # Distancia al centro
        def calc_distance(row):
            if pd.isna(row['latitude']) or pd.isna(row['longitude']):
                return np.nan
            return geodesic(
                (row['latitude'], row['longitude']),
                self.city_center
            ).kilometers

        df['distance_to_center_km'] = df.apply(calc_distance, axis=1)

        # Zona premium (colonias especificas)
        premium_zones = [
            'providencia', 'americana', 'lafayette', 'country',
            'puerta de hierro', 'real', 'bugambilias', 'chapalita'
        ]
        df['is_premium_zone'] = df['neighborhood'].str.lower().isin(premium_zones)

        # Cluster geografico (usando municipio como proxy)
        municipality_encoding = {
            'zapopan': 1.2,
            'guadalajara': 1.0,
            'tlaquepaque': 0.85,
            'tonala': 0.75,
            'tlajomulco': 0.8,
        }
        df['municipality_factor'] = df['municipality'].str.lower().map(municipality_encoding).fillna(0.9)

        return df

    def _add_market_features(self, df: pd.DataFrame) -> pd.DataFrame:
        # Obtener datos de mercado por zona
        for idx, row in df.iterrows():
            market_stats = self.market_data.get_zone_stats(
                neighborhood=row['neighborhood'],
                property_type=row['property_type']
            )

            df.at[idx, 'avg_price_m2_zone'] = market_stats.get('avg_price_m2', np.nan)
            df.at[idx, 'median_price_m2_zone'] = market_stats.get('median_price_m2', np.nan)
            df.at[idx, 'price_trend_3m'] = market_stats.get('trend_3m', 0)
            df.at[idx, 'inventory_zone'] = market_stats.get('inventory', 0)
            df.at[idx, 'days_on_market_zone'] = market_stats.get('avg_dom', 60)

        # Tamano relativo vs zona
        df['relative_size'] = df['constructed_area_m2'] / df['avg_price_m2_zone'].clip(lower=1)

        return df

    def _add_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame:
        now = pd.Timestamp.now()

        df['month'] = now.month
        df['quarter'] = now.quarter
        df['year'] = now.year

        # Temporada alta (enero-marzo, septiembre-noviembre)
        df['is_high_season'] = df['month'].isin([1, 2, 3, 9, 10, 11])

        # Indicadores macroeconomicos actuales (mock - usar API real)
        df['inflation_rate'] = 4.5  # %
        df['interest_rate'] = 11.0  # %

        return df

    def _encode_categoricals(self, df: pd.DataFrame) -> pd.DataFrame:
        # Target encoding para neighborhood (usar valores precalculados)
        neighborhood_means = self.market_data.get_neighborhood_price_means()
        df['neighborhood_encoded'] = df['neighborhood'].map(neighborhood_means)
        df['neighborhood_encoded'] = df['neighborhood_encoded'].fillna(
            neighborhood_means.mean()
        )

        # One-hot para property_type
        property_dummies = pd.get_dummies(
            df['property_type'],
            prefix='type',
            drop_first=True
        )
        df = pd.concat([df, property_dummies], axis=1)

        return df


def create_feature_pipeline(market_service) -> Pipeline:
    """Crear pipeline completo de features"""

    # Features numericos
    numeric_features = [
        'constructed_area_m2', 'land_area_m2', 'bedrooms', 'bathrooms',
        'parking_spaces', 'latitude', 'longitude', 'distance_to_center_km',
        'age_years', 'quality_score', 'avg_price_m2_zone', 'price_trend_3m'
    ]

    # Features categoricos
    categorical_features = ['property_type', 'municipality']

    # Preprocessor
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), numeric_features),
            ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
        ],
        remainder='passthrough'
    )

    pipeline = Pipeline([
        ('feature_engineer', PropertyFeatureEngineer(market_service)),
        ('preprocessor', preprocessor),
    ])

    return pipeline

4. Modelo Ensemble

4.1 Arquitectura del Ensemble

# src/ml/avm/ensemble.py
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.model_selection import cross_val_predict
import xgboost as xgb
import lightgbm as lgb
from catboost import CatBoostRegressor
import joblib

class AVMEnsemble(BaseEstimator, RegressorMixin):
    """Ensemble de modelos para valuacion automatizada"""

    def __init__(
        self,
        weights: Optional[Dict[str, float]] = None,
        use_stacking: bool = False
    ):
        self.weights = weights or {
            'xgboost': 0.35,
            'lightgbm': 0.35,
            'catboost': 0.30,
        }
        self.use_stacking = use_stacking
        self.models = {}
        self.meta_model = None

    def _create_models(self) -> Dict[str, BaseEstimator]:
        """Crear modelos base del ensemble"""

        models = {
            'xgboost': xgb.XGBRegressor(
                n_estimators=500,
                max_depth=8,
                learning_rate=0.05,
                subsample=0.8,
                colsample_bytree=0.8,
                reg_alpha=0.1,
                reg_lambda=1.0,
                random_state=42,
                n_jobs=-1,
            ),

            'lightgbm': lgb.LGBMRegressor(
                n_estimators=500,
                max_depth=10,
                learning_rate=0.05,
                num_leaves=31,
                subsample=0.8,
                colsample_bytree=0.8,
                reg_alpha=0.1,
                reg_lambda=1.0,
                random_state=42,
                n_jobs=-1,
                verbose=-1,
            ),

            'catboost': CatBoostRegressor(
                iterations=500,
                depth=8,
                learning_rate=0.05,
                l2_leaf_reg=3,
                random_seed=42,
                verbose=False,
            ),
        }

        return models

    def fit(self, X: np.ndarray, y: np.ndarray) -> 'AVMEnsemble':
        """Entrenar ensemble"""

        self.models = self._create_models()

        if self.use_stacking:
            # Stacking: usar predicciones OOF como meta-features
            meta_features = np.zeros((len(y), len(self.models)))

            for i, (name, model) in enumerate(self.models.items()):
                # Predicciones out-of-fold
                oof_preds = cross_val_predict(
                    model, X, y, cv=5, n_jobs=-1
                )
                meta_features[:, i] = oof_preds

                # Entrenar en todo el dataset
                model.fit(X, y)

            # Meta-modelo
            from sklearn.linear_model import Ridge
            self.meta_model = Ridge(alpha=1.0)
            self.meta_model.fit(meta_features, y)

        else:
            # Simple averaging
            for name, model in self.models.items():
                model.fit(X, y)

        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Prediccion del ensemble"""

        predictions = {}
        for name, model in self.models.items():
            predictions[name] = model.predict(X)

        if self.use_stacking:
            meta_features = np.column_stack(list(predictions.values()))
            return self.meta_model.predict(meta_features)
        else:
            # Weighted average
            weighted_sum = np.zeros(len(X))
            for name, preds in predictions.items():
                weighted_sum += preds * self.weights[name]
            return weighted_sum

    def predict_with_uncertainty(
        self,
        X: np.ndarray
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Prediccion con intervalos de confianza"""

        predictions = []
        for name, model in self.models.items():
            predictions.append(model.predict(X))

        predictions = np.array(predictions)

        # Media y desviacion estandar de modelos
        mean_pred = np.mean(predictions, axis=0)
        std_pred = np.std(predictions, axis=0)

        # Intervalos de confianza (95%)
        lower = mean_pred - 1.96 * std_pred
        upper = mean_pred + 1.96 * std_pred

        return mean_pred, lower, upper

    def get_feature_importance(self) -> pd.DataFrame:
        """Obtener importancia de features promediada"""

        importances = []

        for name, model in self.models.items():
            if hasattr(model, 'feature_importances_'):
                imp = model.feature_importances_
                importances.append(imp)

        if not importances:
            return pd.DataFrame()

        avg_importance = np.mean(importances, axis=0)
        return pd.DataFrame({
            'importance': avg_importance
        }).sort_values('importance', ascending=False)

    def save(self, path: str):
        """Guardar modelo"""
        joblib.dump({
            'models': self.models,
            'weights': self.weights,
            'meta_model': self.meta_model,
            'use_stacking': self.use_stacking,
        }, path)

    @classmethod
    def load(cls, path: str) -> 'AVMEnsemble':
        """Cargar modelo"""
        data = joblib.load(path)
        ensemble = cls(
            weights=data['weights'],
            use_stacking=data['use_stacking']
        )
        ensemble.models = data['models']
        ensemble.meta_model = data['meta_model']
        return ensemble

4.2 Training Pipeline

# src/ml/avm/training.py
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
import mlflow
from datetime import datetime

from .ensemble import AVMEnsemble
from .feature_engineering import create_feature_pipeline
from .data_loader import PropertyDataLoader

class AVMTrainer:
    """Pipeline de entrenamiento para AVM"""

    def __init__(
        self,
        data_loader: PropertyDataLoader,
        market_service,
        mlflow_experiment: str = "avm-training"
    ):
        self.data_loader = data_loader
        self.market_service = market_service
        mlflow.set_experiment(mlflow_experiment)

    def train(
        self,
        property_types: List[str] = None,
        min_samples: int = 1000,
        test_size: float = 0.2
    ) -> Dict:
        """Entrenar modelo AVM"""

        with mlflow.start_run(run_name=f"avm-{datetime.now().strftime('%Y%m%d_%H%M')}"):
            # 1. Cargar datos
            df = self.data_loader.load_training_data(
                property_types=property_types,
                min_samples=min_samples
            )

            mlflow.log_param("n_samples", len(df))
            mlflow.log_param("property_types", property_types)

            # 2. Feature engineering
            feature_pipeline = create_feature_pipeline(self.market_service)
            X = feature_pipeline.fit_transform(df)
            y = df['price'].values

            # Log feature names
            feature_names = self._get_feature_names(feature_pipeline, df)
            mlflow.log_param("n_features", len(feature_names))

            # 3. Split
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=test_size, random_state=42
            )

            # 4. Entrenar ensemble
            ensemble = AVMEnsemble(use_stacking=True)
            ensemble.fit(X_train, y_train)

            # 5. Evaluar
            metrics = self._evaluate(ensemble, X_train, y_train, X_test, y_test)

            for name, value in metrics.items():
                mlflow.log_metric(name, value)

            # 6. Guardar modelo
            model_path = f"models/avm_{datetime.now().strftime('%Y%m%d')}.joblib"
            ensemble.save(model_path)
            mlflow.log_artifact(model_path)

            # 7. Guardar feature pipeline
            import joblib
            pipeline_path = "models/feature_pipeline.joblib"
            joblib.dump(feature_pipeline, pipeline_path)
            mlflow.log_artifact(pipeline_path)

            return {
                'metrics': metrics,
                'model_path': model_path,
                'feature_importance': ensemble.get_feature_importance()
            }

    def _evaluate(
        self,
        model: AVMEnsemble,
        X_train: np.ndarray,
        y_train: np.ndarray,
        X_test: np.ndarray,
        y_test: np.ndarray
    ) -> Dict[str, float]:
        """Evaluar modelo"""

        y_pred_train = model.predict(X_train)
        y_pred_test = model.predict(X_test)

        # MAE
        mae_train = mean_absolute_error(y_train, y_pred_train)
        mae_test = mean_absolute_error(y_test, y_pred_test)

        # MAPE
        mape_train = mean_absolute_percentage_error(y_train, y_pred_train)
        mape_test = mean_absolute_percentage_error(y_test, y_pred_test)

        # Mediana de error absoluto
        median_ae_test = np.median(np.abs(y_test - y_pred_test))

        # Porcentaje dentro de 10% del valor real
        within_10pct = np.mean(np.abs(y_test - y_pred_test) / y_test < 0.10)
        within_15pct = np.mean(np.abs(y_test - y_pred_test) / y_test < 0.15)

        # R2
        from sklearn.metrics import r2_score
        r2_test = r2_score(y_test, y_pred_test)

        return {
            'mae_train': mae_train,
            'mae_test': mae_test,
            'mape_train': mape_train * 100,
            'mape_test': mape_test * 100,
            'median_ae_test': median_ae_test,
            'within_10pct': within_10pct * 100,
            'within_15pct': within_15pct * 100,
            'r2_test': r2_test,
        }

    def _get_feature_names(self, pipeline, df) -> List[str]:
        """Obtener nombres de features del pipeline"""
        # Simplificado - en produccion seria mas robusto
        return list(df.columns)

5. Explicabilidad con SHAP

# src/ml/avm/explainability.py
import shap
import numpy as np
import pandas as pd
from typing import Dict, List

class AVMExplainer:
    """Explicabilidad de valuaciones usando SHAP"""

    def __init__(self, model, feature_names: List[str]):
        self.model = model
        self.feature_names = feature_names
        self.explainer = None

    def initialize_explainer(self, X_background: np.ndarray):
        """Inicializar SHAP explainer con datos de background"""
        # Usar TreeExplainer para modelos basados en arboles
        if hasattr(self.model, 'models'):
            # Para ensemble, usar el primer modelo
            base_model = list(self.model.models.values())[0]
            self.explainer = shap.TreeExplainer(base_model)
        else:
            self.explainer = shap.TreeExplainer(self.model)

    def explain(
        self,
        X: np.ndarray,
        feature_values: Dict[str, any] = None
    ) -> Dict:
        """Generar explicacion para una prediccion"""

        if self.explainer is None:
            raise ValueError("Explainer not initialized. Call initialize_explainer first.")

        # Calcular SHAP values
        shap_values = self.explainer.shap_values(X)

        if len(X.shape) == 1:
            X = X.reshape(1, -1)
            shap_values = shap_values.reshape(1, -1)

        # Obtener base value (prediccion promedio)
        base_value = self.explainer.expected_value
        if isinstance(base_value, np.ndarray):
            base_value = base_value[0]

        # Crear explicacion estructurada
        explanations = []

        for i in range(len(X)):
            feature_impacts = []

            for j, (name, shap_val) in enumerate(
                zip(self.feature_names, shap_values[i])
            ):
                feature_val = X[i, j] if feature_values is None else feature_values.get(name, X[i, j])

                feature_impacts.append({
                    'feature': name,
                    'value': float(feature_val),
                    'shap_value': float(shap_val),
                    'impact': 'positive' if shap_val > 0 else 'negative',
                    'impact_formatted': self._format_impact(shap_val),
                })

            # Ordenar por impacto absoluto
            feature_impacts.sort(key=lambda x: abs(x['shap_value']), reverse=True)

            explanations.append({
                'base_value': float(base_value),
                'predicted_value': float(base_value + sum(shap_values[i])),
                'top_positive': [f for f in feature_impacts if f['impact'] == 'positive'][:5],
                'top_negative': [f for f in feature_impacts if f['impact'] == 'negative'][:5],
                'all_impacts': feature_impacts,
            })

        return explanations[0] if len(explanations) == 1 else explanations

    def generate_natural_language(self, explanation: Dict) -> str:
        """Generar explicacion en lenguaje natural"""

        lines = []
        predicted = explanation['predicted_value']
        base = explanation['base_value']

        lines.append(f"El valor estimado es ${predicted:,.0f} MXN")
        lines.append(f"(Valor base promedio: ${base:,.0f} MXN)")
        lines.append("")

        # Factores positivos
        if explanation['top_positive']:
            lines.append("Factores que AUMENTAN el valor:")
            for factor in explanation['top_positive'][:3]:
                lines.append(f"  + {self._humanize_feature(factor['feature'])}: "
                           f"{factor['impact_formatted']}")

        # Factores negativos
        if explanation['top_negative']:
            lines.append("")
            lines.append("Factores que REDUCEN el valor:")
            for factor in explanation['top_negative'][:3]:
                lines.append(f"  - {self._humanize_feature(factor['feature'])}: "
                           f"{factor['impact_formatted']}")

        return "\n".join(lines)

    def _format_impact(self, shap_value: float) -> str:
        """Formatear impacto en pesos"""
        prefix = "+" if shap_value > 0 else ""
        return f"{prefix}${shap_value:,.0f}"

    def _humanize_feature(self, feature: str) -> str:
        """Convertir nombre de feature a texto legible"""
        mappings = {
            'constructed_area_m2': 'Superficie construida',
            'land_area_m2': 'Superficie de terreno',
            'bedrooms': 'Numero de recamaras',
            'bathrooms': 'Numero de banos',
            'parking_spaces': 'Estacionamientos',
            'is_premium_zone': 'Ubicacion premium',
            'distance_to_center_km': 'Distancia al centro',
            'age_years': 'Antiguedad',
            'quality_score': 'Calidad/amenidades',
            'avg_price_m2_zone': 'Precio promedio de zona',
            'price_trend_3m': 'Tendencia de mercado',
            'has_pool': 'Cuenta con alberca',
            'has_garden': 'Cuenta con jardin',
        }
        return mappings.get(feature, feature.replace('_', ' ').title())

6. API de Valuacion

# src/ml/avm/api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional
import numpy as np

from .ensemble import AVMEnsemble
from .explainability import AVMExplainer
from .comparables import ComparablesFinder

app = FastAPI(title="AVM API", version="1.0.0")

# Modelos cargados al inicio
avm_model: AVMEnsemble = None
explainer: AVMExplainer = None
comparables_finder: ComparablesFinder = None

class PropertyInput(BaseModel):
    property_type: str = Field(..., example="casa")
    constructed_area_m2: float = Field(..., gt=0, example=180)
    land_area_m2: Optional[float] = Field(None, example=250)
    bedrooms: int = Field(..., ge=1, example=3)
    bathrooms: float = Field(..., ge=1, example=2.5)
    parking_spaces: int = Field(0, ge=0, example=2)
    latitude: float = Field(..., example=20.6736)
    longitude: float = Field(..., example=-103.3927)
    neighborhood: str = Field(..., example="Providencia")
    municipality: str = Field(..., example="Guadalajara")
    year_built: Optional[int] = Field(None, example=2018)
    amenities: List[str] = Field(default_factory=list)

class ValuationResponse(BaseModel):
    estimated_value: float
    confidence: float
    range_low: float
    range_high: float
    price_per_m2: float
    explanation: dict
    comparables: List[dict]

@app.post("/valuate", response_model=ValuationResponse)
async def valuate_property(property_data: PropertyInput):
    """Obtener valuacion de una propiedad"""

    try:
        # 1. Preparar features
        features = prepare_features(property_data)

        # 2. Prediccion con incertidumbre
        mean_pred, lower, upper = avm_model.predict_with_uncertainty(features)

        estimated_value = float(mean_pred[0])
        range_low = float(lower[0])
        range_high = float(upper[0])

        # 3. Calcular confianza
        confidence = calculate_confidence(
            features,
            estimated_value,
            range_low,
            range_high
        )

        # 4. Generar explicacion
        explanation = explainer.explain(features)

        # 5. Buscar comparables
        comparables = comparables_finder.find(
            property_data.dict(),
            limit=5
        )

        # 6. Precio por m2
        price_per_m2 = estimated_value / property_data.constructed_area_m2

        return ValuationResponse(
            estimated_value=round(estimated_value, -3),  # Redondear a miles
            confidence=round(confidence, 2),
            range_low=round(range_low, -3),
            range_high=round(range_high, -3),
            price_per_m2=round(price_per_m2, 0),
            explanation=explanation,
            comparables=comparables,
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/market-stats/{neighborhood}")
async def get_market_stats(neighborhood: str):
    """Obtener estadisticas de mercado por zona"""

    stats = market_service.get_zone_stats(neighborhood)

    return {
        "neighborhood": neighborhood,
        "avg_price_m2": stats.get('avg_price_m2'),
        "median_price_m2": stats.get('median_price_m2'),
        "inventory": stats.get('inventory'),
        "trend_3m": stats.get('trend_3m'),
        "trend_12m": stats.get('trend_12m'),
        "avg_days_on_market": stats.get('avg_dom'),
    }

def prepare_features(property_data: PropertyInput) -> np.ndarray:
    """Convertir input a features del modelo"""
    # Implementar transformacion
    pass

def calculate_confidence(
    features: np.ndarray,
    prediction: float,
    lower: float,
    upper: float
) -> float:
    """Calcular score de confianza 0-100"""

    # Factores que afectan confianza:
    # 1. Ancho del intervalo de confianza
    interval_width = (upper - lower) / prediction
    interval_score = max(0, 100 - interval_width * 200)

    # 2. Cantidad de comparables disponibles
    # (se calcularía con datos reales)
    comparables_score = 80  # placeholder

    # 3. Calidad de datos de entrada
    data_quality_score = 90  # placeholder

    # Promedio ponderado
    confidence = (
        interval_score * 0.4 +
        comparables_score * 0.35 +
        data_quality_score * 0.25
    )

    return min(100, max(0, confidence))

7. Busqueda de Comparables

# src/ml/avm/comparables.py
from typing import List, Dict
import numpy as np
from sqlalchemy import text
from .database import get_db_connection

class ComparablesFinder:
    """Buscar propiedades comparables"""

    def __init__(self, db_connection):
        self.db = db_connection

    def find(
        self,
        property_data: Dict,
        limit: int = 5,
        max_distance_km: float = 2.0,
        max_age_days: int = 180
    ) -> List[Dict]:
        """Encontrar propiedades comparables"""

        query = text("""
            WITH target AS (
                SELECT
                    ST_SetSRID(ST_MakePoint(:lng, :lat), 4326)::geography as location,
                    :property_type as ptype,
                    :bedrooms as beds,
                    :bathrooms as baths,
                    :area as area
            )
            SELECT
                p.id,
                p.title,
                p.price,
                p.constructed_area_m2,
                p.bedrooms,
                p.bathrooms,
                p.neighborhood,
                p.source_url,
                p.last_seen_at,
                ST_Distance(p.coordinates::geography, t.location) / 1000 as distance_km,

                -- Similarity score
                (
                    -- Penalizar por distancia
                    (1 - LEAST(ST_Distance(p.coordinates::geography, t.location) / 2000, 1)) * 30 +

                    -- Similaridad de area (dentro de 30%)
                    CASE WHEN ABS(p.constructed_area_m2 - t.area) / t.area < 0.3
                         THEN (1 - ABS(p.constructed_area_m2 - t.area) / t.area) * 25
                         ELSE 0 END +

                    -- Match de recamaras
                    CASE WHEN p.bedrooms = t.beds THEN 20
                         WHEN ABS(p.bedrooms - t.beds) = 1 THEN 10
                         ELSE 0 END +

                    -- Match de banos
                    CASE WHEN p.bathrooms = t.baths THEN 15
                         WHEN ABS(p.bathrooms - t.baths) <= 0.5 THEN 8
                         ELSE 0 END +

                    -- Recencia (mas reciente = mejor)
                    LEAST(10, 180 - EXTRACT(DAY FROM NOW() - p.last_seen_at)) / 18
                ) as similarity_score

            FROM properties p, target t
            WHERE p.property_type = t.ptype
              AND p.status IN ('active', 'sold')
              AND ST_DWithin(
                  p.coordinates::geography,
                  t.location,
                  :max_distance * 1000
              )
              AND p.last_seen_at > NOW() - INTERVAL ':max_age days'
              AND p.constructed_area_m2 BETWEEN t.area * 0.7 AND t.area * 1.3

            ORDER BY similarity_score DESC
            LIMIT :limit
        """)

        result = self.db.execute(query, {
            'lat': property_data['latitude'],
            'lng': property_data['longitude'],
            'property_type': property_data['property_type'],
            'bedrooms': property_data['bedrooms'],
            'bathrooms': property_data['bathrooms'],
            'area': property_data['constructed_area_m2'],
            'max_distance': max_distance_km,
            'max_age': max_age_days,
            'limit': limit,
        })

        comparables = []
        for row in result:
            comparables.append({
                'id': row.id,
                'title': row.title,
                'price': float(row.price),
                'price_per_m2': float(row.price / row.constructed_area_m2),
                'area_m2': float(row.constructed_area_m2),
                'bedrooms': row.bedrooms,
                'bathrooms': float(row.bathrooms),
                'neighborhood': row.neighborhood,
                'distance_km': round(row.distance_km, 2),
                'similarity_score': round(row.similarity_score, 1),
                'url': row.source_url,
                'last_seen': row.last_seen_at.isoformat(),
            })

        return comparables

8. Tests

# src/ml/avm/__tests__/test_ensemble.py
import pytest
import numpy as np
from ..ensemble import AVMEnsemble

class TestAVMEnsemble:

    @pytest.fixture
    def sample_data(self):
        np.random.seed(42)
        X = np.random.randn(100, 10)
        y = np.random.randn(100) * 1000000 + 3000000  # Precios entre 2-4M
        return X, y

    def test_fit_predict(self, sample_data):
        X, y = sample_data
        model = AVMEnsemble()
        model.fit(X, y)

        predictions = model.predict(X)

        assert len(predictions) == len(y)
        assert predictions.min() > 0

    def test_predict_with_uncertainty(self, sample_data):
        X, y = sample_data
        model = AVMEnsemble()
        model.fit(X, y)

        mean, lower, upper = model.predict_with_uncertainty(X)

        assert len(mean) == len(y)
        assert all(lower <= mean)
        assert all(mean <= upper)

    def test_stacking_ensemble(self, sample_data):
        X, y = sample_data
        model = AVMEnsemble(use_stacking=True)
        model.fit(X, y)

        predictions = model.predict(X)
        assert model.meta_model is not None

    def test_save_load(self, sample_data, tmp_path):
        X, y = sample_data
        model = AVMEnsemble()
        model.fit(X, y)

        path = tmp_path / "model.joblib"
        model.save(str(path))

        loaded = AVMEnsemble.load(str(path))
        preds_original = model.predict(X)
        preds_loaded = loaded.predict(X)

        np.testing.assert_array_almost_equal(preds_original, preds_loaded)

9. Metricas y Monitoreo

# Metricas a trackear en produccion
metrics:
  model_performance:
    - name: avm_mape
      type: gauge
      description: "Mean Absolute Percentage Error"

    - name: avm_predictions_total
      type: counter
      description: "Total valuations performed"
      labels: [property_type, zone]

    - name: avm_prediction_latency_seconds
      type: histogram
      description: "Prediction latency"
      buckets: [0.1, 0.25, 0.5, 1, 2]

    - name: avm_confidence_score
      type: histogram
      description: "Confidence scores distribution"
      buckets: [50, 60, 70, 80, 90, 100]

  data_quality:
    - name: avm_missing_features
      type: counter
      description: "Predictions with missing features"
      labels: [feature]

    - name: avm_comparables_found
      type: histogram
      description: "Number of comparables found"
      buckets: [0, 1, 3, 5, 10]

  drift:
    - name: avm_feature_drift_score
      type: gauge
      description: "Feature distribution drift"
      labels: [feature]

    - name: avm_prediction_drift_score
      type: gauge
      description: "Prediction distribution drift"

Siguiente: ET-IA-008-survival.md - Modelo de prediccion de tiempo de venta