Untitled

 avatar
unknown
plain_text
3 months ago
12 kB
8
Indexable
package main

import (
	"encoding/binary"
	"encoding/hex"
	"fmt"
	"io"
	"log"
	"net"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"sync"
	"time"

	proxyproto "github.com/pires/go-proxyproto"
)

type Config struct {
	ListenAddr         string
	FlespiAddr         string
	HealthAddr         string
	ConnectTimeout     time.Duration
	HandshakeTimeout   time.Duration
	IdleTimeout        time.Duration
	LogHexBody         bool
	LogMaxHexChars     int
	StoreRawChunks     bool
	StoreDir           string
	AcceptedIMEIPrefix string
}

func main() {
	cfg := Config{
		ListenAddr:         getEnv("LISTEN_ADDR", ":9000"),
		HealthAddr:         getEnv("HEALTH_ADDR", ":9001"),
		FlespiAddr:         getEnv("FLESPI_ADDR", "ch1237324.flespi.gw:22973"),
		ConnectTimeout:     getEnvDurationSeconds("CONNECT_TIMEOUT_SEC", 10),
		HandshakeTimeout:   getEnvDurationSeconds("HANDSHAKE_TIMEOUT_SEC", 20),
		IdleTimeout:        getEnvDurationSeconds("IDLE_TIMEOUT_SEC", 3600),
		LogHexBody:         getEnvBool("LOG_HEX_BODY", true),
		LogMaxHexChars:     getEnvInt("LOG_MAX_HEX_CHARS", 4000),
		StoreRawChunks:     getEnvBool("STORE_RAW_CHUNKS", false),
		StoreDir:           getEnv("STORE_DIR", "./data"),
		AcceptedIMEIPrefix: getEnv("ACCEPTED_IMEI_PREFIX", ""),
	}

	go startHealthServer(cfg.HealthAddr)

	if cfg.FlespiAddr == "" {
		log.Fatal("FLESPI_ADDR is required")
	}

	if cfg.StoreRawChunks {
		if err := os.MkdirAll(cfg.StoreDir, 0o755); err != nil {
			log.Fatalf("failed to create store dir: %v", err)
		}
	}

	baseLn, err := net.Listen("tcp", cfg.ListenAddr)
	if err != nil {
		log.Fatalf("listen error: %v", err)
	}
	defer baseLn.Close()

	ln := &proxyproto.Listener{
		Listener:          baseLn,
		ReadHeaderTimeout: 5 * time.Second,
	}
	defer ln.Close()

	log.Printf("teltonika transparent proxy listening on %s", cfg.ListenAddr)
	log.Printf("flespi upstream: %s", cfg.FlespiAddr)

	for {
		deviceConn, err := ln.Accept()
		if err != nil {
			log.Printf("accept error: %v", err)
			continue
		}

		go handleDeviceConnection(deviceConn, cfg)
	}
}

func startHealthServer(addr string) {
	ln, err := net.Listen("tcp", addr)
	if err != nil {
		log.Fatalf("health listen error: %v", err)
	}
	defer ln.Close()

	for {
		conn, err := ln.Accept()
		if err != nil {
			log.Printf("health accept error: %v", err)
			continue
		}

		go func(c net.Conn) {
			defer c.Close()
			_ = c.SetWriteDeadline(time.Now().Add(3 * time.Second))
			_, _ = c.Write([]byte("OK\n"))
		}(conn)
	}
}

