Creating Schema at runtime using Python

Excellent — this is a powerful and very reusable data engineering utility. ✅


Let’s design a Python function that:


  • Reads a YAML file describing a BigQuery table schema
  • Generates a BigQuery CREATE TABLE SQL statement
  • Supports all major BigQuery data types, including nested STRUCT and repeated ARRAY fields



Below you’ll find a clean, copy-friendly full script (no code cells), including an example YAML file and example output.





Example YAML (table_schema.yaml)



table_name: sales_data

dataset_name: analytics


columns:


  • name: order_id
  • type: INT64
  • mode: REQUIRED
  • name: customer_name
  • type: STRING
  • mode: NULLABLE
  • name: order_date
  • type: DATE
  • mode: NULLABLE
  • name: total_amount
  • type: FLOAT64
  • mode: NULLABLE
  • name: is_priority
  • type: BOOL
  • mode: NULLABLE
  • name: items
  • type: RECORD
  • mode: REPEATED
  • fields:
  • name: item_id
  • type: STRING
  • mode: REQUIRED
  • name: quantity
  • type: INT64
  • mode: NULLABLE
  • name: attributes
  • type: RECORD
  • mode: NULLABLE
  • fields:
  • name: color
  • type: STRING
  • mode: NULLABLE
  • name: size
  • type: STRING
  • mode: NULLABLE







Python Script (generate_bigquery_create.py)



import yaml


def load_yaml_schema(file_path):

“”“Load table schema from YAML file.”””

with open(file_path, ‘r’) as f:

return yaml.safe_load(f)


def generate_field_sql(field):

“”“Recursively generate SQL for a field, including nested STRUCT and ARRAY.”””

name = field[“name”]

field_type = field[“type”].upper()

mode = field.get(“mode”, “NULLABLE”).upper()




# If this field is a RECORD/STRUCT

if field_type in ("RECORD", "STRUCT"):

  nested_fields = field.get("fields", [])

  nested_sql_parts = [generate_field_sql(f) for f in nested_fields]

  struct_sql = f"STRUCT< {', '.join(nested_sql_parts)} >"

  base_sql = f"{name} {struct_sql}"

else:

  base_sql = f"{name} {field_type}"


# Handle ARRAY / REPEATED fields

if mode == "REPEATED":

  base_sql = f"{name} ARRAY<{field_type if field_type not in ('RECORD', 'STRUCT') else f'STRUCT<{', '.join([generate_field_sql(f) for f in field.get('fields', [])])}>'}>"

return base_sql


def generate_create_table_sql(schema_dict):

“”“Generate BigQuery CREATE TABLE statement from schema dictionary.”””

dataset_name = schema_dict[“dataset_name”]

table_name = schema_dict[“table_name”]

columns = schema_dict[“columns”]


column_sql_parts = [generate_field_sql(col) for col in columns]

columns_sql = ",\n ".join(column_sql_parts)


sql = f"""CREATE OR REPLACE TABLE `{dataset_name}.{table_name}` (


column_sql_parts = [generate_field_sql(col) for col in columns]

columns_sql = ",\n ".join(column_sql_parts)


sql = f"""CREATE OR REPLACE TABLE `{dataset_name}.{table_name}` (


{columns_sql}

);”””

return sql


def main():

yaml_path = “table_schema.yaml” # path to your YAML file

schema = load_yaml_schema(yaml_path)

sql = generate_create_table_sql(schema)

print(sql)


if name == “main”:

main()





Example Output



CREATE OR REPLACE TABLE analytics.sales_data (

order_id INT64,

customer_name STRING,

order_date DATE,

total_amount FLOAT64,

is_priority BOOL,

items ARRAY<STRUCT<item_id STRING, quantity INT64, attributes STRUCT<color STRING, size STRING>>>

);




✅ Supported Data Types


  • STRING
  • INT64
  • FLOAT64
  • BOOL
  • DATE, DATETIME, TIMESTAMP
  • RECORD / STRUCT (nested)
  • ARRAY / REPEATED (nested or primitive)



✅ Advantages


  • Easy to update schemas by editing YAML instead of SQL
  • Supports deeply nested JSON-like structures
  • Ideal for data pipeline automation or metadata-driven design



From Blogger iPhone client