Untitled

 avatar
unknown
python
13 days ago
15 kB
5
Indexable
#!/usr/bin/env python3
"""
Replace "errored" / unregistered blocks in a Minecraft (Anvil) world with another
block (default: minecraft:stone), across every dimension.

Why: when a mod is removed, blocks it added become unknown registry keys. The game
logs lines like:
    Unknown registry key in minecraft:block: 'magnetization:magnetite_ore' -> using default
and silently swaps them for air on load. This rewrites the chunk palettes on disk so
those blocks become a real block instead, removing the recoverable-error spam and
making the change permanent.

How it works: it only renames the matching entries inside each section's
`block_states.palette` (and drops their `Properties`). The packed block-index data is
left untouched, so every other block in the world is preserved bit-for-bit.

SAFETY:
  * DRY RUN by default. Nothing is written unless you pass --apply.
  * STOP THE SERVER before running with --apply (the world must not be loaded).
  * Take a backup first. (You already have world.7z / backups_restic.)

Targets are discovered automatically from the server logs (every
"Unknown registry key in minecraft:block: 'X'" entry), and/or given explicitly with
--block / --namespace.

Examples:
    # See what would change (dry run), targets auto-read from logs/:
    python3 scripts/replace_errored_blocks.py

    # Actually apply it:
    python3 scripts/replace_errored_blocks.py --apply

    # Target a whole removed mod's namespace explicitly:
    python3 scripts/replace_errored_blocks.py --namespace magnetization --apply

    # Replace with something other than stone:
    python3 scripts/replace_errored_blocks.py --to minecraft:cobblestone --apply
"""

import argparse
import glob
import gzip
import io
import os
import re
import struct
import sys
import zlib
from multiprocessing import Pool, cpu_count

try:
    from nbtlib import File
    from nbtlib.tag import String, Compound
except Exception as e:  # pragma: no cover
    sys.exit("nbtlib is required: pip install --user nbtlib  (error: %s)" % e)

SECTOR = 4096
LOG_PATTERN = re.compile(r"Unknown registry key in minecraft:block: '([^']+)'")

# Legit, like-for-like replacements for known removed-mod blocks.
# Magnetite is iron ore (Fe3O4), so it maps to vanilla iron ore; petrified wood
# maps to an oak log. Anything not listed here falls back to
# the --to block (default minecraft:stone). Override per-block with --map src=dst.
DEFAULT_MAP = {
    "magnetization:magnetite_ore": "minecraft:iron_ore",
    "magnetization:deepslate_magnetite_ore": "minecraft:deepslate_iron_ore",
    "magnetization:petrified_wood": "minecraft:oak_log",
}


def discover_targets_from_logs(logs_dir):
    """Scan logs/*.log and *.log.gz for unknown block registry keys."""
    found = set()
    if not os.path.isdir(logs_dir):
        return found
    for path in sorted(glob.glob(os.path.join(logs_dir, "*.log"))) + \
                sorted(glob.glob(os.path.join(logs_dir, "*.log.gz"))):
        try:
            opener = gzip.open if path.endswith(".gz") else open
            with opener(path, "rt", errors="replace") as fh:
                for line in fh:
                    if "Unknown registry key" not in line:
                        continue
                    m = LOG_PATTERN.search(line)
                    if m:
                        found.add(m.group(1))
        except Exception as e:
            print("  (warning: could not read %s: %s)" % (path, e))
    return found


def find_region_files(world_dir):
    """Every .mca inside a directory named 'region' (block data lives only there;
    entities/ and poi/ have no block palettes)."""
    out = []
    for root, dirs, files in os.walk(world_dir):
        if os.path.basename(root) == "region":
            for f in files:
                if f.endswith(".mca"):
                    out.append(os.path.join(root, f))
    return sorted(out)


def decompress(ctype, raw):
    if ctype == 1:
        return gzip.decompress(raw)
    if ctype == 2:
        return zlib.decompress(raw)
    if ctype == 3:
        return raw
    raise ValueError("unsupported compression type %d" % ctype)


def resolve(name, mapping, namespaces, fallback):
    """Return the replacement block id for `name`, or None if it should be left alone.
    Exact --map / DEFAULT_MAP entries win; otherwise any block in a targeted
    namespace gets the fallback block."""
    if name in mapping:
        return mapping[name]
    ns = name.split(":", 1)[0] if ":" in name else "minecraft"
    if ns in namespaces:
        return fallback
    return None


def process_chunk_nbt(nbt, mapping, namespaces, fallback, counts):
    """Rename matching palette entries in place. Returns True if anything changed."""
    changed = False
    sections = nbt.get("sections")
    if not sections:
        return False
    for sec in sections:
        bs = sec.get("block_states")
        if not bs:
            continue
        palette = bs.get("palette")
        if not palette:
            continue
        for entry in palette:
            name = str(entry.get("Name", ""))
            dst = resolve(name, mapping, namespaces, fallback)
            if dst is not None:
                entry["Name"] = String(dst)
                if "Properties" in entry:
                    del entry["Properties"]
                counts[name] = counts.get(name, 0) + 1
                changed = True
    return changed


