Untitled

 avatar
unknown
plain_text
a month ago
13 kB
2
Indexable
import csv
import re
from google.cloud import storage, bigquery
from datetime import datetime
from io import StringIO

# Definición de encabezados esperados
expected_headers_ajustes_financeiro = [
    "NÚMERO SEQUENCIAL", "FILIAÇÃO DO ESTABELECIMENTO", "NOME ESTABELECIMENTO EQUALS",
    "CÓDIGO ESTABELECIMENTO DO CLIENTE", "ADQUIRENTE",
    "NOME ADQUIRENTE", "TIPO DO MOVIMENTO", "DESCRIÇÃO DO MOVIMENTO", "DATA MOVIMENTO", "LOTE", "LOTE ÚNICO",
    "NÚMERO DA PARCELA", "BANCO",
    "AGÊNCIA", "CONTA", "CRÉDITO OU DÉBITO", "VALOR BRUTO", "VALOR COMISSÃO", "VALOR LÍQUIDO", "BANDEIRA",
    "NOME BANDEIRA",
    "PRODUTO", "DESCRIÇÃO DO PRODUTO", "DATA DA VENDA", "HORA DA VENDA", "AUTORIZAÇÃO", "NSU", "NÚMERO TID", "NRCARTAO",
    "TERMINAL",
    "TOTAL DE PARCELAS", "STATUS DA VENDA", "SITUAÇÃO DE CONCILIAÇÃO", "DATA DE VENCIMENTO DA PARCELA",
    "CÓDIGO DO CLIENTE",
    "NOME DO CLIENTE", "NÚMERO ÚNICO ERP", "NÚMERO ÚNICO NEGÓCIO", "SINAL DIFERENÇA", "DIFERENÇA", "MOTIVO ANULAÇÃO",
    "ID REMESSA", "PAYMENT ID",
    "OPERATION ID", "PRODUCT ID", "VALOR TAXA ANTECIPAÇÃO", "ID_PROCESSAMENTO", "ID_EQUALS", "% TAXA APLICADA",
    "% TAXA CONTRATADA", "MOTIVO_AJUSTE",
    "CATEGORIA_AJUSTE", "NR_OPERACAO_ANTECIP", "DS_INST_FINAN_CESSAO", "ID_UNICO_VENDA", "ID_UNICO_PARCELA", "DS_BANCO"
]

expected_headers_vendas = expected_headers_ajustes_financeiro + ["DATA DO PAGAMENTO"]

# Definición de patrones para nombres de archivos
patterns = {
    'Ajustes': r'^fluxo_caixa-ajustes-(bradesco|caixa|safra)-\d+-\d{8}\.csv$',
    'Financeiro': r'^fluxo_caixa-finan-(bradesco|caixa|safra)-\d+-\d{8}\.csv$',
    'Vendas': r'^fluxo_caixa-vendas-(bradesco|caixa|safra)-\d+-\d{8}\.csv$',
    'Excluidos': r'^mercadolivreboletos_processamentos_excluidos_\d+-\d{8}\.csv$'
}

# Tipos de datos esperados por columna
expected_data_types = {
    "NÚMERO SEQUENCIAL": (int, str),
    "FILIAÇÃO DO ESTABELECIMENTO": (int, float),
    "NOME ESTABELECIMENTO EQUALS": (str,),
    "CÓDIGO ESTABELECIMENTO DO CLIENTE": (str,),
    "ADQUIRENTE": (int, float),
    "NOME ADQUIRENTE": (str,),
    "TIPO DO MOVIMENTO": (str,),
    "DESCRIÇÃO DO MOVIMENTO": (str,),
    "DATA MOVIMENTO": (int, float),
    "LOTE": (int, float),
    "LOTE ÚNICO": (int, float),
    "NÚMERO DA PARCELA": (int, float),
    "BANCO": (int, float),
    "AGÊNCIA": (int, float),
    "CONTA": (str,),
    "CRÉDITO OU DÉBITO": (str,),
    "VALOR BRUTO": (int, float),
    "VALOR COMISSÃO": (int, float),
    "VALOR LÍQUIDO": (int, float),
    "BANDEIRA": (int, float),
    "NOME BANDEIRA": (str,),
    "PRODUTO": (int, float),
    "DESCRIÇÃO DO PRODUTO": (str,),
    "DATA DA VENDA": (int, float),
    "HORA DA VENDA": (int, float),
    "AUTORIZAÇÃO": (str,),
    "NSU": (int, float),
    "NÚMERO TID": (str,),
    "NRCARTAO": (str,),
    "TERMINAL": (str,),
    "TOTAL DE PARCELAS": (int, float),
    "STATUS DA VENDA": (int, float),
    "SITUAÇÃO DE CONCILIAÇÃO": (int, float),
    "DATA DE VENCIMENTO DA PARCELA": (int, float),
    "CÓDIGO DO CLIENTE": (str,),
    "NOME DO CLIENTE": (str,),
    "NÚMERO ÚNICO ERP": (str,),
    "NÚMERO ÚNICO NEGÓCIO": (str, int),
    "SINAL DIFERENÇA": (str,),
    "DIFERENÇA": (int, float),
    "MOTIVO ANULAÇÃO": (str,),
    "ID REMESSA": (int, float),
    "PAYMENT ID": (str,),
    "OPERATION ID": (str, int),
    "PRODUCT ID": (str,),
    "VALOR TAXA ANTECIPAÇÃO": (int, float),
    "ID_PROCESSAMENTO": (int, float),
    "ID_EQUALS": (int, float),
    "% TAXA APLICADA": (int, float),
    "% TAXA CONTRATADA": (int, float),
    "MOTIVO_AJUSTE": (str,),
    "CATEGORIA_AJUSTE": (str,),
    "NR_OPERACAO_ANTECIP": (str,),
    "DS_INST_FINAN_CESSAO": (str,),
    "ID_UNICO_VENDA": (int, float),
    "ID_UNICO_PARCELA": (int, float),
    "DS_BANCO": (str,)
}


