Untitled
!/opt/kafka/bin/kafka-topics.sh --create --topic cyclists-data --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 import csv from kafka import KafkaProducer import json import random import time def produce(file, hostname='localhost', port='9092', topic_name='cyclists-data', max_waiting_time_in_sec=5): producer = KafkaProducer( bootstrap_servers=hostname + ":" + port, value_serializer=lambda v: json.dumps(v).encode('ascii'), key_serializer=lambda v: json.dumps(v).encode('ascii') ) with open(file, 'r') as csv_file: csv_reader = csv.DictReader(csv_file) for row in csv_reader: message = dict(row) print("Sending: {}".format(message)) # mengirim pesan ke kafka producer.send(topic_name, value=message) sleep_time = random.randint(0, max_waiting_time_in_sec * 10) / 10 print("Sleeping for..." + str(sleep_time) + 's') time.sleep(sleep_time) producer.flush() producer.flush() #file file = '/home/ubuntu/gabungan.csv' #menampilkan produce(file) from kafka import KafkaConsumer import json consumer = KafkaConsumer( bootstrap_servers='localhost:9092', value_deserializer = lambda v: json.loads(v.decode('ascii')), auto_offset_reset='earliest' ) consumer.subscribe(topics='cyclists-data') for message in consumer: print ("%d:%d: v=%s" % (message.partition, message.offset, message.value)) import pandas as pd # Membaca dua file CSV df_202305 = pd.read_csv("202305.csv") df_202304 = pd.read_csv("202304.csv") # Menggabungkan DataFrame merged_df = pd.concat([df_202305, df_202304], ignore_index=True) # Menyimpan hasil penggabungan ke file CSV baru dengan nama "gabungan.csv" merged_df.to_csv("gabungan.csv", index=False) # Menampilkan informasi tentang DataFrame hasil penggabungan print("Jumlah baris dan kolom setelah penggabungan:", merged_df.shape)
Leave a Comment