Untitled
unknown
golang
a year ago
7.9 kB
9
Indexable
package main
import (
"database/sql"
"fmt"
"log"
"sync"
"time"
_ "github.com/lib/pq" // 使用PostgreSQL
)
const (
MaxOrders = 100 // 最大不同订单数的阈值
BatchSize = 100 // 每次批量处理的订单数量
KafkaTopic = "order_reports"
)
// 订单报告结构体
type Report struct {
OrderID string
Sequence int
Data string
Status string // unprocessed, failed, sent
}
// 数据库连接
var db *sql.DB
var mu sync.Mutex // 保护共享资源的锁
// 初始化数据库
func initDB() {
var err error
db, err = sql.Open("postgres", "user=postgres password=example dbname=orders sslmode=disable")
if err != nil {
log.Fatal("Failed to connect to database:", err)
}
}
// 模拟从 Kafka 消费数据
func consumeFromKafka() Report {
// 模拟 Kafka 消息消费,实际应用中替换成 Kafka 消费逻辑
time.Sleep(time.Second) // 模拟消费延迟
return Report{OrderID: fmt.Sprintf("order-%d", time.Now().UnixNano()%1000), Sequence: 1, Data: "some data", Status: "unprocessed"}
}
// 模拟发送批量订单报告到下游
func sendToDownstream(reports []Report) (bool, []Report) {
// 模拟下游接口调用成功与失败
time.Sleep(500 * time.Millisecond)
success := len(reports) % 2 == 0 // 模拟每次发送偶尔失败
var failedReports []Report
if !success {
failedReports = reports // 假设所有报告都失败了
}
return success, failedReports
}
// 检查订单报告是否已经存在于数据库中
func reportExists(orderID string, sequence int) (bool, error) {
var exists bool
err := db.QueryRow("SELECT EXISTS(SELECT 1 FROM reports WHERE order_id = $1 AND sequence = $2)", orderID, sequence).Scan(&exists)
return exists, err
}
// 保存订单报告到数据库
func saveToDatabase(report Report) error {
// 检查报告是否已经存在
exists, err := reportExists(report.OrderID, report.Sequence)
if err != nil {
return err
}
if exists {
log.Printf("Report for OrderID %s with Sequence %d already exists. Skipping database insert.", report.OrderID, report.Sequence)
return nil // 如果已存在,返回 nil,不进行插入
}
_, err = db.Exec("INSERT INTO reports (order_id, sequence, data, status) VALUES ($1, $2, $3, $4)", report.OrderID, report.Sequence, report.Data, report.Status)
return err
}
// 更新订单报告的状态
func updateReportStatus(orderID string, sequence int, status string) error {
_, err := db.Exec("UPDATE reports SET status = $1 WHERE order_id = $2 AND sequence = $3", status, orderID, sequence)
return err
}
// 从数据库中恢复未处理的订单报告
func recoverReports(pendingReports map[string][]Report) {
rows, err := db.Query("SELECT order_id, sequence, data, status FROM reports WHERE status = 'unprocessed'")
if err != nil {
log.Println("Failed to retrieve unprocessed reports:", err)
return
}
defer rows.Close()
for rows.Next() {
var report Report
if err := rows.Scan(&report.OrderID, &report.Sequence, &report.Data, &report.Status); err != nil {
log.Println("Failed to scan report:", err)
continue
}
mu.Lock()
pendingReports[report.OrderID] = append(pendingReports[report.OrderID], report)
mu.Unlock()
}
}
// 消息处理逻辑
func processReports(reportChan chan Report, closeCh chan bool, doneCh chan bool) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
pendingReports := make(map[string][]Report)
reportCount := 0
isShuttingDown := false
// 恢复未处理的报告
recoverReports(pendingReports)
for {
select {
case report := <-reportChan:
if isShuttingDown {
log.Println("Received report after shutdown signal, skipping:", report.OrderID)
continue
}
// 将报告加入内存队列
mu.Lock()
pendingReports[report.OrderID] = append(pendingReports[report.OrderID], report)
reportCount++
mu.Unlock()
// 先保存到数据库以防止丢失
err := saveToDatabase(report)
if err != nil {
log.Println("Failed to save report:", err)
}
// 达到100个不同订单,处理批量发送
if len(pendingReports) >= MaxOrders {
processBatch(pendingReports, &reportCount)
}
case <-ticker.C: // 使用 Ticker 定时触发
// 定时器每5秒触发一次批量处理
if reportCount > 0 {
processBatch(pendingReports, &reportCount)
}
case <-closeCh:
log.Println("Shutdown signal received, finishing pending reports...")
isShuttingDown = true
continue
default:
// 如果正在关闭并且没有剩余报告,退出循环
if isShuttingDown && reportCount == 0 {
log.Println("All reports processed, sending done signal.")
doneCh <- true // 通知主程序已安全关闭
return
}
}
}
}
// 批量处理并发送订单报告
func processBatch(pendingReports map[string][]Report, reportCount *int) {
var batch []Report
// 收集100个不同订单的最小序号报告
for orderID, reports := range pendingReports {
if len(reports) > 0 {
batch = append(batch, reports[0]) // 获取每个订单的第一个报告(最小序号)
if len(batch) >= BatchSize {
break
}
}
}
if len(batch) == 0 {
return // 没有可以发送的报告
}
// 发送批量报告到下游
success, failedReports := sendToDownstream(batch)
if success {
removeFromMemory(pendingReports, batch, reportCount) // 成功后移除内存中的报告
} else {
reQueueFailedReports(failedReports, pendingReports) // 将失败的报告重新入队
}
}
// 从内存中移除已成功发送的报告
func removeFromMemory(pendingReports map[string][]Report, reports []Report, reportCount *int) {
for _, report := range reports {
mu.Lock()
orderReports, exists := pendingReports[report.OrderID]
if exists {
pendingReports[report.OrderID] = orderReports[1:] // 移除已发送的报告
if len(pendingReports[report.OrderID]) == 0 {
delete(pendingReports, report.OrderID) // 如果订单所有报告都已发送,移除订单
(*reportCount)--
}
// 更新数据库状态
updateReportStatus(report.OrderID, report.Sequence, "sent")
}
mu.Unlock()
}
}
// 将失败的报告重新入队,确保序号的顺序
func reQueueFailedReports(reports []Report, pendingReports map[string][]Report) {
for _, report := range reports {
mu.Lock()
existingReports := pendingReports[report.OrderID]
// 需要插入到正确的位置,保持序号顺序
insertIndex := 0
for insertIndex < len(existingReports) && existingReports[insertIndex].Sequence < report.Sequence {
insertIndex++
}
// 在合适的位置插入报告
pendingReports[report.OrderID] = append(existingReports[:insertIndex], append([]Report{report}, existingReports[insertIndex:]...)...)
mu.Unlock()
}
}
// 主函数
func main() {
initDB()
defer db.Close()
// 创建报告通道和关闭信号通道
reportChan := make(chan Report)
closeCh := make(chan bool)
doneCh := make(chan bool)
// 启动报告处理器 Goroutine
go process
Editor is loading...
Leave a Comment