Untitled

 avatar
unknown
plain_text
3 years ago
6.3 kB
7
Indexable
#Necessary imports
import string
import pandas as pd
import regex as re
import datetime
import datefinder
import sys
import subprocess
import linecache
from itertools import islice
import time
import gzip
import os
import apache_beam as beam


# Declaring Files
agent_file='/Analytics/venv/Jup/CAPE_Apache_Beam/agent.properties'
bookmark_file='/Analytics/venv/Jup/CAPE_Apache_Beam/bookmark_log.properties'


# Get VM List from agent.properties file
def get_VMList():
    with open(agent_file,'rt') as file:
        for line in file:
            if 'VMList' in line:
                vm_list = str(line.split('='))
                vm_list = re.findall(r'\d+', vm_list)
                VMList = list(map(int, vm_list))
    return VMList
print("The VM List is : ",get_VMList())

VMList = get_VMList()


#Get number of lines to be copied
def get_log_block_size():
    
    with open(agent_file,'rt') as file:
        
        for line in file:
            if 'vmmetricskpi.log_block' in line:
                print("Getting the Log Block size")
                log_block_size=line.split('=')
                #print(log_block_size,type(log_block_size))
                no_of_lines=log_block_size[1]

    return no_of_lines

print("Number of Lines : ",get_log_block_size())


print("Getting Current Timestamp")
# Checking Current Timestamp
current_time = time.strftime("%Y-%m-%d-%H-%M-%S")
print(current_time)

class CheckBookmarkRecord(beam.DoFn):
    def process(self, element):
        vm_id = element
        all_vm_details = get_vm_details()
        vm_ip = all_vm_details[vm_id][vm_id]['vm_ip'][0]
        line_to_search = str(vm_ip) + "_bookmark_log="
        with open(bookmark_file, "r") as f:
            lines = f.readlines()
            filtered_lines = filter(lambda x: line_to_search in x, lines)
            filtered_lines = [line for line in filtered_lines]
            if filtered_lines:
                for bookmark_line in filtered_lines:
                    yield bookmark_line, vm_id
            else:
                bookmark_index = 1
                bookmark_line = str(line_to_search) + str(bookmark_index)
                with open(bookmark_file, 'a') as file:
                    file.write("\n")
                    file.write(bookmark_line)
                yield bookmark_line, vm_id

class PerformFunctionsWithBookmarkRecord(beam.DoFn):
    def process(self, element):
        bookmark_record, vm_id = element
        number_of_lines = get_log_block_size()
        all_vm_details = get_vm_details()
        vm_ip = all_vm_details[vm_id][vm_id]['vm_ip'][0]
        vm_log_filename = all_vm_details[vm_id][vm_id]['vm_log_filename'][0]
        vm_output_directory_loc = all_vm_details[vm_id][vm_id]['vm_output_log_filename'][0]
        bookmark_log = all_vm_details[vm_id][vm_id]['bookmark_log'][0]
        syslog_record = bookmark_record.split(str(vm_ip)+'_bookmark_log=')
        syslog_record_no = syslog_record[1]
        copying_loglines_and_updating_bookmark(bookmark_record, syslog_record_no, vm_id, number_of_lines)
        yield None





# Making the Dictionary Global
enabled_vm_details = {}

# Getting All VM Details
def get_vm_details():
    global enabled_vm_details
    All_VM_Details = []
    for vm in VMList:
        vm_id = vm
        with open(agent_file, 'rt') as file:
            VM_Details = {
                vm_id: {
                    'vm_ip': [],
                    'vm_output_log_filename': [],
                    'vm_log_enable': [],
                    'vm_log_filename': [],
                    'bookmark': [],
                    'bookmark_ip': [],
                    'bookmark_log': []
                }
            }
            for line in file:
                if str(vm) + ".privatip" in line:
                    vm_ip = str(line.split('='))
                    vm_ip = re.findall(r'[0-9]+(?:\.[0-9]+){3}', vm_ip)
                    vm_ip = list(map(str, vm_ip))
                    vm_ip = vm_ip[0]
                    VM_Details[vm_id]['vm_ip'].append(vm_ip)
                    bookmark_ip = vm_ip
                    VM_Details[vm_id]['bookmark_ip'].append(bookmark_ip)
                    bookmark_log = bookmark_ip + "_bookmark_log="
                    VM_Details[vm_id]['bookmark_log'].append(bookmark_log)

                if str(vm) + ".vmmetricskpi.output_log.filename" in line:
                    vm_output_log_filename = line.split('=')
                    vm_output_log_filename = list(map(str, vm_output_log_filename))
                    vm_output_log_filename = vm_output_log_filename[1].strip()
                    VM_Details[vm_id]['vm_output_log_filename'].append(vm_output_log_filename)

                if str(vm) + ".vmmetricskpi.log_enable" in line:
                    vm_log_enable = line.split('=')
                    vm_log_enable = list(map(str, vm_log_enable))
                    vm_log_enable = vm_log_enable[1].strip()
                    VM_Details[vm_id]['vm_log_enable'].append(vm_log_enable)

                if str(vm) + ".vmmetricskpi.log.filename" in line:
                    vm_log_filename = line.split('=')
                    vm_log_filename = list(map(str, vm_log_filename))
                    vm_log_filename = vm_log_filename[1].strip()
                    bookmark = vm_log_filename.split("-")
                    bookmark = list(map(str, bookmark))
                    bookmark = bookmark[1].strip()
                    VM_Details[vm_id]['vm_log_filename'].append(vm_log_filename)
                    VM_Details[vm_id]['bookmark'].append(bookmark)

        All_VM_Details.append(VM_Details)

    
    for vm_details in All_VM_Details:
        for vm_id, vm_detail in vm_details.items():
            if vm_detail['vm_log_enable'][0] == 'y':
                enabled_vm_details[vm_id] = vm_detail
        #print(enabled_vm_details)        

    return enabled_vm_details


def run_pipeline(enabled_vm_details):
    with beam.Pipeline() as pipeline:
        vm_ids = pipeline | beam.Create(enabled_vm_details)
        bookmark_records = vm_ids | beam.ParDo(CheckBookmarkRecord())
        bookmark_records | beam.ParDo(PerformFunctionsWithBookmarkRecord())
Editor is loading...