Untitled
unknown
plain_text
2 years ago
2.5 kB
4
Indexable
import apache_beam as beam import pandas as pd import re import os import gzip class ConvertLogFileToDataFrameDoFn(beam.DoFn): def process(self, element): file_path = element print(file_path) # read the text file and split each line by comma with open(file_path, "r") as f: lines = f.readlines() # create an empty DataFrame with the specified columns columns=['timestamp','hostname','process_name','process_id','log_text'] df = pd.DataFrame(columns=columns) # use list comprehension to extract the values for each column data = [(fields[0].split(": ")[1], fields[1].split(": ")[1].split(".")[0], fields[-1].split(":")[-1].strip(), fields[3].split(": ")[1], fields[4].split(": ", 1)[1].rsplit(",")[0].strip()) if ",1" in fields[4] else (fields[0].split(": ")[1], fields[1].split(": ")[1].split(".")[0], fields[-1].split(":")[-1].strip(), fields[3].split(": ")[1], fields[4].split(": ", 1)[1].split(",")[0].strip()) for line in lines for fields in [line.strip().split(",")] ] # convert the list of tuples to a DataFrame df = pd.DataFrame(data, columns=columns) return [df] class SplitAndSaveFilesDoFn(beam.DoFn): def process(self, element): df = element for hostname, group in df.groupby('hostname'): filename = f"{hostname}-windows.log" group.to_csv(filename, index=False) # Gzip the file and give it chmod 777 permissions with open(filename, 'rb') as f_in: with gzip.open(f"{hostname}-windows.log.gz", 'wb') as f_out: f_out.writelines(f_in) os.chmod(f"{hostname}-windows.log.gz", 0o777) # Remove the original file os.remove(filename) return [] folder_path = "/var/log/EventLog/" with beam.Pipeline() as pipeline: data_frames = (pipeline | "Read Input Files" >> beam.Create(os.listdir(folder_path)) | "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x)) | "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn()) | "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn()))
Editor is loading...