Untitled
unknown
plain_text
2 years ago
2.5 kB
2
Indexable
Never
import apache_beam as beam import pandas as pd import re import os import gzip class ConvertLogFileToDataFrameDoFn(beam.DoFn): def process(self, element): file_path = element # read the text file and split each line by comma with open(file_path, "r") as f: lines = f.readlines() # create an empty DataFrame with the specified columns columns=['timestamp','hostname','process_name','process_id','log_text'] df = pd.DataFrame(columns=columns) # use list comprehension to extract the values for each column data = [(fields[0].split(": ")[1], fields[1].split(": ")[1].split(".")[0], re.search(r'Channel:\s*(.*)', line).group(1), fields[3].split(": ")[1], fields[4].split(": ", 1)[1].rsplit(",")[0].strip()) if ",1" in fields[4] else (fields[0].split(": ")[1], fields[1].split(": ")[1].split(".")[0], re.search(r'Channel:\s*(.*)', line).group(1), fields[3].split(": ")[1], fields[4].split(": ", 1)[1].split(",")[0].strip()) for line in lines for fields in [line.strip().split(",")] ] # convert the list of tuples to a DataFrame df = pd.DataFrame(data, columns=columns) return [df] class SplitAndSaveFilesDoFn(beam.DoFn): def process(self, element): df = element for hostname, group in df.groupby('hostname'): filename = f"{hostname}.txt" group.to_csv(filename, index=False) # Gzip the file and give it chmod 777 permissions with open(filename, 'rb') as f_in: with gzip.open(f"{hostname}.txt.gz", 'wb') as f_out: f_out.writelines(f_in) os.chmod(f"{hostname}.txt.gz", 0o777) # Remove the original file os.remove(filename) return [] filename = "/Analytics/venv/Jup/CAPE_Apache_Beam/windows_server_event_logs.txt" with beam.Pipeline() as pipeline: data_frame = (pipeline | "Read Input File" >> beam.Create([filename]) | "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn()) | "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn()))