stacksats.runner¶
stacksats.runner
¶
Strategy execution orchestration services.
StrategyRunner(data_provider=None)
¶
Bases: StrategyRunnerValidationMixin
Single orchestration path for strategy lifecycle operations.
from_dataframe(df: pl.DataFrame, *, column_map: dict[str, str] | None = None) -> 'StrategyRunner'
classmethod
¶
Construct a StrategyRunner backed by a user-supplied DataFrame.
This is the primary entry point for using StackSats without a BRK parquet file.
Parameters¶
df:
A Polars DataFrame with a canonical date column. At minimum it
must contain a column that maps to price_usd.
column_map:
Mapping from library column names to DataFrame column names.
Example: {"price_usd": "Close", "mvrv": "MVRV_Ratio"}.
If None, the DataFrame columns are used as-is.
Returns¶
StrategyRunner
A fully configured runner using ColumnMapDataProvider.
Example¶
::
runner = StrategyRunner.from_dataframe(
df,
column_map={"price_usd": "Close"},
)
result = runner.backtest(MyStrategy(), BacktestConfig())
WeightTimeSeriesBatch(strategy_id: str, strategy_version: str, run_id: str, config_hash: str, windows: tuple['WeightTimeSeries', ...], schema_version: str = PUBLIC_ARTIFACT_SCHEMA_VERSION, generated_at: dt.datetime = _utc_now(), extra_schema: tuple[ColumnSpec, ...] = ())
dataclass
¶
Collection of single-window strategy weight time-series outputs.
date_span() -> tuple[dt.datetime, dt.datetime]
¶
Return the full date span covered by the batch.
for_window(start_date: str | dt.datetime, end_date: str | dt.datetime) -> 'WeightTimeSeries'
¶
Return the window object for a specific date range.
from_artifact_dir(path: str | Path, *, extra_schema: tuple[ColumnSpec, ...] = ()) -> 'WeightTimeSeriesBatch'
classmethod
¶
Load a batch object from a strategy export artifact directory.
from_csv(path: str | Path, *, strategy_id: str, strategy_version: str, run_id: str, config_hash: str, schema_version: str = PUBLIC_ARTIFACT_SCHEMA_VERSION, generated_at: dt.datetime | None = None, extra_schema: tuple[ColumnSpec, ...] = ()) -> 'WeightTimeSeriesBatch'
classmethod
¶
Load a batch object from a flattened CSV export.
from_flat_dataframe(data: pl.DataFrame, *, strategy_id: str, strategy_version: str, run_id: str, config_hash: str, schema_version: str = PUBLIC_ARTIFACT_SCHEMA_VERSION, generated_at: dt.datetime | None = None, extra_schema: tuple[ColumnSpec, ...] = ()) -> 'WeightTimeSeriesBatch'
classmethod
¶
Build a batch object from a flattened export dataframe.
iter_windows() -> Iterable['WeightTimeSeries']
¶
Yield windows in batch order.
schema_markdown() -> str
¶
Render the shared window schema as markdown.
to_csv(path: str | Path, *, index: bool = False) -> None
¶
Write the canonical flattened batch dataframe to CSV.
to_dataframe() -> pl.DataFrame
¶
Flatten the batch into one canonical dataframe.
validate() -> None
¶
Validate cross-window metadata and uniqueness invariants.
window_keys() -> tuple[tuple[dt.datetime, dt.datetime], ...]
¶
Return all batch window keys in batch order.
WeightValidationError
¶
Bases: ValueError
Raised when strategy weights violate required constraints.