Untitled
unknown
rust
a month ago
1.7 kB
4
Indexable
use std::{path::PathBuf, str::FromStr}; use tokio::io::{self, AsyncBufReadExt, BufReader}; use tokio::fs::File; use crate::models::{raw_data::JsonData,trade_data::TradeData}; use crate::kafka::*; use rdkafka::producer::FutureProducer; use std::time::Instant; pub mod models; pub mod kafka; #[tokio::main] async fn main() ->io::Result<()> { let config = KafkaOutputConfig{ brokers:vec!["192.168.1.239:9092".to_string()], topic:String::from("my-topic"), key:None, client_id:None, compression:Some(CompressionType::Lz4), acks:Some("1".to_string()) }; let start = Instant::now(); let kafka_output:KafkaOutput<FutureProducer> =KafkaOutput::new(config); kafka_output.connect().await; let path = PathBuf::from_str("1000.txt").unwrap(); let file = File::open(path).await.expect("Failed to open file"); let reader = BufReader::with_capacity(4 * 1024 * 1024,file); let mut lines = reader.lines(); let mut count = 0; while let Some(line) = lines.next_line().await? { if let Some((_,data)) = line.split_once(' '){ let json = JsonData::from(data); for raw in json.raw_data.iter(){ let pro:TradeData = raw.into(); //println!("{:?}", &pro); kafka_output.write(pro).await; count+=1; } } } let duration = start.elapsed(); //Execution time: 13.4353252s println!("Execution time: {:?}", duration); if let Err(e) = kafka_output.close().await{ println!("Close Error: {}",e); } //Count: 1593 println!("Count: {}",count); Ok(()) }
Editor is loading...
Leave a Comment