Untitled
unknown
plain_text
3 years ago
3.5 kB
6
Indexable
class ConvertLogFileToDataFrameDoFn(beam.DoFn):
def process(self, element):
file_path = element
print(file_path)
list_of_files.append(file_path)
#print(list_of_files)
# read the text file and split each line by comma
with open(file_path, "r") as f:
lines = f.readlines()
# create an empty DataFrame with the specified columns
columns=['timestamp','hostname','process_name','process_id','log_text']
df = pd.DataFrame(columns=columns)
# use regular expressions to extract the values for each column
data = []
for line in lines:
match = re.search(r'Time: (\S+).*Computer: (\S+).*Event Id: (\d+),Message: (.*)Level: (\d+),Channel: (\S+)', line)
if match:
timestamp, hostname, event_id, log_text, level, channel = match.groups()
filtered_timestamp = timestamp.replace(',','')
hostname_prefix = hostname.split(".")[0]
filtered_log_text=log_text.replace(',','')
data.append((filtered_timestamp, hostname_prefix , channel, event_id, filtered_log_text.strip()))
# convert the list of tuples to a DataFrame
df = pd.DataFrame(data, columns=columns)
#print(df.head(6))
return [df]
## Filtering Function
class FilterImportantLogsDoFn(beam.DoFn):
def process(self, element):
important_words_file = "important_words.txt"
with open(important_words_file, "r") as f:
important_words = f.read().splitlines()
df = pd.concat(element)
filtered_df = df[df['log_text'].str.contains('|'.join(important_words))]
return [filtered_df]
class SplitAndSaveFilesDoFn(beam.DoFn):
def __init__(self, output_dir):
self.output_dir = output_dir
self.processed_files = set()
def process(self, element):
print(list_of_files)
df = element
for hostname, group in df.groupby('hostname'):
# get the first part of the hostname before the first dot
hostname_prefix = hostname.split(".")[0]
print(hostname_prefix)
filename = str(hostname_prefix)+ "-windows" +"-T" + str(current_time) + ".log"
#print(filename)
gz_filename=filename + ".gz"
#print(gz_filename)
output_filename = os.path.join(self.output_dir, gz_filename)
group.to_csv(filename,header=False , index=False)
# Gzip the file and give it chmod 777 permissions
with open(filename, 'rb') as f_in:
with gzip.open(output_filename, 'wb') as f_out:
f_out.writelines(f_in)
os.chmod(output_filename, 0o777)
# Remove the original file
os.remove(filename)
# Rename the successfully processed files
for file1 in list_of_files:
if file1 in self.processed_files:
continue
processed_filename = folder_path + "Processed_postgre_logs"+os.path.basename(file1)[14:]
try:
os.rename(file1,processed_filename)
except FileNotFoundError:
print(f"File not found: {file1}")
continue
self.processed_files.add(file1)
return []
Editor is loading...