Architecture
Pipelines
DAG-based workflows composed of pure Python functions.
Pipelines in OptimusKG are sequences of nodes wired into a directed acyclic graph (DAG). Each node is a pure Python function that takes Polars DataFrames as input and returns DataFrames.
Pipeline Registration
Each layer has a pipeline.py that registers its nodes with namespace prefixes:
from kedro.pipeline import Pipeline, node, pipeline
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=process_disease,
inputs="landing.opentargets.disease",
outputs="bronze.opentargets.disease",
name="process_disease",
),
],
namespace="bronze.opentargets",
)Node Functions
Nodes are pure functions in optimuskg/pipelines/*/nodes/. They receive and return data through catalog-defined datasets:
import polars as pl
def process_disease(raw: pl.DataFrame) -> pl.DataFrame:
return raw.select(
pl.col("id").alias("disease_id"),
pl.col("name").alias("disease_name"),
)Running Pipelines
# Run the full pipeline
uv run kedro run --to-nodes gold.export_kg
# Run a specific layer
uv run kedro run --pipeline bronze
# Run from a specific node downstream
uv run kedro run --from-nodes=<node_name>
# Visualize the pipeline DAG
make kedro-vizNamespaces
Nodes are organized using Kedro namespaces that mirror the medallion layers and data sources:
bronze.opentargets.*- OpenTargets processing nodesbronze.drugbank.*- DrugBank processing nodessilver.nodes.*- Entity consolidation nodessilver.edges.*- Relationship building nodesgold.*- Export nodes