Untitled
unknown
plain_text
2 years ago
6.3 kB
6
Indexable
#Necessary imports import string import pandas as pd import regex as re import datetime import datefinder import sys import subprocess import linecache from itertools import islice import time import gzip import os import apache_beam as beam # Declaring Files agent_file='/Analytics/venv/Jup/CAPE_Apache_Beam/agent.properties' bookmark_file='/Analytics/venv/Jup/CAPE_Apache_Beam/bookmark_log.properties' # Get VM List from agent.properties file def get_VMList(): with open(agent_file,'rt') as file: for line in file: if 'VMList' in line: vm_list = str(line.split('=')) vm_list = re.findall(r'\d+', vm_list) VMList = list(map(int, vm_list)) return VMList print("The VM List is : ",get_VMList()) VMList = get_VMList() #Get number of lines to be copied def get_log_block_size(): with open(agent_file,'rt') as file: for line in file: if 'vmmetricskpi.log_block' in line: print("Getting the Log Block size") log_block_size=line.split('=') #print(log_block_size,type(log_block_size)) no_of_lines=log_block_size[1] return no_of_lines print("Number of Lines : ",get_log_block_size()) print("Getting Current Timestamp") # Checking Current Timestamp current_time = time.strftime("%Y-%m-%d-%H-%M-%S") print(current_time) class CheckBookmarkRecord(beam.DoFn): def process(self, element): vm_id = element all_vm_details = get_vm_details() vm_ip = all_vm_details[vm_id][vm_id]['vm_ip'][0] line_to_search = str(vm_ip) + "_bookmark_log=" with open(bookmark_file, "r") as f: lines = f.readlines() filtered_lines = filter(lambda x: line_to_search in x, lines) filtered_lines = [line for line in filtered_lines] if filtered_lines: for bookmark_line in filtered_lines: yield bookmark_line, vm_id else: bookmark_index = 1 bookmark_line = str(line_to_search) + str(bookmark_index) with open(bookmark_file, 'a') as file: file.write("\n") file.write(bookmark_line) yield bookmark_line, vm_id class PerformFunctionsWithBookmarkRecord(beam.DoFn): def process(self, element): bookmark_record, vm_id = element number_of_lines = get_log_block_size() all_vm_details = get_vm_details() vm_ip = all_vm_details[vm_id][vm_id]['vm_ip'][0] vm_log_filename = all_vm_details[vm_id][vm_id]['vm_log_filename'][0] vm_output_directory_loc = all_vm_details[vm_id][vm_id]['vm_output_log_filename'][0] bookmark_log = all_vm_details[vm_id][vm_id]['bookmark_log'][0] syslog_record = bookmark_record.split(str(vm_ip)+'_bookmark_log=') syslog_record_no = syslog_record[1] copying_loglines_and_updating_bookmark(bookmark_record, syslog_record_no, vm_id, number_of_lines) yield None # Making the Dictionary Global enabled_vm_details = {} # Getting All VM Details def get_vm_details(): global enabled_vm_details All_VM_Details = [] for vm in VMList: vm_id = vm with open(agent_file, 'rt') as file: VM_Details = { vm_id: { 'vm_ip': [], 'vm_output_log_filename': [], 'vm_log_enable': [], 'vm_log_filename': [], 'bookmark': [], 'bookmark_ip': [], 'bookmark_log': [] } } for line in file: if str(vm) + ".privatip" in line: vm_ip = str(line.split('=')) vm_ip = re.findall(r'[0-9]+(?:\.[0-9]+){3}', vm_ip) vm_ip = list(map(str, vm_ip)) vm_ip = vm_ip[0] VM_Details[vm_id]['vm_ip'].append(vm_ip) bookmark_ip = vm_ip VM_Details[vm_id]['bookmark_ip'].append(bookmark_ip) bookmark_log = bookmark_ip + "_bookmark_log=" VM_Details[vm_id]['bookmark_log'].append(bookmark_log) if str(vm) + ".vmmetricskpi.output_log.filename" in line: vm_output_log_filename = line.split('=') vm_output_log_filename = list(map(str, vm_output_log_filename)) vm_output_log_filename = vm_output_log_filename[1].strip() VM_Details[vm_id]['vm_output_log_filename'].append(vm_output_log_filename) if str(vm) + ".vmmetricskpi.log_enable" in line: vm_log_enable = line.split('=') vm_log_enable = list(map(str, vm_log_enable)) vm_log_enable = vm_log_enable[1].strip() VM_Details[vm_id]['vm_log_enable'].append(vm_log_enable) if str(vm) + ".vmmetricskpi.log.filename" in line: vm_log_filename = line.split('=') vm_log_filename = list(map(str, vm_log_filename)) vm_log_filename = vm_log_filename[1].strip() bookmark = vm_log_filename.split("-") bookmark = list(map(str, bookmark)) bookmark = bookmark[1].strip() VM_Details[vm_id]['vm_log_filename'].append(vm_log_filename) VM_Details[vm_id]['bookmark'].append(bookmark) All_VM_Details.append(VM_Details) for vm_details in All_VM_Details: for vm_id, vm_detail in vm_details.items(): if vm_detail['vm_log_enable'][0] == 'y': enabled_vm_details[vm_id] = vm_detail #print(enabled_vm_details) return enabled_vm_details def run_pipeline(enabled_vm_details): with beam.Pipeline() as pipeline: vm_ids = pipeline | beam.Create(enabled_vm_details) bookmark_records = vm_ids | beam.ParDo(CheckBookmarkRecord()) bookmark_records | beam.ParDo(PerformFunctionsWithBookmarkRecord())
Editor is loading...