# Funciones de cliente y registro en BigQuery
def get_bigquery_client():
    return connections['BigQuery_Default_DME'].bigquery_client


def register_error_on_bigquery(filename, error_types):
    client = get_bigquery_client()
    table_id = '<ESQUEMA_TABLA>LK_ERROR_FILES_BOLETOS_EQUALS'

    # Si error_types es una lista, concatenar los errores en una sola cadena
    if isinstance(error_types, list):
        error_types = "; ".join(error_types)  # Concatenar errores con punto y coma

    # Crear la fila a insertar
    row_to_insert = {
        "FILENAME": filename,
        "ERROR_TYPE": error_types,
        "AUD_INS_DTTM": datetime.now().strftime("%Y-%m-%d"),
        "AUD_UPD_DTTM": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }

    # Intentar insertar la fila en BigQuery
    errors = client.insert_rows_json(table_id, [row_to_insert])
    if errors:
        print(f"Error al insertar en BigQuery: {errors}")
    else:
        print(f"Error registrado en BigQuery: {filename}, Tipo de error: {error_types}")


def move_to_error_path(bucket, blob, filename, error_type):
    register_error_on_bigquery(filename, error_type)
    error_path = f'FLUXO_DE_CAIXA_BOLETOS/Error/{filename}'
    bucket_destino_blob = bucket.blob(error_path)
    bucket_destino_blob.rewrite(blob)
    if blob.exists():
        blob.delete()
        print(f'Archivo original eliminado: {filename}')


# Función para validar tipos de datos y registrar todos los errores
def valid_expected_data_types(data, filename):
    # Asegurarse de que hay datos para validar
    if len(data) < 2:  # No hay datos si solo hay encabezado
        return False, [f"El archivo {filename} no tiene datos válidos para validar tipos."]

    errors_list = []  # Lista para almacenar todos los mensajes de error

    # Validar cada fila excepto la última
    for row in data[1:-1]:  # Omitir la última fila
        if len(row) != len(data[0]):
            errors_list.append(f"Fila con longitud inesperada en {filename}: {row}")
            continue  # Continuar con la siguiente fila

        for i, columna in enumerate(data[0]):
            expected_types = expected_data_types.get(columna)
            value = row[i]

            # Permitir valores nulos o vacíos
            if value == "" or value is None:
                continue

            if expected_types is not None:
                # Reemplazar comas por puntos para manejar decimales
                value = value.replace(",", ".")

                try:
                    # Manejar valores en notación científica
                    if re.match(r"^-?\d+(\.\d+)?[eE][-+]?\d+$", value):
                        value = float(value)  # Convertir a float directamente

                    # Intentar convertir a entero si el tipo esperado es int
                    if int in expected_types:
                        # Convertir a float primero y luego a int si es posible
                        value_float = float(value)
                        if value_float.is_integer():
                            value = int(value_float)  # Convertir a entero si es un número entero exacto
                        else:
                            if float in expected_types:
                                value = value_float  # Aceptar como float si también es un tipo permitido
                            else:
                                errors_list.append(
                                    f"Tipo de dato incorrecto en columna '{columna}': se esperaba un entero, pero se encontró '{row[i]}'")

                    elif float in expected_types:
                        float(value)  # Intento de conversión a float

                    elif str in expected_types:
                        if not isinstance(value, str):
                            errors_list.append(
                                f"Tipo de dato incorrecto en columna '{columna}': se esperaba una cadena")

                except ValueError:
                    errors_list.append(
                        f"Error de conversión en columna '{columna}': valor '{row[i]}' no es del tipo esperado")

    if errors_list:
        return False, errors_list  # Devolver False y la lista de errores
    return True, None  # Si no hay errores, devolver True