def process_region(path, mapping, namespaces, fallback, apply, counts):
    """Returns (chunks_changed, replacements_in_file)."""
    with open(path, "rb") as f:
        data = f.read()
    if len(data) < 2 * SECTOR:
        return 0, 0

    loc = data[:SECTOR]
    timestamps = data[SECTOR:2 * SECTOR]

    # entries[i] = (compression_type_byte, payload_bytes) for present chunks, else None
    entries = [None] * 1024
    for i in range(1024):
        off = struct.unpack(">I", b"\x00" + loc[i * 4:i * 4 + 3])[0]
        cnt = loc[i * 4 + 3]
        if off == 0 or cnt == 0:
            continue
        start = off * SECTOR
        if start + 4 > len(data):
            continue
        length = struct.unpack(">I", data[start:start + 4])[0]
        if length == 0:
            continue
        ctype = data[start + 5 - 1]  # byte at start+4
        payload = data[start + 5:start + 4 + length]
        entries[i] = (ctype, payload)

    chunks_changed = 0
    before = sum(counts.values())
    new_payloads = {}  # i -> (ctype, payload) for changed chunks

    for i, ent in enumerate(entries):
        if ent is None:
            continue
        ctype, payload = ent
        # External (.mcc) chunks have bit 0x80 set; leave them untouched.
        if ctype & 0x80:
            continue
        try:
            raw = decompress(ctype, payload)
            nbt = File.parse(io.BytesIO(raw), byteorder="big")
        except Exception as e:
            print("  (warning: skip chunk %d in %s: %s)" % (i, os.path.basename(path), e))
            continue
        if process_chunk_nbt(nbt, mapping, namespaces, fallback, counts):
            chunks_changed += 1
            buf = io.BytesIO()
            nbt.write(buf, byteorder="big")
            new_payloads[i] = (2, zlib.compress(buf.getvalue()))  # re-store as zlib

    replacements = sum(counts.values()) - before

    if apply and chunks_changed:
        for i, np in new_payloads.items():
            entries[i] = np
        _write_region(path, entries, timestamps)

    return chunks_changed, replacements


def _write_region(path, entries, timestamps):
    """Rebuild the region file from scratch with recomputed sector offsets."""
    new_loc = bytearray(SECTOR)
    body = bytearray()
    next_sector = 2  # sectors 0,1 are the two header tables

    for i in range(1024):
        ent = entries[i]
        if ent is None:
            continue
        ctype, payload = ent
        length = len(payload) + 1  # +1 for the compression byte
        record = struct.pack(">I", length) + bytes([ctype]) + payload
        pad = (-len(record)) % SECTOR
        record += b"\x00" * pad
        sectors = len(record) // SECTOR
        if sectors > 255:
            raise ValueError("chunk %d too large to store (%d sectors)" % (i, sectors))
        new_loc[i * 4:i * 4 + 3] = struct.pack(">I", next_sector)[1:]
        new_loc[i * 4 + 3] = sectors
        body += record
        next_sector += sectors

    tmp = path + ".tmp"
    with open(tmp, "wb") as f:
        f.write(new_loc)
        f.write(timestamps)
        f.write(body)
    os.replace(tmp, path)


# --- multiprocessing: each region file is independent, so workers process one
# file each and writes (when --apply) don't conflict. ---
_W = {}


def _init_worker(mapping, namespaces, fallback, apply):
    _W["mapping"] = mapping
    _W["namespaces"] = namespaces
    _W["fallback"] = fallback
    _W["apply"] = apply


def _work(path):
    counts = {}
    try:
        chunks, repl = process_region(
            path, _W["mapping"], _W["namespaces"], _W["fallback"], _W["apply"], counts)
    except Exception as e:
        return (path, 0, 0, {}, "ERROR: %s" % e)
    return (path, chunks, repl, counts, None)


