Untitled

 avatar
unknown
plain_text
2 years ago
797 B
2
Indexable
folder_path = "/var/log/EventLog"

class ReadFileContent(beam.DoFn):
    def process(self, file_path):
        with open(file_path, "r") as f:
            content = f.read()
        return [content]

with beam.Pipeline() as pipeline:
    data_frames = (pipeline
                   | "Read Input Files" >> beam.Create(os.listdir(folder_path))
                   | "Filter Log Files" >> beam.Filter(lambda x: x.endswith(".log"))
                   | "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x))
                   | "Read File Content" >> beam.ParDo(ReadFileContent())
                   | "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn())
                   | "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn()))