Untitled

mail@pastecode.io avatar
unknown
plain_text
2 years ago
1.2 kB
1
Indexable
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Queue struct {
	workers   int
	semaphore chan bool
}

func NewQueue(workers int) *Queue {
	return &Queue{
		workers:   workers,
		semaphore: make(chan bool, workers),
	}
}

func (q *Queue) Enqueue(ch chan int) {
	batchSize := 10
	batch := make([]int, 0, batchSize)

	for {
		select {
		case val, ok := <-ch:
			if !ok {
				return
			}

			batch = append(batch, val)
			if len(batch) >= batchSize {
				q.processBatch(batch)
				batch = make([]int, 0, batchSize)
			}
		}
	}
}

func (q *Queue) processBatch(batch []int) {
	wg := &sync.WaitGroup{}
	for _, val := range batch {
		wg.Add(1)
		q.semaphore <- true
		go func(val int) {
			defer func() { <-q.semaphore }()
			q.processRequest(val)
			wg.Done()
		}(val)
	}
	wg.Wait()
}

func (q *Queue) processRequest(val int) {
	// Simulate calling external service
	rand.Seed(time.Now().UnixNano())
	sleepTime := rand.Intn(5)
	time.Sleep(time.Duration(sleepTime) * time.Second)
	fmt.Printf("Processed request %d after %d seconds\n", val, sleepTime)
}

func main() {
	ch := make(chan int)
	queue := NewQueue(2) // Change the number of workers as needed

	go queue.Enqueue(ch)

	for i := 1; i <= 50; i++ {
		ch <- i
	}

	close(ch)
}