Architecture¶
The Hamilton DAG¶
OpenMedallion uses Hamilton to declare the pipeline as a DAG (directed acyclic graph). Each pipeline layer is a Hamilton node — a Python function whose argument names encode its dependencies.
flowchart TD
cfg["cfg\n(injected at runtime)"]
config["config(cfg) → dict"]
bronze["bronze(config) → dict[str, Path]"]
silver["silver(config, bronze) → dict[str, Path]"]
gold["gold(config, silver) → dict[str, list[Path]]"]
bi_export["bi_export(config, gold) → None"]
cfg --> config
config --> bronze
config --> silver
config --> gold
config --> bi_export
bronze --> silver
silver --> gold
gold --> bi_export
Hamilton reads the argument list of each function and builds the execution graph automatically. No explicit wiring code is needed — the names are the edges.
Layer Execution¶
The CLI selects which Hamilton node to execute as the final variable. Hamilton then computes only the transitive dependencies required for that node.
--layer |
final_vars |
Nodes executed |
|---|---|---|
bronze |
["bronze"] |
config → bronze |
silver |
["silver"] |
config → bronze → silver |
gold |
["gold"] |
config → bronze → silver → gold |
export |
["bi_export"] |
config → bronze → silver → gold → bi_export |
Skipping layers with overrides¶
When you run --layer silver or --layer gold, you do not want to re-run bronze from scratch. OpenMedallion uses Hamilton's overrides mechanism to inject pre-computed values for upstream nodes.
# cli/main.py — simplified
inputs = {"cfg": cfg}
overrides = {}
if layer in ("silver", "gold"):
overrides["bronze"] = _discover_bronze_paths(cfg) # dict[str, Path]
if layer == "gold":
overrides["silver"] = _discover_silver_paths(cfg) # dict[str, Path]
dr.execute(final_vars=final_vars, inputs=inputs, overrides=overrides)
inputs vs overrides
inputs are for leaf nodes — values that have no dependencies in the DAG (e.g. cfg).
overrides are for intermediate nodes — they replace the result of a node that would be computed, regardless of its position in the DAG.
bronze depends on config, so it is an intermediate node. Passing it in inputs is silently ignored by Hamilton. It must be passed in overrides to actually skip execution.
Config Loading¶
Before Hamilton executes, load_project() reads the four YAML files and merges them into a single dict:
flowchart LR
main["main.yaml\npipeline · paths · bi_export · includes"]
bronze["bronze.yaml\nsource · destination"]
silver["silver.yaml\nbronze_to_silver"]
gold["gold.yaml\nsilver_to_gold"]
merged["merged cfg dict"]
main -->|deep merge| merged
bronze -->|deep merge| merged
silver -->|deep merge| merged
gold -->|deep merge| merged
merged -->|"${VAR} expansion"| expanded["cfg ready for Hamilton"]
The merged dict is injected into Hamilton as cfg (a leaf-node input). The config node passes it through unchanged — this gives Hamilton a typed node to depend on rather than a raw external value.
Bronze Layer¶
BronzeLoader wraps dlt to ingest data from any configured source into Parquet files in paths.bronze/.
flowchart LR
src["Source\nsql_database / rest_api / filesystem"]
dlt["dlt pipeline\n(incremental state)"]
shards["Parquet shards\nbronze/bronze/<table>/*.parquet"]
merged["Merged Parquet\nbronze/<TABLE>.parquet"]
src --> dlt --> shards --> merged
dlt writes sharded Parquet files. BronzeLoader._collect_parquets() merges them into a single file per table so silver always reads one clean file.
Silver Layer¶
SilverTransformer runs in two sequential phases:
flowchart TD
subgraph phase1["Phase 1 — base tables"]
b1["bronze/TABLE.parquet"] -->|"rename · cast · drop"| t1["transforms"]
t1 -->|"optional UDF (df) → df"| s1["silver/table.parquet"]
end
subgraph phase2["Phase 2 — derived tables"]
s1 & s2["silver/other.parquet"] -->|"UDF (silver_dir) → df"| d1["silver/derived.parquet"]
end
phase1 --> phase2
Phase 1 — each entry in bronze_to_silver.tables is processed independently: structural transforms first, then an optional in-row UDF.
Phase 2 — each derived_tables entry calls a UDF that reads freely from the silver directory and returns a new DataFrame. Derived tables see all base tables written in Phase 1.
Gold Layer¶
GoldAggregator processes each aggregation block:
flowchart LR
src["silver/<source_file>.parquet"]
udf["optional pre_agg_udf\n(df, silver_dir) → df"]
agg["group_by + metrics\n(YAML-declared)"]
out["gold/<project>/<output_file>.parquet"]
src --> udf --> agg --> out
The pre_agg_udf step runs before the group_by aggregation. It is the correct place to derive columns (e.g. order_month from order_date) or join additional tables that should affect the grouping.
Storage Abstraction¶
All pipeline code calls openmedallion.storage functions instead of pathlib or pl.read_parquet directly. This is what makes pipelines portable between local and S3:
| Call | Local | S3 |
|---|---|---|
read_parquet(path) |
pl.read_parquet(path) |
pl.read_parquet(path, storage_options=...) |
write_parquet(df, path) |
df.write_parquet(path) |
df.write_parquet(path, storage_options=...) |
join(base, *parts) |
os.path.join(base, *parts) |
base.rstrip("/") + "/" + ... |
exists(path) |
Path(path).exists() |
s3fs.S3FileSystem().exists(path) |
To switch from local storage to S3, change paths.* in main.yaml to s3://your-bucket/... and set the four AWS_* environment variables. No code changes required.