Untitled
unknown
plain_text
2 years ago
1.4 kB
16
Indexable
import csv
from kafka import KafkaProducer
import json
import random
import time
def produce_msgs_from_csv(csv_file_path,
hostname='localhost',
port='9092',
topic_name='pubg-matches',
max_waiting_time_in_sec=5):
# Function for Kafka Producer with certain settings related to Kafka's Server
producer = KafkaProducer(
bootstrap_servers=hostname + ":" + port,
value_serializer=lambda v: json.dumps(v).encode('ascii'),
key_serializer=lambda v: json.dumps(v).encode('ascii')
)
with open(csv_file_path, 'r') as csv_file:
csv_reader = csv.DictReader(csv_file)
for row in csv_reader:
message = dict(row)
print("Sending: {}".format(message))
# Sending the message to Kafka
producer.send(topic_name,
value=message)
# Sleeping time
sleep_time = random.randint(0, max_waiting_time_in_sec * 10) / 10
print("Sleeping for..." + str(sleep_time) + 's')
time.sleep(sleep_time)
# Force flushing of all messages
producer.flush()
producer.flush()
# Specify the path to your CSV file
csv_file_path = '/home/ubuntu/PUBG_Game_Prediction_data.csv'
# Call the modified function with the CSV file path
produce_msgs_from_csv(csv_file_path)Editor is loading...
Leave a Comment