Untitled

 avatar
unknown
python
10 days ago
20 kB
13
Indexable
"""IMAP server-identity spoofing proxy for eM Client tag display.

Background
----------
eM Client classifies Migadu's IMAP front-end (which identifies itself via
the ``ID`` command as ``"Sora"`` from ``"Migadu-Mail GmbH"``) as server type
*Other*.  On *Other* servers eM Client refuses to render custom IMAP
keywords, so the tags this daemon applies (``INFO``, ``IMPORTANT``,
…) become invisible -- even though the server stores them correctly and
advertises ``\\*`` in ``PERMANENTFLAGS``.

The tags are *received* by eM Client; they are simply not *displayed*.  The
fix therefore is not to add tags differently but to make eM Client believe
it is talking to a server type it already trusts for custom keywords.

Testing against eM Client pinned down exactly how it decides the type:

  * It fingerprints the server from the **greeting banner**.  Rewriting only
    the banner text to ``Dovecot ready.`` flips detection to *Dovecot* even
    though the ``* ID`` response and ``CAPABILITY`` list stay 100% Migadu's
    own -- so neither ``ID`` nor capabilities play a role.
  * (Spoofing ``ID`` to "IceWarp" alone did *nothing*; injecting the
    ``X-ICEWARP-SERVER`` capability flips detection to *IceWarp*.  Both are
    kept as fallback levers but are off by default.)

Default behaviour is therefore the **minimal, lowest-risk** spoof: change one
string (the greeting banner) to ``Dovecot ready.`` and forward everything else
untouched.  Dovecot is the natural choice because Migadu's real capability set
(CONDSTORE/QRESYNC/MOVE/ESEARCH/...) is already Dovecot-shaped, and Dovecot has
no proprietary command extensions for eM Client to attempt against us.

What it does
------------
A transparent, line-oriented IMAP proxy:

  eM Client  --(plaintext loopback)-->  this proxy  --(TLS)-->  imap.migadu.com

By default everything is forwarded verbatim **except** the greeting banner,
which becomes ``Dovecot ready.`` (``PROXY_SPOOF_GREETING``).

Optional levers, all off by default:

  * ``PROXY_ADD_CAPABILITY`` -- append tokens (e.g. ``X-ICEWARP-SERVER``) to
    every advertised ``CAPABILITY`` list, preserving the real ones.
  * ``PROXY_SPOOF_ID_NAME`` / ``PROXY_SPOOF_ID_VENDOR`` -- rewrite ``* ID``.
  * ``PROXY_SPOOF_CAPABILITY`` -- fully replace the capability list.
  * ``PROXY_MIRROR_PERMANENTFLAGS=1`` -- mirror custom keywords from
    ``* FLAGS`` into ``* OK [PERMANENTFLAGS (...)]``.

The server->client stream is parsed with full IMAP literal awareness so
that binary message bodies (``BODY[] {1234}``) are never mis-framed or
corrupted; only complete control lines are eligible for rewriting.

eM Client setup (loopback PoC)
------------------------------
  IMAP host   : 127.0.0.1
  IMAP port   : 1430   (PROXY_LISTEN_PORT)
  Security    : None / No encryption
  Username/pw : unchanged (forwarded to Migadu as-is)

Loopback plaintext never leaves the machine, so it is safe.  eM Client
caches the detected server type per account, so after pointing it at the
proxy you will likely need to remove and re-add the account to force
re-detection.

To terminate TLS on the client side instead (so you need not disable
encryption), set ``PROXY_TLS_CERT`` and ``PROXY_TLS_KEY`` to a cert/key eM
Client trusts; the listener then speaks TLS.

Run
---
  python -m migadu_tag.proxy

Environment
-----------
  PROXY_LISTEN_HOST           default 127.0.0.1
  PROXY_LISTEN_PORT           default 1430
  PROXY_UPSTREAM_HOST         default imap.migadu.com
  PROXY_UPSTREAM_PORT         default 993
  PROXY_SPOOF_GREETING        greeting banner text; default "Dovecot ready."
                              (set empty to leave the greeting untouched)
  PROXY_SPOOF_ID_NAME         default empty (no ID spoof; cosmetic fallback)
  PROXY_SPOOF_ID_VENDOR       default empty
  PROXY_ADD_CAPABILITY        tokens to append to every CAPABILITY list;
                              default empty. Set "X-ICEWARP-SERVER" to force
                              eM Client's IceWarp detection instead. Space-sep.
  PROXY_SPOOF_CAPABILITY      optional FULL CAPABILITY replacement token list
                              (overrides the server's real capabilities)
  PROXY_MIRROR_PERMANENTFLAGS default off       (1/true/yes to enable)
  PROXY_TLS_CERT / PROXY_TLS_KEY  optional client-side TLS termination
  LOG_LEVEL                   default INFO
"""

