Untitled
unknown
plain_text
a year ago
2.3 kB
2
Indexable
Never
# Extract unique primary key values from source_df source_pk_values = source_df[pk].unique() # Extract unique primary key values from target_df target_pk_values = target_df[pk].unique() # Find primary key values that are in source but not in target new_pk_values = set(source_pk_values) - set(target_pk_values) # Insert new primary key values into target for pk_value in new_pk_values: # Get the source row corresponding to pk_value source_row = source_df[source_df[pk] == pk_value] # Generate an INSERT query dynamically insert_query = f"INSERT INTO {schema_ext}.{table_name} (" insert_columns = [] insert_values = [] for column_name, source_val in source_row.items(): insert_columns.append(column_name) insert_values.append(f"'{source_val}'") insert_query += ", ".join(insert_columns) insert_query += ") VALUES (" insert_query += ", ".join(insert_values) insert_query += ")" # Execute the INSERT query try: print(insert_query) # cursor_ext.execute(insert_query) except Exception as e: continue # Iterate through primary key values that exist in both source and target existing_pk_values = set(source_pk_values) & set(target_pk_values) for pk_value in existing_pk_values: # Get the source row corresponding to pk_value source_row = source_df[source_df[pk] == pk_value] # Get the target row corresponding to pk_value target_row = target_df[target_df[pk] == pk_value].iloc[0] # Compare source and target rows to find differences and generate update query update_query = f"UPDATE {schema_ext}.{table_name} SET " column_updates = [] for column_name, source_val in source_row.items(): if column_name not in ('created_by', 'created_on', 'updated_on', 'start_date', 'end_date'): target_val = target_row[column_name] if source_val != target_val: column_updates.append(f"{column_name} = '{source_val}'") if column_updates: update_query += ", ".join(column_updates) update_query += f" WHERE {eprm_table_col_pk} = '{pk_value}'" # Execute the UPDATE query try: print(update_query) # cursor_ext.execute(update_query) except Exception as e: continue