Untitled

 avatar
unknown
rust
a month ago
1.7 kB
4
Indexable
use std::{path::PathBuf, str::FromStr};
use tokio::io::{self, AsyncBufReadExt, BufReader};
use tokio::fs::File;
use crate::models::{raw_data::JsonData,trade_data::TradeData};
use crate::kafka::*;

use rdkafka::producer::FutureProducer;
use std::time::Instant;

pub mod models;
pub mod kafka;

#[tokio::main]
async fn main() ->io::Result<()> {
    let config = KafkaOutputConfig{
        brokers:vec!["192.168.1.239:9092".to_string()],
        topic:String::from("my-topic"),
        key:None,
        client_id:None,
        compression:Some(CompressionType::Lz4),
        acks:Some("1".to_string())
    };

    let start = Instant::now();
    let kafka_output:KafkaOutput<FutureProducer> =KafkaOutput::new(config);
    kafka_output.connect().await;

    let path = PathBuf::from_str("1000.txt").unwrap();
    let file = File::open(path).await.expect("Failed to open file");
    let reader = BufReader::with_capacity(4 * 1024 * 1024,file);

    let mut lines = reader.lines();
    let mut count = 0;
    while let Some(line) = lines.next_line().await? {
        if let Some((_,data)) = line.split_once(' '){
            let json =  JsonData::from(data);
            for raw in json.raw_data.iter(){
                let pro:TradeData = raw.into();
                //println!("{:?}", &pro);
                kafka_output.write(pro).await;
                count+=1;
                
            }
        }
    }
    let duration = start.elapsed();
    //Execution time: 13.4353252s
    println!("Execution time: {:?}", duration);
    if let Err(e) =  kafka_output.close().await{
        println!("Close Error: {}",e);
    }
    //Count: 1593
    println!("Count: {}",count);
    Ok(())
}
Editor is loading...
Leave a Comment