Untitled
unknown
plain_text
a year ago
7.4 kB
8
Indexable
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
/* eslint-disable @typescript-eslint/no-unsafe-return */
/* eslint-disable @typescript-eslint/no-misused-promises */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import {
Kafka,
Producer,
Consumer,
Partitioners,
logLevel,
Admin,
EachBatchPayload
} from 'kafkajs'
import { AppComponentService, BaseService, Config, LoggerService } from '../../gapo-libs-utils/libs'
import { dependency } from '@foal/core'
import { ICallbackClose } from '../../gapo-libs-utils/libs/services/app-component.service'
import {
BatchProcessingKafka
} from '../utils'
/**
* define feature for kafka
*/
export interface IKafkaService {
}
const clientId = Config.get('kafka.client', 'string', '103.162.92.104:9095')
const groupId = Config.get('kafka.group', 'string', 'search')
const brokers = Config.get('kafka.brokers', 'string', 'search')
const userName = Config.get('kafka.user_name', 'string', 'membership')
const password = Config.get('kafka.password', 'string', 'uLRwuYMMesxg4eYsPx55')
export class KafkaService extends BaseService implements IKafkaService, ICallbackClose {
kafka: Kafka
private producer: Producer
private consumerMap: {
[key: string]: Consumer
}
private admin: Admin
@dependency
logger: LoggerService
@dependency
appComponentService: AppComponentService
constructor() {
super()
this.consumerMap = {}
}
close: () => Promise<void>
/**
* init service after dependency injected
*/
async boot(): Promise<void> {
// this.appComponentService.addCallbackClose(this)
void this.inject()
console.log('KafkaService inject')
}
/**
* warning: only inject using this function
*/
private async inject() {
this.kafka = new Kafka({
clientId,
brokers: brokers.split(','),
logLevel: logLevel.WARN,
sasl: {
mechanism: 'plain',
username: userName,
password: password,
},
})
this.producer = this.kafka.producer({
createPartitioner: Partitioners.LegacyPartitioner,
})
this.admin = this.kafka.admin()
}
async sendMessage(topic: string, messages: { key?: string, value: string, partition?: number }[]) {
try {
await this.producer.connect()
await this.producer.send({
topic,
messages,
})
// console.log(`Message sent to topic ${topic} messages ${JSON.stringify(messages)}`)
} catch (error) {
console.error(`Error sending message to topic ${topic}:`, error)
}
// await this.producer.disconnect()
}
async handleMessage({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }: EachBatchPayload, batchProcessingKafka: BatchProcessingKafka) {
try {
console.log(`topictopic ${batch.topic} total ${batch.messages.length}`)
const data: string[] = [];
for (const message of batch.messages) {
console.log(`received offset ${message.offset} key ${message.key?.toString() || ''} message ${message.value?.toString() || ''}`)
data.push(message.value?.toString() || '')
resolveOffset(message.offset);
await heartbeat()
}
await commitOffsetsIfNecessary()
console.log(`topic ${batch.topic} total ${batch.messages.length} data >>>: `, data)
await batchProcessingKafka(data)
await new Promise(resolve => setTimeout(resolve, 500))
} catch (error) {
this.logger.e(`Error processing batch, topic: ${batch.topic}, will retry: `, error)
}
}
async runConsumer(topic: string, groupId: string, batchProcessingKafka: BatchProcessingKafka) {
try {
if (!this.consumerMap[groupId]) {
const consumer = this.kafka.consumer({
maxBytes: 1024 * 1024, // 1MB per batch
minBytes: 1,
maxWaitTimeInMs: 500, // Wait up to 500ms seconds for a batch
groupId
})
if (consumer) {
this.consumerMap = {
...this.consumerMap,
[groupId]: consumer
}
}
await consumer.connect()
}
await this.consumerMap[groupId].subscribe({ topic: topic, fromBeginning: false })
await this.consumerMap[groupId].run({
eachBatchAutoResolve: true,
eachBatch: async (payload: EachBatchPayload) => {
await this.handleMessage(payload, batchProcessingKafka)
},
autoCommit: false
})
console.log(`Kafka Consumer topic ${topic} is running`)
} catch (error) {
console.error('Error running Kafka Consumer:', error)
}
}
async getListTopic() {
console.log('admin >>>: ', !!this.admin)
const listTopics = async () => {
try {
const topicsMetadata = await this.admin.fetchTopicMetadata()
let count = 0
console.log('Danh sách các topic:')
topicsMetadata.topics.forEach(topic => {
const numPartitions = topic.partitions.length
const replicationFactor = topic.partitions[0].replicas.length
if (topic.name.includes('KAFKA')) {
count++
console.log(`- Topic: ${topic.name}`)
console.log(` - Số phân vùng (numPartitions): ${numPartitions}`)
console.log(` - Số bản sao (replicationFactor): ${replicationFactor}`)
}
})
console.log('Tổng topic: ', count)
} catch (error) {
console.error('Error listing topics:', error)
}
}
await listTopics()
}
async createTopic(topicName: string, numPartitions = 1, replicationFactor = 1): Promise<void> {
const admin = this.kafka.admin()
await admin.createTopics({
topics: [
{
topic: topicName,
numPartitions: numPartitions,
replicationFactor: replicationFactor,
}
]
})
}
async listClientIds() {
const groups = await this.admin.listGroups()
for (const group of groups.groups) {
const groupDetails = await this.admin.describeGroups([group.groupId])
groupDetails.groups.forEach(group => {
group.members.forEach(member => {
console.log(`GroupId: ${group.groupId}, ClientId: ${member.clientId}`)
})
})
}
}
async deleteTopic(topic: string) {
await this.admin.deleteTopics({
topics: [topic]
})
console.log(`Topic "${topic}" is deleted.`)
}
private delay(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms))
}
}
Editor is loading...
Leave a Comment