Untitled
import csv import re from google.cloud import storage, bigquery from datetime import datetime from io import StringIO # Definición de encabezados esperados expected_headers_ajustes_financeiro = [ "NÚMERO SEQUENCIAL", "FILIAÇÃO DO ESTABELECIMENTO", "NOME ESTABELECIMENTO EQUALS", "CÓDIGO ESTABELECIMENTO DO CLIENTE", "ADQUIRENTE", "NOME ADQUIRENTE", "TIPO DO MOVIMENTO", "DESCRIÇÃO DO MOVIMENTO", "DATA MOVIMENTO", "LOTE", "LOTE ÚNICO", "NÚMERO DA PARCELA", "BANCO", "AGÊNCIA", "CONTA", "CRÉDITO OU DÉBITO", "VALOR BRUTO", "VALOR COMISSÃO", "VALOR LÍQUIDO", "BANDEIRA", "NOME BANDEIRA", "PRODUTO", "DESCRIÇÃO DO PRODUTO", "DATA DA VENDA", "HORA DA VENDA", "AUTORIZAÇÃO", "NSU", "NÚMERO TID", "NRCARTAO", "TERMINAL", "TOTAL DE PARCELAS", "STATUS DA VENDA", "SITUAÇÃO DE CONCILIAÇÃO", "DATA DE VENCIMENTO DA PARCELA", "CÓDIGO DO CLIENTE", "NOME DO CLIENTE", "NÚMERO ÚNICO ERP", "NÚMERO ÚNICO NEGÓCIO", "SINAL DIFERENÇA", "DIFERENÇA", "MOTIVO ANULAÇÃO", "ID REMESSA", "PAYMENT ID", "OPERATION ID", "PRODUCT ID", "VALOR TAXA ANTECIPAÇÃO", "ID_PROCESSAMENTO", "ID_EQUALS", "% TAXA APLICADA", "% TAXA CONTRATADA", "MOTIVO_AJUSTE", "CATEGORIA_AJUSTE", "NR_OPERACAO_ANTECIP", "DS_INST_FINAN_CESSAO", "ID_UNICO_VENDA", "ID_UNICO_PARCELA", "DS_BANCO" ] expected_headers_vendas = expected_headers_ajustes_financeiro + ["DATA DO PAGAMENTO"] # Definición de patrones para nombres de archivos patterns = { 'Ajustes': r'^fluxo_caixa-ajustes-(bradesco|caixa|safra)-\d+-\d{8}\.csv$', 'Financeiro': r'^fluxo_caixa-finan-(bradesco|caixa|safra)-\d+-\d{8}\.csv$', 'Vendas': r'^fluxo_caixa-vendas-(bradesco|caixa|safra)-\d+-\d{8}\.csv$', 'Excluidos': r'^mercadolivreboletos_processamentos_excluidos_\d+-\d{8}\.csv$' } # Tipos de datos esperados por columna expected_data_types = { "NÚMERO SEQUENCIAL": (int, str), "FILIAÇÃO DO ESTABELECIMENTO": (int, float), "NOME ESTABELECIMENTO EQUALS": (str,), "CÓDIGO ESTABELECIMENTO DO CLIENTE": (str,), "ADQUIRENTE": (int, float), "NOME ADQUIRENTE": (str,), "TIPO DO MOVIMENTO": (str,), "DESCRIÇÃO DO MOVIMENTO": (str,), "DATA MOVIMENTO": (int, float), "LOTE": (int, float), "LOTE ÚNICO": (int, float), "NÚMERO DA PARCELA": (int, float), "BANCO": (int, float), "AGÊNCIA": (int, float), "CONTA": (str,), "CRÉDITO OU DÉBITO": (str,), "VALOR BRUTO": (int, float), "VALOR COMISSÃO": (int, float), "VALOR LÍQUIDO": (int, float), "BANDEIRA": (int, float), "NOME BANDEIRA": (str,), "PRODUTO": (int, float), "DESCRIÇÃO DO PRODUTO": (str,), "DATA DA VENDA": (int, float), "HORA DA VENDA": (int, float), "AUTORIZAÇÃO": (str,), "NSU": (int, float), "NÚMERO TID": (str,), "NRCARTAO": (str,), "TERMINAL": (str,), "TOTAL DE PARCELAS": (int, float), "STATUS DA VENDA": (int, float), "SITUAÇÃO DE CONCILIAÇÃO": (int, float), "DATA DE VENCIMENTO DA PARCELA": (int, float), "CÓDIGO DO CLIENTE": (str,), "NOME DO CLIENTE": (str,), "NÚMERO ÚNICO ERP": (str,), "NÚMERO ÚNICO NEGÓCIO": (str, int), "SINAL DIFERENÇA": (str,), "DIFERENÇA": (int, float), "MOTIVO ANULAÇÃO": (str,), "ID REMESSA": (int, float), "PAYMENT ID": (str,), "OPERATION ID": (str, int), "PRODUCT ID": (str,), "VALOR TAXA ANTECIPAÇÃO": (int, float), "ID_PROCESSAMENTO": (int, float), "ID_EQUALS": (int, float), "% TAXA APLICADA": (int, float), "% TAXA CONTRATADA": (int, float), "MOTIVO_AJUSTE": (str,), "CATEGORIA_AJUSTE": (str,), "NR_OPERACAO_ANTECIP": (str,), "DS_INST_FINAN_CESSAO": (str,), "ID_UNICO_VENDA": (int, float), "ID_UNICO_PARCELA": (int, float), "DS_BANCO": (str,) } # Funciones de cliente y registro en BigQuery def get_bigquery_client(): return connections['BigQuery_Default_DME'].bigquery_client def register_error_on_bigquery(filename, error_types): client = get_bigquery_client() table_id = '<ESQUEMA_TABLA>LK_ERROR_FILES_BOLETOS_EQUALS' # Si error_types es una lista, concatenar los errores en una sola cadena if isinstance(error_types, list): error_types = "; ".join(error_types) # Concatenar errores con punto y coma # Crear la fila a insertar row_to_insert = { "FILENAME": filename, "ERROR_TYPE": error_types, "AUD_INS_DTTM": datetime.now().strftime("%Y-%m-%d"), "AUD_UPD_DTTM": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } # Intentar insertar la fila en BigQuery errors = client.insert_rows_json(table_id, [row_to_insert]) if errors: print(f"Error al insertar en BigQuery: {errors}") else: print(f"Error registrado en BigQuery: {filename}, Tipo de error: {error_types}") def move_to_error_path(bucket, blob, filename, error_type): register_error_on_bigquery(filename, error_type) error_path = f'FLUXO_DE_CAIXA_BOLETOS/Error/{filename}' bucket_destino_blob = bucket.blob(error_path) bucket_destino_blob.rewrite(blob) if blob.exists(): blob.delete() print(f'Archivo original eliminado: {filename}') # Función para validar tipos de datos y registrar todos los errores def valid_expected_data_types(data, filename): # Asegurarse de que hay datos para validar if len(data) < 2: # No hay datos si solo hay encabezado return False, [f"El archivo {filename} no tiene datos válidos para validar tipos."] errors_list = [] # Lista para almacenar todos los mensajes de error # Validar cada fila excepto la última for row in data[1:-1]: # Omitir la última fila if len(row) != len(data[0]): errors_list.append(f"Fila con longitud inesperada en {filename}: {row}") continue # Continuar con la siguiente fila for i, columna in enumerate(data[0]): expected_types = expected_data_types.get(columna) value = row[i] # Permitir valores nulos o vacíos if value == "" or value is None: continue if expected_types is not None: # Reemplazar comas por puntos para manejar decimales value = value.replace(",", ".") try: # Manejar valores en notación científica if re.match(r"^-?\d+(\.\d+)?[eE][-+]?\d+$", value): value = float(value) # Convertir a float directamente # Intentar convertir a entero si el tipo esperado es int if int in expected_types: # Convertir a float primero y luego a int si es posible value_float = float(value) if value_float.is_integer(): value = int(value_float) # Convertir a entero si es un número entero exacto else: if float in expected_types: value = value_float # Aceptar como float si también es un tipo permitido else: errors_list.append( f"Tipo de dato incorrecto en columna '{columna}': se esperaba un entero, pero se encontró '{row[i]}'") elif float in expected_types: float(value) # Intento de conversión a float elif str in expected_types: if not isinstance(value, str): errors_list.append( f"Tipo de dato incorrecto en columna '{columna}': se esperaba una cadena") except ValueError: errors_list.append( f"Error de conversión en columna '{columna}': valor '{row[i]}' no es del tipo esperado") if errors_list: return False, errors_list # Devolver False y la lista de errores return True, None # Si no hay errores, devolver True # Validación de encabezados y acumulación de errores si es inválido def validate_headers(data, filename): real_headers = data[0] # Determina el encabezado esperado basado en el nombre del archivo if "finan" in filename or "ajustes" in filename: expected_headers = expected_headers_ajustes_financeiro elif "vendas" in filename: expected_headers = expected_headers_vendas else: expected_headers = expected_headers_ajustes_financeiro # Encuentra columnas que no coinciden for i in range(len(expected_headers)): if expected_headers[i] != real_headers[i]: return False, "Columnas erróneas o faltantes" return True, f"El archivo {filename} pasó la validación de tipos encabezos." # Validación del trailer def validate_trailer(data, filename): trailer_found = False register_number = len(data) - 2 # Número de registros sin contar la cabecera y el trailer for row in data: if row[0] == "TRAILER": trailer_found = True # Verificar si el valor de trailer está vacío if not row[1]: return False, "Trailer incorrecto: valor vacío" trailer_value = int(row[1]) if trailer_value != register_number: return False, "Trailer no coincide con cantidad de registros" return True, None return False, "Trailer no encontrado" if not trailer_found else None def listar_blobs(bucket_name, path, folder): filename = blob_name.split('/')[-1] # Validar el delimitador del archivo reader = csv.reader(infile, delimiter=";") data = list(reader) # Agregar la columna FILENAME y convertir valores científicos a float o int print(f"{filename} pasó todas las validaciones.") if "ajustes" in filename or "finan" in filename: if "FILENAME" in data[0]: filename_index = data[0].index("FILENAME") data[0].insert(filename_index, "DATA_DO_PAGAMENTO") for row in data[1:]: row.insert(filename_index, "") else: data[0].append("DATA_DO_PAGAMENTO") for row in data[1:]: row.append("") # Asegúrate de que el encabezado tenga la columna 'FILENAME' if data[0][-1] != 'FILENAME': print("Agregando columna FILENAME") data[0].append('FILENAME') print("Cargando a 'Procesar'.") # Convertir valores científicos a float o int donde sea necesario for row in data[1:]: for i, value in enumerate(row): if value not in ["TRAILER", ""]: # Excluir la fila de TRAILER y valores vacíos try: # Reemplazar comas por puntos para manejar decimales value = value.replace(",", ".") # Convertir en notación científica a float if re.match(r"^-?\d+(\.\d+)?[eE][-+]?\d+$", value): value = float(value) # Convertir a int si el valor es entero if isinstance(value, float) and value.is_integer(): value = int(value) row[i] = value # Reemplazar el valor original por el convertido except ValueError: # Si no se puede convertir, dejar el valor tal cual pass if row[0] == "TRAILER": # Si es la fila de TRAILER, rellena con valores vacíos hasta la última columna y agrega el filename row.extend([''] * (len(data[0]) - len(row) - 1)) row.append(filename) else: # Si es una fila normal, simplemente agrega el filename al final row.append(filename) filename_nw_path = path + 'Procesar/' + filename new_blob = bucket.blob(filename_nw_path) output = StringIO() writer = csv.writer(output, delimiter=';') writer.writerows(data) new_blob.upload_from_string(output.getvalue(), content_type='text/csv') blob.delete() print(f'Archivo {filename} cargado y archivo original eliminado.') # Configuración y ejecución bucket_name = "<BUCKET>" folders = ['Financeiro', 'Ajustes', 'Vendas'] for folder in folders: path = f'FLUXO_DE_CAIXA_BOLETOS/{folder}/' listar_blobs(bucket_name, path, folder)
Leave a Comment