Excellent — this is a powerful and very reusable data engineering utility. ✅
Let’s design a Python function that:
- Reads a YAML file describing a BigQuery table schema
- Generates a BigQuery CREATE TABLE SQL statement
- Supports all major BigQuery data types, including nested STRUCT and repeated ARRAY fields
Below you’ll find a clean, copy-friendly full script (no code cells), including an example YAML file and example output.
Example YAML (table_schema.yaml)
table_name: sales_data
dataset_name: analytics
columns:
- name: order_id
- type: INT64
- mode: REQUIRED
- name: customer_name
- type: STRING
- mode: NULLABLE
- name: order_date
- type: DATE
- mode: NULLABLE
- name: total_amount
- type: FLOAT64
- mode: NULLABLE
- name: is_priority
- type: BOOL
- mode: NULLABLE
- name: items
- type: RECORD
- mode: REPEATED
- fields:
- name: item_id
- type: STRING
- mode: REQUIRED
- name: quantity
- type: INT64
- mode: NULLABLE
- name: attributes
- type: RECORD
- mode: NULLABLE
- fields:
- name: color
- type: STRING
- mode: NULLABLE
- name: size
- type: STRING
- mode: NULLABLE
Python Script (generate_bigquery_create.py)
import yaml
def load_yaml_schema(file_path):
“”“Load table schema from YAML file.”””
with open(file_path, ‘r’) as f:
return yaml.safe_load(f)
def generate_field_sql(field):
“”“Recursively generate SQL for a field, including nested STRUCT and ARRAY.”””
name = field[“name”]
field_type = field[“type”].upper()
mode = field.get(“mode”, “NULLABLE”).upper()
# If this field is a RECORD/STRUCT
if field_type in ("RECORD", "STRUCT"):
nested_fields = field.get("fields", [])
nested_sql_parts = [generate_field_sql(f) for f in nested_fields]
struct_sql = f"STRUCT< {', '.join(nested_sql_parts)} >"
base_sql = f"{name} {struct_sql}"
else:
base_sql = f"{name} {field_type}"
# Handle ARRAY / REPEATED fields
if mode == "REPEATED":
base_sql = f"{name} ARRAY<{field_type if field_type not in ('RECORD', 'STRUCT') else f'STRUCT<{', '.join([generate_field_sql(f) for f in field.get('fields', [])])}>'}>"
return base_sql
def generate_create_table_sql(schema_dict):
“”“Generate BigQuery CREATE TABLE statement from schema dictionary.”””
dataset_name = schema_dict[“dataset_name”]
table_name = schema_dict[“table_name”]
columns = schema_dict[“columns”]
column_sql_parts = [generate_field_sql(col) for col in columns]
columns_sql = ",\n ".join(column_sql_parts)
sql = f"""CREATE OR REPLACE TABLE `{dataset_name}.{table_name}` (
column_sql_parts = [generate_field_sql(col) for col in columns]
columns_sql = ",\n ".join(column_sql_parts)
sql = f"""CREATE OR REPLACE TABLE `{dataset_name}.{table_name}` (
{columns_sql}
);”””
return sql
def main():
yaml_path = “table_schema.yaml” # path to your YAML file
schema = load_yaml_schema(yaml_path)
sql = generate_create_table_sql(schema)
print(sql)
if name == “main”:
main()
Example Output
CREATE OR REPLACE TABLE analytics.sales_data (
order_id INT64,
customer_name STRING,
order_date DATE,
total_amount FLOAT64,
is_priority BOOL,
items ARRAY<STRUCT<item_id STRING, quantity INT64, attributes STRUCT<color STRING, size STRING>>>
);
✅ Supported Data Types
- STRING
- INT64
- FLOAT64
- BOOL
- DATE, DATETIME, TIMESTAMP
- RECORD / STRUCT (nested)
- ARRAY / REPEATED (nested or primitive)
✅ Advantages
- Easy to update schemas by editing YAML instead of SQL
- Supports deeply nested JSON-like structures
- Ideal for data pipeline automation or metadata-driven design