Untitled
unknown
golang
3 years ago
1.9 kB
6
Indexable
package kafka import ( "context" "github.com/segmentio/kafka-go" "log" "os" "os/signal" "syscall" ) type Handler func(message []byte) error type ConsumerFactory func(topic string, group string, handler Handler) *Consumer func CreateConsumerFactory(brokers []string) ConsumerFactory { return func(topic string, group string, handler Handler) *Consumer { run := func(ctx context.Context, done chan bool) error { r := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: group, Topic: topic, }) defer func() { log.Printf("Closing consumer %s...", group) if err := r.Close(); err != nil { log.Printf("Failed to close consumer %s", group) } done <- true }() for { m, err := r.FetchMessage(ctx) if err != nil { log.Printf("[%s] consuming message faied: %s", group, err.Error()) return err } if err = handler(m.Value); err != nil { log.Printf("[%s] handling message faied: %s", group, err.Error()) } else { if err = r.CommitMessages(ctx, m); err != nil { log.Printf("[%s] committing message faied: %s", group, err.Error()) return err } } } } return &Consumer{ Topic: topic, Group: group, Run: run, } } } type Consumer struct { Topic string Group string Run func(ctx context.Context, done chan bool) error } func (c *Consumer) Start(ctx context.Context) { sigChan := make(chan os.Signal, 1) done := make(chan bool) signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) ctx, cancel := context.WithCancel(ctx) go func() { err := c.Run(ctx, done) log.Print(err) }() go func() { <-sigChan log.Printf("[%s] Sutdown signal received, shuttingdown consumer...", c.Group) cancel() }() <-done log.Printf("[%s] consumer has beend stopped", c.Group) }
Editor is loading...