Untitled
unknown
rust
9 months ago
1.7 kB
7
Indexable
use std::{path::PathBuf, str::FromStr};
use tokio::io::{self, AsyncBufReadExt, BufReader};
use tokio::fs::File;
use crate::models::{raw_data::JsonData,trade_data::TradeData};
use crate::kafka::*;
use rdkafka::producer::FutureProducer;
use std::time::Instant;
pub mod models;
pub mod kafka;
#[tokio::main]
async fn main() ->io::Result<()> {
let config = KafkaOutputConfig{
brokers:vec!["192.168.1.239:9092".to_string()],
topic:String::from("my-topic"),
key:None,
client_id:None,
compression:Some(CompressionType::Lz4),
acks:Some("1".to_string())
};
let start = Instant::now();
let kafka_output:KafkaOutput<FutureProducer> =KafkaOutput::new(config);
kafka_output.connect().await;
let path = PathBuf::from_str("1000.txt").unwrap();
let file = File::open(path).await.expect("Failed to open file");
let reader = BufReader::with_capacity(4 * 1024 * 1024,file);
let mut lines = reader.lines();
let mut count = 0;
while let Some(line) = lines.next_line().await? {
if let Some((_,data)) = line.split_once(' '){
let json = JsonData::from(data);
for raw in json.raw_data.iter(){
let pro:TradeData = raw.into();
//println!("{:?}", &pro);
kafka_output.write(pro).await;
count+=1;
}
}
}
let duration = start.elapsed();
//Execution time: 13.4353252s
println!("Execution time: {:?}", duration);
if let Err(e) = kafka_output.close().await{
println!("Close Error: {}",e);
}
//Count: 1593
println!("Count: {}",count);
Ok(())
}
Editor is loading...
Leave a Comment