import logging
import os
import re
import socket
import ssl
import threading

from .logging import setup_logging

log = logging.getLogger(__name__)

# Literal marker at the end of a physical line: {123} or {123+} (non-sync).
# When present, exactly that many raw bytes follow the CRLF and must be
# forwarded untouched.
_LITERAL_RE = re.compile(rb"\{(\d+)\+?\}$")

_RECV = 65536


def _truthy(value: str | None) -> bool:
    return (value or "").strip().lower() in {"1", "true", "yes", "on"}


# ---------------------------------------------------------------------------
# Server -> client stream rewriter (literal-aware)
# ---------------------------------------------------------------------------

class ServerStreamRewriter:
    """Reframe the server->client byte stream and rewrite control lines.

    ``feed(chunk)`` returns the (possibly modified) bytes to forward to the
    client.  State is carried across calls so chunk boundaries are handled
    transparently.

    Only *complete control lines* with no trailing literal are passed to the
    per-line rewriter; literal payloads (message bodies, etc.) are forwarded
    byte-for-byte and never inspected.
    """

    def __init__(self, spoof_id_line: bytes | None, mirror_permflags: bool,
                 greeting_banner: bytes | None = None,
                 capability: bytes | None = None,
                 add_capability: list[bytes] | None = None):
        self._spoof_id_line = spoof_id_line
        self._mirror_permflags = mirror_permflags
        # Human-readable banner text to substitute after the bracketed
        # response code in the greeting (the part eM Client fingerprints).
        self._greeting_banner = greeting_banner
        # Full replacement CAPABILITY token list (space-joined, no
        # "CAPABILITY" prefix); applied to greeting code, login OK code, and
        # untagged "* CAPABILITY" responses.
        self._capability = capability
        # Tokens to append to (rather than replace) every advertised
        # CAPABILITY list -- e.g. b"X-ICEWARP-SERVER" to trigger eM Client's
        # IceWarp detection while keeping the server's real capabilities.
        self._add_capability = add_capability or []
        self._greeting_pending = True
        self._mode = "line"          # "line" | "literal"
        self._literal_remaining = 0
        self._line = bytearray()     # partial physical line in "line" mode
        # Custom keywords captured from the most recent * FLAGS line, used to
        # mirror into the following PERMANENTFLAGS line.
        self._last_custom_flags: list[bytes] = []

    def feed(self, chunk: bytes) -> bytes:
        out = bytearray()
        i, n = 0, len(chunk)
        while i < n:
            if self._mode == "literal":
                take = min(self._literal_remaining, n - i)
                out += chunk[i:i + take]
                i += take
                self._literal_remaining -= take
                if self._literal_remaining == 0:
                    self._mode = "line"
                continue

            # "line" mode: accumulate up to and including the next LF.
            j = chunk.find(b"\n", i)
            if j == -1:
                self._line += chunk[i:]
                break
            self._line += chunk[i:j + 1]
            i = j + 1

            phys = bytes(self._line)
            self._line.clear()

            body = phys[:-2] if phys.endswith(b"\r\n") else phys.rstrip(b"\n")
            m = _LITERAL_RE.search(body)
            if m:
                # A literal follows this physical line; forward it verbatim
                # (such lines -- FETCH responses -- are never rewrite targets).
                out += phys
                self._literal_remaining = int(m.group(1))
                if self._literal_remaining > 0:
                    self._mode = "literal"
            else:
                out += self._rewrite_line(phys)
        return bytes(out)

    # -- per-line rewriting ------------------------------------------------

    def _rewrite_line(self, line: bytes) -> bytes:
        upper = line.upper()

        # Greeting is the first untagged status line the server sends.
        if self._greeting_pending and (
            upper.startswith(b"* OK") or upper.startswith(b"* PREAUTH")
        ):
            self._greeting_pending = False
            return self._rewrite_greeting(line)

        if (self._capability is not None or self._add_capability) and (
            b"[CAPABILITY " in upper or upper.startswith(b"* CAPABILITY ")
        ):
            return self._sub_capability(line)

        if self._spoof_id_line is not None and upper.startswith(b"* ID ("):
            log.info(
                "Spoofing ID response",
                extra={"event": "id_spoofed", "original": line.rstrip().decode(
                    "ascii", "replace")},
            )
            return self._spoof_id_line

        if upper.startswith(b"* FLAGS ("):
            self._capture_flags(line)
            return line

        if self._mirror_permflags and b"[PERMANENTFLAGS (" in upper:
            return self._mirror(line)

        return line

    def _capture_flags(self, line: bytes) -> None:
        inside = self._between(line, b"(", b")")
        if inside is None:
            self._last_custom_flags = []
            return
        # Custom keywords are those not beginning with "\" (system flags) and
        # not the "\*" wildcard.
        self._last_custom_flags = [
            tok for tok in inside.split() if not tok.startswith(b"\\")
        ]

    def _mirror(self, line: bytes) -> bytes:
        if not self._last_custom_flags:
            return line
        inside = self._between(line, b"(", b")")
        if inside is None:
            return line
        existing = set(inside.split())
        additions = [f for f in self._last_custom_flags if f not in existing]
        if not additions:
            return line
        # Insert keywords before the trailing "\*" wildcard if present so the
        # wildcard stays last; otherwise append.
        tokens = inside.split()
        if tokens and tokens[-1] == b"\\*":
            new_inside = b" ".join(tokens[:-1] + additions + [b"\\*"])
        else:
            new_inside = b" ".join(tokens + additions)
        rebuilt = line.replace(b"(" + inside + b")", b"(" + new_inside + b")", 1)
        log.info(
            "Mirrored keywords into PERMANENTFLAGS",
            extra={"event": "permflags_mirrored",
                   "added": [f.decode("ascii", "replace") for f in additions]},
        )
        return rebuilt

    def _rewrite_greeting(self, line: bytes) -> bytes:
        # Apply capability override/augment first so it covers the greeting's
        # bracketed [CAPABILITY ...] code as well.
        if self._capability is not None or self._add_capability:
            line = self._sub_capability(line)
        if self._greeting_banner is None:
            return line
        eol = b"\r\n" if line.endswith(b"\r\n") else b"\n"
        core = line[:-len(eol)]
        # Replace the human-readable banner that follows the response code.
        # "* OK [CODE...] <banner>"  ->  keep through "]", swap the banner.
        close = core.rfind(b"]")
        if close != -1:
            head = core[:close + 1]
        else:
            head = b"* OK" if core.upper().startswith(b"* OK") else b"* PREAUTH"
        rebuilt = head + b" " + self._greeting_banner + eol
        log.info(
            "Spoofing greeting banner",
            extra={"event": "greeting_spoofed",
                   "original": core.decode("ascii", "replace")},
        )
        return rebuilt

    def _sub_capability(self, line: bytes) -> bytes:
        eol = b"\r\n" if line.endswith(b"\r\n") else (
            b"\n" if line.endswith(b"\n") else b"")
        core = line[:len(line) - len(eol)] if eol else line
        # Bracketed response code: [CAPABILITY ...]
        bstart = core.upper().find(b"[CAPABILITY ")
        if bstart != -1:
            bend = core.find(b"]", bstart)
            if bend != -1:
                tokens = core[bstart + len(b"[CAPABILITY "):bend].split()
                core = (core[:bstart] + b"[CAPABILITY "
                        + self._caps(tokens) + core[bend:])
        # Untagged response: * CAPABILITY ...
        elif core.upper().startswith(b"* CAPABILITY "):
            tokens = core[len(b"* CAPABILITY "):].split()
            core = b"* CAPABILITY " + self._caps(tokens)
        return core + eol

    def _caps(self, tokens: list[bytes]) -> bytes:
        """Return the new space-joined capability token list."""
        if self._capability is not None:
            tokens = self._capability.split()
        for tok in self._add_capability:
            if tok not in tokens:
                tokens = tokens + [tok]
        return b" ".join(tokens)

    @staticmethod
    def _between(line: bytes, open_b: bytes, close_b: bytes) -> bytes | None:
        start = line.find(open_b)
        if start == -1:
            return None
        end = line.rfind(close_b)
        if end == -1 or end < start:
            return None
        return line[start + 1:end]