func handleDeviceConnection(deviceConn net.Conn, cfg Config) {
	defer deviceConn.Close()

	deviceRemote := deviceConn.RemoteAddr().String()

	if err := deviceConn.SetDeadline(time.Now().Add(cfg.HandshakeTimeout)); err != nil {
		log.Printf("[WARN] set device handshake deadline failed remote=%s err=%v", deviceRemote, err)
	}

	deviceReader := NewConnReader(deviceConn)

	imei, imeiPacket, err := readIMEIPacket(deviceReader)
	if err != nil {
		log.Printf("[IMEI] failed remote=%s err=%v", deviceRemote, err)
		return
	}

	log.Printf("[CONN] device connected remote=%s imei=%s", deviceRemote, imei)

	if !acceptIMEI(imei, cfg.AcceptedIMEIPrefix) {
		log.Printf("[IMEI] rejected locally imei=%s", imei)
		if err := writeExact(deviceConn, []byte{0x00}, cfg.HandshakeTimeout); err != nil {
			log.Printf("[IMEI] local reject write failed imei=%s err=%v", imei, err)
		}
		return
	}

	flespiConn, err := net.DialTimeout("tcp", cfg.FlespiAddr, cfg.ConnectTimeout)
	if err != nil {
		log.Printf("[FLESPI] connect failed imei=%s upstream=%s err=%v", imei, cfg.FlespiAddr, err)
		return
	}
	defer flespiConn.Close()

	if err := flespiConn.SetDeadline(time.Now().Add(cfg.HandshakeTimeout)); err != nil {
		log.Printf("[WARN] set flespi handshake deadline failed imei=%s err=%v", imei, err)
		return
	}

	flespiReader := NewConnReader(flespiConn)

	// Send IMEI packet upstream exactly as received.
	if err := writeExact(flespiConn, imeiPacket, cfg.HandshakeTimeout); err != nil {
		log.Printf("[FLESPI] send imei failed imei=%s err=%v", imei, err)
		return
	}

	// Read 1-byte accept/reject from flespi and pass it back to device.
	flespiIMEIAck, err := readFixed(flespiReader, 1, cfg.HandshakeTimeout, flespiConn)
	if err != nil {
		if isConnClosed(err) {
			log.Printf("[FLESPI] upstream closed while waiting imei ack imei=%s", imei)
		} else {
			log.Printf("[FLESPI] read imei ack failed imei=%s err=%v", imei, err)
		}
		return
	}

	if err := writeExact(deviceConn, flespiIMEIAck, cfg.HandshakeTimeout); err != nil {
		log.Printf("[DEVICE] send imei ack failed imei=%s err=%v", imei, err)
		return
	}

	log.Printf("[IMEI] passthrough ack imei=%s ack=0x%02X", imei, flespiIMEIAck[0])

	if flespiIMEIAck[0] != 0x01 {
		log.Printf("[IMEI] upstream rejected imei=%s", imei)
		return
	}

	// Clear handshake deadlines. For long-lived relay, prefer idle deadlines refreshed on every read/write.
	_ = deviceConn.SetDeadline(time.Time{})
	_ = flespiConn.SetDeadline(time.Time{})

	sessionLogger := NewSessionLogger(cfg, imei, deviceRemote)

	log.Printf("[RELAY] starting bidirectional relay imei=%s remote=%s upstream=%s", imei, deviceRemote, cfg.FlespiAddr)

	errCh := make(chan error, 2)

	go relayOneWay(
		"device->flespi",
		flespiConn,
		deviceConn,
		cfg.IdleTimeout,
		sessionLogger,
		errCh,
	)

	go relayOneWay(
		"flespi->device",
		deviceConn,
		flespiConn,
		cfg.IdleTimeout,
		sessionLogger,
		errCh,
	)

	firstErr := <-errCh

	// Force both directions to break quickly.
	_ = deviceConn.Close()
	_ = flespiConn.Close()

	secondErr := <-errCh

	log.Printf(
		"[RELAY] finished imei=%s remote=%s first_err=%v second_err=%v",
		imei,
		deviceRemote,
		normalizeRelayErr(firstErr),
		normalizeRelayErr(secondErr),
	)
}

func relayOneWay(
	direction string,
	dst net.Conn,
	src net.Conn,
	idleTimeout time.Duration,
	sessionLogger *SessionLogger,
	errCh chan<- error,
) {
	buf := make([]byte, 32*1024)

	for {
		if idleTimeout > 0 {
			_ = src.SetReadDeadline(time.Now().Add(idleTimeout))
		}

		n, readErr := src.Read(buf)
		if n > 0 {
			chunk := make([]byte, n)
			copy(chunk, buf[:n])

			sessionLogger.LogChunk(direction, chunk)

			if idleTimeout > 0 {
				_ = dst.SetWriteDeadline(time.Now().Add(idleTimeout))
			}

			if err := writeAll(dst, chunk); err != nil {
				errCh <- fmt.Errorf("%s write failed: %w", direction, err)
				return
			}
		}

		if readErr != nil {
			if readErr == io.EOF {
				errCh <- fmt.Errorf("%s closed: %w", direction, readErr)
				return
			}
			errCh <- fmt.Errorf("%s read failed: %w", direction, readErr)
			return
		}
	}
}

func writeAll(conn net.Conn, data []byte) error {
	total := 0
	for total < len(data) {
		n, err := conn.Write(data[total:])
		if err != nil {
			return err
		}
		total += n
	}
	return nil
}

type SessionLogger struct {
	cfg    Config
	imei   string
	remote string
	mu     sync.Mutex
}

func NewSessionLogger(cfg Config, imei, remote string) *SessionLogger {
	return &SessionLogger{
		cfg:    cfg,
		imei:   imei,
		remote: remote,
	}
}

func (s *SessionLogger) LogChunk(direction string, payload []byte) {
	s.mu.Lock()
	defer s.mu.Unlock()

	if s.cfg.LogHexBody {
		log.Printf(
			"[CHUNK] imei=%s dir=%s bytes=%d hex=%s",
			s.imei,
			direction,
			len(payload),
			truncateString(hex.EncodeToString(payload), s.cfg.LogMaxHexChars),
		)
	} else {
		log.Printf(
			"[CHUNK] imei=%s dir=%s bytes=%d",
			s.imei,
			direction,
			len(payload),
		)
	}

	if s.cfg.StoreRawChunks {
		if err := storeChunk(s.cfg.StoreDir, s.imei, s.remote, direction, payload); err != nil {
			log.Printf("[STORE] failed imei=%s dir=%s err=%v", s.imei, direction, err)
		}
	}
}

