Untitled
unknown
plain_text
6 months ago
4.6 kB
2
Indexable
import csv import subprocess from django.core.management.base import BaseCommand from django.db import transaction, models, connection from your_app_name.models import Project_Member # Đường dẫn tới file backup cho các bản ghi bị xóa DELETED_BACKUP_FILE = '/tmp/project_member_deleted_backup.csv' class Command(BaseCommand): help = "Test migration by removing duplicates, migrating, and restoring deleted records." def handle(self, *args, **kwargs): self.stdout.write(self.style.WARNING("Starting test process: remove duplicates, migrate, and restore.")) # Step 1: Backup các bản ghi trùng lặp vào file CSV và xóa chúng deleted_count = self.backup_and_remove_duplicates() if deleted_count > 0: self.stdout.write(self.style.SUCCESS(f"Backed up and removed {deleted_count} duplicate records.")) else: self.stdout.write(self.style.SUCCESS("No duplicates found.")) # Step 2: Kiểm tra xem còn trùng lặp không, nếu không còn thì thực hiện migrate if self.has_duplicates(): self.stderr.write(self.style.ERROR("Duplicate records still exist! Migration aborted.")) return self.run_migrations() # Step 3: Restore các bản ghi từ file CSV và xóa unique constraint để có thể test lại self.restore_deleted_records() self.remove_unique_constraint() self.stdout.write(self.style.SUCCESS("Test process completed. Database is restored to original state.")) def backup_and_remove_duplicates(self): """Backup các bản ghi trùng lặp và xóa chúng.""" self.stdout.write(f"Backing up duplicate records to {DELETED_BACKUP_FILE} and deleting them...") duplicates = ( Project_Member.objects .values('member', 'project') .annotate(count=models.Count('id')) .filter(count__gt=1) ) # Backup các bản ghi trùng lặp vào CSV và xóa bản ghi trùng (giữ lại bản ghi đầu tiên) deleted_count = 0 with open(DELETED_BACKUP_FILE, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['id', 'member_id', 'project_id']) # Header with transaction.atomic(): for duplicate in duplicates: records = Project_Member.objects.filter( member=duplicate['member'], project=duplicate['project'] ) for record in records[1:]: # Giữ lại bản ghi đầu tiên writer.writerow([record.id, record.member_id, record.project_id]) record.delete() deleted_count += 1 return deleted_count def has_duplicates(self): """Kiểm tra xem còn bản ghi trùng lặp không.""" duplicates = ( Project_Member.objects .values('member', 'project') .annotate(count=models.Count('id')) .filter(count__gt=1) ) return duplicates.exists() def run_migrations(self): """Thực hiện lệnh migrate.""" try: subprocess.run(["python", "manage.py", "migrate"], check=True) self.stdout.write(self.style.SUCCESS("Migration completed successfully.")) except subprocess.CalledProcessError as e: self.stderr.write(self.style.ERROR(f"Migration failed: {str(e)}")) def restore_deleted_records(self): """Restore các bản ghi từ file CSV đã backup.""" self.stdout.write(f"Restoring deleted records from {DELETED_BACKUP_FILE}...") with open(DELETED_BACKUP_FILE, 'r', newline='') as f: reader = csv.reader(f) next(reader) # Bỏ qua header with transaction.atomic(): for row in reader: Project_Member.objects.create( id=row[0], member_id=row[1], project_id=row[2] ) self.stdout.write(self.style.SUCCESS("Restored deleted records.")) def remove_unique_constraint(self): """Xóa unique constraint để chuẩn bị cho lần test tiếp theo.""" self.stdout.write("Removing unique constraint on (member, project) fields...") with connection.cursor() as cursor: cursor.execute("ALTER TABLE your_app_name_project_member DROP INDEX unique_member_project;") self.stdout.write(self.style.SUCCESS("Unique constraint removed."))
Editor is loading...
Leave a Comment