Untitled

mail@pastecode.io avatar
unknown
golang
11 days ago
7.9 kB
2
Indexable
Never
package main

import (
    "database/sql"
    "fmt"
    "log"
    "sync"
    "time"

    _ "github.com/lib/pq" // 使用PostgreSQL
)

const (
    MaxOrders  = 100 // 最大不同订单数的阈值
    BatchSize  = 100 // 每次批量处理的订单数量
    KafkaTopic = "order_reports"
)

// 订单报告结构体
type Report struct {
    OrderID  string
    Sequence int
    Data     string
    Status   string // unprocessed, failed, sent
}

// 数据库连接
var db *sql.DB
var mu sync.Mutex // 保护共享资源的锁

// 初始化数据库
func initDB() {
    var err error
    db, err = sql.Open("postgres", "user=postgres password=example dbname=orders sslmode=disable")
    if err != nil {
        log.Fatal("Failed to connect to database:", err)
    }
}

// 模拟从 Kafka 消费数据
func consumeFromKafka() Report {
    // 模拟 Kafka 消息消费,实际应用中替换成 Kafka 消费逻辑
    time.Sleep(time.Second) // 模拟消费延迟
    return Report{OrderID: fmt.Sprintf("order-%d", time.Now().UnixNano()%1000), Sequence: 1, Data: "some data", Status: "unprocessed"}
}

// 模拟发送批量订单报告到下游
func sendToDownstream(reports []Report) (bool, []Report) {
    // 模拟下游接口调用成功与失败
    time.Sleep(500 * time.Millisecond)
    success := len(reports) % 2 == 0 // 模拟每次发送偶尔失败
    var failedReports []Report
    if !success {
        failedReports = reports // 假设所有报告都失败了
    }
    return success, failedReports
}

// 检查订单报告是否已经存在于数据库中
func reportExists(orderID string, sequence int) (bool, error) {
    var exists bool
    err := db.QueryRow("SELECT EXISTS(SELECT 1 FROM reports WHERE order_id = $1 AND sequence = $2)", orderID, sequence).Scan(&exists)
    return exists, err
}

// 保存订单报告到数据库
func saveToDatabase(report Report) error {
    // 检查报告是否已经存在
    exists, err := reportExists(report.OrderID, report.Sequence)
    if err != nil {
        return err
    }
    if exists {
        log.Printf("Report for OrderID %s with Sequence %d already exists. Skipping database insert.", report.OrderID, report.Sequence)
        return nil // 如果已存在,返回 nil,不进行插入
    }

    _, err = db.Exec("INSERT INTO reports (order_id, sequence, data, status) VALUES ($1, $2, $3, $4)", report.OrderID, report.Sequence, report.Data, report.Status)
    return err
}

// 更新订单报告的状态
func updateReportStatus(orderID string, sequence int, status string) error {
    _, err := db.Exec("UPDATE reports SET status = $1 WHERE order_id = $2 AND sequence = $3", status, orderID, sequence)
    return err
}

// 从数据库中恢复未处理的订单报告
func recoverReports(pendingReports map[string][]Report) {
    rows, err := db.Query("SELECT order_id, sequence, data, status FROM reports WHERE status = 'unprocessed'")
    if err != nil {
        log.Println("Failed to retrieve unprocessed reports:", err)
        return
    }
    defer rows.Close()

    for rows.Next() {
        var report Report
        if err := rows.Scan(&report.OrderID, &report.Sequence, &report.Data, &report.Status); err != nil {
            log.Println("Failed to scan report:", err)
            continue
        }

        mu.Lock()
        pendingReports[report.OrderID] = append(pendingReports[report.OrderID], report)
        mu.Unlock()
    }
}

// 消息处理逻辑
func processReports(reportChan chan Report, closeCh chan bool, doneCh chan bool) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    pendingReports := make(map[string][]Report)
    reportCount := 0
    isShuttingDown := false

    // 恢复未处理的报告
    recoverReports(pendingReports)

    for {
        select {
        case report := <-reportChan:
            if isShuttingDown {
                log.Println("Received report after shutdown signal, skipping:", report.OrderID)
                continue
            }

            // 将报告加入内存队列
            mu.Lock()
            pendingReports[report.OrderID] = append(pendingReports[report.OrderID], report)
            reportCount++
            mu.Unlock()

            // 先保存到数据库以防止丢失
            err := saveToDatabase(report)
            if err != nil {
                log.Println("Failed to save report:", err)
            }

            // 达到100个不同订单,处理批量发送
            if len(pendingReports) >= MaxOrders {
                processBatch(pendingReports, &reportCount)
            }

        case <-ticker.C: // 使用 Ticker 定时触发
            // 定时器每5秒触发一次批量处理
            if reportCount > 0 {
                processBatch(pendingReports, &reportCount)
            }

        case <-closeCh:
            log.Println("Shutdown signal received, finishing pending reports...")
            isShuttingDown = true
            continue

        default:
            // 如果正在关闭并且没有剩余报告,退出循环
            if isShuttingDown && reportCount == 0 {
                log.Println("All reports processed, sending done signal.")
                doneCh <- true // 通知主程序已安全关闭
                return
            }
        }
    }
}

// 批量处理并发送订单报告
func processBatch(pendingReports map[string][]Report, reportCount *int) {
    var batch []Report

    // 收集100个不同订单的最小序号报告
    for orderID, reports := range pendingReports {
        if len(reports) > 0 {
            batch = append(batch, reports[0]) // 获取每个订单的第一个报告(最小序号)
            if len(batch) >= BatchSize {
                break
            }
        }
    }

    if len(batch) == 0 {
        return // 没有可以发送的报告
    }

    // 发送批量报告到下游
    success, failedReports := sendToDownstream(batch)
    if success {
        removeFromMemory(pendingReports, batch, reportCount) // 成功后移除内存中的报告
    } else {
        reQueueFailedReports(failedReports, pendingReports) // 将失败的报告重新入队
    }
}

// 从内存中移除已成功发送的报告
func removeFromMemory(pendingReports map[string][]Report, reports []Report, reportCount *int) {
    for _, report := range reports {
        mu.Lock()
        orderReports, exists := pendingReports[report.OrderID]
        if exists {
            pendingReports[report.OrderID] = orderReports[1:] // 移除已发送的报告
            if len(pendingReports[report.OrderID]) == 0 {
                delete(pendingReports, report.OrderID) // 如果订单所有报告都已发送,移除订单
                (*reportCount)--
            }
            // 更新数据库状态
            updateReportStatus(report.OrderID, report.Sequence, "sent")
        }
        mu.Unlock()
    }
}

// 将失败的报告重新入队,确保序号的顺序
func reQueueFailedReports(reports []Report, pendingReports map[string][]Report) {
    for _, report := range reports {
        mu.Lock()
        existingReports := pendingReports[report.OrderID]
        // 需要插入到正确的位置,保持序号顺序
        insertIndex := 0
        for insertIndex < len(existingReports) && existingReports[insertIndex].Sequence < report.Sequence {
            insertIndex++
        }
        // 在合适的位置插入报告
        pendingReports[report.OrderID] = append(existingReports[:insertIndex], append([]Report{report}, existingReports[insertIndex:]...)...)
        mu.Unlock()
    }
}

// 主函数
func main() {
    initDB()
    defer db.Close()

    // 创建报告通道和关闭信号通道
    reportChan := make(chan Report)
    closeCh := make(chan bool)
    doneCh := make(chan bool)

    // 启动报告处理器 Goroutine
    go process
Leave a Comment