In software development, ensuring that your programs behave consistently is crucial, especially when dealing with large data sets or critical transactions. One of the key concepts in this regard is idempotency β the property of a system or function where repeated executions produce the same result, even if performed multiple times. This is particularly important in scenarios like ETL (Extract, Transform, Load) processes, where interruptions or failures can cause data inconsistencies, such as duplicate entries. In this post, we’ll explore how to write idempotent Python code, the importance of ensuring idempotency in your applications, and how it can help avoid common pitfalls, such as data duplication during processes like money transfers. Through a practical example, you’ll see how to prevent the issues that arise when long-running processes are interrupted and need to be rerun. By understanding and implementing idempotent programming, you can build more reliable and efficient Python applications, ensuring smooth data processing without the risk of duplication or errors.
Prerequisites
I will use pickle to read and write data for my complex types. The following are the functions I will use for these tasks:
import pickle def store_as_pickle(data, filename): with open(filename, 'wb') as f: pickle.dump(data, f) def read_from_pickle(filename): with open(filename, 'rb') as f: data = pickle.load(f) return data
1st Use Case
As a use case, I will retrieve data from a table in one source, apply transformations to the retrieved data, and then insert it into a table in another source. To ensure idempotency, I will break these three functionalities into separate sections.
Assume the source data is stored in the table_a table of the first_database. We can begin with a generic query like the following:
source_connection = psycopg2.connect( "postgres://postgres_user:postgres_password@postgres_host:5432/first_database" ) source_cursor = source_connection.cursor() source_cursor.execute(f""" select first_column, second_column, third_column, fourth_column, fifth_column from public."table_a"; """)
The issue is that table_a contains millions or billions of records, and fetching all of them at once could lead to performance issues on both application side and database side. To address this, I need to retrieve the data in smaller batches.
The following snippet fetches data from the source in 10K record batches. Once all the data is retrieved, it will be stored in source_data.pkl. If the source data has already been extracted and the file exists at that path, it will read from the file instead of querying the source, even if the process is re-run. In that way, we will satisfy idempotency while reading data from source.
import os batch_size = 10000 page = 0 source_path = "source_data.pkl" results = [] while True: # exit if source data is already extracted if os.path.exists(source_path): results = read_from_pickle(source_path) break print(f"retrieving [{(page) * batch_size} - {(page+1) * batch_size})") # extract data sub_results = source_cursor.fetchmany(batch_size) print(f"{len(sub_results)} records extracted from source in the {page+1}-th batch") # No more data to fetch if not sub_results: store_as_pickle(results, source_path) break # exit loop results = results + sub_results page += 1
The source data is stored in the results variable. For simplicity, I am skipping the transformation step and feeding the data directly to the target source. If you need to apply transformations, you should do so immediately after extracting the data into results.
Next, I will initialize the connection to the target data source and prepare the statement required to insert data into its table.
target_connection = psycopg2.connect( "postgres://user:password@another_host:5432/second_database" ) target_cursor = target_connection.cursor() statement = f""" insert into public."table_b" ( first_column, second_column, third_column, fourth_column, fifth_column ) values ( %s, %s, %s, %s, %s );
We are now ready to perform bulk inserts into the target data source. The following snippet processes all the results data in chunks of 1,000 records, inserting each chunk in a single operation. A chunk will only be committed if all the insert statements are executed successfully.
After committing the data for a chunk, a flat file named .checkpoint_index will be created to mark its completion. If you re-run this snippet, it will first check whether a checkpoint file exists for the current chunk. If the checkpoint is found, that iteration will be skipped, ensuring idempotency.
import os from tqdm import tqdm commit_interval = 1000 pbar = tqdm(range(0, len(results), commit_interval) for i in pbar: valid_from = i valid_until = min(i+commit_interval, len(datas)) checkpoint_file = f".checkpoint_{i}" if os.path.exits(checkpoint_file) is True: print(f"chunk of [{valid_from}, {valid_until}) of {len(results)} is already performed") continue chunk = results[valid_from:valid_until] pbar.set_description(f"Inserting [{valid_from}, {valid_until}) of {len(results)}") target_cursor.executemany(statement, chunk) target_connection.commit() # create a flat file for this chunk open(checkpoint_file, "w").close()
This use case demonstrates a one-time ETL process for a large table. But what happens if the data in the source table is dynamic and continues to change over time?
2nd Use Case
To handle dynamic data, we can perform ETL on the source periodically. If new data is added to the source after the initial ETL job, I want to ensure that only the newly added data is processed during subsequent ETL runs.
The following snippet will extract data from both source and target.
batch_size = 10000 page = 0 source_cursor.execute(f""" select first_column, second_column, third_column, fourth_column, fifth_column from public."table_a"; """) target_cursor.execute(f""" select first_column, second_column, third_column, fourth_column, fifth_column from public."table_b"; """) source_data = [] while True: sub_results = source_cursor.fetchmany(batch_size) # No more data to fetch if not sub_results: break # exit loop source_data = source_data + sub_results target_data = [] while True: sub_results = target_cursor.fetchmany(batch_size) # No more data to fetch if not sub_results: break # exit loop target_data = target_data + sub_results
Assume that first_column serves as the unique identifier for the data. I will store these identifiers in a set, allowing me to identify unloaded data by leveraging the set’s difference functionality. Similarly, if a record is deleted from the source after being loaded into the target, I can use the set difference again to identify and handle the missing data.
alpha = {i[0] for i in source_data} beta = {j[0] for j in target_data} unloaded_ids = alpha - beta deleted_ids = beta - alpha unloaded_records = [] pbar = tqdm(range(0, len(results)) for i in pbar: if i in unloaded_ids: unloaded_records.append(i)
Now, we can use the same procedure to insert unloaded data into target.
import os from tqdm import tqdm commit_interval = 1000 pbar = tqdm(range(0, len(unloaded_records), commit_interval) for i in pbar: valid_from = i valid_until = min(i+commit_interval, len(datas)) checkpoint_file = f".checkpoint_v2_{i}" if os.path.exits(checkpoint_file) is True: print( f"chunk of [{valid_from}, {valid_until}) " "of {len(unloaded_records)} is already performed" ) continue chunk = unloaded_records[valid_from:valid_until] pbar.set_description(f"Inserting [{valid_from}, {valid_until}) of {len(unloaded_records)}") target_cursor.executemany(statement, chunk) target_connection.commit() # create a flat file for this chunk open(checkpoint_file, "w").close()
As a homework task, could you please implement a function to delete data from the target source based on the deleted_ids?
Conclusion
In this post, we explored how to write idempotent Python code, particularly in the context of ETL processes involving large datasets. We discussed the importance of idempotency in ensuring data integrity, especially when dealing with interruptions or failures during long-running processes like data transfers. By utilizing strategies such as batch processing, checkpointing, and leveraging Python sets for identifying new or deleted data, we can maintain consistency and avoid duplication across multiple runs.
Idempotent programming not only improves the reliability of your applications but also ensures that your ETL jobs can be safely re-run without the risk of data corruption or inconsistencies. By following the practices outlined here, you can build more resilient data pipelines that handle dynamic data sources effectively.
If youβre dealing with large-scale data migrations or complex transformations, incorporating these idempotent techniques will help you achieve smooth, error-free operations every time.
Support this blog if you do like!