Untitled
unknown
golang
4 years ago
1.9 kB
9
Indexable
package kafka
import (
"context"
"github.com/segmentio/kafka-go"
"log"
"os"
"os/signal"
"syscall"
)
type Handler func(message []byte) error
type ConsumerFactory func(topic string, group string, handler Handler) *Consumer
func CreateConsumerFactory(brokers []string) ConsumerFactory {
return func(topic string, group string, handler Handler) *Consumer {
run := func(ctx context.Context, done chan bool) error {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: group,
Topic: topic,
})
defer func() {
log.Printf("Closing consumer %s...", group)
if err := r.Close(); err != nil {
log.Printf("Failed to close consumer %s", group)
}
done <- true
}()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
log.Printf("[%s] consuming message faied: %s", group, err.Error())
return err
}
if err = handler(m.Value); err != nil {
log.Printf("[%s] handling message faied: %s", group, err.Error())
} else {
if err = r.CommitMessages(ctx, m); err != nil {
log.Printf("[%s] committing message faied: %s", group, err.Error())
return err
}
}
}
}
return &Consumer{
Topic: topic,
Group: group,
Run: run,
}
}
}
type Consumer struct {
Topic string
Group string
Run func(ctx context.Context, done chan bool) error
}
func (c *Consumer) Start(ctx context.Context) {
sigChan := make(chan os.Signal, 1)
done := make(chan bool)
signal.Notify(sigChan,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
ctx, cancel := context.WithCancel(ctx)
go func() {
err := c.Run(ctx, done)
log.Print(err)
}()
go func() {
<-sigChan
log.Printf("[%s] Sutdown signal received, shuttingdown consumer...", c.Group)
cancel()
}()
<-done
log.Printf("[%s] consumer has beend stopped", c.Group)
}
Editor is loading...