MLOps Tracking and Empirical Benchmarking (Engineering)¶
Navigation:
Theory introduction: See the Intro
Related mathematical theory: See the Mathematical Theory
This chapter details the software engineering behind the evaluation/ module (metrics.py, performance_analyzer.py, plotting.py, and tracker.py). While the mathematical solvers handle the internal physics of the model, these scripts act as the external “MLOps wrapper,” providing robust, production-safe telemetry to track how models actually perform on out-of-sample data.
Object-Oriented Telemetry (BenchmarkTracker)¶
In complex pipelines (especially evolutionary AutoML where hundreds of models are trained and discarded), passing around loose dictionaries of metrics quickly becomes unmanageable.
Architectural Choice (The State Holder): The framework introduces the BenchmarkTracker class. This Object-Oriented component acts as a unified state holder for a specific model’s lifecycle. It stores the model’s metadata (time_fit, time_predict), the raw predictions across all data splits (Train, Validation, Test), and the computed diagnostics.
When slice_and_evaluate is called, the tracker automatically partitions the contiguous prediction tensor into the correct cross-validation folds and routes them to the metric and diagnostic sub-modules, centralizing the telemetry logic in a single, auditable object.
def slice_and_evaluate(self, data_dict: dict, target_col: str = 'value'):
"""
Slices continuous predictions into respective folds and computes
standard metrics and MLOps diagnostics.
"""
if self.y_pred_full is None:
print(f"Warning: '{self.model_name}' - 'y_pred_full' not found. Cannot evaluate.")
return
current_idx = 0
for split_name, df in data_dict.items():
length = len(df)
y_true = df[target_col].values
y_pred = self.y_pred_full[current_idx : current_idx + length]
self.predictions[split_name] = y_pred
self.metrics[split_name] = calculate_regression_metrics(y_true, y_pred)
self.diagnostics[split_name] = analyze_residuals(y_true, y_pred)
if split_name == 'test':
drift = detect_temporal_degradation(y_true, y_pred, metric='RMSE')
self.diagnostics['test']['RMSE_Drift_H2_vs_H1 (%)'] = drift
current_idx += length
components = ['fit', 'dev', 'val']
if all(k in data_dict for k in components) and 'train' not in data_dict:
yt_train = np.concatenate([data_dict[k][target_col].values for k in components])
yp_train = np.concatenate([self.predictions[k] for k in components])
self.predictions['train'] = yp_train
self.metrics['train'] = calculate_regression_metrics(yt_train, yp_train)
Robust Metric Calculation (Pipeline Safety)¶
When evaluating edge-case models generated by evolutionary pipelines, predictions can occasionally explode (producing Inf) or fail entirely (producing NaN). If a standard metric function attempts to compute the Mean Squared Error on an array containing a single NaN, the entire pipeline will crash.
Architectural Choice (Safe NumPy Vectorization): The calculate_regression_metrics function relies exclusively on pure, vectorized NumPy operations without heavy dependencies like sklearn. Crucially, it enforces a strict boolean mask (~np.isnan(y_true) & ~np.isnan(y_pred) & ~np.isinf(y_pred)) before any math is executed.
If an evolutionary algorithm proposes a mathematically unstable formula, this module safely isolates the failure, returning empty or constrained metrics rather than crashing the global optimization loop.
def calculate_regression_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict:
"""
Computes a comprehensive suite of regression and forecasting metrics.
Args:
y_true (np.ndarray): The ground truth target values.
y_pred (np.ndarray): The predicted values from the model.
Returns:
dict: A dictionary containing RMSE, MAE, NMAE, SMAPE, R2, and MAPE.
"""
if y_pred is None or len(y_pred) == 0:
return {}
mask = ~np.isnan(y_true) & ~np.isnan(y_pred) & ~np.isinf(y_pred)
if mask.sum() == 0:
return {}
yt, yp = y_true[mask], y_pred[mask]
mae = np.mean(np.abs(yt - yp))
rmse = np.sqrt(np.mean((yt - yp) ** 2))
ss_res = np.sum((yt - yp) ** 2)
ss_tot = np.sum((yt - np.mean(yt)) ** 2)
r2 = 1.0 - (ss_res / ss_tot) if ss_tot != 0 else np.nan
smape = np.mean(2.0 * np.abs(yp - yt) / (np.abs(yt) + np.abs(yp) + 1e-8)) * 100.0
nmae = mae / (np.mean(np.abs(yt)) + 1e-8)
metrics = {
'RMSE': rmse,
'MAE': mae,
'NMAE': nmae,
'SMAPE': smape,
'R2': r2
}
if not np.any(yt == 0):
metrics['MAPE'] = np.mean(np.abs((yt - yp) / yt)) * 100.0
else:
metrics['MAPE'] = np.nan
return metrics
MLOps Diagnostics: Residuals and Degradation¶
To move beyond simple point-accuracy metrics, the performance_analyzer.py module computes structural diagnostics.
Architectural Choice (Fast Proxy Statistics): Instead of importing heavy statistical libraries (like statsmodels) to run a formal Durbin-Watson test for serial correlation, the analyze_residuals function computes the Lag-1 Autocorrelation natively via np.corrcoef(res[:-1], res[1:]). This provides the exact same diagnostic signal (missing temporal features) but executes orders of magnitude faster, which is critical when analyzing hundreds of models in an AutoTAM loop.
def analyze_residuals(y_true: np.ndarray, y_pred: np.ndarray) -> dict:
"""
Analyzes prediction residuals to check for bias, skewness, and autocorrelation.
Calculates the Lag-1 Autocorrelation as a simplified Durbin-Watson proxy.
High autocorrelation indicates the model missed a temporal signal.
Args:
y_true (np.ndarray): The ground truth values.
y_pred (np.ndarray): The predicted values.
Returns:
dict: Dictionary of calculated residual statistics.
"""
mask = ~np.isnan(y_true) & ~np.isnan(y_pred)
if mask.sum() == 0:
return {}
res = y_true[mask] - y_pred[mask]
if len(res) > 1:
lag1_corr = np.corrcoef(res[:-1], res[1:])[0, 1]
else:
lag1_corr = np.nan
return {
'Mean Error (Bias)': np.mean(res),
'Std Error': np.std(res),
'Skewness': skew(res),
'Kurtosis': kurtosis(res),
'Lag-1 AutoCorr': lag1_corr
}
To measure Concept Drift, the detect_temporal_degradation function slices the target arrays into strict chronological halves (midpoint = len(yt) // 2). It re-routes these sub-arrays back through the safe metric calculator to determine if the test-set error is statistically stationary or actively exploding.
def detect_temporal_degradation(y_true: np.ndarray, y_pred: np.ndarray, metric: str = 'RMSE') -> float:
"""
Splits the test set in half to detect if performance is decaying over time.
Args:
y_true (np.ndarray): The ground truth values.
y_pred (np.ndarray): The predicted values.
metric (str): The metric to evaluate degradation against. Defaults to RMSE.
Returns:
float: The degradation ratio percentage. A value of +20.0 means
errors grew by 20% in the second half of the data.
"""
from .metrics import calculate_regression_metrics
mask = ~np.isnan(y_true) & ~np.isnan(y_pred)
yt, yp = y_true[mask], y_pred[mask]
if len(yt) < 4:
return np.nan
midpoint = len(yt) // 2
h1_metrics = calculate_regression_metrics(yt[:midpoint], yp[:midpoint])
h2_metrics = calculate_regression_metrics(yt[midpoint:], yp[midpoint:])
h1_val = h1_metrics.get(metric, np.nan)
h2_val = h2_metrics.get(metric, np.nan)
if np.isnan(h1_val) or h1_val == 0:
return np.nan
degradation_pct = ((h2_val - h1_val) / h1_val) * 100.0
return degradation_pct
The Dashboard Facade (plotting.py)¶
Visualizing the results of multiple competing models requires a dynamic layout system.
Architectural Choice (Dynamic GridSpec): The plot_benchmark_dashboard acts as a universal Facade. It dynamically adjusts its internal Matplotlib GridSpec layout based on the nature of the data.
If
is_timeseries=True, it allocates massive wide panels to render the temporal prediction curves against the ground truth.If it is cross-sectional data, it reconfigures the grid to prioritize Parity Plots (Predicted vs True scatter plots) and Residual Distribution histograms.
This abstraction ensures that data scientists can invoke a single reporting command (plot_benchmark_dashboard) and receive a presentation-ready, multi-pane diagnostic view perfectly tailored to the topology of their dataset.
def plot_benchmark_dashboard(data_dict, trackers_dict, target_col='value', date_col='date',
is_timeseries=True, primary_metric='MAPE', title="Benchmark Comparison",
heatmap_col='month', summary_text=None, forecast_smoothing=None):
"""
Generates a universal benchmarking dashboard using Matplotlib.
Dynamically adjusts layout based on whether the data is a time-series
or cross-sectional, ranking models by the specified primary metric.
"""
import matplotlib.pyplot as plt
plt.rcParams.update({'axes.grid': True, 'grid.alpha': 0.5, 'axes.facecolor': '#fcfcfc'})
if not trackers_dict:
return
trackers_list = list(trackers_dict.values())
models_to_plot = [t.model_name for t in trackers_list]
tab10_colors = plt.cm.tab10.colors
color_map = {name: tab10_colors[i % len(tab10_colors)] for i, name in enumerate(models_to_plot)}
ascending = any(m in primary_metric.upper() for m in ['RMSE', 'MAE', 'MAPE', 'SMAPE', 'NMAE', 'ERROR'])
sorted_trackers = sorted(
[t for t in trackers_list if not np.isnan(t.get_metric('test', primary_metric))],
key=lambda x: x.get_metric('test', primary_metric),
reverse=not ascending
)
if not sorted_trackers:
return
fig = plt.figure(figsize=(18, 12))
if is_timeseries:
gs = fig.add_gridspec(2, 2, height_ratios=[1, 1.2])
ax_bar = fig.add_subplot(gs[0, 0])
ax_main = fig.add_subplot(gs[0, 1])
ax_bottom = fig.add_subplot(gs[1, :])
else:
gs = fig.add_gridspec(2, 2, height_ratios=[1, 1])
ax_bar = fig.add_subplot(gs[0, 0])
ax_main = fig.add_subplot(gs[0, 1])
ax_bottom = fig.add_subplot(gs[1, 0])
ax_text = fig.add_subplot(gs[1, 1])
ax_text.axis('off')
names = [t.model_name for t in sorted_trackers]
scores = [t.get_metric('test', primary_metric) for t in sorted_trackers]
y_pos = np.arange(len(names))
bar_colors = [color_map[name] for name in names]
ax_bar.barh(y_pos, scores, color=bar_colors, edgecolor='none')
ax_bar.set_yticks(y_pos)
ax_bar.set_yticklabels(names)
ax_bar.invert_yaxis()
ax_bar.set_xlabel(f"Test {primary_metric}")
ax_bar.set_title(f"Model Ranking ({'Lower' if ascending else 'Higher'} is Better)", fontweight='bold')
ax_bar.grid(axis='y', visible=False)
for i, v in enumerate(scores):
ax_bar.text(v + (max(scores)*0.02), i, f"{v:.3f}", va='center', fontweight='bold')
top_n = min(4, len(sorted_trackers))
if is_timeseries:
df_full = pd.concat(data_dict.values())
plot_df = df_full.copy()
if forecast_smoothing:
plot_df = plot_df.set_index(date_col)
if isinstance(forecast_smoothing, int):
plot_df = plot_df.rolling(window=forecast_smoothing, min_periods=1).mean(numeric_only=True)
elif isinstance(forecast_smoothing, str):
plot_df = plot_df.resample(forecast_smoothing).mean(numeric_only=True)
plot_df = plot_df.reset_index()
ax_main.plot(plot_df[date_col], plot_df[target_col], color='gray', alpha=0.5, label='Actuals', lw=1)
for i in range(top_n):
t = sorted_trackers[i]
if t.y_pred_full is not None:
temp_df = df_full.copy()
temp_df['pred'] = t.y_pred_full
if forecast_smoothing:
temp_df = temp_df.set_index(date_col)
if isinstance(forecast_smoothing, int):
temp_df = temp_df.rolling(window=forecast_smoothing, min_periods=1).mean(numeric_only=True)
elif isinstance(forecast_smoothing, str):
temp_df = temp_df.resample(forecast_smoothing).mean(numeric_only=True)
temp_df = temp_df.reset_index()
ax_main.plot(temp_df[date_col], temp_df['pred'], color=color_map[t.model_name], linestyle='-', lw=1.5, label=t.model_name)
current_len = 0
splits_dates = {}
splits_keys = list(data_dict.keys())
y_bottom_lim, y_top_lim = ax_main.get_ylim()
y_pos_main = y_bottom_lim + (y_top_lim - y_bottom_lim) * 0.95
y_pos_sub = y_bottom_lim + (y_top_lim - y_bottom_lim) * 0.88
for i, (split_name, df) in enumerate(data_dict.items()):
split_len = len(df)
if i < len(splits_keys) - 1:
mid_idx = min(current_len + (split_len // 2), len(df_full) - 1)
mid_date = df_full.iloc[mid_idx][date_col]
ax_main.text(mid_date, y_pos_sub, split_name.upper(), ha='center', va='center',
fontweight='bold', fontsize=10, color='#495057')
current_len += split_len
if i < len(splits_keys) - 1:
boundary_date = df_full.iloc[current_len][date_col]
splits_dates[split_name] = boundary_date
if split_name == splits_keys[-2]:
ax_main.axvline(x=boundary_date, color='black', linestyle='-', lw=3)
ax_main.text(boundary_date, y_pos_main, "← STAGE 1: SEARCH & REFIT ", ha='right', va='center', fontweight='bold', fontsize=11)
ax_main.text(boundary_date, y_pos_main, " STAGE 2: TEST →", ha='left', va='center', fontweight='bold', fontsize=11, color='#1d3557')
else:
ax_main.axvline(x=boundary_date, color='black', linestyle=':', lw=2, alpha=0.7)
if splits_dates:
start_date = df_full[date_col].iloc[0]
test_start_date = list(splits_dates.values())[-1]
end_date = df_full[date_col].iloc[-1]
ax_main.axvspan(start_date, test_start_date, color='gray', alpha=0.1)
ax_main.axvspan(test_start_date, end_date, color='#457b9d', alpha=0.1)
smoothing_label = f" [Smoothed: {forecast_smoothing}]" if forecast_smoothing else ""
ax_main.set_title(f"Chronological Forecast (Top {top_n}){smoothing_label}", fontweight='bold')
ax_main.legend(loc='upper left')
if summary_text:
ax_bottom.clear()
ax_bottom.axis('off')
wrapped_text = textwrap.fill(summary_text, width=130)
ax_bottom.text(0.5, 0.5, wrapped_text, transform=ax_bottom.transAxes,
fontsize=13, va='center', ha='center', family='monospace',
bbox=dict(boxstyle="round,pad=1.5", fc="#f8f9fa", ec="#dee2e6", lw=1.5))
else:
try:
df_test = data_dict.get('test')
if df_test is not None and heatmap_col in df_test.columns:
ape_records = []
for t in sorted_trackers:
if 'test' in t.predictions:
trues, preds = df_test[target_col].values, t.predictions['test']
if 'RMSE' in primary_metric.upper() or 'MSE' in primary_metric.upper():
metric_vals = (trues - preds)**2
elif 'MAPE' in primary_metric.upper():
metric_vals = np.abs((trues - preds) / np.maximum(np.abs(trues), 1e-8)) * 100
else:
metric_vals = np.abs(trues - preds)
groups = df_test[heatmap_col].values
for grp, val in zip(groups, metric_vals):
ape_records.append({'Model': t.model_name, 'Group': grp, 'Error': val})
df_err = pd.DataFrame(ape_records)
pivot_err = df_err.groupby(['Model', 'Group'])['Error'].mean().reset_index().pivot(index='Model', columns='Group', values='Error')
pivot_err = pivot_err.reindex(sorted(pivot_err.columns), axis=1)
pivot_err = pivot_err.reindex(names)
if 'RMSE' in primary_metric.upper():
pivot_err = np.sqrt(pivot_err)
data = pivot_err.values
masked_data = np.ma.masked_invalid(data)
im = ax_bottom.imshow(masked_data, cmap="Reds", aspect="auto")
cbar = fig.colorbar(im, ax=ax_bottom)
cbar.set_label(primary_metric)
ax_bottom.set_xticks(np.arange(len(pivot_err.columns)))
ax_bottom.set_yticks(np.arange(len(pivot_err.index)))
ax_bottom.set_xticklabels(pivot_err.columns)
ax_bottom.set_yticklabels(pivot_err.index)
for i in range(len(pivot_err.index)):
for j in range(len(pivot_err.columns)):
val = data[i, j]
if not np.isnan(val):
text_color = "white" if val > np.nanmax(data)*0.5 else "black"
ax_bottom.text(j, i, f"{val:.2f}", ha="center", va="center", color=text_color)
ax_bottom.grid(False)
ax_bottom.set_title(f"Test Set Vulnerability: {primary_metric} by '{heatmap_col}'", fontweight='bold', pad=15)
else:
ax_bottom.set_title(f"Temporal Heatmap unavailable. Column '{heatmap_col}' not found in Test set.")
except Exception as e:
ax_bottom.set_title(f"Temporal Heatmap error: {e}")
else:
df_test = data_dict.get('test')
if df_test is not None:
ax_main.plot([df_test[target_col].min(), df_test[target_col].max()],
[df_test[target_col].min(), df_test[target_col].max()],
color='black', linestyle='--', lw=2, label='Perfect Fit')
for i in range(top_n):
t = sorted_trackers[i]
if 'test' in t.predictions:
ax_main.scatter(df_test[target_col], t.predictions['test'], color=color_map[t.model_name], alpha=0.6, label=t.model_name, s=15)
ax_main.set_title(f"True vs Predicted (Top {top_n})", fontweight='bold')
ax_main.set_xlabel("True Values"); ax_main.set_ylabel("Predicted Values"); ax_main.legend()
for i in range(top_n):
t = sorted_trackers[i]
if 'test' in t.predictions:
residuals = df_test[target_col].values - t.predictions['test']
ax_bottom.hist(residuals, bins=30, density=True, color=color_map[t.model_name], alpha=0.3, label=t.model_name, histtype='stepfilled')
ax_bottom.hist(residuals, bins=30, density=True, color=color_map[t.model_name], lw=2, histtype='step')
ax_bottom.axvline(0, color='black', linestyle='--', lw=2)
ax_bottom.set_title("Residual Density Distribution", fontweight='bold')
ax_bottom.set_xlabel("Residual Error (True - Pred)"); ax_bottom.legend()
if summary_text:
wrapped_text = textwrap.fill(summary_text, width=70)
ax_text.text(0.05, 0.90, wrapped_text, transform=ax_text.transAxes,
fontsize=12, va='top', ha='left', family='monospace',
bbox=dict(boxstyle="round,pad=1.5", fc="#f8f9fa", ec="#dee2e6", lw=1.5))
else:
ax_text.text(0.5, 0.5, "Pass 'summary_text' to render diagnostics here.",
transform=ax_text.transAxes, fontsize=12, va='center', ha='center', color='gray')
plt.suptitle(title, fontsize=18, y=1.02)
plt.tight_layout()
plt.show()