OptimusKG
Architecture

Pipelines

DAG-based workflows composed of pure Python functions.

Pipelines in OptimusKG are sequences of nodes wired into a directed acyclic graph (DAG). Each node is a pure Python function that takes Polars DataFrames as input and returns DataFrames.

Pipeline Registration

Each layer has a pipeline.py that registers its nodes with namespace prefixes:

from kedro.pipeline import Pipeline, node, pipeline

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=process_disease,
                inputs="landing.opentargets.disease",
                outputs="bronze.opentargets.disease",
                name="process_disease",
            ),
        ],
        namespace="bronze.opentargets",
    )

Node Functions

Nodes are pure functions in optimuskg/pipelines/*/nodes/. They receive and return data through catalog-defined datasets:

import polars as pl

def process_disease(raw: pl.DataFrame) -> pl.DataFrame:
    return raw.select(
        pl.col("id").alias("disease_id"),
        pl.col("name").alias("disease_name"),
    )

Running Pipelines

# Run the full pipeline
uv run kedro run --to-nodes gold.export_kg

# Run a specific layer
uv run kedro run --pipeline bronze

# Run from a specific node downstream
uv run kedro run --from-nodes=<node_name>

# Visualize the pipeline DAG
make kedro-viz

Namespaces

Nodes are organized using Kedro namespaces that mirror the medallion layers and data sources:

  • bronze.opentargets.* - OpenTargets processing nodes
  • bronze.drugbank.* - DrugBank processing nodes
  • silver.nodes.* - Entity consolidation nodes
  • silver.edges.* - Relationship building nodes
  • gold.* - Export nodes

On this page