Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
605 B
2
Indexable
Never
class FilterImportantLogsDoFn(beam.DoFn):
    def __init__(self, important_words_file):
        self.important_words_file = important_words_file

    def process(self, df):
        print(f"Received dataframe with shape {df.shape}")
        with open(self.important_words_file, "r") as f:
            important_words = f.read().splitlines()
        print(f"Filtering important logs using keywords: {important_words}")
        filtered_df = df[df['log_text'].str.contains('|'.join(important_words))]
        print(f"Number of important logs found: {len(filtered_df)}")
        yield filtered_df