Untitled
unknown
python
10 days ago
20 kB
13
Indexable
"""IMAP server-identity spoofing proxy for eM Client tag display.
Background
----------
eM Client classifies Migadu's IMAP front-end (which identifies itself via
the ``ID`` command as ``"Sora"`` from ``"Migadu-Mail GmbH"``) as server type
*Other*. On *Other* servers eM Client refuses to render custom IMAP
keywords, so the tags this daemon applies (``INFO``, ``IMPORTANT``,
…) become invisible -- even though the server stores them correctly and
advertises ``\\*`` in ``PERMANENTFLAGS``.
The tags are *received* by eM Client; they are simply not *displayed*. The
fix therefore is not to add tags differently but to make eM Client believe
it is talking to a server type it already trusts for custom keywords.
Testing against eM Client pinned down exactly how it decides the type:
* It fingerprints the server from the **greeting banner**. Rewriting only
the banner text to ``Dovecot ready.`` flips detection to *Dovecot* even
though the ``* ID`` response and ``CAPABILITY`` list stay 100% Migadu's
own -- so neither ``ID`` nor capabilities play a role.
* (Spoofing ``ID`` to "IceWarp" alone did *nothing*; injecting the
``X-ICEWARP-SERVER`` capability flips detection to *IceWarp*. Both are
kept as fallback levers but are off by default.)
Default behaviour is therefore the **minimal, lowest-risk** spoof: change one
string (the greeting banner) to ``Dovecot ready.`` and forward everything else
untouched. Dovecot is the natural choice because Migadu's real capability set
(CONDSTORE/QRESYNC/MOVE/ESEARCH/...) is already Dovecot-shaped, and Dovecot has
no proprietary command extensions for eM Client to attempt against us.
What it does
------------
A transparent, line-oriented IMAP proxy:
eM Client --(plaintext loopback)--> this proxy --(TLS)--> imap.migadu.com
By default everything is forwarded verbatim **except** the greeting banner,
which becomes ``Dovecot ready.`` (``PROXY_SPOOF_GREETING``).
Optional levers, all off by default:
* ``PROXY_ADD_CAPABILITY`` -- append tokens (e.g. ``X-ICEWARP-SERVER``) to
every advertised ``CAPABILITY`` list, preserving the real ones.
* ``PROXY_SPOOF_ID_NAME`` / ``PROXY_SPOOF_ID_VENDOR`` -- rewrite ``* ID``.
* ``PROXY_SPOOF_CAPABILITY`` -- fully replace the capability list.
* ``PROXY_MIRROR_PERMANENTFLAGS=1`` -- mirror custom keywords from
``* FLAGS`` into ``* OK [PERMANENTFLAGS (...)]``.
The server->client stream is parsed with full IMAP literal awareness so
that binary message bodies (``BODY[] {1234}``) are never mis-framed or
corrupted; only complete control lines are eligible for rewriting.
eM Client setup (loopback PoC)
------------------------------
IMAP host : 127.0.0.1
IMAP port : 1430 (PROXY_LISTEN_PORT)
Security : None / No encryption
Username/pw : unchanged (forwarded to Migadu as-is)
Loopback plaintext never leaves the machine, so it is safe. eM Client
caches the detected server type per account, so after pointing it at the
proxy you will likely need to remove and re-add the account to force
re-detection.
To terminate TLS on the client side instead (so you need not disable
encryption), set ``PROXY_TLS_CERT`` and ``PROXY_TLS_KEY`` to a cert/key eM
Client trusts; the listener then speaks TLS.
Run
---
python -m migadu_tag.proxy
Environment
-----------
PROXY_LISTEN_HOST default 127.0.0.1
PROXY_LISTEN_PORT default 1430
PROXY_UPSTREAM_HOST default imap.migadu.com
PROXY_UPSTREAM_PORT default 993
PROXY_SPOOF_GREETING greeting banner text; default "Dovecot ready."
(set empty to leave the greeting untouched)
PROXY_SPOOF_ID_NAME default empty (no ID spoof; cosmetic fallback)
PROXY_SPOOF_ID_VENDOR default empty
PROXY_ADD_CAPABILITY tokens to append to every CAPABILITY list;
default empty. Set "X-ICEWARP-SERVER" to force
eM Client's IceWarp detection instead. Space-sep.
PROXY_SPOOF_CAPABILITY optional FULL CAPABILITY replacement token list
(overrides the server's real capabilities)
PROXY_MIRROR_PERMANENTFLAGS default off (1/true/yes to enable)
PROXY_TLS_CERT / PROXY_TLS_KEY optional client-side TLS termination
LOG_LEVEL default INFO
"""
import logging
import os
import re
import socket
import ssl
import threading
from .logging import setup_logging
log = logging.getLogger(__name__)
# Literal marker at the end of a physical line: {123} or {123+} (non-sync).
# When present, exactly that many raw bytes follow the CRLF and must be
# forwarded untouched.
_LITERAL_RE = re.compile(rb"\{(\d+)\+?\}$")
_RECV = 65536
def _truthy(value: str | None) -> bool:
return (value or "").strip().lower() in {"1", "true", "yes", "on"}
# ---------------------------------------------------------------------------
# Server -> client stream rewriter (literal-aware)
# ---------------------------------------------------------------------------
class ServerStreamRewriter:
"""Reframe the server->client byte stream and rewrite control lines.
``feed(chunk)`` returns the (possibly modified) bytes to forward to the
client. State is carried across calls so chunk boundaries are handled
transparently.
Only *complete control lines* with no trailing literal are passed to the
per-line rewriter; literal payloads (message bodies, etc.) are forwarded
byte-for-byte and never inspected.
"""
def __init__(self, spoof_id_line: bytes | None, mirror_permflags: bool,
greeting_banner: bytes | None = None,
capability: bytes | None = None,
add_capability: list[bytes] | None = None):
self._spoof_id_line = spoof_id_line
self._mirror_permflags = mirror_permflags
# Human-readable banner text to substitute after the bracketed
# response code in the greeting (the part eM Client fingerprints).
self._greeting_banner = greeting_banner
# Full replacement CAPABILITY token list (space-joined, no
# "CAPABILITY" prefix); applied to greeting code, login OK code, and
# untagged "* CAPABILITY" responses.
self._capability = capability
# Tokens to append to (rather than replace) every advertised
# CAPABILITY list -- e.g. b"X-ICEWARP-SERVER" to trigger eM Client's
# IceWarp detection while keeping the server's real capabilities.
self._add_capability = add_capability or []
self._greeting_pending = True
self._mode = "line" # "line" | "literal"
self._literal_remaining = 0
self._line = bytearray() # partial physical line in "line" mode
# Custom keywords captured from the most recent * FLAGS line, used to
# mirror into the following PERMANENTFLAGS line.
self._last_custom_flags: list[bytes] = []
def feed(self, chunk: bytes) -> bytes:
out = bytearray()
i, n = 0, len(chunk)
while i < n:
if self._mode == "literal":
take = min(self._literal_remaining, n - i)
out += chunk[i:i + take]
i += take
self._literal_remaining -= take
if self._literal_remaining == 0:
self._mode = "line"
continue
# "line" mode: accumulate up to and including the next LF.
j = chunk.find(b"\n", i)
if j == -1:
self._line += chunk[i:]
break
self._line += chunk[i:j + 1]
i = j + 1
phys = bytes(self._line)
self._line.clear()
body = phys[:-2] if phys.endswith(b"\r\n") else phys.rstrip(b"\n")
m = _LITERAL_RE.search(body)
if m:
# A literal follows this physical line; forward it verbatim
# (such lines -- FETCH responses -- are never rewrite targets).
out += phys
self._literal_remaining = int(m.group(1))
if self._literal_remaining > 0:
self._mode = "literal"
else:
out += self._rewrite_line(phys)
return bytes(out)
# -- per-line rewriting ------------------------------------------------
def _rewrite_line(self, line: bytes) -> bytes:
upper = line.upper()
# Greeting is the first untagged status line the server sends.
if self._greeting_pending and (
upper.startswith(b"* OK") or upper.startswith(b"* PREAUTH")
):
self._greeting_pending = False
return self._rewrite_greeting(line)
if (self._capability is not None or self._add_capability) and (
b"[CAPABILITY " in upper or upper.startswith(b"* CAPABILITY ")
):
return self._sub_capability(line)
if self._spoof_id_line is not None and upper.startswith(b"* ID ("):
log.info(
"Spoofing ID response",
extra={"event": "id_spoofed", "original": line.rstrip().decode(
"ascii", "replace")},
)
return self._spoof_id_line
if upper.startswith(b"* FLAGS ("):
self._capture_flags(line)
return line
if self._mirror_permflags and b"[PERMANENTFLAGS (" in upper:
return self._mirror(line)
return line
def _capture_flags(self, line: bytes) -> None:
inside = self._between(line, b"(", b")")
if inside is None:
self._last_custom_flags = []
return
# Custom keywords are those not beginning with "\" (system flags) and
# not the "\*" wildcard.
self._last_custom_flags = [
tok for tok in inside.split() if not tok.startswith(b"\\")
]
def _mirror(self, line: bytes) -> bytes:
if not self._last_custom_flags:
return line
inside = self._between(line, b"(", b")")
if inside is None:
return line
existing = set(inside.split())
additions = [f for f in self._last_custom_flags if f not in existing]
if not additions:
return line
# Insert keywords before the trailing "\*" wildcard if present so the
# wildcard stays last; otherwise append.
tokens = inside.split()
if tokens and tokens[-1] == b"\\*":
new_inside = b" ".join(tokens[:-1] + additions + [b"\\*"])
else:
new_inside = b" ".join(tokens + additions)
rebuilt = line.replace(b"(" + inside + b")", b"(" + new_inside + b")", 1)
log.info(
"Mirrored keywords into PERMANENTFLAGS",
extra={"event": "permflags_mirrored",
"added": [f.decode("ascii", "replace") for f in additions]},
)
return rebuilt
def _rewrite_greeting(self, line: bytes) -> bytes:
# Apply capability override/augment first so it covers the greeting's
# bracketed [CAPABILITY ...] code as well.
if self._capability is not None or self._add_capability:
line = self._sub_capability(line)
if self._greeting_banner is None:
return line
eol = b"\r\n" if line.endswith(b"\r\n") else b"\n"
core = line[:-len(eol)]
# Replace the human-readable banner that follows the response code.
# "* OK [CODE...] <banner>" -> keep through "]", swap the banner.
close = core.rfind(b"]")
if close != -1:
head = core[:close + 1]
else:
head = b"* OK" if core.upper().startswith(b"* OK") else b"* PREAUTH"
rebuilt = head + b" " + self._greeting_banner + eol
log.info(
"Spoofing greeting banner",
extra={"event": "greeting_spoofed",
"original": core.decode("ascii", "replace")},
)
return rebuilt
def _sub_capability(self, line: bytes) -> bytes:
eol = b"\r\n" if line.endswith(b"\r\n") else (
b"\n" if line.endswith(b"\n") else b"")
core = line[:len(line) - len(eol)] if eol else line
# Bracketed response code: [CAPABILITY ...]
bstart = core.upper().find(b"[CAPABILITY ")
if bstart != -1:
bend = core.find(b"]", bstart)
if bend != -1:
tokens = core[bstart + len(b"[CAPABILITY "):bend].split()
core = (core[:bstart] + b"[CAPABILITY "
+ self._caps(tokens) + core[bend:])
# Untagged response: * CAPABILITY ...
elif core.upper().startswith(b"* CAPABILITY "):
tokens = core[len(b"* CAPABILITY "):].split()
core = b"* CAPABILITY " + self._caps(tokens)
return core + eol
def _caps(self, tokens: list[bytes]) -> bytes:
"""Return the new space-joined capability token list."""
if self._capability is not None:
tokens = self._capability.split()
for tok in self._add_capability:
if tok not in tokens:
tokens = tokens + [tok]
return b" ".join(tokens)
@staticmethod
def _between(line: bytes, open_b: bytes, close_b: bytes) -> bytes | None:
start = line.find(open_b)
if start == -1:
return None
end = line.rfind(close_b)
if end == -1 or end < start:
return None
return line[start + 1:end]
# ---------------------------------------------------------------------------
# Connection plumbing
# ---------------------------------------------------------------------------
def _pump_raw(src: socket.socket, dst: socket.socket) -> None:
"""Forward bytes src->dst verbatim until EOF, then half-close dst."""
try:
while True:
chunk = src.recv(_RECV)
if not chunk:
break
dst.sendall(chunk)
except OSError:
pass
finally:
_safe_shutdown(dst)
def _pump_rewrite(src: socket.socket, dst: socket.socket,
rewriter: ServerStreamRewriter) -> None:
"""Forward src->dst applying the rewriter, until EOF, then half-close."""
try:
while True:
chunk = src.recv(_RECV)
if not chunk:
break
out = rewriter.feed(chunk)
if out:
dst.sendall(out)
except OSError:
pass
finally:
_safe_shutdown(dst)
def _safe_shutdown(sock: socket.socket) -> None:
try:
sock.shutdown(socket.SHUT_WR)
except OSError:
pass
def _build_spoof_id_line(name: str, vendor: str) -> bytes | None:
if not name:
return None
return (
f'* ID ("name" "{name}" "version" "1.0" "vendor" "{vendor}")\r\n'
).encode("ascii", "replace")
def handle_client(client: socket.socket, peer, cfg: dict) -> None:
upstream = None
try:
ctx = ssl.create_default_context()
raw = socket.create_connection(
(cfg["upstream_host"], cfg["upstream_port"]), timeout=30)
upstream = ctx.wrap_socket(raw, server_hostname=cfg["upstream_host"])
log.info(
"Client connected; upstream established",
extra={"event": "client_connected", "peer": f"{peer[0]}:{peer[1]}",
"upstream": f"{cfg['upstream_host']}:{cfg['upstream_port']}"},
)
rewriter = ServerStreamRewriter(
cfg["spoof_id_line"], cfg["mirror_permflags"],
greeting_banner=cfg["greeting_banner"],
capability=cfg["capability"],
add_capability=cfg["add_capability"])
t_up = threading.Thread(
target=_pump_raw, args=(client, upstream), daemon=True)
t_down = threading.Thread(
target=_pump_rewrite, args=(upstream, client, rewriter), daemon=True)
t_up.start()
t_down.start()
t_up.join()
t_down.join()
except OSError as exc:
log.warning(
"Connection error",
extra={"event": "connection_error", "peer": f"{peer[0]}:{peer[1]}",
"error": str(exc)},
)
finally:
for s in (client, upstream):
if s is not None:
try:
s.close()
except OSError:
pass
log.info(
"Client disconnected",
extra={"event": "client_disconnected", "peer": f"{peer[0]}:{peer[1]}"},
)
def _load_config() -> dict:
cfg = {
"listen_host": os.environ.get("PROXY_LISTEN_HOST", "127.0.0.1"),
"listen_port": int(os.environ.get("PROXY_LISTEN_PORT", "1430")),
"upstream_host": os.environ.get("PROXY_UPSTREAM_HOST", "imap.migadu.com"),
"upstream_port": int(os.environ.get("PROXY_UPSTREAM_PORT", "993")),
"mirror_permflags": _truthy(os.environ.get("PROXY_MIRROR_PERMANENTFLAGS")),
"tls_cert": os.environ.get("PROXY_TLS_CERT"),
"tls_key": os.environ.get("PROXY_TLS_KEY"),
}
# Default: minimal "Dovecot" spoof -- only the greeting banner is changed,
# no ID lie and no fake capabilities, because eM Client fingerprints the
# server type from the greeting banner alone (verified against Migadu).
# The IceWarp levers (ID spoof / X-ICEWARP-SERVER) remain available as
# overrides for environments where Dovecot detection regresses.
cfg["spoof_id_line"] = _build_spoof_id_line(
os.environ.get("PROXY_SPOOF_ID_NAME", ""),
os.environ.get("PROXY_SPOOF_ID_VENDOR", ""),
)
banner = os.environ.get("PROXY_SPOOF_GREETING", "Dovecot ready.")
cfg["greeting_banner"] = banner.encode("ascii", "replace") if banner else None
caps = os.environ.get("PROXY_SPOOF_CAPABILITY")
cfg["capability"] = caps.encode("ascii", "replace") if caps else None
add = os.environ.get("PROXY_ADD_CAPABILITY", "")
cfg["add_capability"] = [t.encode("ascii", "replace") for t in add.split()]
return cfg
def main() -> int:
setup_logging()
cfg = _load_config()
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind((cfg["listen_host"], cfg["listen_port"]))
listener.listen(16)
# Optional client-side TLS termination.
if cfg["tls_cert"] and cfg["tls_key"]:
srv_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
srv_ctx.load_cert_chain(cfg["tls_cert"], cfg["tls_key"])
else:
srv_ctx = None
log.info(
"Proxy listening",
extra={
"event": "proxy_started",
"listen": f"{cfg['listen_host']}:{cfg['listen_port']}",
"upstream": f"{cfg['upstream_host']}:{cfg['upstream_port']}",
"spoof_id": cfg["spoof_id_line"] is not None,
"mirror_permflags": cfg["mirror_permflags"],
"client_tls": srv_ctx is not None,
},
)
try:
while True:
client, peer = listener.accept()
if srv_ctx is not None:
try:
client = srv_ctx.wrap_socket(client, server_side=True)
except ssl.SSLError as exc:
log.warning("Client TLS handshake failed",
extra={"event": "client_tls_error",
"error": str(exc)})
client.close()
continue
threading.Thread(
target=handle_client, args=(client, peer, cfg), daemon=True
).start()
except KeyboardInterrupt:
log.info("Shutting down", extra={"event": "proxy_stopping"})
finally:
listener.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())
Editor is loading...
Leave a Comment