Untitled
unknown
plain_text
3 years ago
2.5 kB
12
Indexable
import apache_beam as beam
import pandas as pd
import re
import os
import gzip
class ConvertLogFileToDataFrameDoFn(beam.DoFn):
def process(self, element):
file_path = element
# read the text file and split each line by comma
with open(file_path, "r") as f:
lines = f.readlines()
# create an empty DataFrame with the specified columns
columns=['timestamp','hostname','process_name','process_id','log_text']
df = pd.DataFrame(columns=columns)
# use list comprehension to extract the values for each column
data = [(fields[0].split(": ")[1],
fields[1].split(": ")[1].split(".")[0],
re.search(r'Channel:\s*(.*)', line).group(1),
fields[3].split(": ")[1],
fields[4].split(": ", 1)[1].rsplit(",")[0].strip())
if ",1" in fields[4] else
(fields[0].split(": ")[1],
fields[1].split(": ")[1].split(".")[0],
re.search(r'Channel:\s*(.*)', line).group(1),
fields[3].split(": ")[1],
fields[4].split(": ", 1)[1].split(",")[0].strip())
for line in lines
for fields in [line.strip().split(",")]
]
# convert the list of tuples to a DataFrame
df = pd.DataFrame(data, columns=columns)
return [df]
class SplitAndSaveFilesDoFn(beam.DoFn):
def process(self, element):
df = element
for hostname, group in df.groupby('hostname'):
filename = f"{hostname}.txt"
group.to_csv(filename, index=False)
# Gzip the file and give it chmod 777 permissions
with open(filename, 'rb') as f_in:
with gzip.open(f"{hostname}.txt.gz", 'wb') as f_out:
f_out.writelines(f_in)
os.chmod(f"{hostname}.txt.gz", 0o777)
# Remove the original file
os.remove(filename)
return []
filename = "/Analytics/venv/Jup/CAPE_Apache_Beam/windows_server_event_logs.txt"
with beam.Pipeline() as pipeline:
data_frame = (pipeline
| "Read Input File" >> beam.Create([filename])
| "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn())
| "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn()))
Editor is loading...