Untitled

 avatar
unknown
plain_text
5 months ago
9.0 kB
4
Indexable
import tqdm

unavl_fun = []

err = []
group_errors = []
unordered_ev_erros = []
scalar_errors = []
file_number = 0
index_error = []
unbound_error = []
generic_error = []
attr_db = pd.DataFrame()
logc_db = pd.DataFrame()

for file in tqdm.tqdm(cbl_files):#[0:300]):
    # try:
    file_number = file_number + 1
    print("\n\n\n\n PROCESSING FILE NUMBER : ",file_number)
    logging.debug(f'Start processing COBOL Program {file}')

    #Get function list and program as text
    functions_list, prog_txt = parser.get_functions(file)
    print("FUNC LIST :",functions_list)
    
    #Find Functions inside functions
    
    
    #*****************
    #Get replacement words for copy books
    replacements_dict = parser.parse_cobol_replacements(prog_txt)
    print("\n\n\n\nDICT : ",replacements_dict)

    #combine copystatements with REPLACING and BY keywords spanning multiple lines
    prog_txt = parser.combine_copy_statements(prog_txt)
    #*****************
    #Meging Function
    if functions_list==[]:
        logging.debug('No functions Available')
    else:
        logging.debug(f'No. of functions found {len(functions_list)} in {file}')
        logging.debug(f'list of functions ==> {functions_list}')

        available_fun = [i for i in functions_list if i in cpy_files_names]
        unavl_fun.append((file, list(set(functions_list).difference(set(available_fun)))))

        if list(set(functions_list).difference(set(available_fun)))!=[]: continue

        logging.debug(f'Unable to find function files for ==> {set(functions_list).difference(set(available_fun))}')
        for fun in available_fun:
            logging.debug(f'Function ==>{fun}-->Merge Started')
            prog_txt = parser.merge_functions(
                replacement_dict = replacements_dict[fun] if fun in replacements_dict else None,
                program_text  = prog_txt,
                function_name = fun,
                # function_path =f'./Data/oscar_benefits_mainframe/Copylib/{fun}.cpy'
                #function_path =f'V:\CORPDATA02\Thryve\MCOE\Repos_Old\Reference\Copylib\{fun}.cpy'
                function_path = [i for i in cpy_files if fun+'.cpy' in i][0]
                )
            logging.debug(f'Function ==>{fun}-->Merge Completed')

    #preprocess data
    pre_processed_data = parser.preprocess_data(prog_txt)

    #============================================================
    #Handle part1 

    for divisions in pre_processed_data:
        #Get section data of part1
        part1 = divisions[0]
        part2 = divisions[1]
        # print("PROCEDURE DIVISION")
        # print(part2)
        program_name = divisions[2]
        #Get section data of part1
        section_data = parser.get_section(part1)

        #get
        file_working_storage_linking_df, file_contorl_df = test.get_attrs(program_name=program_name, section_data=section_data).extract()
        attr_db                                          = pd.concat([attr_db, file_working_storage_linking_df])

        #============================================================
        #Handle part 2
        logic_df = logical_extractor.logic_extact(data=part2, program_name=program_name)
        logc_db  = pd.concat([logc_db, logic_df])     
        logc_db  = logc_db[logc_db['Statement']!=''] 

        logging.debug(f'Completed processing COBOL Program {file}')
        logging.debug(f'=========================================')
    # except AttributeError as e:
    #     group_errors.append(file)
    #     print(f"\n\n\n!!!!!!!! WARNING !!!!!!!!!!!\n\n GROUP ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {e}")
    #     append_to_file(f"\n\nGROUP ERROR IGNORED     FILE NAME : {file}     ERROR : {e}")
    #     continue
    # except ValueError as ve:
    #     scalar_errors.append(file)
    #     print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n SCALAR ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ve}")
    #     append_to_file(f"\n\nSCALAR ERROR IGNORED     FILE NAME : {file}     ERROR : {ve}")
    #     continue
    # except AssertionError as ae:
    #     unordered_ev_erros.append(file)
    #     print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n UNORDERED EV ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ae}")
    #     append_to_file(f"\n\nUNORDERED EV ERROR IGNORED     FILE NAME : {file}     ERROR : {ae}")
    #     continue
    # except IndexError as ie:
    #     index_error.append(file)
    #     print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n INDEX ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ie}")
    #     append_to_file(f"\n\n INDEX ERROR IGNORED     FILE NAME : {file}     ERROR : {ie}")
    #     continue
    # except UnboundLocalError as ule:
    #     unbound_error.append(file)
    #     print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n UNBOUND LOCAL ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ule}")
    #     append_to_file(f"\n\n UNBOUND LOCAL ERROR IGNORED   g  FILE NAME : {file}     ERROR : {ule}")
    #     continue
    # except Exception as ge:
    #     generic_error.append(file)
    #     print(f"\n\n\n !!!!!!! WARNING !!!!!!!!!! \n\n\n GENERAL ERROR IGNORED \n\n FILE NAME : {file} \n\n ERROR : {ge}")
    #     append_to_file(f"\n\n GENREIC ERROR IGNORED   g  FILE NAME : {file}     ERROR : {ge}")
    #     continue


    def combine_copy_statements(self,cobol_code):
        """Combines consecutive COPY statements with REPLACING 
            and BY clauses into single line."""
        combined_code = []
        in_copy_block = False
        current_copy_line = ""
        for line in cobol_code:
            if bool(re.search(r"\bCOPY\s+", line)):
                parts = line.split("COPY", 1)
                before_copy = parts[0]
                after_copy = parts[1]
                padding = "  " * len(before_copy)  # Calculate padding based on length of first part
                padded_after_copy = padding + "COPY" + after_copy
                if in_copy_block:
                    combined_code.append(current_copy_line)
                    combined_code.append(before_copy)
                else:
                    in_copy_block = True
                current_copy_line = padded_after_copy
                combined_code.append(before_copy)
            elif in_copy_block and bool(re.search(r"\bREPLACING\s+|\bBY\s+", line)):
                current_copy_line += " " + line #add to existing line
            elif in_copy_block:
                combined_code.append(current_copy_line)
                in_copy_block = False
                combined_code.append(line)
                current_copy_line = ""
            else:
                combined_code.append(line)

        if in_copy_block:
            combined_code.append(current_copy_line)

        return combined_code

    def parse_cobol_replacements(self,cobol_statements:list) -> dict:
        """
        Parses a COBOL REPLACING statement to extract replacement information.

        Args:
            cobol_statement: The COBOL statement string.

        Returns:
            A nested dictionary representing the replacements, or empty dict if replacement keyword is not present.  
            Structure: {<identifier>: {<original_word>: <new_word>, ...}}
        """
        data = pd.Series(cobol_statements)
        data = [i for i in data if i.strip()!=""]
        data = [line[7:72].strip('\n').rstrip() for line in data if line[6] not in ['*', '/','\n','EJECT']]
        data = ' '.join(data)
        replacements_dict = {}
        copy_statement = re.findall(r'COPY(.*?)\.',data, re.DOTALL)
        for i in copy_statement:
            parts = i.split("REPLACING", 1)
            if len(parts) == 2:
                copy_book_name = parts[0].split()[0].strip()
                replacements_str = parts[1].strip()
            else:
                continue # Replacement keyword is not present

            replacements = {}
            pairs = re.findall(r'(\S+)\s+BY\s+(\S+)', replacements_str, re.IGNORECASE)
            #print(pairs)
            for original, new in pairs:
                replacements[original.strip().strip('=')] = new.strip().strip('=')
            replacements_dict[copy_book_name] = replacements
        return replacements_dict

    def get_functions(self, cbl_file_path:str) -> list:
        """
        This function can be used to read COBOL files and get
        list of function in it.

        Parameters
        ----------
        cbl_file_path : str : COBOL file path
        
        Returns
        -------
        List[] : List of functions
        """
        data          = [i for i in open(cbl_file_path, 'r').readlines() if len(i)>6]
        data_df       = pd.Series(data).astype('str')
        function_list = data_df[data_df.str.contains(r"(\sCOPY\s+)([A-Z0-9]+)",  regex=True)].str.extract(r"COPY\s+([A-Z0-9]+)").iloc[:,0].to_list()
        return function_list, data



So the data is main file here
function_list is the file names that must be copy pasted in the main code


Editor is loading...
Leave a Comment