Untitled

 avatar
unknown
plain_text
2 years ago
5.6 kB
4
Indexable
import apache_beam as beam
import pandas as pd
import re
import os
import gzip
import time
import logging


## Logging some confirgurations ##############

# Define the log file path
log_file = '/home/TCSCAPEAGENT/Python/logs/windows_event_logs_transfer_script.csv'

# Configure the logging format as ERROR which includes both ERROR as well as INFO
logging.basicConfig(filename=log_file, format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)


list_of_files=[]
folder_path = "/var/log/EventLog/"
important_words_file="/home/TCSCAPEAGENT/Python/CAPE_WINDOWS_EVENT_LOGS_INTEGRATION/Code/important_words.txt"

print("Getting Current Timestamp")
# Checking Current Timestamp
current_time = time.strftime("%Y-%m-%d-%H-%M-%S")
print(current_time)


class ConvertLogFileToDataFrameDoFn(beam.DoFn):
    def process(self, element):
        file_path = element
        print(file_path)
        logging.info("File Paths : %s ",file_path)
        list_of_files.append(file_path)
        #print(list_of_files)
        # read the text file and split each line by comma
        with open(file_path, "r") as f:
            lines = f.readlines()

        # create an empty DataFrame with the specified columns
        columns=['timestamp','hostname','process_name','process_id','log_text']
        df = pd.DataFrame(columns=columns)

        # use regular expressions to extract the values for each column

        data = []
        for line in lines:
            match = re.search(r'Time: (\S+).*Computer: (\S+).*Event Id: (\d+),Message: (.*)Level: (\d+),Channel: (\S+)', line)
            if match:
                timestamp, hostname, event_id, log_text, level, channel = match.groups()
                filtered_timestamp = timestamp.replace(',','')
                hostname_prefix = hostname.split(".")[0]
                filtered_log_text=log_text.replace(',','')
                data.append((filtered_timestamp, hostname_prefix , channel, event_id, filtered_log_text.strip()))

        # convert the list of tuples to a DataFrame
        df = pd.DataFrame(data, columns=columns)
        #logging.info("Dataframe : %s", df)
        #print(df.head(6))

        return [df]


## Filtering Function
class FilterImportantLogsDoFn(beam.DoFn):
    def __init__(self, important_words_file):
        self.important_words_file = important_words_file

    def process(self, df):
        print(f"Received dataframe with shape {df.shape}")
        with open(self.important_words_file, "r") as f:
            important_words = f.read().strip().split(',')
        print(f"Filtering important logs using keywords: {important_words}")
        filtered_df = df[df['log_text'].str.contains('|'.join(important_words))]
        print(f"Number of important logs found: {len(filtered_df)}")
        logging.info(f"Number of important logs found: {len(filtered_df)}")
        return [filtered_df]



class SplitAndSaveFilesDoFn(beam.DoFn):
    def __init__(self, output_dir):
        self.output_dir = output_dir
        #self.important_words_file = important_words_file
        self.processed_files = set()

    def process(self, element):
        print(list_of_files)
        df = element
        for hostname, group in df.groupby('hostname'):
            # get the first part of the hostname before the first dot
            hostname_prefix = hostname.split(".")[0]
            print(hostname_prefix)
            filename = str(hostname_prefix)+ "-windows-postgre" +"-T" + str(current_time) + ".log"
            print(filename)
            gz_filename=filename + ".gz"
            print(gz_filename)
            output_filename = os.path.join(self.output_dir, gz_filename)
            print("Output Filename : ",output_filename)
            group.to_csv(filename,header=False , index=False)
            # Gzip the file and give it chmod 777 permissions
            with open(filename, 'rb') as f_in:
                with gzip.open(output_filename, 'wb') as f_out:
                    f_out.writelines(f_in)
            os.chmod(output_filename, 0o777)

            # Remove the original file
            os.remove(filename)

            # Rename the successfully processed files
            for file1 in list_of_files:
                if file1 in self.processed_files:
                    continue
                processed_filename = folder_path + "Processed_postgre_logs"+os.path.basename(file1)[24:]
                try:
                    os.rename(file1,processed_filename)
                except FileNotFoundError:
                    print(f"File not found: {file1}")
                    logging.error("File not found : %s ",{file1})
                    continue
                self.processed_files.add(file1)

        return []

with beam.Pipeline() as pipeline:
    data_frames = (pipeline
                   | "Read Input Files" >> beam.Create(os.listdir(folder_path))
                   | "Filter Log Files" >> beam.Filter(lambda x: x.startswith("Processed_forwarded_logs"))
                   | "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x))
                   | "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn())
                   | "Merge Dataframes" >> beam.CombineGlobally(pd.concat)
                   | "Filter Important Logs" >> beam.ParDo(FilterImportantLogsDoFn(important_words_file))
                   | "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn(folder_path)))

Editor is loading...
Leave a Comment