Untitled
unknown
plain_text
3 months ago
12 kB
7
Indexable
package main
import (
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"log"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
proxyproto "github.com/pires/go-proxyproto"
)
type Config struct {
ListenAddr string
FlespiAddr string
HealthAddr string
ConnectTimeout time.Duration
HandshakeTimeout time.Duration
IdleTimeout time.Duration
LogHexBody bool
LogMaxHexChars int
StoreRawChunks bool
StoreDir string
AcceptedIMEIPrefix string
}
func main() {
cfg := Config{
ListenAddr: getEnv("LISTEN_ADDR", ":9000"),
HealthAddr: getEnv("HEALTH_ADDR", ":9001"),
FlespiAddr: getEnv("FLESPI_ADDR", "ch1237324.flespi.gw:22973"),
ConnectTimeout: getEnvDurationSeconds("CONNECT_TIMEOUT_SEC", 10),
HandshakeTimeout: getEnvDurationSeconds("HANDSHAKE_TIMEOUT_SEC", 20),
IdleTimeout: getEnvDurationSeconds("IDLE_TIMEOUT_SEC", 3600),
LogHexBody: getEnvBool("LOG_HEX_BODY", true),
LogMaxHexChars: getEnvInt("LOG_MAX_HEX_CHARS", 4000),
StoreRawChunks: getEnvBool("STORE_RAW_CHUNKS", false),
StoreDir: getEnv("STORE_DIR", "./data"),
AcceptedIMEIPrefix: getEnv("ACCEPTED_IMEI_PREFIX", ""),
}
go startHealthServer(cfg.HealthAddr)
if cfg.FlespiAddr == "" {
log.Fatal("FLESPI_ADDR is required")
}
if cfg.StoreRawChunks {
if err := os.MkdirAll(cfg.StoreDir, 0o755); err != nil {
log.Fatalf("failed to create store dir: %v", err)
}
}
baseLn, err := net.Listen("tcp", cfg.ListenAddr)
if err != nil {
log.Fatalf("listen error: %v", err)
}
defer baseLn.Close()
ln := &proxyproto.Listener{
Listener: baseLn,
ReadHeaderTimeout: 5 * time.Second,
}
defer ln.Close()
log.Printf("teltonika transparent proxy listening on %s", cfg.ListenAddr)
log.Printf("flespi upstream: %s", cfg.FlespiAddr)
for {
deviceConn, err := ln.Accept()
if err != nil {
log.Printf("accept error: %v", err)
continue
}
go handleDeviceConnection(deviceConn, cfg)
}
}
func startHealthServer(addr string) {
ln, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("health listen error: %v", err)
}
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
log.Printf("health accept error: %v", err)
continue
}
go func(c net.Conn) {
defer c.Close()
_ = c.SetWriteDeadline(time.Now().Add(3 * time.Second))
_, _ = c.Write([]byte("OK\n"))
}(conn)
}
}
func handleDeviceConnection(deviceConn net.Conn, cfg Config) {
defer deviceConn.Close()
deviceRemote := deviceConn.RemoteAddr().String()
if err := deviceConn.SetDeadline(time.Now().Add(cfg.HandshakeTimeout)); err != nil {
log.Printf("[WARN] set device handshake deadline failed remote=%s err=%v", deviceRemote, err)
}
deviceReader := NewConnReader(deviceConn)
imei, imeiPacket, err := readIMEIPacket(deviceReader)
if err != nil {
log.Printf("[IMEI] failed remote=%s err=%v", deviceRemote, err)
return
}
log.Printf("[CONN] device connected remote=%s imei=%s", deviceRemote, imei)
if !acceptIMEI(imei, cfg.AcceptedIMEIPrefix) {
log.Printf("[IMEI] rejected locally imei=%s", imei)
if err := writeExact(deviceConn, []byte{0x00}, cfg.HandshakeTimeout); err != nil {
log.Printf("[IMEI] local reject write failed imei=%s err=%v", imei, err)
}
return
}
flespiConn, err := net.DialTimeout("tcp", cfg.FlespiAddr, cfg.ConnectTimeout)
if err != nil {
log.Printf("[FLESPI] connect failed imei=%s upstream=%s err=%v", imei, cfg.FlespiAddr, err)
return
}
defer flespiConn.Close()
if err := flespiConn.SetDeadline(time.Now().Add(cfg.HandshakeTimeout)); err != nil {
log.Printf("[WARN] set flespi handshake deadline failed imei=%s err=%v", imei, err)
return
}
flespiReader := NewConnReader(flespiConn)
// Send IMEI packet upstream exactly as received.
if err := writeExact(flespiConn, imeiPacket, cfg.HandshakeTimeout); err != nil {
log.Printf("[FLESPI] send imei failed imei=%s err=%v", imei, err)
return
}
// Read 1-byte accept/reject from flespi and pass it back to device.
flespiIMEIAck, err := readFixed(flespiReader, 1, cfg.HandshakeTimeout, flespiConn)
if err != nil {
if isConnClosed(err) {
log.Printf("[FLESPI] upstream closed while waiting imei ack imei=%s", imei)
} else {
log.Printf("[FLESPI] read imei ack failed imei=%s err=%v", imei, err)
}
return
}
if err := writeExact(deviceConn, flespiIMEIAck, cfg.HandshakeTimeout); err != nil {
log.Printf("[DEVICE] send imei ack failed imei=%s err=%v", imei, err)
return
}
log.Printf("[IMEI] passthrough ack imei=%s ack=0x%02X", imei, flespiIMEIAck[0])
if flespiIMEIAck[0] != 0x01 {
log.Printf("[IMEI] upstream rejected imei=%s", imei)
return
}
// Clear handshake deadlines. For long-lived relay, prefer idle deadlines refreshed on every read/write.
_ = deviceConn.SetDeadline(time.Time{})
_ = flespiConn.SetDeadline(time.Time{})
sessionLogger := NewSessionLogger(cfg, imei, deviceRemote)
log.Printf("[RELAY] starting bidirectional relay imei=%s remote=%s upstream=%s", imei, deviceRemote, cfg.FlespiAddr)
errCh := make(chan error, 2)
go relayOneWay(
"device->flespi",
flespiConn,
deviceConn,
cfg.IdleTimeout,
sessionLogger,
errCh,
)
go relayOneWay(
"flespi->device",
deviceConn,
flespiConn,
cfg.IdleTimeout,
sessionLogger,
errCh,
)
firstErr := <-errCh
// Force both directions to break quickly.
_ = deviceConn.Close()
_ = flespiConn.Close()
secondErr := <-errCh
log.Printf(
"[RELAY] finished imei=%s remote=%s first_err=%v second_err=%v",
imei,
deviceRemote,
normalizeRelayErr(firstErr),
normalizeRelayErr(secondErr),
)
}
func relayOneWay(
direction string,
dst net.Conn,
src net.Conn,
idleTimeout time.Duration,
sessionLogger *SessionLogger,
errCh chan<- error,
) {
buf := make([]byte, 32*1024)
for {
if idleTimeout > 0 {
_ = src.SetReadDeadline(time.Now().Add(idleTimeout))
}
n, readErr := src.Read(buf)
if n > 0 {
chunk := make([]byte, n)
copy(chunk, buf[:n])
sessionLogger.LogChunk(direction, chunk)
if idleTimeout > 0 {
_ = dst.SetWriteDeadline(time.Now().Add(idleTimeout))
}
if err := writeAll(dst, chunk); err != nil {
errCh <- fmt.Errorf("%s write failed: %w", direction, err)
return
}
}
if readErr != nil {
if readErr == io.EOF {
errCh <- fmt.Errorf("%s closed: %w", direction, readErr)
return
}
errCh <- fmt.Errorf("%s read failed: %w", direction, readErr)
return
}
}
}
func writeAll(conn net.Conn, data []byte) error {
total := 0
for total < len(data) {
n, err := conn.Write(data[total:])
if err != nil {
return err
}
total += n
}
return nil
}
type SessionLogger struct {
cfg Config
imei string
remote string
mu sync.Mutex
}
func NewSessionLogger(cfg Config, imei, remote string) *SessionLogger {
return &SessionLogger{
cfg: cfg,
imei: imei,
remote: remote,
}
}
func (s *SessionLogger) LogChunk(direction string, payload []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if s.cfg.LogHexBody {
log.Printf(
"[CHUNK] imei=%s dir=%s bytes=%d hex=%s",
s.imei,
direction,
len(payload),
truncateString(hex.EncodeToString(payload), s.cfg.LogMaxHexChars),
)
} else {
log.Printf(
"[CHUNK] imei=%s dir=%s bytes=%d",
s.imei,
direction,
len(payload),
)
}
if s.cfg.StoreRawChunks {
if err := storeChunk(s.cfg.StoreDir, s.imei, s.remote, direction, payload); err != nil {
log.Printf("[STORE] failed imei=%s dir=%s err=%v", s.imei, direction, err)
}
}
}
func readIMEIPacket(r *ConnReader) (string, []byte, error) {
lengthBytes, err := r.ReadExact(2)
if err != nil {
return "", nil, fmt.Errorf("read imei length: %w", err)
}
imeiLen := binary.BigEndian.Uint16(lengthBytes)
if imeiLen == 0 || imeiLen > 32 {
return "", nil, fmt.Errorf("invalid imei length: %d", imeiLen)
}
imeiBytes, err := r.ReadExact(int(imeiLen))
if err != nil {
return "", nil, fmt.Errorf("read imei body: %w", err)
}
packet := make([]byte, 2+len(imeiBytes))
copy(packet[0:2], lengthBytes)
copy(packet[2:], imeiBytes)
return string(imeiBytes), packet, nil
}
func acceptIMEI(imei string, acceptedPrefix string) bool {
if len(imei) < 8 {
return false
}
if acceptedPrefix != "" && !strings.HasPrefix(imei, acceptedPrefix) {
return false
}
for _, ch := range imei {
if ch < '0' || ch > '9' {
return false
}
}
return true
}
type ConnReader struct {
conn net.Conn
mu sync.Mutex
}
func NewConnReader(conn net.Conn) *ConnReader {
return &ConnReader{conn: conn}
}
func (r *ConnReader) ReadExact(n int) ([]byte, error) {
r.mu.Lock()
defer r.mu.Unlock()
buf := make([]byte, n)
_, err := io.ReadFull(r.conn, buf)
return buf, err
}
func readFixed(r *ConnReader, n int, timeout time.Duration, conn net.Conn) ([]byte, error) {
if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
return r.ReadExact(n)
}
func writeExact(conn net.Conn, data []byte, timeout time.Duration) error {
if err := conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
return err
}
return writeAll(conn, data)
}
func storeChunk(storeDir, imei, remote, direction string, payload []byte) error {
now := time.Now().UTC()
dateDir := filepath.Join(storeDir, now.Format("2006-01-02"), imei)
if err := os.MkdirAll(dateDir, 0o755); err != nil {
return err
}
baseName := fmt.Sprintf(
"%s_dir-%s_from-%s",
now.Format("150405.000000000"),
sanitizeFilename(direction),
sanitizeFilename(remote),
)
binPath := filepath.Join(dateDir, baseName+".bin")
metaPath := filepath.Join(dateDir, baseName+".meta")
if err := os.WriteFile(binPath, payload, 0o644); err != nil {
return err
}
meta := strings.Join([]string{
"imei=" + imei,
"remote_addr=" + remote,
"direction=" + direction,
fmt.Sprintf("received_at_utc=%s", now.Format(time.RFC3339Nano)),
fmt.Sprintf("payload_size=%d", len(payload)),
"payload_hex=" + hex.EncodeToString(payload),
}, "\n")
if err := os.WriteFile(metaPath, []byte(meta), 0o644); err != nil {
return err
}
return nil
}
func sanitizeFilename(s string) string {
replacer := strings.NewReplacer(
":", "_",
"/", "_",
"\\", "_",
" ", "_",
"\t", "_",
">", "_",
"<", "_",
)
return replacer.Replace(s)
}
func truncateString(s string, max int) string {
if max <= 0 || len(s) <= max {
return s
}
return s[:max] + "...(truncated)"
}
func isConnClosed(err error) bool {
if err == nil {
return false
}
if err == io.EOF {
return true
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "use of closed network connection") ||
strings.Contains(msg, "connection reset by peer") ||
strings.Contains(msg, "broken pipe") ||
strings.Contains(msg, "i/o timeout") ||
strings.Contains(msg, "eof")
}
func normalizeRelayErr(err error) string {
if err == nil {
return "nil"
}
if isConnClosed(err) {
return "connection closed"
}
return err.Error()
}
func getEnv(key, fallback string) string {
v := strings.TrimSpace(os.Getenv(key))
if v == "" {
return fallback
}
return v
}
func getEnvBool(key string, fallback bool) bool {
v := strings.TrimSpace(strings.ToLower(os.Getenv(key)))
switch v {
case "1", "true", "yes", "y":
return true
case "0", "false", "no", "n":
return false
default:
return fallback
}
}
func getEnvInt(key string, fallback int) int {
v := strings.TrimSpace(os.Getenv(key))
if v == "" {
return fallback
}
n, err := strconv.Atoi(v)
if err != nil {
return fallback
}
return n
}
func getEnvDurationSeconds(key string, fallbackSec int) time.Duration {
return time.Duration(getEnvInt(key, fallbackSec)) * time.Second
}Editor is loading...
Leave a Comment