# ---------------------------------------------------------------------------
# Connection plumbing
# ---------------------------------------------------------------------------

def _pump_raw(src: socket.socket, dst: socket.socket) -> None:
    """Forward bytes src->dst verbatim until EOF, then half-close dst."""
    try:
        while True:
            chunk = src.recv(_RECV)
            if not chunk:
                break
            dst.sendall(chunk)
    except OSError:
        pass
    finally:
        _safe_shutdown(dst)


def _pump_rewrite(src: socket.socket, dst: socket.socket,
                  rewriter: ServerStreamRewriter) -> None:
    """Forward src->dst applying the rewriter, until EOF, then half-close."""
    try:
        while True:
            chunk = src.recv(_RECV)
            if not chunk:
                break
            out = rewriter.feed(chunk)
            if out:
                dst.sendall(out)
    except OSError:
        pass
    finally:
        _safe_shutdown(dst)


def _safe_shutdown(sock: socket.socket) -> None:
    try:
        sock.shutdown(socket.SHUT_WR)
    except OSError:
        pass


def _build_spoof_id_line(name: str, vendor: str) -> bytes | None:
    if not name:
        return None
    return (
        f'* ID ("name" "{name}" "version" "1.0" "vendor" "{vendor}")\r\n'
    ).encode("ascii", "replace")


