Untitled
unknown
plain_text
2 years ago
797 B
3
Indexable
folder_path = "/var/log/EventLog" class ReadFileContent(beam.DoFn): def process(self, file_path): with open(file_path, "r") as f: content = f.read() return [content] with beam.Pipeline() as pipeline: data_frames = (pipeline | "Read Input Files" >> beam.Create(os.listdir(folder_path)) | "Filter Log Files" >> beam.Filter(lambda x: x.endswith(".log")) | "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x)) | "Read File Content" >> beam.ParDo(ReadFileContent()) | "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn()) | "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn()))
Editor is loading...