Untitled

mail@pastecode.io avatar
unknown
plain_text
7 days ago
7.4 kB
2
Indexable
Never
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
/* eslint-disable @typescript-eslint/no-unsafe-return */
/* eslint-disable @typescript-eslint/no-misused-promises */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */

import {
    Kafka,
    Producer,
    Consumer,
    Partitioners,
    logLevel,
    Admin,
    EachBatchPayload
} from 'kafkajs'
import { AppComponentService, BaseService, Config, LoggerService } from '../../gapo-libs-utils/libs'
import { dependency } from '@foal/core'
import { ICallbackClose } from '../../gapo-libs-utils/libs/services/app-component.service'
import {
    BatchProcessingKafka
} from '../utils'

/**
 * define feature for kafka
 */
export interface IKafkaService {

}

const clientId = Config.get('kafka.client', 'string', '103.162.92.104:9095')
const groupId = Config.get('kafka.group', 'string', 'search')
const brokers = Config.get('kafka.brokers', 'string', 'search')
const userName = Config.get('kafka.user_name', 'string', 'membership')
const password = Config.get('kafka.password', 'string', 'uLRwuYMMesxg4eYsPx55')

export class KafkaService extends BaseService implements IKafkaService, ICallbackClose {
    kafka: Kafka
    private producer: Producer
    private consumerMap: {
        [key: string]: Consumer
    }
    private admin: Admin

    @dependency
    logger: LoggerService

    @dependency
    appComponentService: AppComponentService

    constructor() {
        super()
        this.consumerMap = {}
    }

    close: () => Promise<void>

    /**
     * init service after dependency injected
     */
    async boot(): Promise<void> {
        // this.appComponentService.addCallbackClose(this)
        void this.inject()
        console.log('KafkaService inject')
    }

    /**
     * warning: only inject using this function
     */
    private async inject() {
        this.kafka = new Kafka({
            clientId,
            brokers: brokers.split(','),
            logLevel: logLevel.WARN,
            sasl: {
                mechanism: 'plain',
                username: userName,
                password: password,
            },
        })
        this.producer = this.kafka.producer({
            createPartitioner: Partitioners.LegacyPartitioner,
        })
        this.admin = this.kafka.admin()
    }

    async sendMessage(topic: string, messages: { key?: string, value: string, partition?: number }[]) {
        try {
            await this.producer.connect()
            await this.producer.send({
                topic,
                messages,
            })
            // console.log(`Message sent to topic ${topic} messages ${JSON.stringify(messages)}`)
        } catch (error) {
            console.error(`Error sending message to topic ${topic}:`, error)
        }
        // await this.producer.disconnect()
    }

    async handleMessage({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }: EachBatchPayload, batchProcessingKafka: BatchProcessingKafka) {
        try {
            console.log(`topictopic ${batch.topic} total ${batch.messages.length}`)
            const data: string[] = [];
            for (const message of batch.messages) {
                console.log(`received offset ${message.offset} key ${message.key?.toString() || ''} message ${message.value?.toString() || ''}`)
                data.push(message.value?.toString() || '')
                resolveOffset(message.offset);
                await heartbeat()
            }
            await commitOffsetsIfNecessary()
            console.log(`topic ${batch.topic} total ${batch.messages.length} data >>>: `, data)
            await batchProcessingKafka(data)
            await new Promise(resolve => setTimeout(resolve, 500))
        } catch (error) {
            this.logger.e(`Error processing batch, topic: ${batch.topic}, will retry: `, error)
        }
    }

    async runConsumer(topic: string, groupId: string, batchProcessingKafka: BatchProcessingKafka) {
        try {
            if (!this.consumerMap[groupId]) {
                const consumer = this.kafka.consumer({
                    maxBytes: 1024 * 1024, // 1MB per batch
                    minBytes: 1,
                    maxWaitTimeInMs: 500, // Wait up to 500ms seconds for a batch
                    groupId
                })
                if (consumer) {
                    this.consumerMap = {
                        ...this.consumerMap,
                        [groupId]: consumer
                    }
                }
                await consumer.connect()
            }
            await this.consumerMap[groupId].subscribe({ topic: topic, fromBeginning: false })
            await this.consumerMap[groupId].run({
                eachBatchAutoResolve: true,
                eachBatch: async (payload: EachBatchPayload) => {
                    await this.handleMessage(payload, batchProcessingKafka)
                },
                autoCommit: false
            })
            console.log(`Kafka Consumer topic ${topic} is running`)
        } catch (error) {
            console.error('Error running Kafka Consumer:', error)
        }
    }

    async getListTopic() {
        console.log('admin >>>: ', !!this.admin)
        const listTopics = async () => {
            try {
                const topicsMetadata = await this.admin.fetchTopicMetadata()
                let count = 0
                console.log('Danh sách các topic:')
                topicsMetadata.topics.forEach(topic => {
                    const numPartitions = topic.partitions.length
                    const replicationFactor = topic.partitions[0].replicas.length
                    if (topic.name.includes('KAFKA')) {
                        count++
                        console.log(`- Topic: ${topic.name}`)
                        console.log(`  - Số phân vùng (numPartitions): ${numPartitions}`)
                        console.log(`  - Số bản sao (replicationFactor): ${replicationFactor}`)
                    }
                })
                console.log('Tổng topic: ', count)
            } catch (error) {
                console.error('Error listing topics:', error)
            }
        }
        await listTopics()
    }

    async createTopic(topicName: string, numPartitions = 1, replicationFactor = 1): Promise<void> {
        const admin = this.kafka.admin()
        await admin.createTopics({
            topics: [
                {
                    topic: topicName,
                    numPartitions: numPartitions,
                    replicationFactor: replicationFactor,
                }
            ]
        })
    }

    async listClientIds() {
        const groups = await this.admin.listGroups()
        for (const group of groups.groups) {
            const groupDetails = await this.admin.describeGroups([group.groupId])
            groupDetails.groups.forEach(group => {
                group.members.forEach(member => {
                    console.log(`GroupId: ${group.groupId}, ClientId: ${member.clientId}`)
                })
            })
        }
    }

    async deleteTopic(topic: string) {
        await this.admin.deleteTopics({
            topics: [topic]
        })
        console.log(`Topic "${topic}" is deleted.`)
    }

    private delay(ms: number) {
        return new Promise(resolve => setTimeout(resolve, ms))
    }
}
Leave a Comment