MLOps Tracking and Empirical Benchmarking (Engineering)

Navigation:

This chapter details the software engineering behind the evaluation/ module (metrics.py, performance_analyzer.py, plotting.py, and tracker.py). While the mathematical solvers handle the internal physics of the model, these scripts act as the external “MLOps wrapper,” providing robust, production-safe telemetry to track how models actually perform on out-of-sample data.


Object-Oriented Telemetry (BenchmarkTracker)

In complex pipelines (especially evolutionary AutoML where hundreds of models are trained and discarded), passing around loose dictionaries of metrics quickly becomes unmanageable.

Architectural Choice (The State Holder): The framework introduces the BenchmarkTracker class. This Object-Oriented component acts as a unified state holder for a specific model’s lifecycle. It stores the model’s metadata (time_fit, time_predict), the raw predictions across all data splits (Train, Validation, Test), and the computed diagnostics.

When slice_and_evaluate is called, the tracker automatically partitions the contiguous prediction tensor into the correct cross-validation folds and routes them to the metric and diagnostic sub-modules, centralizing the telemetry logic in a single, auditable object.

src/tam/evaluation/tracker.py (Object-Oriented Telemetry Slicing)
    def slice_and_evaluate(self, data_dict: dict, target_col: str = 'value'):
        """
        Slices continuous predictions into respective folds and computes 
        standard metrics and MLOps diagnostics.
        """
        if self.y_pred_full is None:
            print(f"Warning: '{self.model_name}' - 'y_pred_full' not found. Cannot evaluate.")
            return

        current_idx = 0
        for split_name, df in data_dict.items():
            length = len(df)
            y_true = df[target_col].values
            y_pred = self.y_pred_full[current_idx : current_idx + length]
            
            self.predictions[split_name] = y_pred
            self.metrics[split_name] = calculate_regression_metrics(y_true, y_pred)
            self.diagnostics[split_name] = analyze_residuals(y_true, y_pred)
            
            if split_name == 'test':
                drift = detect_temporal_degradation(y_true, y_pred, metric='RMSE')
                self.diagnostics['test']['RMSE_Drift_H2_vs_H1 (%)'] = drift
                
            current_idx += length
            
        components = ['fit', 'dev', 'val']
        if all(k in data_dict for k in components) and 'train' not in data_dict:
            yt_train = np.concatenate([data_dict[k][target_col].values for k in components])
            yp_train = np.concatenate([self.predictions[k] for k in components])
            self.predictions['train'] = yp_train
            self.metrics['train'] = calculate_regression_metrics(yt_train, yp_train)

Robust Metric Calculation (Pipeline Safety)

When evaluating edge-case models generated by evolutionary pipelines, predictions can occasionally explode (producing Inf) or fail entirely (producing NaN). If a standard metric function attempts to compute the Mean Squared Error on an array containing a single NaN, the entire pipeline will crash.

Architectural Choice (Safe NumPy Vectorization): The calculate_regression_metrics function relies exclusively on pure, vectorized NumPy operations without heavy dependencies like sklearn. Crucially, it enforces a strict boolean mask (~np.isnan(y_true) & ~np.isnan(y_pred) & ~np.isinf(y_pred)) before any math is executed.

If an evolutionary algorithm proposes a mathematically unstable formula, this module safely isolates the failure, returning empty or constrained metrics rather than crashing the global optimization loop.

src/tam/evaluation/metrics.py (NaN-Safe Metric Vectorization)
def calculate_regression_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict:
    """
    Computes a comprehensive suite of regression and forecasting metrics.
    
    Args:
        y_true (np.ndarray): The ground truth target values.
        y_pred (np.ndarray): The predicted values from the model.
        
    Returns:
        dict: A dictionary containing RMSE, MAE, NMAE, SMAPE, R2, and MAPE.
    """
    if y_pred is None or len(y_pred) == 0: 
        return {}
        
    mask = ~np.isnan(y_true) & ~np.isnan(y_pred) & ~np.isinf(y_pred)
    if mask.sum() == 0: 
        return {}
    
    yt, yp = y_true[mask], y_pred[mask]
    
    mae = np.mean(np.abs(yt - yp))
    rmse = np.sqrt(np.mean((yt - yp) ** 2))
    
    ss_res = np.sum((yt - yp) ** 2)
    ss_tot = np.sum((yt - np.mean(yt)) ** 2)
    r2 = 1.0 - (ss_res / ss_tot) if ss_tot != 0 else np.nan
    
    smape = np.mean(2.0 * np.abs(yp - yt) / (np.abs(yt) + np.abs(yp) + 1e-8)) * 100.0
    nmae = mae / (np.mean(np.abs(yt)) + 1e-8)
    
    metrics = {
        'RMSE': rmse,
        'MAE': mae,
        'NMAE': nmae,
        'SMAPE': smape,
        'R2': r2
    }
    
    if not np.any(yt == 0):
        metrics['MAPE'] = np.mean(np.abs((yt - yp) / yt)) * 100.0
    else:
        metrics['MAPE'] = np.nan
        
    return metrics

