The Engineering of Normalization and Padding¶
Navigation:
Theory introduction: See the Intro
Related mathematical theory: See the Mathematical Theory
This chapter covers the technical aspects of data preparation. It explains how the theoretical principles of N-Dim Broadcasting are implemented in Python via PyTorch and Pandas, ensuring numerical stability and compatibility with massively parallel GPU architectures.
Tensor Uniformization: Group Balancing¶
The Primal solver operates exclusively on structured tensors of shape \((G \times N \times D)\) for base static training. A major technical constraint emerges when dealing with panel data econometrics [Arellano, 2003]: in reality, isolated time series (groups) rarely share the exact same length \(N\).
The utils.py script resolves this issue via the _balance_groups function, which enforces the mathematical uniformity of the temporal dimension through two distinct strategies:
The
dropmethod (Truncation): Truncates all series to the length of the shortest series present in the dataset, formally guaranteeing a perfectly balanced panel.The
fillmethod (Temporal Padding): Identifies the longest series and pads shorter series by cloning their last known row. To prevent indexing conflicts, the script calculates a time step (delta) and generates incremental fake dates (fake_date).
def _balance_groups(
dataset: pd.DataFrame,
group_col: str,
date_col: str,
method: str = 'drop'
) -> Tuple[pd.Series, pd.DataFrame]:
r"""
Balances groups in a DataFrame to ensure consistent sizes.
Args:
dataset: The source DataFrame.
group_col: Column defining the groups.
date_col: Date column used for padding logic.
method: Strategy to use ('drop' to truncate, 'fill' to pad).
Returns:
Tuple[pd.Series, pd.DataFrame]: A boolean mask indicating original rows,
and the balanced DataFrame.
"""
if method not in ['drop', 'fill']:
raise ValueError("Method must be 'drop' or 'fill'.")
if dataset.empty:
return pd.Series(dtype=bool), dataset.copy()
group_counts = dataset[group_col].value_counts()
if group_counts.empty:
return pd.Series(dtype=bool), dataset.copy()
min_count, max_count = group_counts.min(), group_counts.max()
if min_count == max_count:
return pd.Series(True, index=dataset.index), dataset.copy()
if method == 'drop':
balanced_df = dataset.groupby(group_col).head(min_count)
mask = dataset.index.isin(balanced_df.index)
return mask, balanced_df.copy()
if method == 'fill':
rows_to_add = []
groups_to_fill = group_counts[group_counts < max_count].index
if len(dataset) > 1 and dataset[date_col].max() != dataset[date_col].min():
delta = (dataset[date_col].max() - dataset[date_col].min()) / len(dataset)
else:
delta = pd.Timedelta(days=1)
fake_date = dataset[date_col].max() + delta
for value in groups_to_fill:
num_missing = max_count - group_counts[value]
group_df = dataset[dataset[group_col] == value]
if group_df.empty:
continue
last_row = group_df.iloc[-1:].copy()
for _ in range(num_missing):
last_row[date_col] = fake_date
rows_to_add.append(last_row.copy())
fake_date += delta
if not rows_to_add:
return pd.Series(True, index=dataset.index), dataset.copy()
new_rows_df = pd.concat(rows_to_add, ignore_index=True)
balanced_df = pd.concat([dataset, new_rows_df], ignore_index=True)
original_mask = pd.Series(True, index=dataset.index)
new_mask = pd.Series(False, index=new_rows_df.index)
mask = pd.concat([original_mask, new_mask])
return mask.reset_index(drop=True), balanced_df.reset_index(drop=True)
A boolean mask is strictly preserved during this padding phase. Once the model outputs its final predictions, this mask is applied to strip away the artificially padded rows, ensuring the final output exactly matches the user’s initial input dimensions.
Affine Normalization and 3D Stacking¶
Once balanced, the data must be projected into the mathematical \([-1, 1]\) interval. This affine transformation is not merely for numerical scaling; it is a strict geometric requirement. Orthogonal bases like Chebyshev polynomials are only mathematically stable and bounded on the \([-1, 1]\) domain, preventing the catastrophic Runge phenomenon [Rivlin, 1990]. Furthermore, standardizing the input space guarantees that the resulting global covariance matrix remains numerically well-conditioned during the exact Primal inversion [LeCun et al., 1998].
Instead of looping through groups in Python, the _transform_data_stacked function uses vectorized Pandas operations to apply the affine transformation:
After normalization, the flat 2D DataFrame is explicitly reshaped into a strictly dimensional 3D PyTorch tensor of shape (n_groups, n_time_steps, n_features).
def _transform_data_stacked(
data: pd.DataFrame,
features: List[str],
group_col: str,
norm_params: Dict,
unique_groups: List,
target_col: Optional[str] = None
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
r"""
Normalizes data per group and stacks it into 3D tensors.
Used for standard (non-adaptive) batch training.
Args:
data: The DataFrame to transform.
features: List of feature columns.
group_col: The grouping column name.
norm_params: Fitted normalization parameters.
unique_groups: Fitted list of unique group names.
target_col: Target variable name (optional).
Returns:
Tuple[torch.Tensor, Optional[torch.Tensor]]:
- x_stacked: (n_groups, n_samples_per_group, n_features).
- y_stacked: (n_groups, n_samples_per_group, 1) or None.
Raises:
ValueError: If `unique_groups` is None.
"""
formatted_data = []
if unique_groups is None:
raise ValueError("`unique_groups` cannot be None.")
for group_name in unique_groups:
if group_name not in norm_params:
continue
group_data = data[data[group_col] == group_name].reset_index(drop=True)
if group_data.empty:
continue
params = norm_params[group_name]
normalized_features = normalize(df_to_normalize=group_data[features], params=params)
x_tensor = torch.tensor(normalized_features.values, dtype=torch.get_default_dtype(), device=TORCH_DEVICE)
y_tensor = None
if target_col in data.columns:
y_vals = group_data[target_col].values
y_tensor = torch.tensor(y_vals, dtype=torch.get_default_dtype(), device=TORCH_DEVICE)
y_tensor = y_tensor.view(-1, 1)
formatted_data.append([x_tensor, y_tensor])
if not formatted_data:
x_empty = torch.empty(0, 0, len(features), device=TORCH_DEVICE)
y_empty = torch.empty(0, 0, 1, device=TORCH_DEVICE, dtype=torch.get_default_dtype()) if target_col else None
return x_empty, y_empty
x_stacked = torch.stack([entry[0] for entry in formatted_data])
y_stacked = torch.stack([entry[1] for entry in formatted_data]) if target_col in data.columns else None
return x_stacked, y_stacked
Vectorized Sliding Windows (Adaptive Online)¶
For the Adaptive meta-learner, managing the continuous shift of historical bounds is essential for online GAM selection and adaptation to concept drift [Das et al., 2025]. Constructing rolling historical windows iteratively in Python would create a massive CPU bottleneck.
The _transform_data_adaptive function bypasses this by utilizing advanced PyTorch tensor indexing to formalize the adaptive online approach natively on the hardware [Doumèche et al., 2025].
Index Calculation: It calculates the valid starting points by stepping backward from the end of the series.
Offset Broadcasting: It builds 1D
train_offsetsandpredict_offsetstensors usingtorch.arange.Advanced Indexing: Complete 4D windows are extracted instantaneously via tensor addition by broadcasting the offsets against the reshaped start indices (
start_indices.view(-1, 1) + train_offsets).
def _transform_data_adaptive(
data: pd.DataFrame,
features: List[str],
group_col: str,
norm_params: Dict,
unique_groups: List,
target_col: str,
update_interval_periods: int,
training_window_periods: int,
steps_per_period: int,
horizon_steps: int = 1
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
r"""
Prepares data for adaptive learning using vectorized sliding window indexing.
Returns tensors for (X_train, Y_train, X_predict) for each simulation step.
Args:
data: Validation/Test DataFrame.
features: Feature list.
group_col: Grouping column.
norm_params: Normalization parameters.
unique_groups: Group names.
target_col: Target column.
update_interval_periods: Prediction window size.
training_window_periods: Training history size.
steps_per_period: Samples per period.
horizon_steps: Horizon of forecasting
Returns:
Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
(x_stacked, y_stacked, x_to_predict)
"""
learning_size_steps = training_window_periods * steps_per_period
window_size_steps = update_interval_periods * steps_per_period
all_groups_x_train = []
all_groups_y_train = []
all_groups_x_predict = []
if unique_groups is None:
raise ValueError("`unique_groups` cannot be None.")
prep_device = 'cpu'
for group_name in unique_groups:
if group_name not in norm_params:
continue
data_group = data[data[group_col] == group_name].reset_index(drop=True)
params = norm_params.get(group_name)
if params is None: continue
total_available_steps = len(data_group)
required_history = learning_size_steps + (horizon_steps - 1)
if total_available_steps <= required_history:
continue
# Normalize group data
data_group[features] = normalize(df_to_normalize=data_group[features], params=params)
x_group = torch.tensor(data_group[features].values, dtype=torch.float32, device=prep_device)
y_group = torch.tensor(data_group[target_col].values, dtype=torch.float32, device=prep_device).view(-1, 1)
# Calculate valid start indices (reverse chronological)
start_indices_list = []
first_predict_start = total_available_steps - (total_available_steps - learning_size_steps) % window_size_steps
if first_predict_start == total_available_steps and total_available_steps > learning_size_steps:
first_predict_start -= window_size_steps
current_predict_start = first_predict_start
while current_predict_start >= learning_size_steps:
if current_predict_start + window_size_steps <= total_available_steps:
start_indices_list.append(current_predict_start)
current_predict_start -= window_size_steps
if not start_indices_list:
continue
start_indices_list.reverse()
start_indices = torch.tensor(start_indices_list, device=prep_device, dtype=torch.long)
# Vectorized Window Indexing
train_end_offset = -(horizon_steps - 1) if horizon_steps > 1 else 0
train_start_offset = train_end_offset - learning_size_steps
train_offsets = torch.arange(train_start_offset, train_end_offset, device=prep_device)
predict_offsets = torch.arange(0, window_size_steps, device=prep_device)
train_indices = start_indices.view(-1, 1) + train_offsets
predict_indices = start_indices.view(-1, 1) + predict_offsets
# Gather
group_x_train = x_group[train_indices]
group_y_train = y_group[train_indices]
group_x_predict = x_group[predict_indices]
all_groups_x_train.append(group_x_train)
all_groups_y_train.append(group_y_train)
all_groups_x_predict.append(group_x_predict)
if not all_groups_x_train:
raise ValueError("No simulation data could be generated. Check dataset length/window sizes.")
x_stacked = torch.stack(all_groups_x_train).to(TORCH_DEVICE)
y_stacked = torch.stack(all_groups_y_train).to(TORCH_DEVICE)
x_to_predict = torch.stack(all_groups_x_predict).to(TORCH_DEVICE)
return x_stacked, y_stacked, x_to_predict
The Flattening Optimization for VRAM Management: It is important to note that the core mathematical solver (_math.py) is intrinsically dimension-agnostic. It relies on PyTorch’s N-dimensional broadcasting (...) and dynamic shape expansion to resolve linear systems of any rank.
However, before sending the 4D tensors to the solver, the orchestrator explicitly flattens the Groups and Windows dimensions into a single combined batch dimension (total_items = n_groups * n_windows) using a memory-free .view() operation.
This was a deliberate engineering choice for performance and memory safety. By collapsing the data into a pseudo-3D shape, the hardware manager can easily chunk the massive historical workload into safe 1D slices (start_idx:end_idx), entirely avoiding the complex 2D indexing logic that would be required to prevent Out-Of-Memory (OOM) errors.
Reassembly and Decomposition¶
After the core engine computes the predictions (or decomposes them per-effect), the multidimensional PyTorch tensors must be safely mapped back to the user’s original 2D Pandas DataFrame.
The _reassemble_decomposed_predictions function reverses the stacking process. It flattens the predicted tensors along the batch dimensions and precisely aligns them against the original unique_groups order to guarantee absolute data integrity.
def _reassemble_predictions(
original_data: pd.DataFrame,
predictions_stacked: torch.Tensor,
group_col: str,
unique_groups: List,
target_col: str
) -> pd.DataFrame:
r"""
Reassembles stacked tensor predictions back into the original DataFrame structure.
Args:
original_data: The source DataFrame.
predictions_stacked: Tensor of predictions (n_groups, n_samples).
group_col: Grouping column.
unique_groups: List of group names corresponding to tensor dimensions.
target_col: Original target name (used to name the prediction column).
Returns:
pd.DataFrame: Original data with a new `Estimated{target_col}` column.
"""
is_3d_input = predictions_stacked.dim() == 3
all_predictions_series = []
for i, group_name in enumerate(unique_groups):
group_indices_full = original_data.index[original_data[group_col] == group_name]
if i >= predictions_stacked.shape[0]:
continue
if is_3d_input:
preds_group = predictions_stacked[i].cpu().numpy().flatten()
else:
preds_group = predictions_stacked[i].cpu().numpy()
if len(preds_group) == 0:
continue
# Align to the end of the group's indices (handling potential truncation)
group_indices_aligned = group_indices_full[-len(preds_group):]
preds_series = pd.Series(preds_group, index=group_indices_aligned)
all_predictions_series.append(preds_series)
result_df = original_data.copy()
if not all_predictions_series:
result_df[f"Estimated{target_col}"] = np.nan
return result_df
final_predictions = pd.concat(all_predictions_series)
result_df[f"Estimated{target_col}"] = final_predictions
return result_df