Untitled

 avatar
unknown
plain_text
2 years ago
1.4 kB
11
Indexable
import csv
from kafka import KafkaProducer
import json
import random
import time

def produce_msgs_from_csv(csv_file_path,
                           hostname='localhost',
                           port='9092',
                           topic_name='pubg-matches',
                           max_waiting_time_in_sec=5):
    # Function for Kafka Producer with certain settings related to Kafka's Server
    producer = KafkaProducer(
        bootstrap_servers=hostname + ":" + port,
        value_serializer=lambda v: json.dumps(v).encode('ascii'),
        key_serializer=lambda v: json.dumps(v).encode('ascii')
    )

    with open(csv_file_path, 'r') as csv_file:
        csv_reader = csv.DictReader(csv_file)
        for row in csv_reader:
            message = dict(row)

            print("Sending: {}".format(message))
            # Sending the message to Kafka
            producer.send(topic_name,
                          value=message)
            
            # Sleeping time
            sleep_time = random.randint(0, max_waiting_time_in_sec * 10) / 10
            print("Sleeping for..." + str(sleep_time) + 's')
            time.sleep(sleep_time)

            # Force flushing of all messages
            producer.flush()

    producer.flush()

# Specify the path to your CSV file
csv_file_path = '/home/ubuntu/PUBG_Game_Prediction_data.csv'

# Call the modified function with the CSV file path
produce_msgs_from_csv(csv_file_path)
Editor is loading...
Leave a Comment