func readIMEIPacket(r *ConnReader) (string, []byte, error) {
	lengthBytes, err := r.ReadExact(2)
	if err != nil {
		return "", nil, fmt.Errorf("read imei length: %w", err)
	}

	imeiLen := binary.BigEndian.Uint16(lengthBytes)
	if imeiLen == 0 || imeiLen > 32 {
		return "", nil, fmt.Errorf("invalid imei length: %d", imeiLen)
	}

	imeiBytes, err := r.ReadExact(int(imeiLen))
	if err != nil {
		return "", nil, fmt.Errorf("read imei body: %w", err)
	}

	packet := make([]byte, 2+len(imeiBytes))
	copy(packet[0:2], lengthBytes)
	copy(packet[2:], imeiBytes)

	return string(imeiBytes), packet, nil
}

func acceptIMEI(imei string, acceptedPrefix string) bool {
	if len(imei) < 8 {
		return false
	}

	if acceptedPrefix != "" && !strings.HasPrefix(imei, acceptedPrefix) {
		return false
	}

	for _, ch := range imei {
		if ch < '0' || ch > '9' {
			return false
		}
	}

	return true
}

type ConnReader struct {
	conn net.Conn
	mu   sync.Mutex
}

func NewConnReader(conn net.Conn) *ConnReader {
	return &ConnReader{conn: conn}
}

func (r *ConnReader) ReadExact(n int) ([]byte, error) {
	r.mu.Lock()
	defer r.mu.Unlock()

	buf := make([]byte, n)
	_, err := io.ReadFull(r.conn, buf)
	return buf, err
}

func readFixed(r *ConnReader, n int, timeout time.Duration, conn net.Conn) ([]byte, error) {
	if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
		return nil, err
	}
	return r.ReadExact(n)
}

func writeExact(conn net.Conn, data []byte, timeout time.Duration) error {
	if err := conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
		return err
	}
	return writeAll(conn, data)
}

func storeChunk(storeDir, imei, remote, direction string, payload []byte) error {
	now := time.Now().UTC()
	dateDir := filepath.Join(storeDir, now.Format("2006-01-02"), imei)

	if err := os.MkdirAll(dateDir, 0o755); err != nil {
		return err
	}

	baseName := fmt.Sprintf(
		"%s_dir-%s_from-%s",
		now.Format("150405.000000000"),
		sanitizeFilename(direction),
		sanitizeFilename(remote),
	)

	binPath := filepath.Join(dateDir, baseName+".bin")
	metaPath := filepath.Join(dateDir, baseName+".meta")

	if err := os.WriteFile(binPath, payload, 0o644); err != nil {
		return err
	}

	meta := strings.Join([]string{
		"imei=" + imei,
		"remote_addr=" + remote,
		"direction=" + direction,
		fmt.Sprintf("received_at_utc=%s", now.Format(time.RFC3339Nano)),
		fmt.Sprintf("payload_size=%d", len(payload)),
		"payload_hex=" + hex.EncodeToString(payload),
	}, "\n")

	if err := os.WriteFile(metaPath, []byte(meta), 0o644); err != nil {
		return err
	}

	return nil
}

func sanitizeFilename(s string) string {
	replacer := strings.NewReplacer(
		":", "_",
		"/", "_",
		"\\", "_",
		" ", "_",
		"\t", "_",
		">", "_",
		"<", "_",
	)
	return replacer.Replace(s)
}

func truncateString(s string, max int) string {
	if max <= 0 || len(s) <= max {
		return s
	}
	return s[:max] + "...(truncated)"
}

func isConnClosed(err error) bool {
	if err == nil {
		return false
	}

	if err == io.EOF {
		return true
	}

	msg := strings.ToLower(err.Error())
	return strings.Contains(msg, "use of closed network connection") ||
		strings.Contains(msg, "connection reset by peer") ||
		strings.Contains(msg, "broken pipe") ||
		strings.Contains(msg, "i/o timeout") ||
		strings.Contains(msg, "eof")
}

func normalizeRelayErr(err error) string {
	if err == nil {
		return "nil"
	}
	if isConnClosed(err) {
		return "connection closed"
	}
	return err.Error()
}

func getEnv(key, fallback string) string {
	v := strings.TrimSpace(os.Getenv(key))
	if v == "" {
		return fallback
	}
	return v
}

func getEnvBool(key string, fallback bool) bool {
	v := strings.TrimSpace(strings.ToLower(os.Getenv(key)))
	switch v {
	case "1", "true", "yes", "y":
		return true
	case "0", "false", "no", "n":
		return false
	default:
		return fallback
	}
}

func getEnvInt(key string, fallback int) int {
	v := strings.TrimSpace(os.Getenv(key))
	if v == "" {
		return fallback
	}

	n, err := strconv.Atoi(v)
	if err != nil {
		return fallback
	}
	return n
}

func getEnvDurationSeconds(key string, fallbackSec int) time.Duration {
	return time.Duration(getEnvInt(key, fallbackSec)) * time.Second
}
Editor is loading...
Leave a Comment