Row based batch processing

To efficiently process large datasets from Oracle into Python DataFrames while avoiding the ORA-02399: exceeded maximum connect time error, you can break down query size in multiple ways. Below are several approaches:


1. Row-Based Batch Processing


Retrieve data in chunks using ROWNUM or OFFSET/FETCH.


import cx_Oracle

import pandas as pd


def fetch_data_in_batches(connection, query, batch_size):

  offset = 0

  data_frames = []


  while True:

    batch_query = f"""

    SELECT * FROM (

      SELECT a.*, ROWNUM rnum

      FROM ({query}) a

      WHERE ROWNUM <= {offset + batch_size}

    )

    WHERE rnum > {offset}

    """

    cursor = connection.cursor()

    cursor.execute(batch_query)

    rows = cursor.fetchall()


    if not rows:

      break


    columns = [col[0] for col in cursor.description]

    df = pd.DataFrame(rows, columns=columns)

    data_frames.append(df)

    offset += batch_size


  return pd.concat(data_frames, ignore_index=True)


# Example usage

dsn = cx_Oracle.makedsn("hostname", "port", service_name="service_name")

connection = cx_Oracle.connect(user="username", password="password", dsn=dsn)

query = "SELECT * FROM your_table"

batch_size = 1000


final_df = fetch_data_in_batches(connection, query, batch_size)

connection.close()


Advantages:

• Works well for tables with ordered rows.

• Limits memory usage during query execution.


2. Time-Based Partitioning


Split the query using a time range or date column for large datasets with timestamps.


def fetch_data_by_date_range(connection, table, start_date, end_date, interval_days):

  current_date = start_date

  data_frames = []


  while current_date < end_date:

    next_date = current_date + pd.Timedelta(days=interval_days)

    query = f"""

    SELECT * FROM {table}

    WHERE date_column >= TO_DATE('{current_date.strftime('%Y-%m-%d')}', 'YYYY-MM-DD')

     AND date_column < TO_DATE('{next_date.strftime('%Y-%m-%d')}', 'YYYY-MM-DD')

    """

    cursor = connection.cursor()

    cursor.execute(query)

    rows = cursor.fetchall()


    if rows:

      columns = [col[0] for col in cursor.description]

      df = pd.DataFrame(rows, columns=columns)

      data_frames.append(df)


    current_date = next_date


  return pd.concat(data_frames, ignore_index=True)


# Example usage

import datetime

start_date = datetime.date(2023, 1, 1)

end_date = datetime.date(2023, 12, 31)


final_df = fetch_data_by_date_range(connection, "your_table", start_date, end_date, 30)


Advantages:

• Suitable for time-series data.

• Easily parallelizable if multiple workers query different ranges.


3. Key-Based Partitioning


Use primary key or indexed column ranges for splitting.


def fetch_data_by_key_ranges(connection, table, key_column, min_key, max_key, step):

  data_frames = []

  current_key = min_key


  while current_key < max_key:

    next_key = current_key + step

    query = f"""

    SELECT * FROM {table}

    WHERE {key_column} >= {current_key} AND {key_column} < {next_key}

    """

    cursor = connection.cursor()

    cursor.execute(query)

    rows = cursor.fetchall()


    if rows:

      columns = [col[0] for col in cursor.description]

      df = pd.DataFrame(rows, columns=columns)

      data_frames.append(df)


    current_key = next_key


  return pd.concat(data_frames, ignore_index=True)


# Example usage

final_df = fetch_data_by_key_ranges(connection, "your_table", "id", 1, 100000, 1000)


Advantages:

• Effective for evenly distributed numeric keys.

• Can process non-time-series data efficiently.


4. Query with Pagination (OFFSET/FETCH)


Use Oracle’s OFFSET and FETCH for pagination if supported.


def fetch_data_with_pagination(connection, query, batch_size):

  offset = 0

  data_frames = []


  while True:

    paginated_query = f"""

    {query}

    OFFSET {offset} ROWS FETCH NEXT {batch_size} ROWS ONLY

    """

    cursor = connection.cursor()

    cursor.execute(paginated_query)

    rows = cursor.fetchall()


    if not rows:

      break


    columns = [col[0] for col in cursor.description]

    df = pd.DataFrame(rows, columns=columns)

    data_frames.append(df)

    offset += batch_size


  return pd.concat(data_frames, ignore_index=True)


# Example usage

query = "SELECT * FROM your_table"

final_df = fetch_data_with_pagination(connection, query, 1000)


Advantages:

• Straightforward and modern approach.

• Supported in Oracle 12c+.


5. Partitioned Queries with Oracle Features


Use Oracle’s built-in partitioning or virtual column capabilities.

• If the table is partitioned, query each partition individually:


SELECT * FROM your_table PARTITION (partition_name);



• If using virtual columns, filter based on calculated values:


SELECT * FROM your_table WHERE virtual_column = some_value;




In Python, loop over the partitions or virtual column ranges dynamically.


6. Use Database Views or Materialized Views


Create a materialized view or database-side partition to simplify Python-side processing:


CREATE MATERIALIZED VIEW mv_table_partition AS

SELECT * FROM your_table WHERE MOD(id, 10) = 0;


Query each view in smaller chunks in Python.


7. Dynamic SQL for Custom Query Generation


Use Python to generate dynamic SQL queries to split the dataset into manageable chunks, processing each query iteratively.


Summary of Methods


Method Best For Complexity

Row-Based Batches Generic datasets Easy

Time-Based Partitioning Time-series data Moderate

Key-Based Partitioning Indexed or numeric columns Moderate

Pagination (OFFSET/FETCH) Supported Oracle versions Easy

Oracle Partitioning Large partitioned tables Advanced

Materialized Views Repeated queries Advanced


By selecting the appropriate method based on your dataset and requirements, you can avoid connection timeouts and process large datasets efficiently.



From Blogger iPhone client