7 days ago
7.4 kB
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */ /* eslint-disable @typescript-eslint/no-unsafe-return */ /* eslint-disable @typescript-eslint/no-misused-promises */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { Kafka, Producer, Consumer, Partitioners, logLevel, Admin, EachBatchPayload } from 'kafkajs' import { AppComponentService, BaseService, Config, LoggerService } from '../../gapo-libs-utils/libs' import { dependency } from '@foal/core' import { ICallbackClose } from '../../gapo-libs-utils/libs/services/app-component.service' import { BatchProcessingKafka } from '../utils' /** * define feature for kafka */ export interface IKafkaService { } const clientId = Config.get('kafka.client', 'string', '') const groupId = Config.get('kafka.group', 'string', 'search') const brokers = Config.get('kafka.brokers', 'string', 'search') const userName = Config.get('kafka.user_name', 'string', 'membership') const password = Config.get('kafka.password', 'string', 'uLRwuYMMesxg4eYsPx55') export class KafkaService extends BaseService implements IKafkaService, ICallbackClose { kafka: Kafka private producer: Producer private consumerMap: { [key: string]: Consumer } private admin: Admin @dependency logger: LoggerService @dependency appComponentService: AppComponentService constructor() { super() this.consumerMap = {} } close: () => Promise<void> /** * init service after dependency injected */ async boot(): Promise<void> { // this.appComponentService.addCallbackClose(this) void this.inject() console.log('KafkaService inject') } /** * warning: only inject using this function */ private async inject() { this.kafka = new Kafka({ clientId, brokers: brokers.split(','), logLevel: logLevel.WARN, sasl: { mechanism: 'plain', username: userName, password: password, }, }) this.producer = this.kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner, }) this.admin = this.kafka.admin() } async sendMessage(topic: string, messages: { key?: string, value: string, partition?: number }[]) { try { await this.producer.connect() await this.producer.send({ topic, messages, }) // console.log(`Message sent to topic ${topic} messages ${JSON.stringify(messages)}`) } catch (error) { console.error(`Error sending message to topic ${topic}:`, error) } // await this.producer.disconnect() } async handleMessage({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }: EachBatchPayload, batchProcessingKafka: BatchProcessingKafka) { try { console.log(`topictopic ${batch.topic} total ${batch.messages.length}`) const data: string[] = []; for (const message of batch.messages) { console.log(`received offset ${message.offset} key ${message.key?.toString() || ''} message ${message.value?.toString() || ''}`) data.push(message.value?.toString() || '') resolveOffset(message.offset); await heartbeat() } await commitOffsetsIfNecessary() console.log(`topic ${batch.topic} total ${batch.messages.length} data >>>: `, data) await batchProcessingKafka(data) await new Promise(resolve => setTimeout(resolve, 500)) } catch (error) { this.logger.e(`Error processing batch, topic: ${batch.topic}, will retry: `, error) } } async runConsumer(topic: string, groupId: string, batchProcessingKafka: BatchProcessingKafka) { try { if (!this.consumerMap[groupId]) { const consumer = this.kafka.consumer({ maxBytes: 1024 * 1024, // 1MB per batch minBytes: 1, maxWaitTimeInMs: 500, // Wait up to 500ms seconds for a batch groupId }) if (consumer) { this.consumerMap = { ...this.consumerMap, [groupId]: consumer } } await consumer.connect() } await this.consumerMap[groupId].subscribe({ topic: topic, fromBeginning: false }) await this.consumerMap[groupId].run({ eachBatchAutoResolve: true, eachBatch: async (payload: EachBatchPayload) => { await this.handleMessage(payload, batchProcessingKafka) }, autoCommit: false }) console.log(`Kafka Consumer topic ${topic} is running`) } catch (error) { console.error('Error running Kafka Consumer:', error) } } async getListTopic() { console.log('admin >>>: ', !!this.admin) const listTopics = async () => { try { const topicsMetadata = await this.admin.fetchTopicMetadata() let count = 0 console.log('Danh sách các topic:') topicsMetadata.topics.forEach(topic => { const numPartitions = topic.partitions.length const replicationFactor = topic.partitions[0].replicas.length if (topic.name.includes('KAFKA')) { count++ console.log(`- Topic: ${topic.name}`) console.log(` - Số phân vùng (numPartitions): ${numPartitions}`) console.log(` - Số bản sao (replicationFactor): ${replicationFactor}`) } }) console.log('Tổng topic: ', count) } catch (error) { console.error('Error listing topics:', error) } } await listTopics() } async createTopic(topicName: string, numPartitions = 1, replicationFactor = 1): Promise<void> { const admin = this.kafka.admin() await admin.createTopics({ topics: [ { topic: topicName, numPartitions: numPartitions, replicationFactor: replicationFactor, } ] }) } async listClientIds() { const groups = await this.admin.listGroups() for (const group of groups.groups) { const groupDetails = await this.admin.describeGroups([group.groupId]) groupDetails.groups.forEach(group => { group.members.forEach(member => { console.log(`GroupId: ${group.groupId}, ClientId: ${member.clientId}`) }) }) } } async deleteTopic(topic: string) { await this.admin.deleteTopics({ topics: [topic] }) console.log(`Topic "${topic}" is deleted.`) } private delay(ms: number) { return new Promise(resolve => setTimeout(resolve, ms)) } }
Leave a Comment