Untitled

 avatar
unknown
golang
3 years ago
1.9 kB
6
Indexable
package kafka

import (
	"context"
	"github.com/segmentio/kafka-go"
	"log"
	"os"
	"os/signal"
	"syscall"
)

type Handler func(message []byte) error
type ConsumerFactory func(topic string, group string, handler Handler) *Consumer

func CreateConsumerFactory(brokers []string) ConsumerFactory {
	return func(topic string, group string, handler Handler) *Consumer {
		run := func(ctx context.Context, done chan bool) error {
			r := kafka.NewReader(kafka.ReaderConfig{
				Brokers: brokers,
				GroupID: group,
				Topic:   topic,
			})
			defer func() {
				log.Printf("Closing consumer %s...", group)
				if err := r.Close(); err != nil {
					log.Printf("Failed to close consumer %s", group)
				}
				done <- true
			}()

			for {
				m, err := r.FetchMessage(ctx)
				if err != nil {
					log.Printf("[%s] consuming message faied: %s", group, err.Error())
					return err
				}

				if err = handler(m.Value); err != nil {
					log.Printf("[%s] handling message faied: %s", group, err.Error())
				} else {
					if err = r.CommitMessages(ctx, m); err != nil {
						log.Printf("[%s] committing message faied: %s", group, err.Error())
						return err
					}
				}
			}
		}
		return &Consumer{
			Topic: topic,
			Group: group,
			Run:   run,
		}
	}
}

type Consumer struct {
	Topic string
	Group string
	Run   func(ctx context.Context, done chan bool) error
}

func (c *Consumer) Start(ctx context.Context) {
	sigChan := make(chan os.Signal, 1)
	done := make(chan bool)
	signal.Notify(sigChan,
		syscall.SIGHUP,
		syscall.SIGINT,
		syscall.SIGTERM,
		syscall.SIGQUIT)

	ctx, cancel := context.WithCancel(ctx)

	go func() {
		err := c.Run(ctx, done)
		log.Print(err)
	}()

	go func() {
		<-sigChan
		log.Printf("[%s] Sutdown signal received, shuttingdown consumer...", c.Group)
		cancel()
	}()

	<-done
	log.Printf("[%s] consumer has beend stopped", c.Group)
}
Editor is loading...