Untitled

 avatar
unknown
plain_text
2 years ago
1.3 kB
5
Indexable
class SplitAndSaveFilesDoFn(beam.DoFn):
    def process(self, element):
        df = element
        for hostname, group in df.groupby('hostname'):
            # get the first part of the hostname before the first dot
            hostname_prefix = hostname.split(".")[0]
            #print(hostname_prefix)
            filename = f"{hostname_prefix}-windows.log"
            group.to_csv(filename, index=False)
            # Gzip the file and give it chmod 777 permissions
            with open(filename, 'rb') as f_in:
                with gzip.open(f"{hostname_prefix}-windows.log.gz", 'wb') as f_out:
                    f_out.writelines(f_in)
            os.chmod(f"{hostname_prefix}-windows.log.gz", 0o777)

            # Remove the original file
            os.remove(filename)

            # Rename the processed file
            current_file_path = os.path.join(os.getcwd(), filename)
            print(current_file_path)
            processed_file_name = f"Processed_Logs-{os.path.basename(current_file_path)}"
            processed_file_path = os.path.join(os.path.dirname(current_file_path), processed_file_name)
            print(processed_file_path)
            os.rename(current_file_path, processed_file_path)

        return []

Editor is loading...