MLOps Diagnostics: Residuals and Degradation

To move beyond simple point-accuracy metrics, the performance_analyzer.py module computes structural diagnostics.

Architectural Choice (Fast Proxy Statistics): Instead of importing heavy statistical libraries (like statsmodels) to run a formal Durbin-Watson test for serial correlation, the analyze_residuals function computes the Lag-1 Autocorrelation natively via np.corrcoef(res[:-1], res[1:]). This provides the exact same diagnostic signal (missing temporal features) but executes orders of magnitude faster, which is critical when analyzing hundreds of models in an AutoTAM loop.

src/tam/evaluation/performance_analyzer.py (Vectorized Residual Analysis)
def analyze_residuals(y_true: np.ndarray, y_pred: np.ndarray) -> dict:
    """
    Analyzes prediction residuals to check for bias, skewness, and autocorrelation.
    
    Calculates the Lag-1 Autocorrelation as a simplified Durbin-Watson proxy. 
    High autocorrelation indicates the model missed a temporal signal.
    
    Args:
        y_true (np.ndarray): The ground truth values.
        y_pred (np.ndarray): The predicted values.
        
    Returns:
        dict: Dictionary of calculated residual statistics.
    """
    mask = ~np.isnan(y_true) & ~np.isnan(y_pred)
    if mask.sum() == 0: 
        return {}
    
    res = y_true[mask] - y_pred[mask]
    
    if len(res) > 1:
        lag1_corr = np.corrcoef(res[:-1], res[1:])[0, 1]
    else:
        lag1_corr = np.nan
        
    return {
        'Mean Error (Bias)': np.mean(res),
        'Std Error': np.std(res),
        'Skewness': skew(res),
        'Kurtosis': kurtosis(res),
        'Lag-1 AutoCorr': lag1_corr
    }