def main():
    ap = argparse.ArgumentParser(description=__doc__,
                                 formatter_class=argparse.RawDescriptionHelpFormatter)
    ap.add_argument("--world", default="world", help="world directory (default: world)")
    ap.add_argument("--logs", default="logs", help="logs directory to auto-discover targets (default: logs)")
    ap.add_argument("--to", default="minecraft:stone",
                    help="fallback block for targets without a specific mapping (default: minecraft:stone)")
    ap.add_argument("--map", action="append", default=[], metavar="SRC=DST",
                    help="explicit replacement, e.g. magnetization:magnetite_ore=minecraft:iron_ore "
                         "(repeatable; overrides the built-in defaults)")
    ap.add_argument("--block", action="append", default=[],
                    help="block id to replace with the --to fallback (repeatable)")
    ap.add_argument("--namespace", action="append", default=[],
                    help="replace every block in this namespace with --to, e.g. magnetization (repeatable)")
    ap.add_argument("--no-defaults", action="store_true",
                    help="ignore the built-in like-for-like map (DEFAULT_MAP)")
    ap.add_argument("--no-logs", action="store_true", help="do not auto-discover targets from logs")
    ap.add_argument("--jobs", type=int, default=0,
                    help="parallel worker processes (default: all CPU cores)")
    ap.add_argument("--apply", action="store_true",
                    help="actually write changes (default is a DRY RUN that writes nothing)")
    args = ap.parse_args()

    fallback = args.to
    namespaces = set(args.namespace)

    # Build the src->dst mapping. Precedence: built-in defaults < --block fallback
    # < log-discovered fallback < explicit --map.
    mapping = {} if args.no_defaults else dict(DEFAULT_MAP)

    for b in args.block:
        mapping[b] = fallback

    if not args.no_logs:
        from_logs = discover_targets_from_logs(args.logs)
        if from_logs:
            print("Discovered %d errored block id(s) from logs:" % len(from_logs))
            for b in sorted(from_logs):
                print("    %s" % b)
            for b in from_logs:
                mapping.setdefault(b, fallback)  # don't override a known good mapping

    for pair in args.map:
        if "=" not in pair:
            sys.exit("--map expects SRC=DST, got: %s" % pair)
        src, dst = pair.split("=", 1)
        mapping[src.strip()] = dst.strip()

    if not mapping and not namespaces:
        sys.exit("No target blocks. Pass --map / --block / --namespace, or ensure logs/ "
                 "has 'Unknown registry key' lines.")

    print()
    print("Replacement plan:")
    for src in sorted(mapping):
        tag = "" if src in DEFAULT_MAP and mapping[src] == DEFAULT_MAP[src] else "  (fallback)" \
            if mapping[src] == fallback else "  (custom)"
        print("    %-50s -> %s%s" % (src, mapping[src], tag))
    if namespaces:
        print("    namespace(s) %s -> %s" % (", ".join(sorted(namespaces)), fallback))
    print("Mode: %s" % ("APPLY (writing changes)" if args.apply else "DRY RUN (no changes written)"))
    print()

    if not os.path.isdir(args.world):
        sys.exit("World directory not found: %s" % args.world)

    region_files = find_region_files(args.world)
    jobs = max(1, args.jobs if args.jobs else cpu_count())
    total = len(region_files)
    print("Scanning %d region files under %s using %d worker(s) ..."
          % (total, args.world, jobs))
    print()

    total_counts = {}
    total_chunks = 0
    files_touched = 0
    done = 0

    def consume(result):
        nonlocal total_chunks, files_touched
        path, chunks_changed, replacements, counts, err = result
        if err:
            print("  (warning: %s: %s)" % (os.path.relpath(path, args.world), err), flush=True)
            return
        if chunks_changed:
            files_touched += 1
            total_chunks += chunks_changed
            for k, v in counts.items():
                total_counts[k] = total_counts.get(k, 0) + v
            rel = os.path.relpath(path, args.world)
            print("  %-55s %5d palette entries in %4d chunks" %
                  (rel, replacements, chunks_changed), flush=True)

    if jobs == 1:
        _init_worker(mapping, namespaces, fallback, args.apply)
        for path in region_files:
            consume(_work(path))
            done += 1
            if done % 500 == 0:
                print("  ... %d/%d region files scanned" % (done, total), flush=True)
    else:
        with Pool(jobs, initializer=_init_worker,
                  initargs=(mapping, namespaces, fallback, args.apply)) as pool:
            for result in pool.imap_unordered(_work, region_files, chunksize=8):
                consume(result)
                done += 1
                if done % 500 == 0:
                    print("  ... %d/%d region files scanned" % (done, total), flush=True)

    print()
    print("=" * 70)
    print("Summary (%s):" % ("APPLIED" if args.apply else "DRY RUN"))
    print("  region files affected : %d" % files_touched)
    print("  chunks affected       : %d" % total_chunks)
    print("  palette entries fixed :")
    for k in sorted(total_counts):
        dst = mapping.get(k, fallback)
        print("      %-50s -> %-28s %d" % (k, dst, total_counts[k]))
    if not total_counts:
        print("      (none found — nothing to do)")
    print("=" * 70)
    if not args.apply and total_counts:
        print("\nThis was a DRY RUN. Re-run with --apply to write the changes")
        print("(stop the server and back up the world first).")


if __name__ == "__main__":
    main()
Editor is loading...
Leave a Comment