Untitled

 avatar
unknown
plain_text
a year ago
2.2 kB
1
Indexable
!/opt/kafka/bin/kafka-topics.sh --create --topic cyclists-data --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1






import csv
from kafka import KafkaProducer
import json
import random
import time

def produce(file,
                           hostname='localhost',
                           port='9092',
                           topic_name='cyclists-data',
                           max_waiting_time_in_sec=5):

    producer = KafkaProducer(
        bootstrap_servers=hostname + ":" + port,
        value_serializer=lambda v: json.dumps(v).encode('ascii'),
        key_serializer=lambda v: json.dumps(v).encode('ascii')
    )

    with open(file, 'r') as csv_file:
        csv_reader = csv.DictReader(csv_file)
        for row in csv_reader:
            message = dict(row)

            print("Sending: {}".format(message))
            # mengirim pesan ke kafka
            producer.send(topic_name,
                          value=message)
           
            sleep_time = random.randint(0, max_waiting_time_in_sec * 10) / 10
            print("Sleeping for..." + str(sleep_time) + 's')
            time.sleep(sleep_time)

            
            producer.flush()

    producer.flush()

#file
file = '/home/ubuntu/gabungan.csv'

#menampilkan
produce(file)






from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
 bootstrap_servers='localhost:9092',
 value_deserializer = lambda v: json.loads(v.decode('ascii')),
 auto_offset_reset='earliest'
)





consumer.subscribe(topics='cyclists-data')
for message in consumer:
  print ("%d:%d: v=%s" % (message.partition,
                      	message.offset,
                      	message.value))








import pandas as pd

# Membaca dua file CSV
df_202305 = pd.read_csv("202305.csv")
df_202304 = pd.read_csv("202304.csv")

# Menggabungkan DataFrame
merged_df = pd.concat([df_202305, df_202304], ignore_index=True)

# Menyimpan hasil penggabungan ke file CSV baru dengan nama "gabungan.csv"
merged_df.to_csv("gabungan.csv", index=False)

# Menampilkan informasi tentang DataFrame hasil penggabungan
print("Jumlah baris dan kolom setelah penggabungan:", merged_df.shape)

Leave a Comment