Production Deployment#

This page covers patterns for deploying OpenSTEF forecasting pipelines in production environments. Whether you run a simple cron job on a single server or orchestrate hundreds of forecast targets across a cloud platform, OpenSTEF’s library design gives you full control over how pipelines are scheduled, containerized, and monitored.

For data source configuration, see Data Integration. For use-case-specific patterns, see Common Use Cases.

        graph LR
    A[(Weather APIs)] --> D[OpenSTEF Pipeline]
    B[(SCADA / Meters)] --> D
    C[(Market Data)] --> D
    D --> E{Train or Predict}
    E -->|train| F[(MLflow Storage)]
    F -->|load model| E
    E -->|predict| G[Forecast Output]
    G --> H[(Dashboard)]
    G --> I[(Energy Platform)]
    classDef primary fill:#00D9C5,stroke:#1E3A5F,stroke-width:2px,color:#000
    classDef secondary fill:#1E3A5F,stroke:#00D9C5,stroke-width:2px,color:#fff
    classDef accent fill:#e6f7f5,stroke:#00D9C5,stroke-width:2px,color:#000
    class D,E primary
    class A,B,C,F secondary
    class G,H,I accent
    

Deployment Approaches#

OpenSTEF is a library, not a service. You embed it in your own application and choose how to trigger execution. The three most common patterns are:

  • Scheduled scripts — cron or systemd timers calling a Python entrypoint

  • Task queues — Celery, Airflow, or Dagster orchestrating train/predict tasks

  • Serverless functions — AWS Lambda or Azure Functions triggered on a schedule

All patterns share the same core: configure a workflow, call fit() on a schedule (e.g., daily), and call predict() at forecast cadence (e.g., every 15 minutes).

Minimal Production Script#

A production entrypoint typically loads configuration, builds the workflow, fetches data, and runs prediction:

import logging
from datetime import timedelta
from pathlib import Path

from openstef_core.datasets import ForecastDataset
from openstef_core.types import LeadTime, Q
from openstef_models.integrations.mlflow import MLFlowStorage
from openstef_models.presets import ForecastingWorkflowConfig, create_forecasting_workflow

logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

def create_workflow(model_dir: Path):
    return create_forecasting_workflow(
        config=ForecastingWorkflowConfig(
            model_id="production_forecaster_v1",
            model="gblinear",
            horizons=[LeadTime.from_string("PT36H")],
            quantiles=[Q(0.5), Q(0.1), Q(0.9)],
            mlflow_storage=MLFlowStorage(
                tracking_uri=str(model_dir / "mlflow_tracking"),
                local_artifacts_path=model_dir / "mlflow_artifacts",
            ),
        )
    )

def run_predict(workflow, dataset):
    """Run prediction and return forecast."""
    forecast: ForecastDataset = workflow.predict(dataset)
    logger.info("Forecast generated: %d rows", len(forecast.data))
    return forecast

def run_train(workflow, dataset):
    """Retrain model on latest data."""
    result = workflow.fit(dataset)
    if result is not None:
        logger.info("Training complete. Metrics:\n%s", result.metrics_full.to_dataframe())
    return result

Containerization#

Package your forecasting application in a Docker container for reproducible deployments:

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY src/ ./src/
COPY configs/ ./configs/

# Model artifacts volume
VOLUME /models

ENV MODEL_DIR=/models
ENV PYTHONUNBUFFERED=1

ENTRYPOINT ["python", "-m", "src.main"]

Your requirements.txt should pin OpenSTEF packages:

openstef-core>=4.0,<5.0
openstef-models>=4.0,<5.0
openstef-beam>=4.0,<5.0

Warning

Always pin major versions. OpenSTEF V4 has breaking changes from V3 — see Migrating from V3 to V4 for details.

Scheduling with Cron#

The simplest production deployment uses cron to trigger training and prediction separately:

# /etc/cron.d/openstef-forecasting

# Retrain models daily at 02:00 UTC
0 2 * * * appuser /app/venv/bin/python -m src.main --mode train

# Generate forecasts every 15 minutes
*/15 * * * * appuser /app/venv/bin/python -m src.main --mode predict

Structure your entrypoint to accept a mode argument:

import argparse
from pathlib import Path

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--mode", choices=["train", "predict"], required=True)
    parser.add_argument("--model-dir", type=Path, default=Path("/models"))
    args = parser.parse_args()

    workflow = create_workflow(args.model_dir)
    dataset = load_latest_data()  # Your data loading logic

    if args.mode == "train":
        run_train(workflow, dataset)
    else:
        run_predict(workflow, dataset)

if __name__ == "__main__":
    main()

Orchestration with Airflow#

For multi-target deployments, Apache Airflow provides dependency management, retries, and monitoring:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(minutes=30),
}

