How to Write Idempotent Python Codes

In software development, ensuring that your programs behave consistently is crucial, especially when dealing with large data sets or critical transactions. One of the key concepts in this regard is idempotency β€” the property of a system or function where repeated executions produce the same result, even if performed multiple times. This is particularly important in scenarios like ETL (Extract, Transform, Load) processes, where interruptions or failures can cause data inconsistencies, such as duplicate entries. In this post, we’ll explore how to write idempotent Python code, the importance of ensuring idempotency in your applications, and how it can help avoid common pitfalls, such as data duplication during processes like money transfers. Through a practical example, you’ll see how to prevent the issues that arise when long-running processes are interrupted and need to be rerun. By understanding and implementing idempotent programming, you can build more reliable and efficient Python applications, ensuring smooth data processing without the risk of duplication or errors.

Black Microphone Windscreen From Pexels

Prerequisites

I will use pickle to read and write data for my complex types. The following are the functions I will use for these tasks:


Visit Deep Learning Enabled Art Exhibition: Digital Van Gogh




import pickle

def store_as_pickle(data, filename):
    with open(filename, 'wb') as f:
        pickle.dump(data, f)

def read_from_pickle(filename):
    with open(filename, 'rb') as f:
        data = pickle.load(f)
    return data

1st Use Case

As a use case, I will retrieve data from a table in one source, apply transformations to the retrieved data, and then insert it into a table in another source. To ensure idempotency, I will break these three functionalities into separate sections.

Assume the source data is stored in the table_a table of the first_database. We can begin with a generic query like the following:

source_connection = psycopg2.connect(
   "postgres://postgres_user:postgres_password@postgres_host:5432/first_database"
)
source_cursor = source_connection.cursor()
source_cursor.execute(f"""
    select
    first_column,
    second_column,
    third_column,
    fourth_column,
    fifth_column
    from public."table_a";
""")

The issue is that table_a contains millions or billions of records, and fetching all of them at once could lead to performance issues on both application side and database side. To address this, I need to retrieve the data in smaller batches.

The following snippet fetches data from the source in 10K record batches. Once all the data is retrieved, it will be stored in source_data.pkl. If the source data has already been extracted and the file exists at that path, it will read from the file instead of querying the source, even if the process is re-run. In that way, we will satisfy idempotency while reading data from source.

import os

batch_size = 10000
page = 0

source_path = "source_data.pkl"
results = []
while True:
   # exit if source data is already extracted
   if os.path.exists(source_path):
      results = read_from_pickle(source_path)
      break

   print(f"retrieving [{(page) * batch_size} - {(page+1) * batch_size})")

   # extract data
   sub_results = source_cursor.fetchmany(batch_size)
   print(f"{len(sub_results)} records extracted from source in the {page+1}-th batch")

   # No more data to fetch
   if not sub_results:
      store_as_pickle(results, source_path)
      break # exit loop

   results = results + sub_results
   page += 1

The source data is stored in the results variable. For simplicity, I am skipping the transformation step and feeding the data directly to the target source. If you need to apply transformations, you should do so immediately after extracting the data into results.

Next, I will initialize the connection to the target data source and prepare the statement required to insert data into its table.

target_connection = psycopg2.connect(
   "postgres://user:password@another_host:5432/second_database"
)
target_cursor = target_connection.cursor()

