Untitled
unknown
plain_text
a year ago
4.6 kB
4
Indexable
import csv
import subprocess
from django.core.management.base import BaseCommand
from django.db import transaction, models, connection
from your_app_name.models import Project_Member
# Đường dẫn tới file backup cho các bản ghi bị xóa
DELETED_BACKUP_FILE = '/tmp/project_member_deleted_backup.csv'
class Command(BaseCommand):
help = "Test migration by removing duplicates, migrating, and restoring deleted records."
def handle(self, *args, **kwargs):
self.stdout.write(self.style.WARNING("Starting test process: remove duplicates, migrate, and restore."))
# Step 1: Backup các bản ghi trùng lặp vào file CSV và xóa chúng
deleted_count = self.backup_and_remove_duplicates()
if deleted_count > 0:
self.stdout.write(self.style.SUCCESS(f"Backed up and removed {deleted_count} duplicate records."))
else:
self.stdout.write(self.style.SUCCESS("No duplicates found."))
# Step 2: Kiểm tra xem còn trùng lặp không, nếu không còn thì thực hiện migrate
if self.has_duplicates():
self.stderr.write(self.style.ERROR("Duplicate records still exist! Migration aborted."))
return
self.run_migrations()
# Step 3: Restore các bản ghi từ file CSV và xóa unique constraint để có thể test lại
self.restore_deleted_records()
self.remove_unique_constraint()
self.stdout.write(self.style.SUCCESS("Test process completed. Database is restored to original state."))
def backup_and_remove_duplicates(self):
"""Backup các bản ghi trùng lặp và xóa chúng."""
self.stdout.write(f"Backing up duplicate records to {DELETED_BACKUP_FILE} and deleting them...")
duplicates = (
Project_Member.objects
.values('member', 'project')
.annotate(count=models.Count('id'))
.filter(count__gt=1)
)
# Backup các bản ghi trùng lặp vào CSV và xóa bản ghi trùng (giữ lại bản ghi đầu tiên)
deleted_count = 0
with open(DELETED_BACKUP_FILE, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id', 'member_id', 'project_id']) # Header
with transaction.atomic():
for duplicate in duplicates:
records = Project_Member.objects.filter(
member=duplicate['member'], project=duplicate['project']
)
for record in records[1:]: # Giữ lại bản ghi đầu tiên
writer.writerow([record.id, record.member_id, record.project_id])
record.delete()
deleted_count += 1
return deleted_count
def has_duplicates(self):
"""Kiểm tra xem còn bản ghi trùng lặp không."""
duplicates = (
Project_Member.objects
.values('member', 'project')
.annotate(count=models.Count('id'))
.filter(count__gt=1)
)
return duplicates.exists()
def run_migrations(self):
"""Thực hiện lệnh migrate."""
try:
subprocess.run(["python", "manage.py", "migrate"], check=True)
self.stdout.write(self.style.SUCCESS("Migration completed successfully."))
except subprocess.CalledProcessError as e:
self.stderr.write(self.style.ERROR(f"Migration failed: {str(e)}"))
def restore_deleted_records(self):
"""Restore các bản ghi từ file CSV đã backup."""
self.stdout.write(f"Restoring deleted records from {DELETED_BACKUP_FILE}...")
with open(DELETED_BACKUP_FILE, 'r', newline='') as f:
reader = csv.reader(f)
next(reader) # Bỏ qua header
with transaction.atomic():
for row in reader:
Project_Member.objects.create(
id=row[0], member_id=row[1], project_id=row[2]
)
self.stdout.write(self.style.SUCCESS("Restored deleted records."))
def remove_unique_constraint(self):
"""Xóa unique constraint để chuẩn bị cho lần test tiếp theo."""
self.stdout.write("Removing unique constraint on (member, project) fields...")
with connection.cursor() as cursor:
cursor.execute("ALTER TABLE your_app_name_project_member DROP INDEX unique_member_project;")
self.stdout.write(self.style.SUCCESS("Unique constraint removed."))
Editor is loading...
Leave a Comment