folder_path = "/var/log/EventLog"
class ReadFileContent(beam.DoFn):
def process(self, file_path):
with open(file_path, "r") as f:
content = f.read()
return [content]
with beam.Pipeline() as pipeline:
data_frames = (pipeline
| "Read Input Files" >> beam.Create(os.listdir(folder_path))
| "Filter Log Files" >> beam.Filter(lambda x: x.endswith(".log"))
| "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x))
| "Read File Content" >> beam.ParDo(ReadFileContent())
| "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn())
| "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn()))