Untitled
unknown
plain_text
10 months ago
5.6 kB
2
Indexable
Never
import apache_beam as beam import pandas as pd import re import os import gzip import time import logging ## Logging some confirgurations ############## # Define the log file path log_file = '/home/TCSCAPEAGENT/Python/logs/windows_event_logs_transfer_script.csv' # Configure the logging format as ERROR which includes both ERROR as well as INFO logging.basicConfig(filename=log_file, format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) list_of_files=[] folder_path = "/var/log/EventLog/" important_words_file="/home/TCSCAPEAGENT/Python/CAPE_WINDOWS_EVENT_LOGS_INTEGRATION/Code/important_words.txt" print("Getting Current Timestamp") # Checking Current Timestamp current_time = time.strftime("%Y-%m-%d-%H-%M-%S") print(current_time) class ConvertLogFileToDataFrameDoFn(beam.DoFn): def process(self, element): file_path = element print(file_path) logging.info("File Paths : %s ",file_path) list_of_files.append(file_path) #print(list_of_files) # read the text file and split each line by comma with open(file_path, "r") as f: lines = f.readlines() # create an empty DataFrame with the specified columns columns=['timestamp','hostname','process_name','process_id','log_text'] df = pd.DataFrame(columns=columns) # use regular expressions to extract the values for each column data = [] for line in lines: match = re.search(r'Time: (\S+).*Computer: (\S+).*Event Id: (\d+),Message: (.*)Level: (\d+),Channel: (\S+)', line) if match: timestamp, hostname, event_id, log_text, level, channel = match.groups() filtered_timestamp = timestamp.replace(',','') hostname_prefix = hostname.split(".")[0] filtered_log_text=log_text.replace(',','') data.append((filtered_timestamp, hostname_prefix , channel, event_id, filtered_log_text.strip())) # convert the list of tuples to a DataFrame df = pd.DataFrame(data, columns=columns) #logging.info("Dataframe : %s", df) #print(df.head(6)) return [df] ## Filtering Function class FilterImportantLogsDoFn(beam.DoFn): def __init__(self, important_words_file): self.important_words_file = important_words_file def process(self, df): print(f"Received dataframe with shape {df.shape}") with open(self.important_words_file, "r") as f: important_words = f.read().strip().split(',') print(f"Filtering important logs using keywords: {important_words}") filtered_df = df[df['log_text'].str.contains('|'.join(important_words))] print(f"Number of important logs found: {len(filtered_df)}") logging.info(f"Number of important logs found: {len(filtered_df)}") return [filtered_df] class SplitAndSaveFilesDoFn(beam.DoFn): def __init__(self, output_dir): self.output_dir = output_dir #self.important_words_file = important_words_file self.processed_files = set() def process(self, element): print(list_of_files) df = element for hostname, group in df.groupby('hostname'): # get the first part of the hostname before the first dot hostname_prefix = hostname.split(".")[0] print(hostname_prefix) filename = str(hostname_prefix)+ "-windows-postgre" +"-T" + str(current_time) + ".log" print(filename) gz_filename=filename + ".gz" print(gz_filename) output_filename = os.path.join(self.output_dir, gz_filename) print("Output Filename : ",output_filename) group.to_csv(filename,header=False , index=False) # Gzip the file and give it chmod 777 permissions with open(filename, 'rb') as f_in: with gzip.open(output_filename, 'wb') as f_out: f_out.writelines(f_in) os.chmod(output_filename, 0o777) # Remove the original file os.remove(filename) # Rename the successfully processed files for file1 in list_of_files: if file1 in self.processed_files: continue processed_filename = folder_path + "Processed_postgre_logs"+os.path.basename(file1)[24:] try: os.rename(file1,processed_filename) except FileNotFoundError: print(f"File not found: {file1}") logging.error("File not found : %s ",{file1}) continue self.processed_files.add(file1) return [] with beam.Pipeline() as pipeline: data_frames = (pipeline | "Read Input Files" >> beam.Create(os.listdir(folder_path)) | "Filter Log Files" >> beam.Filter(lambda x: x.startswith("Processed_forwarded_logs")) | "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x)) | "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn()) | "Merge Dataframes" >> beam.CombineGlobally(pd.concat) | "Filter Important Logs" >> beam.ParDo(FilterImportantLogsDoFn(important_words_file)) | "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn(folder_path)))
Leave a Comment