# Validación de encabezados y acumulación de errores si es inválido
def validate_headers(data, filename):
    real_headers = data[0]

    # Determina el encabezado esperado basado en el nombre del archivo
    if "finan" in filename or "ajustes" in filename:
        expected_headers = expected_headers_ajustes_financeiro
    elif "vendas" in filename:
        expected_headers = expected_headers_vendas
    else:
        expected_headers = expected_headers_ajustes_financeiro

    # Encuentra columnas que no coinciden
    for i in range(len(expected_headers)):
        if expected_headers[i] != real_headers[i]:
            return False, "Columnas erróneas o faltantes"

    return True, f"El archivo {filename} pasó la validación de tipos encabezos."


# Validación del trailer
def validate_trailer(data, filename):
    trailer_found = False
    register_number = len(data) - 2  # Número de registros sin contar la cabecera y el trailer
    for row in data:
        if row[0] == "TRAILER":
            trailer_found = True
            # Verificar si el valor de trailer está vacío
            if not row[1]:
                return False, "Trailer incorrecto: valor vacío"
            trailer_value = int(row[1])
            if trailer_value != register_number:
                return False, "Trailer no coincide con cantidad de registros"
            return True, None
    return False, "Trailer no encontrado" if not trailer_found else None


def listar_blobs(bucket_name, path, folder):

        filename = blob_name.split('/')[-1]


        # Validar el delimitador del archivo
        reader = csv.reader(infile, delimiter=";")
        data = list(reader)



        # Agregar la columna FILENAME y convertir valores científicos a float o int
        print(f"{filename} pasó todas las validaciones.")

        if "ajustes" in filename or "finan" in filename:
            if "FILENAME" in data[0]:
                filename_index = data[0].index("FILENAME")
                data[0].insert(filename_index, "DATA_DO_PAGAMENTO")

                for row in data[1:]:
                    row.insert(filename_index, "")

            else:
                data[0].append("DATA_DO_PAGAMENTO")
                for row in data[1:]:
                    row.append("")

        # Asegúrate de que el encabezado tenga la columna 'FILENAME'
        if data[0][-1] != 'FILENAME':
            print("Agregando columna FILENAME")
            data[0].append('FILENAME')

        print("Cargando a 'Procesar'.")
        # Convertir valores científicos a float o int donde sea necesario
        for row in data[1:]:
            for i, value in enumerate(row):
                if value not in ["TRAILER", ""]:  # Excluir la fila de TRAILER y valores vacíos
                    try:
                        # Reemplazar comas por puntos para manejar decimales
                        value = value.replace(",", ".")

                        # Convertir en notación científica a float
                        if re.match(r"^-?\d+(\.\d+)?[eE][-+]?\d+$", value):
                            value = float(value)

                        # Convertir a int si el valor es entero
                        if isinstance(value, float) and value.is_integer():
                            value = int(value)

                        row[i] = value  # Reemplazar el valor original por el convertido
                    except ValueError:
                        # Si no se puede convertir, dejar el valor tal cual
                        pass

            if row[0] == "TRAILER":
                # Si es la fila de TRAILER, rellena con valores vacíos hasta la última columna y agrega el filename
                row.extend([''] * (len(data[0]) - len(row) - 1))
                row.append(filename)
            else:
                # Si es una fila normal, simplemente agrega el filename al final
                row.append(filename)

            filename_nw_path = path + 'Procesar/' + filename
            new_blob = bucket.blob(filename_nw_path)
            output = StringIO()
            writer = csv.writer(output, delimiter=';')
            writer.writerows(data)
            new_blob.upload_from_string(output.getvalue(), content_type='text/csv')
            blob.delete()
            print(f'Archivo {filename} cargado y archivo original eliminado.')


# Configuración y ejecución
bucket_name = "<BUCKET>"
folders = ['Financeiro', 'Ajustes', 'Vendas']
for folder in folders:
    path = f'FLUXO_DE_CAIXA_BOLETOS/{folder}/'
    listar_blobs(bucket_name, path, folder)
Leave a Comment