Untitled
unknown
python
13 days ago
15 kB
5
Indexable
#!/usr/bin/env python3
"""
Replace "errored" / unregistered blocks in a Minecraft (Anvil) world with another
block (default: minecraft:stone), across every dimension.
Why: when a mod is removed, blocks it added become unknown registry keys. The game
logs lines like:
Unknown registry key in minecraft:block: 'magnetization:magnetite_ore' -> using default
and silently swaps them for air on load. This rewrites the chunk palettes on disk so
those blocks become a real block instead, removing the recoverable-error spam and
making the change permanent.
How it works: it only renames the matching entries inside each section's
`block_states.palette` (and drops their `Properties`). The packed block-index data is
left untouched, so every other block in the world is preserved bit-for-bit.
SAFETY:
* DRY RUN by default. Nothing is written unless you pass --apply.
* STOP THE SERVER before running with --apply (the world must not be loaded).
* Take a backup first. (You already have world.7z / backups_restic.)
Targets are discovered automatically from the server logs (every
"Unknown registry key in minecraft:block: 'X'" entry), and/or given explicitly with
--block / --namespace.
Examples:
# See what would change (dry run), targets auto-read from logs/:
python3 scripts/replace_errored_blocks.py
# Actually apply it:
python3 scripts/replace_errored_blocks.py --apply
# Target a whole removed mod's namespace explicitly:
python3 scripts/replace_errored_blocks.py --namespace magnetization --apply
# Replace with something other than stone:
python3 scripts/replace_errored_blocks.py --to minecraft:cobblestone --apply
"""
import argparse
import glob
import gzip
import io
import os
import re
import struct
import sys
import zlib
from multiprocessing import Pool, cpu_count
try:
from nbtlib import File
from nbtlib.tag import String, Compound
except Exception as e: # pragma: no cover
sys.exit("nbtlib is required: pip install --user nbtlib (error: %s)" % e)
SECTOR = 4096
LOG_PATTERN = re.compile(r"Unknown registry key in minecraft:block: '([^']+)'")
# Legit, like-for-like replacements for known removed-mod blocks.
# Magnetite is iron ore (Fe3O4), so it maps to vanilla iron ore; petrified wood
# maps to an oak log. Anything not listed here falls back to
# the --to block (default minecraft:stone). Override per-block with --map src=dst.
DEFAULT_MAP = {
"magnetization:magnetite_ore": "minecraft:iron_ore",
"magnetization:deepslate_magnetite_ore": "minecraft:deepslate_iron_ore",
"magnetization:petrified_wood": "minecraft:oak_log",
}
def discover_targets_from_logs(logs_dir):
"""Scan logs/*.log and *.log.gz for unknown block registry keys."""
found = set()
if not os.path.isdir(logs_dir):
return found
for path in sorted(glob.glob(os.path.join(logs_dir, "*.log"))) + \
sorted(glob.glob(os.path.join(logs_dir, "*.log.gz"))):
try:
opener = gzip.open if path.endswith(".gz") else open
with opener(path, "rt", errors="replace") as fh:
for line in fh:
if "Unknown registry key" not in line:
continue
m = LOG_PATTERN.search(line)
if m:
found.add(m.group(1))
except Exception as e:
print(" (warning: could not read %s: %s)" % (path, e))
return found
def find_region_files(world_dir):
"""Every .mca inside a directory named 'region' (block data lives only there;
entities/ and poi/ have no block palettes)."""
out = []
for root, dirs, files in os.walk(world_dir):
if os.path.basename(root) == "region":
for f in files:
if f.endswith(".mca"):
out.append(os.path.join(root, f))
return sorted(out)
def decompress(ctype, raw):
if ctype == 1:
return gzip.decompress(raw)
if ctype == 2:
return zlib.decompress(raw)
if ctype == 3:
return raw
raise ValueError("unsupported compression type %d" % ctype)
def resolve(name, mapping, namespaces, fallback):
"""Return the replacement block id for `name`, or None if it should be left alone.
Exact --map / DEFAULT_MAP entries win; otherwise any block in a targeted
namespace gets the fallback block."""
if name in mapping:
return mapping[name]
ns = name.split(":", 1)[0] if ":" in name else "minecraft"
if ns in namespaces:
return fallback
return None
def process_chunk_nbt(nbt, mapping, namespaces, fallback, counts):
"""Rename matching palette entries in place. Returns True if anything changed."""
changed = False
sections = nbt.get("sections")
if not sections:
return False
for sec in sections:
bs = sec.get("block_states")
if not bs:
continue
palette = bs.get("palette")
if not palette:
continue
for entry in palette:
name = str(entry.get("Name", ""))
dst = resolve(name, mapping, namespaces, fallback)
if dst is not None:
entry["Name"] = String(dst)
if "Properties" in entry:
del entry["Properties"]
counts[name] = counts.get(name, 0) + 1
changed = True
return changed
def process_region(path, mapping, namespaces, fallback, apply, counts):
"""Returns (chunks_changed, replacements_in_file)."""
with open(path, "rb") as f:
data = f.read()
if len(data) < 2 * SECTOR:
return 0, 0
loc = data[:SECTOR]
timestamps = data[SECTOR:2 * SECTOR]
# entries[i] = (compression_type_byte, payload_bytes) for present chunks, else None
entries = [None] * 1024
for i in range(1024):
off = struct.unpack(">I", b"\x00" + loc[i * 4:i * 4 + 3])[0]
cnt = loc[i * 4 + 3]
if off == 0 or cnt == 0:
continue
start = off * SECTOR
if start + 4 > len(data):
continue
length = struct.unpack(">I", data[start:start + 4])[0]
if length == 0:
continue
ctype = data[start + 5 - 1] # byte at start+4
payload = data[start + 5:start + 4 + length]
entries[i] = (ctype, payload)
chunks_changed = 0
before = sum(counts.values())
new_payloads = {} # i -> (ctype, payload) for changed chunks
for i, ent in enumerate(entries):
if ent is None:
continue
ctype, payload = ent
# External (.mcc) chunks have bit 0x80 set; leave them untouched.
if ctype & 0x80:
continue
try:
raw = decompress(ctype, payload)
nbt = File.parse(io.BytesIO(raw), byteorder="big")
except Exception as e:
print(" (warning: skip chunk %d in %s: %s)" % (i, os.path.basename(path), e))
continue
if process_chunk_nbt(nbt, mapping, namespaces, fallback, counts):
chunks_changed += 1
buf = io.BytesIO()
nbt.write(buf, byteorder="big")
new_payloads[i] = (2, zlib.compress(buf.getvalue())) # re-store as zlib
replacements = sum(counts.values()) - before
if apply and chunks_changed:
for i, np in new_payloads.items():
entries[i] = np
_write_region(path, entries, timestamps)
return chunks_changed, replacements
def _write_region(path, entries, timestamps):
"""Rebuild the region file from scratch with recomputed sector offsets."""
new_loc = bytearray(SECTOR)
body = bytearray()
next_sector = 2 # sectors 0,1 are the two header tables
for i in range(1024):
ent = entries[i]
if ent is None:
continue
ctype, payload = ent
length = len(payload) + 1 # +1 for the compression byte
record = struct.pack(">I", length) + bytes([ctype]) + payload
pad = (-len(record)) % SECTOR
record += b"\x00" * pad
sectors = len(record) // SECTOR
if sectors > 255:
raise ValueError("chunk %d too large to store (%d sectors)" % (i, sectors))
new_loc[i * 4:i * 4 + 3] = struct.pack(">I", next_sector)[1:]
new_loc[i * 4 + 3] = sectors
body += record
next_sector += sectors
tmp = path + ".tmp"
with open(tmp, "wb") as f:
f.write(new_loc)
f.write(timestamps)
f.write(body)
os.replace(tmp, path)
# --- multiprocessing: each region file is independent, so workers process one
# file each and writes (when --apply) don't conflict. ---
_W = {}
def _init_worker(mapping, namespaces, fallback, apply):
_W["mapping"] = mapping
_W["namespaces"] = namespaces
_W["fallback"] = fallback
_W["apply"] = apply
def _work(path):
counts = {}
try:
chunks, repl = process_region(
path, _W["mapping"], _W["namespaces"], _W["fallback"], _W["apply"], counts)
except Exception as e:
return (path, 0, 0, {}, "ERROR: %s" % e)
return (path, chunks, repl, counts, None)
def main():
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--world", default="world", help="world directory (default: world)")
ap.add_argument("--logs", default="logs", help="logs directory to auto-discover targets (default: logs)")
ap.add_argument("--to", default="minecraft:stone",
help="fallback block for targets without a specific mapping (default: minecraft:stone)")
ap.add_argument("--map", action="append", default=[], metavar="SRC=DST",
help="explicit replacement, e.g. magnetization:magnetite_ore=minecraft:iron_ore "
"(repeatable; overrides the built-in defaults)")
ap.add_argument("--block", action="append", default=[],
help="block id to replace with the --to fallback (repeatable)")
ap.add_argument("--namespace", action="append", default=[],
help="replace every block in this namespace with --to, e.g. magnetization (repeatable)")
ap.add_argument("--no-defaults", action="store_true",
help="ignore the built-in like-for-like map (DEFAULT_MAP)")
ap.add_argument("--no-logs", action="store_true", help="do not auto-discover targets from logs")
ap.add_argument("--jobs", type=int, default=0,
help="parallel worker processes (default: all CPU cores)")
ap.add_argument("--apply", action="store_true",
help="actually write changes (default is a DRY RUN that writes nothing)")
args = ap.parse_args()
fallback = args.to
namespaces = set(args.namespace)
# Build the src->dst mapping. Precedence: built-in defaults < --block fallback
# < log-discovered fallback < explicit --map.
mapping = {} if args.no_defaults else dict(DEFAULT_MAP)
for b in args.block:
mapping[b] = fallback
if not args.no_logs:
from_logs = discover_targets_from_logs(args.logs)
if from_logs:
print("Discovered %d errored block id(s) from logs:" % len(from_logs))
for b in sorted(from_logs):
print(" %s" % b)
for b in from_logs:
mapping.setdefault(b, fallback) # don't override a known good mapping
for pair in args.map:
if "=" not in pair:
sys.exit("--map expects SRC=DST, got: %s" % pair)
src, dst = pair.split("=", 1)
mapping[src.strip()] = dst.strip()
if not mapping and not namespaces:
sys.exit("No target blocks. Pass --map / --block / --namespace, or ensure logs/ "
"has 'Unknown registry key' lines.")
print()
print("Replacement plan:")
for src in sorted(mapping):
tag = "" if src in DEFAULT_MAP and mapping[src] == DEFAULT_MAP[src] else " (fallback)" \
if mapping[src] == fallback else " (custom)"
print(" %-50s -> %s%s" % (src, mapping[src], tag))
if namespaces:
print(" namespace(s) %s -> %s" % (", ".join(sorted(namespaces)), fallback))
print("Mode: %s" % ("APPLY (writing changes)" if args.apply else "DRY RUN (no changes written)"))
print()
if not os.path.isdir(args.world):
sys.exit("World directory not found: %s" % args.world)
region_files = find_region_files(args.world)
jobs = max(1, args.jobs if args.jobs else cpu_count())
total = len(region_files)
print("Scanning %d region files under %s using %d worker(s) ..."
% (total, args.world, jobs))
print()
total_counts = {}
total_chunks = 0
files_touched = 0
done = 0
def consume(result):
nonlocal total_chunks, files_touched
path, chunks_changed, replacements, counts, err = result
if err:
print(" (warning: %s: %s)" % (os.path.relpath(path, args.world), err), flush=True)
return
if chunks_changed:
files_touched += 1
total_chunks += chunks_changed
for k, v in counts.items():
total_counts[k] = total_counts.get(k, 0) + v
rel = os.path.relpath(path, args.world)
print(" %-55s %5d palette entries in %4d chunks" %
(rel, replacements, chunks_changed), flush=True)
if jobs == 1:
_init_worker(mapping, namespaces, fallback, args.apply)
for path in region_files:
consume(_work(path))
done += 1
if done % 500 == 0:
print(" ... %d/%d region files scanned" % (done, total), flush=True)
else:
with Pool(jobs, initializer=_init_worker,
initargs=(mapping, namespaces, fallback, args.apply)) as pool:
for result in pool.imap_unordered(_work, region_files, chunksize=8):
consume(result)
done += 1
if done % 500 == 0:
print(" ... %d/%d region files scanned" % (done, total), flush=True)
print()
print("=" * 70)
print("Summary (%s):" % ("APPLIED" if args.apply else "DRY RUN"))
print(" region files affected : %d" % files_touched)
print(" chunks affected : %d" % total_chunks)
print(" palette entries fixed :")
for k in sorted(total_counts):
dst = mapping.get(k, fallback)
print(" %-50s -> %-28s %d" % (k, dst, total_counts[k]))
if not total_counts:
print(" (none found — nothing to do)")
print("=" * 70)
if not args.apply and total_counts:
print("\nThis was a DRY RUN. Re-run with --apply to write the changes")
print("(stop the server and back up the world first).")
if __name__ == "__main__":
main()Editor is loading...
Leave a Comment