Untitled
unknown
plain_text
2 years ago
3.0 kB
1
Indexable
Never
import apache_beam as beam import pandas as pd import re import os import gzip class ConvertLogFileToDataFrameDoFn(beam.DoFn): def process(self, element): file_path = element # read the text file and split each line by comma with open(file_path, "r") as f: lines = f.readlines() # create an empty DataFrame with the specified columns columns=['timestamp','hostname','process_name','process_id','log_text'] df = pd.DataFrame(columns=columns) # use list comprehension to extract the values for each column data = [(fields[0].split(": ")[1], fields[1].split(": ")[1].split(".")[0], re.search(r'Channel:\s*(.*)', line).group(1), fields[3].split(": ")[1], fields[4].split(": ", 1)[1].rsplit(",")[0].strip()) if ",1" in fields[4] else (fields[0].split(": ")[1], fields[1].split(": ")[1].split(".")[0], re.search(r'Channel:\s*(.*)', line).group(1), fields[3].split(": ")[1], fields[4].split(": ", 1)[1].split(",")[0].strip()) for line in lines for fields in [line.strip().split(",")] ] # convert the list of tuples to a DataFrame df = pd.DataFrame(data, columns=columns) return [df] class SplitAndSaveFilesDoFn(beam.DoFn): def process(self, element): df = element for hostname, group in df.groupby('hostname'): filename = f"{hostname}.txt" group.to_csv(filename, index=False) # Gzip the file and give it chmod 777 permissions with open(filename, 'rb') as f_in: with gzip.open(f"{hostname}.txt.gz", 'wb') as f_out: f_out.writelines(f_in) os.chmod(f"{hostname}.txt.gz", 0o777) # Remove the original file os.remove(filename) return [] folder_path = "/var/log/EventLog" class ReadFileContent(beam.DoFn): def process(self, file_path): with open(file_path, "r") as f: content = f.read() return [content] with beam.Pipeline() as pipeline: data_frames = (pipeline | "Read Input Files" >> beam.Create(os.listdir(folder_path)) | "Filter Log Files" >> beam.Filter(lambda x: x.endswith(".log")) | "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x)) | "Read File Content" >> beam.ParDo(ReadFileContent()) | "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn()) | "Merge Dataframes" >> beam.CombineGlobally(pd.concat) | "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn()))