Untitled
unknown
plain_text
a year ago
9.0 kB
7
Indexable
import tqdm
unavl_fun = []
err = []
group_errors = []
unordered_ev_erros = []
scalar_errors = []
file_number = 0
index_error = []
unbound_error = []
generic_error = []
attr_db = pd.DataFrame()
logc_db = pd.DataFrame()
for file in tqdm.tqdm(cbl_files):#[0:300]):
# try:
file_number = file_number + 1
print("\n\n\n\n PROCESSING FILE NUMBER : ",file_number)
logging.debug(f'Start processing COBOL Program {file}')
#Get function list and program as text
functions_list, prog_txt = parser.get_functions(file)
print("FUNC LIST :",functions_list)
#Find Functions inside functions
#*****************
#Get replacement words for copy books
replacements_dict = parser.parse_cobol_replacements(prog_txt)
print("\n\n\n\nDICT : ",replacements_dict)
#combine copystatements with REPLACING and BY keywords spanning multiple lines
prog_txt = parser.combine_copy_statements(prog_txt)
#*****************
#Meging Function
if functions_list==[]:
logging.debug('No functions Available')
else:
logging.debug(f'No. of functions found {len(functions_list)} in {file}')
logging.debug(f'list of functions ==> {functions_list}')
available_fun = [i for i in functions_list if i in cpy_files_names]
unavl_fun.append((file, list(set(functions_list).difference(set(available_fun)))))
if list(set(functions_list).difference(set(available_fun)))!=[]: continue
logging.debug(f'Unable to find function files for ==> {set(functions_list).difference(set(available_fun))}')
for fun in available_fun:
logging.debug(f'Function ==>{fun}-->Merge Started')
prog_txt = parser.merge_functions(
replacement_dict = replacements_dict[fun] if fun in replacements_dict else None,
program_text = prog_txt,
function_name = fun,
# function_path =f'./Data/oscar_benefits_mainframe/Copylib/{fun}.cpy'
#function_path =f'V:\CORPDATA02\Thryve\MCOE\Repos_Old\Reference\Copylib\{fun}.cpy'
function_path = [i for i in cpy_files if fun+'.cpy' in i][0]
)
logging.debug(f'Function ==>{fun}-->Merge Completed')
#preprocess data
pre_processed_data = parser.preprocess_data(prog_txt)
#============================================================
#Handle part1
for divisions in pre_processed_data:
#Get section data of part1
part1 = divisions[0]
part2 = divisions[1]
# print("PROCEDURE DIVISION")
# print(part2)
program_name = divisions[2]
#Get section data of part1
section_data = parser.get_section(part1)
#get
file_working_storage_linking_df, file_contorl_df = test.get_attrs(program_name=program_name, section_data=section_data).extract()
attr_db = pd.concat([attr_db, file_working_storage_linking_df])
#============================================================
#Handle part 2
logic_df = logical_extractor.logic_extact(data=part2, program_name=program_name)
logc_db = pd.concat([logc_db, logic_df])
logc_db = logc_db[logc_db['Statement']!='']
logging.debug(f'Completed processing COBOL Program {file}')
logging.debug(f'=========================================')
# except AttributeError as e:
# group_errors.append(file)
# print(f"\n\n\n!!!!!!!! WARNING !!!!!!!!!!!\n\n GROUP ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {e}")
# append_to_file(f"\n\nGROUP ERROR IGNORED FILE NAME : {file} ERROR : {e}")
# continue
# except ValueError as ve:
# scalar_errors.append(file)
# print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n SCALAR ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ve}")
# append_to_file(f"\n\nSCALAR ERROR IGNORED FILE NAME : {file} ERROR : {ve}")
# continue
# except AssertionError as ae:
# unordered_ev_erros.append(file)
# print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n UNORDERED EV ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ae}")
# append_to_file(f"\n\nUNORDERED EV ERROR IGNORED FILE NAME : {file} ERROR : {ae}")
# continue
# except IndexError as ie:
# index_error.append(file)
# print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n INDEX ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ie}")
# append_to_file(f"\n\n INDEX ERROR IGNORED FILE NAME : {file} ERROR : {ie}")
# continue
# except UnboundLocalError as ule:
# unbound_error.append(file)
# print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n UNBOUND LOCAL ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ule}")
# append_to_file(f"\n\n UNBOUND LOCAL ERROR IGNORED g FILE NAME : {file} ERROR : {ule}")
# continue
# except Exception as ge:
# generic_error.append(file)
# print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n GENERAL ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ge}")
# append_to_file(f"\n\n GENREIC ERROR IGNORED g FILE NAME : {file} ERROR : {ge}")
# continue
def combine_copy_statements(self,cobol_code):
"""Combines consecutive COPY statements with REPLACING
and BY clauses into single line."""
combined_code = []
in_copy_block = False
current_copy_line = ""
for line in cobol_code:
if bool(re.search(r"\bCOPY\s+", line)):
parts = line.split("COPY", 1)
before_copy = parts[0]
after_copy = parts[1]
padding = " " * len(before_copy) # Calculate padding based on length of first part
padded_after_copy = padding + "COPY" + after_copy
if in_copy_block:
combined_code.append(current_copy_line)
combined_code.append(before_copy)
else:
in_copy_block = True
current_copy_line = padded_after_copy
combined_code.append(before_copy)
elif in_copy_block and bool(re.search(r"\bREPLACING\s+|\bBY\s+", line)):
current_copy_line += " " + line #add to existing line
elif in_copy_block:
combined_code.append(current_copy_line)
in_copy_block = False
combined_code.append(line)
current_copy_line = ""
else:
combined_code.append(line)
if in_copy_block:
combined_code.append(current_copy_line)
return combined_code
def parse_cobol_replacements(self,cobol_statements:list) -> dict:
"""
Parses a COBOL REPLACING statement to extract replacement information.
Args:
cobol_statement: The COBOL statement string.
Returns:
A nested dictionary representing the replacements, or empty dict if replacement keyword is not present.
Structure: {<identifier>: {<original_word>: <new_word>, ...}}
"""
data = pd.Series(cobol_statements)
data = [i for i in data if i.strip()!=""]
data = [line[7:72].strip('\n').rstrip() for line in data if line[6] not in ['*', '/','\n','EJECT']]
data = ' '.join(data)
replacements_dict = {}
copy_statement = re.findall(r'COPY(.*?)\.',data, re.DOTALL)
for i in copy_statement:
parts = i.split("REPLACING", 1)
if len(parts) == 2:
copy_book_name = parts[0].split()[0].strip()
replacements_str = parts[1].strip()
else:
continue # Replacement keyword is not present
replacements = {}
pairs = re.findall(r'(\S+)\s+BY\s+(\S+)', replacements_str, re.IGNORECASE)
#print(pairs)
for original, new in pairs:
replacements[original.strip().strip('=')] = new.strip().strip('=')
replacements_dict[copy_book_name] = replacements
return replacements_dict
def get_functions(self, cbl_file_path:str) -> list:
"""
This function can be used to read COBOL files and get
list of function in it.
Parameters
----------
cbl_file_path : str : COBOL file path
Returns
-------
List[] : List of functions
"""
data = [i for i in open(cbl_file_path, 'r').readlines() if len(i)>6]
data_df = pd.Series(data).astype('str')
function_list = data_df[data_df.str.contains(r"(\sCOPY\s+)([A-Z0-9]+)", regex=True)].str.extract(r"COPY\s+([A-Z0-9]+)").iloc[:,0].to_list()
return function_list, data
So the data is main file here
function_list is the file names that must be copy pasted in the main code
Editor is loading...
Leave a Comment