Untitled
unknown
plain_text
a year ago
3.0 kB
2
Indexable
''' Table:users Index: id +-------------+--------------+---------------------+-------------+---------------+---------------+------------+ | id (PK) | uid | created_at | is_deleted | email | phone_number | social_id | +-------------+--------------+---------------------+-------------+---------------+---------------+------------+ | 1 | abcdef123456 | 2023-01-01 00:00:00 | 0 | user1@mail.com| 1234567890 | sid123 | | 2 | defabc654321 | 2023-01-02 12:30:45 | 0 | user2@mail.com| 0987654321 | sid456 | | ... | ... | ... | ... | ... | ... | ... | +-------------+--------------+---------------------+-------------+---------------+---------------+------------+ Table:user_phone_number_temp (UID,phone_number)-> Composite Index +--------------+---------------+------------------------+ | uid | phone_number | created_at | +--------------+---------------+------------------------+ | abcdef123456 | 1234567890 | 2023-01-01 00:00:00 | | defabc654321 | 0987654321 | 2023-01-01 00:00:00 | | ... | ... | ... | +--------------+---------------+------------------------+ This table also utilizes a composite index on uid and phone_number. The current operation to manage data retention involves a script that deletes records older than a day based on the created_at timestamp. ''' import mysql.connector from kafka import KafkaConsumer # connection details for MASTER DB db_config = { "host": "masterdb", "user": "masterdb", "password": "password", "database": "masterdb" } consumer = KafkaConsumer( 'uids_topic', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group' ) # Connect to the database db_connection = mysql.connector.connect(**db_config) db_connection.start_transaction(isolation_level='SERIALIZABLE') cursor = db_connection.cursor() def update(): # Fetch all UIDs and phone numbers from the users table cursor.execute("SELECT uid, phone_number FROM users") user_records = cursor.fetchall() # Find UIDs with a NULL phone number uids_with_null_phones = [uid for uid, phone in user_records if phone is None] for uid in uids_with_null_phones: # For every UID, fetch the new phone number from user_phone_number_temp cursor.execute("SELECT phone_number FROM user_phone_number_temp WHERE uid = %s", (uid,)) new_phone_number_record = cursor.fetchone() if new_phone_number_record: new_phone_number = new_phone_number_record[0] # Update users table with the new phone number for each UID with a NULL phone number cursor.execute("UPDATE users SET phone_number = %s WHERE uid = %s", (new_phone_number, uid)) db_connection.commit() update() db_connection.close()
Editor is loading...
Leave a Comment