Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
1.1 kB
2
Indexable
Never
class SplitAndSaveFilesDoFn(beam.DoFn):
    def __init__(self, output_dir, important_words_file):
        self.output_dir = output_dir
        self.important_words_file = important_words_file
        self.processed_files = set()

    def process(self, element):
        # ... rest of the code ...

with beam.Pipeline() as pipeline:
    data_frames = (pipeline
                   | "Read Input Files" >> beam.Create(os.listdir(folder_path))
                   | "Filter Log Files" >> beam.Filter(lambda x: x.startswith("Processed_logs_cassandra"))
                   | "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x))
                   | "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn())
                   | "Merge Dataframes" >> beam.CombineGlobally(pd.concat)
                   | "Filter Important Logs" >> beam.ParDo(FilterImportantLogsDoFn(important_words_file))
                   | "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn(folder_path, important_words_file)))