def handle_client(client: socket.socket, peer, cfg: dict) -> None:
    upstream = None
    try:
        ctx = ssl.create_default_context()
        raw = socket.create_connection(
            (cfg["upstream_host"], cfg["upstream_port"]), timeout=30)
        upstream = ctx.wrap_socket(raw, server_hostname=cfg["upstream_host"])
        log.info(
            "Client connected; upstream established",
            extra={"event": "client_connected", "peer": f"{peer[0]}:{peer[1]}",
                   "upstream": f"{cfg['upstream_host']}:{cfg['upstream_port']}"},
        )

        rewriter = ServerStreamRewriter(
            cfg["spoof_id_line"], cfg["mirror_permflags"],
            greeting_banner=cfg["greeting_banner"],
            capability=cfg["capability"],
            add_capability=cfg["add_capability"])

        t_up = threading.Thread(
            target=_pump_raw, args=(client, upstream), daemon=True)
        t_down = threading.Thread(
            target=_pump_rewrite, args=(upstream, client, rewriter), daemon=True)
        t_up.start()
        t_down.start()
        t_up.join()
        t_down.join()
    except OSError as exc:
        log.warning(
            "Connection error",
            extra={"event": "connection_error", "peer": f"{peer[0]}:{peer[1]}",
                   "error": str(exc)},
        )
    finally:
        for s in (client, upstream):
            if s is not None:
                try:
                    s.close()
                except OSError:
                    pass
        log.info(
            "Client disconnected",
            extra={"event": "client_disconnected", "peer": f"{peer[0]}:{peer[1]}"},
        )


def _load_config() -> dict:
    cfg = {
        "listen_host": os.environ.get("PROXY_LISTEN_HOST", "127.0.0.1"),
        "listen_port": int(os.environ.get("PROXY_LISTEN_PORT", "1430")),
        "upstream_host": os.environ.get("PROXY_UPSTREAM_HOST", "imap.migadu.com"),
        "upstream_port": int(os.environ.get("PROXY_UPSTREAM_PORT", "993")),
        "mirror_permflags": _truthy(os.environ.get("PROXY_MIRROR_PERMANENTFLAGS")),
        "tls_cert": os.environ.get("PROXY_TLS_CERT"),
        "tls_key": os.environ.get("PROXY_TLS_KEY"),
    }
    # Default: minimal "Dovecot" spoof -- only the greeting banner is changed,
    # no ID lie and no fake capabilities, because eM Client fingerprints the
    # server type from the greeting banner alone (verified against Migadu).
    # The IceWarp levers (ID spoof / X-ICEWARP-SERVER) remain available as
    # overrides for environments where Dovecot detection regresses.
    cfg["spoof_id_line"] = _build_spoof_id_line(
        os.environ.get("PROXY_SPOOF_ID_NAME", ""),
        os.environ.get("PROXY_SPOOF_ID_VENDOR", ""),
    )
    banner = os.environ.get("PROXY_SPOOF_GREETING", "Dovecot ready.")
    cfg["greeting_banner"] = banner.encode("ascii", "replace") if banner else None
    caps = os.environ.get("PROXY_SPOOF_CAPABILITY")
    cfg["capability"] = caps.encode("ascii", "replace") if caps else None
    add = os.environ.get("PROXY_ADD_CAPABILITY", "")
    cfg["add_capability"] = [t.encode("ascii", "replace") for t in add.split()]
    return cfg


def main() -> int:
    setup_logging()
    cfg = _load_config()

    listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listener.bind((cfg["listen_host"], cfg["listen_port"]))
    listener.listen(16)

    # Optional client-side TLS termination.
    if cfg["tls_cert"] and cfg["tls_key"]:
        srv_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        srv_ctx.load_cert_chain(cfg["tls_cert"], cfg["tls_key"])
    else:
        srv_ctx = None

    log.info(
        "Proxy listening",
        extra={
            "event": "proxy_started",
            "listen": f"{cfg['listen_host']}:{cfg['listen_port']}",
            "upstream": f"{cfg['upstream_host']}:{cfg['upstream_port']}",
            "spoof_id": cfg["spoof_id_line"] is not None,
            "mirror_permflags": cfg["mirror_permflags"],
            "client_tls": srv_ctx is not None,
        },
    )

    try:
        while True:
            client, peer = listener.accept()
            if srv_ctx is not None:
                try:
                    client = srv_ctx.wrap_socket(client, server_side=True)
                except ssl.SSLError as exc:
                    log.warning("Client TLS handshake failed",
                                extra={"event": "client_tls_error",
                                       "error": str(exc)})
                    client.close()
                    continue
            threading.Thread(
                target=handle_client, args=(client, peer, cfg), daemon=True
            ).start()
    except KeyboardInterrupt:
        log.info("Shutting down", extra={"event": "proxy_stopping"})
    finally:
        listener.close()
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
Editor is loading...
Leave a Comment