statement = f"""
    insert into public."table_b"
    (
        first_column,
        second_column,
        third_column,
        fourth_column,
        fifth_column
    )
    values
    (
        %s, %s, %s, %s, %s
    );

We are now ready to perform bulk inserts into the target data source. The following snippet processes all the results data in chunks of 1,000 records, inserting each chunk in a single operation. A chunk will only be committed if all the insert statements are executed successfully.

After committing the data for a chunk, a flat file named .checkpoint_index will be created to mark its completion. If you re-run this snippet, it will first check whether a checkpoint file exists for the current chunk. If the checkpoint is found, that iteration will be skipped, ensuring idempotency.





import os
from tqdm import tqdm

commit_interval = 1000

pbar = tqdm(range(0, len(results), commit_interval)
for i in pbar:
    valid_from = i
    valid_until = min(i+commit_interval, len(datas))

    checkpoint_file = f".checkpoint_{i}"
    if os.path.exits(checkpoint_file) is True:
       print(f"chunk of [{valid_from}, {valid_until}) of {len(results)} is already performed")
       continue

    chunk = results[valid_from:valid_until]
    pbar.set_description(f"Inserting [{valid_from}, {valid_until}) of {len(results)}")
    target_cursor.executemany(statement, chunk)
    target_connection.commit()

    # create a flat file for this chunk
    open(checkpoint_file, "w").close()

This use case demonstrates a one-time ETL process for a large table. But what happens if the data in the source table is dynamic and continues to change over time?

2nd Use Case

To handle dynamic data, we can perform ETL on the source periodically. If new data is added to the source after the initial ETL job, I want to ensure that only the newly added data is processed during subsequent ETL runs.

The following snippet will extract data from both source and target.

batch_size = 10000
page = 0

source_cursor.execute(f"""
    select
    first_column,
    second_column,
    third_column,
    fourth_column,
    fifth_column
    from public."table_a";
""")

target_cursor.execute(f"""
    select
    first_column,
    second_column,
    third_column,
    fourth_column,
    fifth_column
    from public."table_b";
""")

source_data = []
while True:
   sub_results = source_cursor.fetchmany(batch_size)

   # No more data to fetch
   if not sub_results:
      break # exit loop

   source_data = source_data + sub_results

target_data = []
while True:
   sub_results = target_cursor.fetchmany(batch_size)

   # No more data to fetch
   if not sub_results:
      break # exit loop

   target_data = target_data + sub_results

Assume that first_column serves as the unique identifier for the data. I will store these identifiers in a set, allowing me to identify unloaded data by leveraging the set’s difference functionality. Similarly, if a record is deleted from the source after being loaded into the target, I can use the set difference again to identify and handle the missing data.

alpha = {i[0] for i in source_data}
beta = {j[0] for j in target_data}

unloaded_ids = alpha - beta
deleted_ids = beta - alpha

unloaded_records = []

pbar = tqdm(range(0, len(results))
for i in pbar:
   if i in unloaded_ids:
      unloaded_records.append(i)

Now, we can use the same procedure to insert unloaded data into target.

import os
from tqdm import tqdm

commit_interval = 1000

pbar = tqdm(range(0, len(unloaded_records), commit_interval)
for i in pbar:
    valid_from = i
    valid_until = min(i+commit_interval, len(datas))

    checkpoint_file = f".checkpoint_v2_{i}"
    if os.path.exits(checkpoint_file) is True:
       print(
          f"chunk of [{valid_from}, {valid_until}) "
          "of {len(unloaded_records)} is already performed"
       )
       continue

    chunk = unloaded_records[valid_from:valid_until]
    pbar.set_description(f"Inserting [{valid_from}, {valid_until}) of {len(unloaded_records)}")
    target_cursor.executemany(statement, chunk)
    target_connection.commit()

    # create a flat file for this chunk
    open(checkpoint_file, "w").close()

As a homework task, could you please implement a function to delete data from the target source based on the deleted_ids?

Conclusion

In this post, we explored how to write idempotent Python code, particularly in the context of ETL processes involving large datasets. We discussed the importance of idempotency in ensuring data integrity, especially when dealing with interruptions or failures during long-running processes like data transfers. By utilizing strategies such as batch processing, checkpointing, and leveraging Python sets for identifying new or deleted data, we can maintain consistency and avoid duplication across multiple runs.

Idempotent programming not only improves the reliability of your applications but also ensures that your ETL jobs can be safely re-run without the risk of data corruption or inconsistencies. By following the practices outlined here, you can build more resilient data pipelines that handle dynamic data sources effectively.

If you’re dealing with large-scale data migrations or complex transformations, incorporating these idempotent techniques will help you achieve smooth, error-free operations every time.






Support this blog if you do like!

Buy me a coffee      Buy me a coffee


Leave a Reply