Dynamic handling of Medallion layer schema

Here’s a clean, single-copy version without code blocks or formatting boxes so you can easily copy it all at once:




Excellent question — this is a very relevant problem in data engineering pipelines following the medallion architecture (Bronze → Silver → Gold).


Let’s go through the best practices for using YAML + Python to define and generate table schemas and views for each layer (Bronze, Silver, Gold).


  1. Core Concept
  2. Instead of hardcoding SQL or schema definitions, you maintain YAML files that describe:




  • Table name and layer (bronze/silver/gold)
  • Columns and types
  • Source or transformation logic
  • Primary/foreign keys
  • Partition and clustering details



Your Python code then reads the YAML and:


  • Generates CREATE TABLE DDL for each layer
  • Generates views for transformations (Silver, Gold)
  • Optionally executes them (e.g., via BigQuery, Databricks, or Snowflake SDKs)




  1. Example YAML Schema Design



tables:


  • name: customer_bronze
  • layer: bronze
  • source: gs://raw-data/customers.csv
  • description: “Raw customer data from CRM system”
  • columns:
  • name: customer_id
  • type: STRING
  • nullable: false
  • name: name
  • type: STRING
  • name: created_at
  • type: TIMESTAMP
  • metadata:
  • partition_by: created_at
  • format: parquet

  • name: customer_silver
  • layer: silver
  • source_table: customer_bronze
  • description: “Cleaned and deduplicated customer data”
  • transformations:
  • “SELECT DISTINCT * FROM {{source_table}}”
  • columns:
  • name: customer_id
  • type: STRING
  • name: name
  • type: STRING
  • name: created_at
  • type: TIMESTAMP

  • name: customer_gold
  • layer: gold
  • source_table: customer_silver
  • description: “Customer aggregated metrics”
  • transformations:
  • |
  • SELECT
  • customer_id,
  • COUNT(*) AS transaction_count,
  • MAX(created_at) AS last_transaction
  • FROM {{source_table}}
  • GROUP BY customer_id
  • columns:
  • name: customer_id
  • type: STRING
  • name: transaction_count
  • type: INTEGER
  • name: last_transaction
  • type: TIMESTAMP




  1. Python Script to Generate DDL and Views



import yaml

from jinja2 import Template


def load_yaml(yaml_file):

with open(yaml_file, ‘r’) as f:

return yaml.safe_load(f)


def generate_create_table_sql(table):

cols = “,\n “.join([

f”{col[‘name’]} {col[‘type’]}{’’ if col.get(‘nullable’, True) else ’ NOT NULL’}”

for col in table[‘columns’]

])

table_name = table[‘name’]

sql = f”CREATE OR REPLACE TABLE {table_name} (\n {cols}\n)”

if ‘metadata’ in table and table[‘metadata’].get(‘partition_by’):

sql += f”\nPARTITION BY {table[‘metadata’][‘partition_by’]}”

if ‘metadata’ in table and table[‘metadata’].get(‘format’):

sql += f”\nOPTIONS (format=’{table[‘metadata’][‘format’]}’)”

return sql + “;”


def generate_view_sql(table):

if ‘transformations’ not in table:

return None

source_table = table.get(‘source_table’)

transformation_sql = “\n”.join(table[‘transformations’])

sql_template = Template(transformation_sql)

rendered_sql = sql_template.render(source_table=source_table)

return f”CREATE OR REPLACE VIEW {table[‘name’]} AS\n{rendered_sql};”


if name == “main”:

schema = load_yaml(‘tables.yaml’)

for table in schema[‘tables’]:

if table[‘layer’] == ‘bronze’:

ddl = generate_create_table_sql(table)

else:

ddl = generate_view_sql(table)

print(f”\n– {table[‘name’].upper()} ({table[‘layer’]})\n{ddl}\n”)


  1. Best Practices Summary



YAML Design


  • Keep each layer’s YAML separate (bronze.yaml, silver.yaml, gold.yaml) for modularity.
  • Use Jinja templating inside YAML for reusable transformations.
  • Add metadata for partitioning, file format, and owner/team.



Code Design


  • Use a class-based model (Table, Column) to represent tables.
  • Validate YAML schema using pydantic or cerberus.
  • Add unit tests to verify SQL generation.
  • Optionally, store schema history (for auditing schema evolution).



Layer Logic


From Blogger iPhone client