Untitled

mail@pastecode.io avatar
unknown
plain_text
2 years ago
3.0 kB
1
Indexable
Never
import apache_beam as beam
import pandas as pd
import re
import os
import gzip

class ConvertLogFileToDataFrameDoFn(beam.DoFn):
    def process(self, element):
        file_path = element
        # read the text file and split each line by comma
        with open(file_path, "r") as f:
            lines = f.readlines()

        # create an empty DataFrame with the specified columns
        columns=['timestamp','hostname','process_name','process_id','log_text']
        df = pd.DataFrame(columns=columns)

        # use list comprehension to extract the values for each column
        data = [(fields[0].split(": ")[1], 
                 fields[1].split(": ")[1].split(".")[0],
                 re.search(r'Channel:\s*(.*)', line).group(1),
                 fields[3].split(": ")[1], 
                 fields[4].split(": ", 1)[1].rsplit(",")[0].strip())
                if ",1" in fields[4] else 
                (fields[0].split(": ")[1], 
                 fields[1].split(": ")[1].split(".")[0],
                 re.search(r'Channel:\s*(.*)', line).group(1),
                 fields[3].split(": ")[1], 
                 fields[4].split(": ", 1)[1].split(",")[0].strip())
                for line in lines
                for fields in [line.strip().split(",")]
               ]

        # convert the list of tuples to a DataFrame
        df = pd.DataFrame(data, columns=columns)

        return [df]

class SplitAndSaveFilesDoFn(beam.DoFn):
    def process(self, element):
        df = element
        for hostname, group in df.groupby('hostname'):
            filename = f"{hostname}.txt"
            group.to_csv(filename, index=False)
            # Gzip the file and give it chmod 777 permissions
            with open(filename, 'rb') as f_in:
                with gzip.open(f"{hostname}.txt.gz", 'wb') as f_out:
                    f_out.writelines(f_in)
            os.chmod(f"{hostname}.txt.gz", 0o777)
            
            # Remove the original file
            os.remove(filename)
         
        return []


folder_path = "/var/log/EventLog"

class ReadFileContent(beam.DoFn):
    def process(self, file_path):
        with open(file_path, "r") as f:
            content = f.read()
        return [content]

with beam.Pipeline() as pipeline:
    data_frames = (pipeline
                   | "Read Input Files" >> beam.Create(os.listdir(folder_path))
                   | "Filter Log Files" >> beam.Filter(lambda x: x.endswith(".log"))
                   | "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x))
                   | "Read File Content" >> beam.ParDo(ReadFileContent())
                   | "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn())
                   | "Merge Dataframes" >> beam.CombineGlobally(pd.concat)
                   | "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn()))