To measure Concept Drift, the detect_temporal_degradation function slices the target arrays into strict chronological halves (midpoint = len(yt) // 2). It re-routes these sub-arrays back through the safe metric calculator to determine if the test-set error is statistically stationary or actively exploding.

src/tam/evaluation/performance_analyzer.py (Temporal Degradation Tracking)
def detect_temporal_degradation(y_true: np.ndarray, y_pred: np.ndarray, metric: str = 'RMSE') -> float:
    """
    Splits the test set in half to detect if performance is decaying over time.
    
    Args:
        y_true (np.ndarray): The ground truth values.
        y_pred (np.ndarray): The predicted values.
        metric (str): The metric to evaluate degradation against. Defaults to RMSE.
        
    Returns:
        float: The degradation ratio percentage. A value of +20.0 means 
               errors grew by 20% in the second half of the data.
    """
    from .metrics import calculate_regression_metrics
    
    mask = ~np.isnan(y_true) & ~np.isnan(y_pred)
    yt, yp = y_true[mask], y_pred[mask]
    
    if len(yt) < 4: 
        return np.nan
    
    midpoint = len(yt) // 2
    h1_metrics = calculate_regression_metrics(yt[:midpoint], yp[:midpoint])
    h2_metrics = calculate_regression_metrics(yt[midpoint:], yp[midpoint:])
    
    h1_val = h1_metrics.get(metric, np.nan)
    h2_val = h2_metrics.get(metric, np.nan)
    
    if np.isnan(h1_val) or h1_val == 0: 
        return np.nan
    
    degradation_pct = ((h2_val - h1_val) / h1_val) * 100.0
    return degradation_pct

The Dashboard Facade (plotting.py)

Visualizing the results of multiple competing models requires a dynamic layout system.

Architectural Choice (Dynamic GridSpec): The plot_benchmark_dashboard acts as a universal Facade. It dynamically adjusts its internal Matplotlib GridSpec layout based on the nature of the data.

  • If is_timeseries=True, it allocates massive wide panels to render the temporal prediction curves against the ground truth.

  • If it is cross-sectional data, it reconfigures the grid to prioritize Parity Plots (Predicted vs True scatter plots) and Residual Distribution histograms.

This abstraction ensures that data scientists can invoke a single reporting command (plot_benchmark_dashboard) and receive a presentation-ready, multi-pane diagnostic view perfectly tailored to the topology of their dataset.

src/tam/evaluation/eval_plotting.py (Dynamic Matplotlib Dashboard Layout)
def plot_benchmark_dashboard(data_dict, trackers_dict, target_col='value', date_col='date', 
                             is_timeseries=True, primary_metric='MAPE', title="Benchmark Comparison",
                             heatmap_col='month', summary_text=None, forecast_smoothing=None):
    """
    Generates a universal benchmarking dashboard using Matplotlib.
    
    Dynamically adjusts layout based on whether the data is a time-series 
    or cross-sectional, ranking models by the specified primary metric.
    """
    import matplotlib.pyplot as plt
    plt.rcParams.update({'axes.grid': True, 'grid.alpha': 0.5, 'axes.facecolor': '#fcfcfc'})
    
    if not trackers_dict: 
        return
        
    trackers_list = list(trackers_dict.values())
    models_to_plot = [t.model_name for t in trackers_list]
    tab10_colors = plt.cm.tab10.colors
    color_map = {name: tab10_colors[i % len(tab10_colors)] for i, name in enumerate(models_to_plot)}
    
    ascending = any(m in primary_metric.upper() for m in ['RMSE', 'MAE', 'MAPE', 'SMAPE', 'NMAE', 'ERROR'])
    
    sorted_trackers = sorted(
        [t for t in trackers_list if not np.isnan(t.get_metric('test', primary_metric))],
        key=lambda x: x.get_metric('test', primary_metric),
        reverse=not ascending
    )
    if not sorted_trackers: 
        return

    fig = plt.figure(figsize=(18, 12))
    
    if is_timeseries:
        gs = fig.add_gridspec(2, 2, height_ratios=[1, 1.2])
        ax_bar = fig.add_subplot(gs[0, 0])
        ax_main = fig.add_subplot(gs[0, 1])
        ax_bottom = fig.add_subplot(gs[1, :])
    else:
        gs = fig.add_gridspec(2, 2, height_ratios=[1, 1])
        ax_bar = fig.add_subplot(gs[0, 0])
        ax_main = fig.add_subplot(gs[0, 1])
        ax_bottom = fig.add_subplot(gs[1, 0]) 
        ax_text = fig.add_subplot(gs[1, 1])   
        ax_text.axis('off') 

    names = [t.model_name for t in sorted_trackers]
    scores = [t.get_metric('test', primary_metric) for t in sorted_trackers]
    
    y_pos = np.arange(len(names))
    bar_colors = [color_map[name] for name in names]
    
    ax_bar.barh(y_pos, scores, color=bar_colors, edgecolor='none')
    ax_bar.set_yticks(y_pos)
    ax_bar.set_yticklabels(names)
    ax_bar.invert_yaxis()
    ax_bar.set_xlabel(f"Test {primary_metric}")
    ax_bar.set_title(f"Model Ranking ({'Lower' if ascending else 'Higher'} is Better)", fontweight='bold')
    ax_bar.grid(axis='y', visible=False)
    for i, v in enumerate(scores): 
        ax_bar.text(v + (max(scores)*0.02), i, f"{v:.3f}", va='center', fontweight='bold')
        
    top_n = min(4, len(sorted_trackers))
    
    if is_timeseries:
        df_full = pd.concat(data_dict.values())
        plot_df = df_full.copy()
        
        if forecast_smoothing:
            plot_df = plot_df.set_index(date_col)
            if isinstance(forecast_smoothing, int):
                plot_df = plot_df.rolling(window=forecast_smoothing, min_periods=1).mean(numeric_only=True)
            elif isinstance(forecast_smoothing, str):
                plot_df = plot_df.resample(forecast_smoothing).mean(numeric_only=True)
            plot_df = plot_df.reset_index()

        ax_main.plot(plot_df[date_col], plot_df[target_col], color='gray', alpha=0.5, label='Actuals', lw=1)
        
        for i in range(top_n):
            t = sorted_trackers[i]
            if t.y_pred_full is not None:
                temp_df = df_full.copy()
                temp_df['pred'] = t.y_pred_full
                
                if forecast_smoothing:
                    temp_df = temp_df.set_index(date_col)
                    if isinstance(forecast_smoothing, int):
                        temp_df = temp_df.rolling(window=forecast_smoothing, min_periods=1).mean(numeric_only=True)
                    elif isinstance(forecast_smoothing, str):
                        temp_df = temp_df.resample(forecast_smoothing).mean(numeric_only=True)
                    temp_df = temp_df.reset_index()
                
                ax_main.plot(temp_df[date_col], temp_df['pred'], color=color_map[t.model_name], linestyle='-', lw=1.5, label=t.model_name)
        
        current_len = 0
        splits_dates = {}
        splits_keys = list(data_dict.keys())
        y_bottom_lim, y_top_lim = ax_main.get_ylim()
        
        y_pos_main = y_bottom_lim + (y_top_lim - y_bottom_lim) * 0.95
        y_pos_sub = y_bottom_lim + (y_top_lim - y_bottom_lim) * 0.88
        
        for i, (split_name, df) in enumerate(data_dict.items()):
            split_len = len(df)
            
            if i < len(splits_keys) - 1: 
                mid_idx = min(current_len + (split_len // 2), len(df_full) - 1)
                mid_date = df_full.iloc[mid_idx][date_col]
                ax_main.text(mid_date, y_pos_sub, split_name.upper(), ha='center', va='center', 
                             fontweight='bold', fontsize=10, color='#495057')

            current_len += split_len
            if i < len(splits_keys) - 1:
                boundary_date = df_full.iloc[current_len][date_col]
                splits_dates[split_name] = boundary_date
                
                if split_name == splits_keys[-2]:
                    ax_main.axvline(x=boundary_date, color='black', linestyle='-', lw=3)
                    ax_main.text(boundary_date, y_pos_main, "← STAGE 1: SEARCH & REFIT  ", ha='right', va='center', fontweight='bold', fontsize=11)
                    ax_main.text(boundary_date, y_pos_main, "  STAGE 2: TEST →", ha='left', va='center', fontweight='bold', fontsize=11, color='#1d3557')
                else:
                    ax_main.axvline(x=boundary_date, color='black', linestyle=':', lw=2, alpha=0.7)

        if splits_dates:
            start_date = df_full[date_col].iloc[0]
            test_start_date = list(splits_dates.values())[-1]
            end_date = df_full[date_col].iloc[-1]
            ax_main.axvspan(start_date, test_start_date, color='gray', alpha=0.1)
            ax_main.axvspan(test_start_date, end_date, color='#457b9d', alpha=0.1)
            
        smoothing_label = f" [Smoothed: {forecast_smoothing}]" if forecast_smoothing else ""
        ax_main.set_title(f"Chronological Forecast (Top {top_n}){smoothing_label}", fontweight='bold')
        ax_main.legend(loc='upper left')

        if summary_text:
            ax_bottom.clear()
            ax_bottom.axis('off')
            wrapped_text = textwrap.fill(summary_text, width=130) 
            ax_bottom.text(0.5, 0.5, wrapped_text, transform=ax_bottom.transAxes, 
                           fontsize=13, va='center', ha='center', family='monospace',
                           bbox=dict(boxstyle="round,pad=1.5", fc="#f8f9fa", ec="#dee2e6", lw=1.5))
        else:
            try:
                df_test = data_dict.get('test')
                if df_test is not None and heatmap_col in df_test.columns:
                    ape_records = []
                    for t in sorted_trackers:
                        if 'test' in t.predictions:
                            trues, preds = df_test[target_col].values, t.predictions['test']
                            
                            if 'RMSE' in primary_metric.upper() or 'MSE' in primary_metric.upper():
                                metric_vals = (trues - preds)**2
                            elif 'MAPE' in primary_metric.upper():
                                metric_vals = np.abs((trues - preds) / np.maximum(np.abs(trues), 1e-8)) * 100
                            else:
                                metric_vals = np.abs(trues - preds)
                                
                            groups = df_test[heatmap_col].values
                            for grp, val in zip(groups, metric_vals):
                                ape_records.append({'Model': t.model_name, 'Group': grp, 'Error': val})

                    df_err = pd.DataFrame(ape_records)
                    pivot_err = df_err.groupby(['Model', 'Group'])['Error'].mean().reset_index().pivot(index='Model', columns='Group', values='Error')
                    pivot_err = pivot_err.reindex(sorted(pivot_err.columns), axis=1)
                    pivot_err = pivot_err.reindex(names) 
                    
                    if 'RMSE' in primary_metric.upper():
                        pivot_err = np.sqrt(pivot_err)
                    
                    data = pivot_err.values
                    masked_data = np.ma.masked_invalid(data) 
                    im = ax_bottom.imshow(masked_data, cmap="Reds", aspect="auto")
                    cbar = fig.colorbar(im, ax=ax_bottom)
                    cbar.set_label(primary_metric)
                    
                    ax_bottom.set_xticks(np.arange(len(pivot_err.columns)))
                    ax_bottom.set_yticks(np.arange(len(pivot_err.index)))
                    ax_bottom.set_xticklabels(pivot_err.columns)
                    ax_bottom.set_yticklabels(pivot_err.index)
                    
                    for i in range(len(pivot_err.index)):
                        for j in range(len(pivot_err.columns)):
                            val = data[i, j]
                            if not np.isnan(val):
                                text_color = "white" if val > np.nanmax(data)*0.5 else "black"
                                ax_bottom.text(j, i, f"{val:.2f}", ha="center", va="center", color=text_color)
                                
                    ax_bottom.grid(False) 
                    ax_bottom.set_title(f"Test Set Vulnerability: {primary_metric} by '{heatmap_col}'", fontweight='bold', pad=15)
                else:
                    ax_bottom.set_title(f"Temporal Heatmap unavailable. Column '{heatmap_col}' not found in Test set.")
            except Exception as e:
                ax_bottom.set_title(f"Temporal Heatmap error: {e}")

    else:
        df_test = data_dict.get('test')
        if df_test is not None:
            ax_main.plot([df_test[target_col].min(), df_test[target_col].max()], 
                         [df_test[target_col].min(), df_test[target_col].max()], 
                         color='black', linestyle='--', lw=2, label='Perfect Fit')
            for i in range(top_n):
                t = sorted_trackers[i]
                if 'test' in t.predictions: 
                    ax_main.scatter(df_test[target_col], t.predictions['test'], color=color_map[t.model_name], alpha=0.6, label=t.model_name, s=15)
            ax_main.set_title(f"True vs Predicted (Top {top_n})", fontweight='bold')
            ax_main.set_xlabel("True Values"); ax_main.set_ylabel("Predicted Values"); ax_main.legend()

            for i in range(top_n):
                t = sorted_trackers[i]
                if 'test' in t.predictions:
                    residuals = df_test[target_col].values - t.predictions['test']
                    ax_bottom.hist(residuals, bins=30, density=True, color=color_map[t.model_name], alpha=0.3, label=t.model_name, histtype='stepfilled')
                    ax_bottom.hist(residuals, bins=30, density=True, color=color_map[t.model_name], lw=2, histtype='step')
                    
            ax_bottom.axvline(0, color='black', linestyle='--', lw=2)
            ax_bottom.set_title("Residual Density Distribution", fontweight='bold')
            ax_bottom.set_xlabel("Residual Error (True - Pred)"); ax_bottom.legend()

            if summary_text:
                wrapped_text = textwrap.fill(summary_text, width=70) 
                ax_text.text(0.05, 0.90, wrapped_text, transform=ax_text.transAxes, 
                             fontsize=12, va='top', ha='left', family='monospace',
                             bbox=dict(boxstyle="round,pad=1.5", fc="#f8f9fa", ec="#dee2e6", lw=1.5))
            else:
                ax_text.text(0.5, 0.5, "Pass 'summary_text' to render diagnostics here.", 
                             transform=ax_text.transAxes, fontsize=12, va='center', ha='center', color='gray')

    plt.suptitle(title, fontsize=18, y=1.02)
    plt.tight_layout()
    plt.show()