Untitled

mail@pastecode.io avatar
unknown
python
2 months ago
3.0 kB
1
Indexable
Never




'''
Table:users
Index: id
+-------------+--------------+---------------------+-------------+---------------+---------------+------------+
| id (PK)     | uid          | created_at          | is_deleted  | email         | phone_number  | social_id  |
+-------------+--------------+---------------------+-------------+---------------+---------------+------------+
| 1           | abcdef123456 | 2023-01-01 00:00:00 | 0           | user1@mail.com| 1234567890    | sid123     |
| 2           | defabc654321 | 2023-01-02 12:30:45 | 0           | user2@mail.com| 0987654321    | sid456     |
| ...         | ...          | ...                 | ...         | ...           | ...           | ...        |
+-------------+--------------+---------------------+-------------+---------------+---------------+------------+

Table:user_phone_number_temp
(UID,phone_number)-> Composite Index

+--------------+---------------+------------------------+
| uid          | phone_number  | created_at             |
+--------------+---------------+------------------------+
| abcdef123456 | 1234567890    |  2023-01-01 00:00:00   |           
| defabc654321 | 0987654321    |  2023-01-01 00:00:00   |
| ...          | ...           |  ...                   |
+--------------+---------------+------------------------+

This table also utilizes a composite index on uid and phone_number. 
The current operation to manage data retention involves a script that deletes records older than a day based on the created_at timestamp.

'''

import mysql.connector
from kafka import KafkaConsumer


# connection details for MASTER DB
db_config = {
    "host": "masterdb",
    "user": "masterdb",
    "password": "password",
    "database": "masterdb"
}
consumer = KafkaConsumer(
    'uids_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group'
)

# Connect to the database
db_connection = mysql.connector.connect(**db_config)
db_connection.start_transaction(isolation_level='SERIALIZABLE')

cursor = db_connection.cursor()

def update():
    # Fetch all UIDs and phone numbers from the users table
    cursor.execute("SELECT uid, phone_number FROM users")
    user_records = cursor.fetchall()
    
    # Find UIDs with a NULL phone number
    uids_with_null_phones = [uid for uid, phone in user_records if phone is None]
    
    for uid in uids_with_null_phones:
        # For every UID, fetch the new phone number from user_phone_number_temp
        cursor.execute("SELECT phone_number FROM user_phone_number_temp WHERE uid = %s", (uid,))
        new_phone_number_record = cursor.fetchone()
        
        if new_phone_number_record:
            new_phone_number = new_phone_number_record[0]
            
            # Update users table with the new phone number for each UID with a NULL phone number
            cursor.execute("UPDATE users SET phone_number = %s WHERE uid = %s", (new_phone_number, uid))
            db_connection.commit()

update()
db_connection.close()







Leave a Comment