Untitled
unknown
plain_text
3 years ago
1.1 kB
10
Indexable
class SplitAndSaveFilesDoFn(beam.DoFn):
def __init__(self, output_dir, important_words_file):
self.output_dir = output_dir
self.important_words_file = important_words_file
self.processed_files = set()
def process(self, element):
# ... rest of the code ...
with beam.Pipeline() as pipeline:
data_frames = (pipeline
| "Read Input Files" >> beam.Create(os.listdir(folder_path))
| "Filter Log Files" >> beam.Filter(lambda x: x.startswith("Processed_logs_cassandra"))
| "Get Full File Path" >> beam.Map(lambda x: os.path.join(folder_path, x))
| "Convert Log Data into Dataframe" >> beam.ParDo(ConvertLogFileToDataFrameDoFn())
| "Merge Dataframes" >> beam.CombineGlobally(pd.concat)
| "Filter Important Logs" >> beam.ParDo(FilterImportantLogsDoFn(important_words_file))
| "Split and Save Files" >> beam.ParDo(SplitAndSaveFilesDoFn(folder_path, important_words_file)))
Editor is loading...