Untitled

 avatar
unknown
plain_text
6 months ago
4.6 kB
2
Indexable
import csv
import subprocess
from django.core.management.base import BaseCommand
from django.db import transaction, models, connection
from your_app_name.models import Project_Member

# Đường dẫn tới file backup cho các bản ghi bị xóa
DELETED_BACKUP_FILE = '/tmp/project_member_deleted_backup.csv'

class Command(BaseCommand):
    help = "Test migration by removing duplicates, migrating, and restoring deleted records."

    def handle(self, *args, **kwargs):
        self.stdout.write(self.style.WARNING("Starting test process: remove duplicates, migrate, and restore."))

        # Step 1: Backup các bản ghi trùng lặp vào file CSV và xóa chúng
        deleted_count = self.backup_and_remove_duplicates()
        if deleted_count > 0:
            self.stdout.write(self.style.SUCCESS(f"Backed up and removed {deleted_count} duplicate records."))
        else:
            self.stdout.write(self.style.SUCCESS("No duplicates found."))

        # Step 2: Kiểm tra xem còn trùng lặp không, nếu không còn thì thực hiện migrate
        if self.has_duplicates():
            self.stderr.write(self.style.ERROR("Duplicate records still exist! Migration aborted."))
            return
        self.run_migrations()

        # Step 3: Restore các bản ghi từ file CSV và xóa unique constraint để có thể test lại
        self.restore_deleted_records()
        self.remove_unique_constraint()
        self.stdout.write(self.style.SUCCESS("Test process completed. Database is restored to original state."))

    def backup_and_remove_duplicates(self):
        """Backup các bản ghi trùng lặp và xóa chúng."""
        self.stdout.write(f"Backing up duplicate records to {DELETED_BACKUP_FILE} and deleting them...")
        duplicates = (
            Project_Member.objects
            .values('member', 'project')
            .annotate(count=models.Count('id'))
            .filter(count__gt=1)
        )

        # Backup các bản ghi trùng lặp vào CSV và xóa bản ghi trùng (giữ lại bản ghi đầu tiên)
        deleted_count = 0
        with open(DELETED_BACKUP_FILE, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['id', 'member_id', 'project_id'])  # Header

            with transaction.atomic():
                for duplicate in duplicates:
                    records = Project_Member.objects.filter(
                        member=duplicate['member'], project=duplicate['project']
                    )
                    for record in records[1:]:  # Giữ lại bản ghi đầu tiên
                        writer.writerow([record.id, record.member_id, record.project_id])
                        record.delete()
                        deleted_count += 1

        return deleted_count

    def has_duplicates(self):
        """Kiểm tra xem còn bản ghi trùng lặp không."""
        duplicates = (
            Project_Member.objects
            .values('member', 'project')
            .annotate(count=models.Count('id'))
            .filter(count__gt=1)
        )
        return duplicates.exists()

    def run_migrations(self):
        """Thực hiện lệnh migrate."""
        try:
            subprocess.run(["python", "manage.py", "migrate"], check=True)
            self.stdout.write(self.style.SUCCESS("Migration completed successfully."))
        except subprocess.CalledProcessError as e:
            self.stderr.write(self.style.ERROR(f"Migration failed: {str(e)}"))

    def restore_deleted_records(self):
        """Restore các bản ghi từ file CSV đã backup."""
        self.stdout.write(f"Restoring deleted records from {DELETED_BACKUP_FILE}...")
        with open(DELETED_BACKUP_FILE, 'r', newline='') as f:
            reader = csv.reader(f)
            next(reader)  # Bỏ qua header
            with transaction.atomic():
                for row in reader:
                    Project_Member.objects.create(
                        id=row[0], member_id=row[1], project_id=row[2]
                    )
        self.stdout.write(self.style.SUCCESS("Restored deleted records."))

    def remove_unique_constraint(self):
        """Xóa unique constraint để chuẩn bị cho lần test tiếp theo."""
        self.stdout.write("Removing unique constraint on (member, project) fields...")
        with connection.cursor() as cursor:
            cursor.execute("ALTER TABLE your_app_name_project_member DROP INDEX unique_member_project;")
        self.stdout.write(self.style.SUCCESS("Unique constraint removed."))
Editor is loading...
Leave a Comment