Untitled
unknown
plain_text
3 years ago
1.3 kB
8
Indexable
class SplitAndSaveFilesDoFn(beam.DoFn):
def process(self, element):
df = element
for hostname, group in df.groupby('hostname'):
# get the first part of the hostname before the first dot
hostname_prefix = hostname.split(".")[0]
#print(hostname_prefix)
filename = f"{hostname_prefix}-windows.log"
group.to_csv(filename, index=False)
# Gzip the file and give it chmod 777 permissions
with open(filename, 'rb') as f_in:
with gzip.open(f"{hostname_prefix}-windows.log.gz", 'wb') as f_out:
f_out.writelines(f_in)
os.chmod(f"{hostname_prefix}-windows.log.gz", 0o777)
# Remove the original file
os.remove(filename)
# Rename the processed file
current_file_path = os.path.join(os.getcwd(), filename)
print(current_file_path)
processed_file_name = f"Processed_Logs-{os.path.basename(current_file_path)}"
processed_file_path = os.path.join(os.path.dirname(current_file_path), processed_file_name)
print(processed_file_path)
os.rename(current_file_path, processed_file_path)
return []
Editor is loading...