Untitled
unknown
plain_text
2 years ago
1.4 kB
11
Indexable
import csv from kafka import KafkaProducer import json import random import time def produce_msgs_from_csv(csv_file_path, hostname='localhost', port='9092', topic_name='pubg-matches', max_waiting_time_in_sec=5): # Function for Kafka Producer with certain settings related to Kafka's Server producer = KafkaProducer( bootstrap_servers=hostname + ":" + port, value_serializer=lambda v: json.dumps(v).encode('ascii'), key_serializer=lambda v: json.dumps(v).encode('ascii') ) with open(csv_file_path, 'r') as csv_file: csv_reader = csv.DictReader(csv_file) for row in csv_reader: message = dict(row) print("Sending: {}".format(message)) # Sending the message to Kafka producer.send(topic_name, value=message) # Sleeping time sleep_time = random.randint(0, max_waiting_time_in_sec * 10) / 10 print("Sleeping for..." + str(sleep_time) + 's') time.sleep(sleep_time) # Force flushing of all messages producer.flush() producer.flush() # Specify the path to your CSV file csv_file_path = '/home/ubuntu/PUBG_Game_Prediction_data.csv' # Call the modified function with the CSV file path produce_msgs_from_csv(csv_file_path)
Editor is loading...
Leave a Comment