Advanced Customization#
This page covers the main extension points in OpenSTEF for power users who need to go beyond the built-in presets. You will learn how to create custom transforms for feature engineering, build custom preprocessing and postprocessing pipelines, implement workflow callbacks for monitoring, and compose custom forecasting workflows.
If you haven’t yet run your first forecast, start with the Your First Forecast tutorial.
graph TD
A[Core Forecasting Flow] --> B[Custom Transforms]
A --> C[Custom Pipelines]
A --> D[Workflow Callbacks]
A --> E[Custom Workflows]
B --> F[Feature Engineering]
C --> G[Pre and Post Processing]
D --> H[Monitoring and Logging]
E --> I[Compose New Flows]
classDef primary fill:#00D9C5,stroke:#1E3A5F,stroke-width:2px,color:#000
classDef secondary fill:#1E3A5F,stroke:#00D9C5,stroke-width:2px,color:#fff
classDef accent fill:#e6f7f5,stroke:#00D9C5,stroke-width:2px,color:#000
class A secondary
class B,C,D,E primary
class F,G,H,I accent
Custom Transforms#
The transform system is OpenSTEF’s primary mechanism for feature engineering and data preparation. All transforms implement a common interface with fit, transform, and fit_transform methods.
Creating a Custom TimeSeriesTransform#
To create a custom feature transform, subclass TimeSeriesTransform and implement the transform and features_added methods:
from openstef_core.datasets import TimeSeriesDataset
from openstef_models.transforms.energy_domain import TimeSeriesTransform
class TemperatureGradientAdder(TimeSeriesTransform):
"""Adds temperature rate-of-change as a feature."""
@property
def is_fitted(self) -> bool:
# Stateless transform - always fitted
return True
def fit(self, data: TimeSeriesDataset) -> None:
pass
def transform(self, data: TimeSeriesDataset) -> TimeSeriesDataset:
df = data.data.copy()
if "temperature" in df.columns:
df["temperature_gradient"] = df["temperature"].diff()
return TimeSeriesDataset(df, data.sample_interval)
def features_added(self) -> list[str]:
return ["temperature_gradient"]
The features_added method declares which columns the transform introduces, enabling pipeline introspection and validation.
Stateful Transforms#
For transforms that learn parameters from training data, override is_fitted and fit:
from openstef_core.datasets import TimeSeriesDataset
from openstef_models.transforms.energy_domain import TimeSeriesTransform
class MinMaxScaler(TimeSeriesTransform):
"""Scales the target column to [0, 1] range."""
scale_factor: float | None = None
@property
def is_fitted(self) -> bool:
return self.scale_factor is not None
def fit(self, data: TimeSeriesDataset) -> None:
self.scale_factor = data.data.max().max()
def transform(self, data: TimeSeriesDataset) -> TimeSeriesDataset:
if not self.is_fitted:
raise RuntimeError("Transform not fitted. Call fit() first.")
scaled_data = data.data / self.scale_factor
return TimeSeriesDataset(scaled_data, data.sample_interval)
def features_added(self) -> list[str]:
return []
Stateful transforms are fitted during training and their learned parameters are persisted with the model.
Custom Pipelines#
The TransformPipeline chains multiple transforms into a sequential processing flow. Each transform receives the output of the previous one.
from openstef_core.datasets import TimeSeriesDataset
from openstef_models.transforms import TransformPipeline
from openstef_models.transforms.energy_domain.wind_power_feature_adder import WindPowerFeatureAdder
preprocessing = TransformPipeline[TimeSeriesDataset](
transforms=[
WindPowerFeatureAdder(),
TemperatureGradientAdder(),
]
)
# Fit and transform in one step
processed_data = preprocessing.fit_transform(raw_data)
# Or separately for train/test splits
preprocessing.fit(train_data)
processed_train = preprocessing.transform(train_data)
processed_test = preprocessing.transform(test_data)
An empty pipeline acts as a no-op, which is useful for conditional processing:
# No-op pipeline
passthrough = TransformPipeline[TimeSeriesDataset](transforms=[])
Built-in Transform Domains#
OpenSTEF organizes transforms into domain-specific subpackages:
openstef_models.transforms.energy_domain— Energy-specific features (e.g., wind power curves)openstef_models.transforms.weather_domain— Weather-derived featuresopenstef_models.transforms.time_domain— Temporal features (lags, calendar effects)openstef_models.transforms.general— General-purpose transformsopenstef_models.transforms.validation— Data quality and validation transforms
Custom Forecasting Workflows#
The CustomForecastingWorkflow combines a model with preprocessing, postprocessing, and lifecycle callbacks into a complete forecasting system.
from openstef_models.workflows.custom_forecasting_workflow import CustomForecastingWorkflow
from openstef_models.models import ForecastingModel
from openstef_models.transforms import TransformPipeline
workflow = CustomForecastingWorkflow(
model=ForecastingModel(
preprocessing=TransformPipeline(transforms=[
WindPowerFeatureAdder(),
TemperatureGradientAdder(),
]),
forecaster=my_forecaster,
postprocessing=TransformPipeline(transforms=[
ConfidenceIntervalApplicator(quantiles=[0.1, 0.5, 0.9]),
]),
target_column="load",
data_splitter=my_splitter,
cutoff_history=timedelta(days=365),
evaluation_metrics=my_metrics,
tags={"location": "substation_A"},
),
model_id="custom-model-001",
run_name="experiment-v1",
callbacks=[],
)
# Train
result = workflow.fit(train_data, data_val=val_data)
# Predict
forecasts = workflow.predict(new_data)
You can create a variant of an existing workflow with a different run name using with_run_name:
experiment_b = workflow.with_run_name("experiment-v2")
Lifecycle Callbacks#
Callbacks let you hook into workflow lifecycle events without modifying core logic. This is the recommended pattern for logging, monitoring, model storage, and custom validation.
from openstef_models.workflows.custom_forecasting_workflow import ForecastingCallback
class LoggingCallback(ForecastingCallback):
"""Logs training and prediction events."""
def on_fit_start(self, pipeline, dataset):
print(f"Starting training with {len(dataset.data)} samples")
def on_fit_end(self, pipeline, dataset, result):
print(f"Training complete. Metrics: {result}")
def on_predict_start(self, pipeline, dataset):
print(f"Generating forecast for {len(dataset.data)} time steps")
def on_predict_end(self, pipeline, dataset, forecasts):
print(f"Forecast generated: {len(forecasts.data)} predictions")
Register callbacks when constructing the workflow:
workflow = CustomForecastingWorkflow(
model=my_model,
model_id="custom-model-001",
callbacks=[LoggingCallback()],
)
OpenSTEF includes a built-in MLFlowStorageCallback for model persistence and selection:
from openstef_models.workflows.custom_forecasting_workflow import MLFlowStorageCallback
mlflow_callback = MLFlowStorageCallback(
storage=my_mlflow_storage,
model_reuse_enable=True,
model_reuse_max_age=timedelta(days=7),
model_selection_enable=True,
model_selection_metric="rmse",
model_selection_old_model_penalty=0.05,
)
graph TD
A[on_fit_start] --> B[fit]
B --> C[on_fit_end]
E[on_predict_start] --> F[predict]
F --> G[on_predict_end]
H([Workflow Executes]) --> A
H --> E
classDef primary fill:#00D9C5,stroke:#1E3A5F,stroke-width:2px,color:#000
classDef secondary fill:#1E3A5F,stroke:#00D9C5,stroke-width:2px,color:#fff
classDef accent fill:#e6f7f5,stroke:#00D9C5,stroke-width:2px,color:#000
class B,F secondary
class A,C,E,G primary
class H accent
Putting It All Together#
Here is a complete example combining custom transforms, a pipeline, and callbacks into a production-ready workflow:
from datetime import timedelta
from openstef_core.datasets import TimeSeriesDataset
from openstef_models.models import ForecastingModel
from openstef_models.transforms import TransformPipeline
from openstef_models.workflows.custom_forecasting_workflow import (
CustomForecastingWorkflow,
ForecastingCallback,
)
# 1. Define custom transforms
class PeakIndicatorAdder(TimeSeriesTransform):
"""Flags time steps during typical peak hours."""
def transform(self, data: TimeSeriesDataset) -> TimeSeriesDataset:
df = data.data.copy()
df["is_peak_hour"] = df.index.hour.isin(range(17, 21)).astype(int)
return TimeSeriesDataset(df, data.sample_interval)
def features_added(self) -> list[str]:
return ["is_peak_hour"]
# 2. Define a monitoring callback
class MetricsCallback(ForecastingCallback):
def on_fit_end(self, pipeline, dataset, result):
if result is not None:
print(f"Model trained successfully: {result}")
# 3. Assemble the workflow
workflow = CustomForecastingWorkflow(
model=ForecastingModel(
preprocessing=TransformPipeline(transforms=[
PeakIndicatorAdder(),
]),
forecaster=my_forecaster,
postprocessing=TransformPipeline(transforms=[]),
target_column="load",
cutoff_history=timedelta(days=180),
tags={"use_case": "congestion_management"},
),
model_id="peak-aware-model",
callbacks=[MetricsCallback()],
)
# 4. Train and predict
workflow.fit(train_data, data_val=val_data)
forecasts = workflow.predict(live_data)
Summary of Extension Points#
Extension Point |
Base Class |
Use Case |
|---|---|---|
Feature engineering |
|
Add domain-specific features |
Data preprocessing |
|
Chain transforms sequentially |
Workflow monitoring |
|
Logging, storage, validation |
Full workflow |
|
End-to-end custom forecasting |
Next Steps#
Evaluate your custom models with backtesting — see Backtesting Your Models
Explore the built-in transforms in the ../api/openstef_models/transforms reference
Review the presets for common configurations before building from scratch