with DAG(
    "openstef_forecast",
    default_args=default_args,
    schedule_interval="*/15 * * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    def predict_target(target_id: str, **kwargs):
        from src.forecasting import create_workflow, load_data, publish_forecast
        from pathlib import Path

        workflow = create_workflow(Path(f"/models/{target_id}"))
        dataset = load_data(target_id)
        forecast = workflow.predict(dataset)
        publish_forecast(target_id, forecast)

    # Create a task per forecast target
    target_ids = ["substation_north", "substation_south", "industrial_park"]

    for target_id in target_ids:
        PythonOperator(
            task_id=f"predict_{target_id}",
            python_callable=predict_target,
            op_kwargs={"target_id": target_id},
        )
        graph TD
    A([DAG Trigger]) --> B[Fetch Shared Data]
    B --> C[Predict Substation North]
    B --> D[Predict Substation South]
    B --> E[Predict Industrial Park]
    C --> F[Publish Forecasts]
    D --> F
    E --> F
    F --> G[(Forecast Store)]
    classDef primary fill:#00D9C5,stroke:#1E3A5F,stroke-width:2px,color:#000
    classDef secondary fill:#1E3A5F,stroke:#00D9C5,stroke-width:2px,color:#fff
    classDef accent fill:#e6f7f5,stroke:#00D9C5,stroke-width:2px,color:#000
    class A accent
    class B secondary
    class C,D,E primary
    class F primary
    class G secondary
    

Cloud Deployment Options#

AWS#

  • ECS Fargate — run containerized predict/train tasks on a schedule via EventBridge rules

  • Lambda — suitable for lightweight predictions with models under 250 MB (use Lambda layers or container images)

  • SageMaker — use for training if GPU acceleration is needed; deploy OpenSTEF as a custom inference container

Azure#

  • Container Instances — on-demand containers triggered by Logic Apps or Azure Functions timer triggers

  • Azure Functions — Python functions on a timer schedule; use Durable Functions for long-running training

  • Azure ML — managed training pipelines with model registry integration

GCP#

  • Cloud Run Jobs — scheduled containerized workloads with automatic scaling to zero

  • Cloud Functions — lightweight prediction triggers

  • Vertex AI — managed training with custom containers

Model Storage in Production#

OpenSTEF uses MLflow for model versioning and artifact storage. In production, point the tracking URI to a shared backend:

from openstef_models.integrations.mlflow import MLFlowStorage

# Local filesystem (single-server deployments)
storage = MLFlowStorage(
    tracking_uri="file:///models/mlflow",
    local_artifacts_path=Path("/models/artifacts"),
)

# Remote MLflow server (multi-service deployments)
storage = MLFlowStorage(
    tracking_uri="http://mlflow-server:5000",
    local_artifacts_path=Path("/tmp/mlflow_cache"),
)

For S3-backed artifact storage, configure MLflow’s artifact root on the server side. OpenSTEF’s MLFlowStorage integration handles model serialization and versioning transparently.

Monitoring and Alerting#

Production forecasting systems need monitoring at three levels:

Pipeline health#

Track whether jobs complete successfully and on time:

import time
import logging

logger = logging.getLogger(__name__)

def monitored_predict(workflow, dataset, target_id: str):
    start = time.time()
    try:
        forecast = workflow.predict(dataset)
        duration = time.time() - start
        logger.info(
            "forecast_complete",
            extra={
                "target_id": target_id,
                "duration_seconds": duration,
                "rows": len(forecast.data),
            },
        )
        # Emit metric to your monitoring system
        emit_metric("forecast.duration_seconds", duration, tags={"target": target_id})
        emit_metric("forecast.success", 1, tags={"target": target_id})
        return forecast
    except Exception as e:
        emit_metric("forecast.success", 0, tags={"target": target_id})
        logger.exception("Forecast failed for target %s", target_id)
        raise

Forecast quality#

Use OpenSTEF’s evaluation framework to track model degradation over time. Schedule periodic backtests and compare metrics against baselines. The benchmarking pipeline (openstef_beam.benchmarking) automates this:

from openstef_beam.benchmarking import (
    BenchmarkPipeline,
    LocalBenchmarkStorage,
)

# Run periodic benchmark to detect model drift
storage = LocalBenchmarkStorage(base_path=Path("/benchmarks"))

Data quality#

Monitor input data for missing values, stale timestamps, or out-of-range values before feeding it to the pipeline. Catch issues early to avoid silent forecast degradation.

Production Checklist#

Before going live, verify:

  • Model trained and validated — run backtests covering representative periods

  • Data pipeline reliable — input data arrives on time with alerting on gaps

  • Retry logic — transient failures don’t cause missed forecasts

  • Model versioning — MLflow tracks which model version produced each forecast

  • Logging structured — JSON logs with target IDs for easy filtering

  • Resource limits — memory and CPU limits set on containers to prevent runaway jobs

  • Graceful degradation — fallback to last-known-good forecast if prediction fails

  • Secrets management — API keys and credentials stored in vault/secrets manager, not in code

Scaling to Many Targets#

When forecasting hundreds of grid points or assets, parallelize across targets:

from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path

def predict_single_target(target_id: str) -> dict:
    workflow = create_workflow(Path(f"/models/{target_id}"))
    dataset = load_data(target_id)
    forecast = workflow.predict(dataset)
    publish_forecast(target_id, forecast)
    return {"target_id": target_id, "status": "success"}

target_ids = load_active_targets()  # e.g., from database

with ProcessPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(predict_single_target, tid): tid for tid in target_ids}
    for future in as_completed(futures):
        result = future.result()
        logger.info("Completed: %s", result["target_id"])

For larger scale (1000+ targets), use distributed task queues (Celery) or Kubernetes Jobs with a job-per-target pattern.

        graph TD
    A[(Target Database)] --> B[Job Scheduler]
    B --> C[Worker 1]
    B --> D[Worker 2]
    B --> E[Worker 3]
    B --> F[Worker N]
    G[(Shared Model Storage)] --> C
    G --> D
    G --> E
    G --> F
    C --> H[(Output Sink)]
    D --> H
    E --> H
    F --> H
    classDef primary fill:#00D9C5,stroke:#1E3A5F,stroke-width:2px,color:#000
    classDef secondary fill:#1E3A5F,stroke:#00D9C5,stroke-width:2px,color:#fff
    classDef accent fill:#e6f7f5,stroke:#00D9C5,stroke-width:2px,color:#000
    class B primary
    class C,D,E,F accent
    class A,G,H secondary
    

Note

[DIAGRAM: Scaling architecture showing job scheduler distributing targets across worker pool, with shared model storage and output sink]