Untitled
unknown
plain_text
5 months ago
9.0 kB
4
Indexable
import tqdm unavl_fun = [] err = [] group_errors = [] unordered_ev_erros = [] scalar_errors = [] file_number = 0 index_error = [] unbound_error = [] generic_error = [] attr_db = pd.DataFrame() logc_db = pd.DataFrame() for file in tqdm.tqdm(cbl_files):#[0:300]): # try: file_number = file_number + 1 print("\n\n\n\n PROCESSING FILE NUMBER : ",file_number) logging.debug(f'Start processing COBOL Program {file}') #Get function list and program as text functions_list, prog_txt = parser.get_functions(file) print("FUNC LIST :",functions_list) #Find Functions inside functions #***************** #Get replacement words for copy books replacements_dict = parser.parse_cobol_replacements(prog_txt) print("\n\n\n\nDICT : ",replacements_dict) #combine copystatements with REPLACING and BY keywords spanning multiple lines prog_txt = parser.combine_copy_statements(prog_txt) #***************** #Meging Function if functions_list==[]: logging.debug('No functions Available') else: logging.debug(f'No. of functions found {len(functions_list)} in {file}') logging.debug(f'list of functions ==> {functions_list}') available_fun = [i for i in functions_list if i in cpy_files_names] unavl_fun.append((file, list(set(functions_list).difference(set(available_fun))))) if list(set(functions_list).difference(set(available_fun)))!=[]: continue logging.debug(f'Unable to find function files for ==> {set(functions_list).difference(set(available_fun))}') for fun in available_fun: logging.debug(f'Function ==>{fun}-->Merge Started') prog_txt = parser.merge_functions( replacement_dict = replacements_dict[fun] if fun in replacements_dict else None, program_text = prog_txt, function_name = fun, # function_path =f'./Data/oscar_benefits_mainframe/Copylib/{fun}.cpy' #function_path =f'V:\CORPDATA02\Thryve\MCOE\Repos_Old\Reference\Copylib\{fun}.cpy' function_path = [i for i in cpy_files if fun+'.cpy' in i][0] ) logging.debug(f'Function ==>{fun}-->Merge Completed') #preprocess data pre_processed_data = parser.preprocess_data(prog_txt) #============================================================ #Handle part1 for divisions in pre_processed_data: #Get section data of part1 part1 = divisions[0] part2 = divisions[1] # print("PROCEDURE DIVISION") # print(part2) program_name = divisions[2] #Get section data of part1 section_data = parser.get_section(part1) #get file_working_storage_linking_df, file_contorl_df = test.get_attrs(program_name=program_name, section_data=section_data).extract() attr_db = pd.concat([attr_db, file_working_storage_linking_df]) #============================================================ #Handle part 2 logic_df = logical_extractor.logic_extact(data=part2, program_name=program_name) logc_db = pd.concat([logc_db, logic_df]) logc_db = logc_db[logc_db['Statement']!=''] logging.debug(f'Completed processing COBOL Program {file}') logging.debug(f'=========================================') # except AttributeError as e: # group_errors.append(file) # print(f"\n\n\n!!!!!!!! WARNING !!!!!!!!!!!\n\n GROUP ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {e}") # append_to_file(f"\n\nGROUP ERROR IGNORED FILE NAME : {file} ERROR : {e}") # continue # except ValueError as ve: # scalar_errors.append(file) # print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n SCALAR ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ve}") # append_to_file(f"\n\nSCALAR ERROR IGNORED FILE NAME : {file} ERROR : {ve}") # continue # except AssertionError as ae: # unordered_ev_erros.append(file) # print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n UNORDERED EV ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ae}") # append_to_file(f"\n\nUNORDERED EV ERROR IGNORED FILE NAME : {file} ERROR : {ae}") # continue # except IndexError as ie: # index_error.append(file) # print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n INDEX ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ie}") # append_to_file(f"\n\n INDEX ERROR IGNORED FILE NAME : {file} ERROR : {ie}") # continue # except UnboundLocalError as ule: # unbound_error.append(file) # print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n UNBOUND LOCAL ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ule}") # append_to_file(f"\n\n UNBOUND LOCAL ERROR IGNORED g FILE NAME : {file} ERROR : {ule}") # continue # except Exception as ge: # generic_error.append(file) # print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n GENERAL ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ge}") # append_to_file(f"\n\n GENREIC ERROR IGNORED g FILE NAME : {file} ERROR : {ge}") # continue def combine_copy_statements(self,cobol_code): """Combines consecutive COPY statements with REPLACING and BY clauses into single line.""" combined_code = [] in_copy_block = False current_copy_line = "" for line in cobol_code: if bool(re.search(r"\bCOPY\s+", line)): parts = line.split("COPY", 1) before_copy = parts[0] after_copy = parts[1] padding = " " * len(before_copy) # Calculate padding based on length of first part padded_after_copy = padding + "COPY" + after_copy if in_copy_block: combined_code.append(current_copy_line) combined_code.append(before_copy) else: in_copy_block = True current_copy_line = padded_after_copy combined_code.append(before_copy) elif in_copy_block and bool(re.search(r"\bREPLACING\s+|\bBY\s+", line)): current_copy_line += " " + line #add to existing line elif in_copy_block: combined_code.append(current_copy_line) in_copy_block = False combined_code.append(line) current_copy_line = "" else: combined_code.append(line) if in_copy_block: combined_code.append(current_copy_line) return combined_code def parse_cobol_replacements(self,cobol_statements:list) -> dict: """ Parses a COBOL REPLACING statement to extract replacement information. Args: cobol_statement: The COBOL statement string. Returns: A nested dictionary representing the replacements, or empty dict if replacement keyword is not present. Structure: {<identifier>: {<original_word>: <new_word>, ...}} """ data = pd.Series(cobol_statements) data = [i for i in data if i.strip()!=""] data = [line[7:72].strip('\n').rstrip() for line in data if line[6] not in ['*', '/','\n','EJECT']] data = ' '.join(data) replacements_dict = {} copy_statement = re.findall(r'COPY(.*?)\.',data, re.DOTALL) for i in copy_statement: parts = i.split("REPLACING", 1) if len(parts) == 2: copy_book_name = parts[0].split()[0].strip() replacements_str = parts[1].strip() else: continue # Replacement keyword is not present replacements = {} pairs = re.findall(r'(\S+)\s+BY\s+(\S+)', replacements_str, re.IGNORECASE) #print(pairs) for original, new in pairs: replacements[original.strip().strip('=')] = new.strip().strip('=') replacements_dict[copy_book_name] = replacements return replacements_dict def get_functions(self, cbl_file_path:str) -> list: """ This function can be used to read COBOL files and get list of function in it. Parameters ---------- cbl_file_path : str : COBOL file path Returns ------- List[] : List of functions """ data = [i for i in open(cbl_file_path, 'r').readlines() if len(i)>6] data_df = pd.Series(data).astype('str') function_list = data_df[data_df.str.contains(r"(\sCOPY\s+)([A-Z0-9]+)", regex=True)].str.extract(r"COPY\s+([A-Z0-9]+)").iloc[:,0].to_list() return function_list, data So the data is main file here function_list is the file names that must be copy pasted in the main code
Editor is loading...
Leave a Comment