Untitled
unknown
plain_text
a year ago
1.1 kB
2
Indexable
Never
class SplitAndSaveFilesDoFn(beam.DoFn): def __init__(self, output_dir, important_words_file): self.output_dir = output_dir self.important_words_file = important_words_file self.processed_files = set() def process(self, element): # ... rest of the code ... with beam.Pipeline() as pipeline: data_frames = (pipeline | "Read Input Files" >> beam.Create(os.listdir(folder_path)) | "Filter Log Files" >> beam.Filter(lambda x: x.startswith("Processed_logs_cassandra")) | "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x)) | "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn()) | "Merge Dataframes" >> beam.CombineGlobally(pd.concat) | "Filter Important Logs" >> beam.ParDo(FilterImportantLogsDoFn(important_words_file)) | "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn(folder_path, important_words_file)))