Untitled
unknown
plain_text
2 years ago
3.0 kB
4
Indexable
'''
Table:users
Index: id
+-------------+--------------+---------------------+-------------+---------------+---------------+------------+
| id (PK) | uid | created_at | is_deleted | email | phone_number | social_id |
+-------------+--------------+---------------------+-------------+---------------+---------------+------------+
| 1 | abcdef123456 | 2023-01-01 00:00:00 | 0 | user1@mail.com| 1234567890 | sid123 |
| 2 | defabc654321 | 2023-01-02 12:30:45 | 0 | user2@mail.com| 0987654321 | sid456 |
| ... | ... | ... | ... | ... | ... | ... |
+-------------+--------------+---------------------+-------------+---------------+---------------+------------+
Table:user_phone_number_temp
(UID,phone_number)-> Composite Index
+--------------+---------------+------------------------+
| uid | phone_number | created_at |
+--------------+---------------+------------------------+
| abcdef123456 | 1234567890 | 2023-01-01 00:00:00 |
| defabc654321 | 0987654321 | 2023-01-01 00:00:00 |
| ... | ... | ... |
+--------------+---------------+------------------------+
This table also utilizes a composite index on uid and phone_number.
The current operation to manage data retention involves a script that deletes records older than a day based on the created_at timestamp.
'''
import mysql.connector
from kafka import KafkaConsumer
# connection details for MASTER DB
db_config = {
"host": "masterdb",
"user": "masterdb",
"password": "password",
"database": "masterdb"
}
consumer = KafkaConsumer(
'uids_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group'
)
# Connect to the database
db_connection = mysql.connector.connect(**db_config)
db_connection.start_transaction(isolation_level='SERIALIZABLE')
cursor = db_connection.cursor()
def update():
# Fetch all UIDs and phone numbers from the users table
cursor.execute("SELECT uid, phone_number FROM users")
user_records = cursor.fetchall()
# Find UIDs with a NULL phone number
uids_with_null_phones = [uid for uid, phone in user_records if phone is None]
for uid in uids_with_null_phones:
# For every UID, fetch the new phone number from user_phone_number_temp
cursor.execute("SELECT phone_number FROM user_phone_number_temp WHERE uid = %s", (uid,))
new_phone_number_record = cursor.fetchone()
if new_phone_number_record:
new_phone_number = new_phone_number_record[0]
# Update users table with the new phone number for each UID with a NULL phone number
cursor.execute("UPDATE users SET phone_number = %s WHERE uid = %s", (new_phone_number, uid))
db_connection.commit()
update()
db_connection.close()
Editor is loading...
Leave a Comment