Untitled
unknown
golang
11 days ago
7.9 kB
2
Indexable
Never
package main import ( "database/sql" "fmt" "log" "sync" "time" _ "github.com/lib/pq" // 使用PostgreSQL ) const ( MaxOrders = 100 // 最大不同订单数的阈值 BatchSize = 100 // 每次批量处理的订单数量 KafkaTopic = "order_reports" ) // 订单报告结构体 type Report struct { OrderID string Sequence int Data string Status string // unprocessed, failed, sent } // 数据库连接 var db *sql.DB var mu sync.Mutex // 保护共享资源的锁 // 初始化数据库 func initDB() { var err error db, err = sql.Open("postgres", "user=postgres password=example dbname=orders sslmode=disable") if err != nil { log.Fatal("Failed to connect to database:", err) } } // 模拟从 Kafka 消费数据 func consumeFromKafka() Report { // 模拟 Kafka 消息消费,实际应用中替换成 Kafka 消费逻辑 time.Sleep(time.Second) // 模拟消费延迟 return Report{OrderID: fmt.Sprintf("order-%d", time.Now().UnixNano()%1000), Sequence: 1, Data: "some data", Status: "unprocessed"} } // 模拟发送批量订单报告到下游 func sendToDownstream(reports []Report) (bool, []Report) { // 模拟下游接口调用成功与失败 time.Sleep(500 * time.Millisecond) success := len(reports) % 2 == 0 // 模拟每次发送偶尔失败 var failedReports []Report if !success { failedReports = reports // 假设所有报告都失败了 } return success, failedReports } // 检查订单报告是否已经存在于数据库中 func reportExists(orderID string, sequence int) (bool, error) { var exists bool err := db.QueryRow("SELECT EXISTS(SELECT 1 FROM reports WHERE order_id = $1 AND sequence = $2)", orderID, sequence).Scan(&exists) return exists, err } // 保存订单报告到数据库 func saveToDatabase(report Report) error { // 检查报告是否已经存在 exists, err := reportExists(report.OrderID, report.Sequence) if err != nil { return err } if exists { log.Printf("Report for OrderID %s with Sequence %d already exists. Skipping database insert.", report.OrderID, report.Sequence) return nil // 如果已存在,返回 nil,不进行插入 } _, err = db.Exec("INSERT INTO reports (order_id, sequence, data, status) VALUES ($1, $2, $3, $4)", report.OrderID, report.Sequence, report.Data, report.Status) return err } // 更新订单报告的状态 func updateReportStatus(orderID string, sequence int, status string) error { _, err := db.Exec("UPDATE reports SET status = $1 WHERE order_id = $2 AND sequence = $3", status, orderID, sequence) return err } // 从数据库中恢复未处理的订单报告 func recoverReports(pendingReports map[string][]Report) { rows, err := db.Query("SELECT order_id, sequence, data, status FROM reports WHERE status = 'unprocessed'") if err != nil { log.Println("Failed to retrieve unprocessed reports:", err) return } defer rows.Close() for rows.Next() { var report Report if err := rows.Scan(&report.OrderID, &report.Sequence, &report.Data, &report.Status); err != nil { log.Println("Failed to scan report:", err) continue } mu.Lock() pendingReports[report.OrderID] = append(pendingReports[report.OrderID], report) mu.Unlock() } } // 消息处理逻辑 func processReports(reportChan chan Report, closeCh chan bool, doneCh chan bool) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() pendingReports := make(map[string][]Report) reportCount := 0 isShuttingDown := false // 恢复未处理的报告 recoverReports(pendingReports) for { select { case report := <-reportChan: if isShuttingDown { log.Println("Received report after shutdown signal, skipping:", report.OrderID) continue } // 将报告加入内存队列 mu.Lock() pendingReports[report.OrderID] = append(pendingReports[report.OrderID], report) reportCount++ mu.Unlock() // 先保存到数据库以防止丢失 err := saveToDatabase(report) if err != nil { log.Println("Failed to save report:", err) } // 达到100个不同订单,处理批量发送 if len(pendingReports) >= MaxOrders { processBatch(pendingReports, &reportCount) } case <-ticker.C: // 使用 Ticker 定时触发 // 定时器每5秒触发一次批量处理 if reportCount > 0 { processBatch(pendingReports, &reportCount) } case <-closeCh: log.Println("Shutdown signal received, finishing pending reports...") isShuttingDown = true continue default: // 如果正在关闭并且没有剩余报告,退出循环 if isShuttingDown && reportCount == 0 { log.Println("All reports processed, sending done signal.") doneCh <- true // 通知主程序已安全关闭 return } } } } // 批量处理并发送订单报告 func processBatch(pendingReports map[string][]Report, reportCount *int) { var batch []Report // 收集100个不同订单的最小序号报告 for orderID, reports := range pendingReports { if len(reports) > 0 { batch = append(batch, reports[0]) // 获取每个订单的第一个报告(最小序号) if len(batch) >= BatchSize { break } } } if len(batch) == 0 { return // 没有可以发送的报告 } // 发送批量报告到下游 success, failedReports := sendToDownstream(batch) if success { removeFromMemory(pendingReports, batch, reportCount) // 成功后移除内存中的报告 } else { reQueueFailedReports(failedReports, pendingReports) // 将失败的报告重新入队 } } // 从内存中移除已成功发送的报告 func removeFromMemory(pendingReports map[string][]Report, reports []Report, reportCount *int) { for _, report := range reports { mu.Lock() orderReports, exists := pendingReports[report.OrderID] if exists { pendingReports[report.OrderID] = orderReports[1:] // 移除已发送的报告 if len(pendingReports[report.OrderID]) == 0 { delete(pendingReports, report.OrderID) // 如果订单所有报告都已发送,移除订单 (*reportCount)-- } // 更新数据库状态 updateReportStatus(report.OrderID, report.Sequence, "sent") } mu.Unlock() } } // 将失败的报告重新入队,确保序号的顺序 func reQueueFailedReports(reports []Report, pendingReports map[string][]Report) { for _, report := range reports { mu.Lock() existingReports := pendingReports[report.OrderID] // 需要插入到正确的位置,保持序号顺序 insertIndex := 0 for insertIndex < len(existingReports) && existingReports[insertIndex].Sequence < report.Sequence { insertIndex++ } // 在合适的位置插入报告 pendingReports[report.OrderID] = append(existingReports[:insertIndex], append([]Report{report}, existingReports[insertIndex:]...)...) mu.Unlock() } } // 主函数 func main() { initDB() defer db.Close() // 创建报告通道和关闭信号通道 reportChan := make(chan Report) closeCh := make(chan bool) doneCh := make(chan bool) // 启动报告处理器 Goroutine go process
Leave a Comment