Untitled

 avatar
unknown
plain_text
a month ago
17 kB
2
Indexable
import csv
import re
from google.cloud import storage, bigquery
from datetime import datetime
from io import StringIO

# Definición de encabezados esperados
expected_headers_ajustes_financeiro = [
    "NÚMERO SEQUENCIAL", "FILIAÇÃO DO ESTABELECIMENTO", "NOME ESTABELECIMENTO EQUALS",
    "CÓDIGO ESTABELECIMENTO DO CLIENTE", "ADQUIRENTE",
    "NOME ADQUIRENTE", "TIPO DO MOVIMENTO", "DESCRIÇÃO DO MOVIMENTO", "DATA MOVIMENTO", "LOTE", "LOTE ÚNICO",
    "NÚMERO DA PARCELA", "BANCO",
    "AGÊNCIA", "CONTA", "CRÉDITO OU DÉBITO", "VALOR BRUTO", "VALOR COMISSÃO", "VALOR LÍQUIDO", "BANDEIRA",
    "NOME BANDEIRA",
    "PRODUTO", "DESCRIÇÃO DO PRODUTO", "DATA DA VENDA", "HORA DA VENDA", "AUTORIZAÇÃO", "NSU", "NÚMERO TID", "NRCARTAO",
    "TERMINAL",
    "TOTAL DE PARCELAS", "STATUS DA VENDA", "SITUAÇÃO DE CONCILIAÇÃO", "DATA DE VENCIMENTO DA PARCELA",
    "CÓDIGO DO CLIENTE",
    "NOME DO CLIENTE", "NÚMERO ÚNICO ERP", "NÚMERO ÚNICO NEGÓCIO", "SINAL DIFERENÇA", "DIFERENÇA", "MOTIVO ANULAÇÃO",
    "ID REMESSA", "PAYMENT ID",
    "OPERATION ID", "PRODUCT ID", "VALOR TAXA ANTECIPAÇÃO", "ID_PROCESSAMENTO", "ID_EQUALS", "% TAXA APLICADA",
    "% TAXA CONTRATADA", "MOTIVO_AJUSTE",
    "CATEGORIA_AJUSTE", "NR_OPERACAO_ANTECIP", "DS_INST_FINAN_CESSAO", "ID_UNICO_VENDA", "ID_UNICO_PARCELA", "DS_BANCO"
]

expected_headers_vendas = expected_headers_ajustes_financeiro + ["DATA DO PAGAMENTO"]

# Definición de patrones para nombres de archivos
patterns = {
    'Ajustes': r'^fluxo_caixa-ajustes-(bradesco|caixa|safra)-\d+-\d{8}\.csv$',
    'Financeiro': r'^fluxo_caixa-finan-(bradesco|caixa|safra)-\d+-\d{8}\.csv$',
    'Vendas': r'^fluxo_caixa-vendas-(bradesco|caixa|safra)-\d+-\d{8}\.csv$',
    'Excluidos': r'^mercadolivreboletos_processamentos_excluidos_\d+-\d{8}\.csv$'
}

# Tipos de datos esperados por columna
expected_data_types = {
    "NÚMERO SEQUENCIAL": (int, str),
    "FILIAÇÃO DO ESTABELECIMENTO": (int, float),
    "NOME ESTABELECIMENTO EQUALS": (str,),
    "CÓDIGO ESTABELECIMENTO DO CLIENTE": (str,),
    "ADQUIRENTE": (int, float),
    "NOME ADQUIRENTE": (str,),
    "TIPO DO MOVIMENTO": (str,),
    "DESCRIÇÃO DO MOVIMENTO": (str,),
    "DATA MOVIMENTO": (int, float),
    "LOTE": (int, float),
    "LOTE ÚNICO": (int, float),
    "NÚMERO DA PARCELA": (int, float),
    "BANCO": (int, float),
    "AGÊNCIA": (int, float),
    "CONTA": (str,),
    "CRÉDITO OU DÉBITO": (str,),
    "VALOR BRUTO": (int, float),
    "VALOR COMISSÃO": (int, float),
    "VALOR LÍQUIDO": (int, float),
    "BANDEIRA": (int, float),
    "NOME BANDEIRA": (str,),
    "PRODUTO": (int, float),
    "DESCRIÇÃO DO PRODUTO": (str,),
    "DATA DA VENDA": (int, float),
    "HORA DA VENDA": (int, float),
    "AUTORIZAÇÃO": (str,),
    "NSU": (int, float),
    "NÚMERO TID": (str,),
    "NRCARTAO": (str,),
    "TERMINAL": (str,),
    "TOTAL DE PARCELAS": (int, float),
    "STATUS DA VENDA": (int, float),
    "SITUAÇÃO DE CONCILIAÇÃO": (int, float),
    "DATA DE VENCIMENTO DA PARCELA": (int, float),
    "CÓDIGO DO CLIENTE": (str,),
    "NOME DO CLIENTE": (str,),
    "NÚMERO ÚNICO ERP": (str,),
    "NÚMERO ÚNICO NEGÓCIO": (str, int),
    "SINAL DIFERENÇA": (str,),
    "DIFERENÇA": (int, float),
    "MOTIVO ANULAÇÃO": (str,),
    "ID REMESSA": (int, float),
    "PAYMENT ID": (str,),
    "OPERATION ID": (str, int),
    "PRODUCT ID": (str,),
    "VALOR TAXA ANTECIPAÇÃO": (int, float),
    "ID_PROCESSAMENTO": (int, float),
    "ID_EQUALS": (int, float),
    "% TAXA APLICADA": (int, float),
    "% TAXA CONTRATADA": (int, float),
    "MOTIVO_AJUSTE": (str,),
    "CATEGORIA_AJUSTE": (str,),
    "NR_OPERACAO_ANTECIP": (str,),
    "DS_INST_FINAN_CESSAO": (str,),
    "ID_UNICO_VENDA": (int, float),
    "ID_UNICO_PARCELA": (int, float),
    "DS_BANCO": (str,)
}


# Funciones de cliente y registro en BigQuery
def get_bigquery_client():
    return connections['BigQuery_Default_DME'].bigquery_client


def register_error_on_bigquery(filename, error_types):
    client = get_bigquery_client()
    table_id = '<ESQUEMA_TABLA>LK_ERROR_FILES_BOLETOS_EQUALS'

    # Si error_types es una lista, concatenar los errores en una sola cadena
    if isinstance(error_types, list):
        error_types = "; ".join(error_types)  # Concatenar errores con punto y coma

    # Crear la fila a insertar
    row_to_insert = {
        "FILENAME": filename,
        "ERROR_TYPE": error_types,
        "AUD_INS_DTTM": datetime.now().strftime("%Y-%m-%d"),
        "AUD_UPD_DTTM": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }

    # Intentar insertar la fila en BigQuery
    errors = client.insert_rows_json(table_id, [row_to_insert])
    if errors:
        print(f"Error al insertar en BigQuery: {errors}")
    else:
        print(f"Error registrado en BigQuery: {filename}, Tipo de error: {error_types}")


def move_to_error_path(bucket, blob, filename, error_type):
    register_error_on_bigquery(filename, error_type)
    error_path = f'FLUXO_DE_CAIXA_BOLETOS/Error/{filename}'
    bucket_destino_blob = bucket.blob(error_path)
    bucket_destino_blob.rewrite(blob)
    if blob.exists():
        blob.delete()
        print(f'Archivo original eliminado: {filename}')


# Función para validar tipos de datos y registrar todos los errores
def valid_expected_data_types(data, filename):
    # Asegurarse de que hay datos para validar
    if len(data) < 2:  # No hay datos si solo hay encabezado
        return False, [f"El archivo {filename} no tiene datos válidos para validar tipos."]

    errors_list = []  # Lista para almacenar todos los mensajes de error

    # Validar cada fila excepto la última
    for row in data[1:-1]:  # Omitir la última fila
        if len(row) != len(data[0]):
            errors_list.append(f"Fila con longitud inesperada en {filename}: {row}")
            continue  # Continuar con la siguiente fila

        for i, columna in enumerate(data[0]):
            expected_types = expected_data_types.get(columna)
            value = row[i]

            # Permitir valores nulos o vacíos
            if value == "" or value is None:
                continue

            if expected_types is not None:
                # Reemplazar comas por puntos para manejar decimales
                value = value.replace(",", ".")

                try:
                    # Manejar valores en notación científica
                    if re.match(r"^-?\d+(\.\d+)?[eE][-+]?\d+$", value):
                        value = float(value)  # Convertir a float directamente

                    # Intentar convertir a entero si el tipo esperado es int
                    if int in expected_types:
                        # Convertir a float primero y luego a int si es posible
                        value_float = float(value)
                        if value_float.is_integer():
                            value = int(value_float)  # Convertir a entero si es un número entero exacto
                        else:
                            if float in expected_types:
                                value = value_float  # Aceptar como float si también es un tipo permitido
                            else:
                                errors_list.append(
                                    f"Tipo de dato incorrecto en columna '{columna}': se esperaba un entero, pero se encontró '{row[i]}'")

                    elif float in expected_types:
                        float(value)  # Intento de conversión a float

                    elif str in expected_types:
                        if not isinstance(value, str):
                            errors_list.append(
                                f"Tipo de dato incorrecto en columna '{columna}': se esperaba una cadena")

                except ValueError:
                    errors_list.append(
                        f"Error de conversión en columna '{columna}': valor '{row[i]}' no es del tipo esperado")

    if errors_list:
        return False, errors_list  # Devolver False y la lista de errores
    return True, None  # Si no hay errores, devolver True


# Validación de encabezados y acumulación de errores si es inválido
def validate_headers(data, filename):
    real_headers = data[0]

    # Determina el encabezado esperado basado en el nombre del archivo
    if "finan" in filename or "ajustes" in filename:
        expected_headers = expected_headers_ajustes_financeiro
    elif "vendas" in filename:
        expected_headers = expected_headers_vendas
    else:
        expected_headers = expected_headers_ajustes_financeiro

    # Encuentra columnas que no coinciden
    for i in range(len(expected_headers)):
        if expected_headers[i] != real_headers[i]:
            return False, "Columnas erróneas o faltantes"

    return True, f"El archivo {filename} pasó la validación de tipos encabezos."


# Validación del trailer
def validate_trailer(data, filename):
    trailer_found = False
    register_number = len(data) - 2  # Número de registros sin contar la cabecera y el trailer
    for row in data:
        if row[0] == "TRAILER":
            trailer_found = True
            # Verificar si el valor de trailer está vacío
            if not row[1]:
                return False, "Trailer incorrecto: valor vacío"
            trailer_value = int(row[1])
            if trailer_value != register_number:
                return False, "Trailer no coincide con cantidad de registros"
            return True, None
    return False, "Trailer no encontrado" if not trailer_found else None


def listar_blobs(bucket_name, path, folder):
    storage_client = connections['BigQuery_Default_DME'].storage_client
    bucket = storage_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=path))

    if not blobs:
        print(f'No se encontraron archivos en la carpeta {folder}. No se procesará nada.')
        return

    pattern = patterns[folder]

    for blob in blobs:
        blob_name = blob.name
        filename = blob_name.split('/')[-1]

        # Validar el formato del nombre del archivo
        if blob_name.count('/') == path.count('/'):
            if not re.match(pattern, filename):
                print(f'El archivo {filename} no cumple con el formato esperado.')
                move_to_error_path(bucket, blob, filename, "Nombre o path de origen no válido")
                continue

            # Validar la fecha en el nombre del archivo
            date_str = filename[-12:-4]
            try:
                datetime.strptime(date_str, "%Y%m%d")
            except ValueError:
                print(f'El archivo {filename} tiene una fecha no válida en su nombre.')
                move_to_error_path(bucket, blob, filename, "Fecha no válida en el nombre")
                continue

            # Validar que sea un archivo CSV
            if not filename.endswith('.csv'):
                print(f'El archivo {filename} no es un CSV.')
                move_to_error_path(bucket, blob, filename, "Archivo no es CSV")
                continue

            # Procesar el archivo
            print(f'Procesando archivo: {filename}')
            file_contents = blob.download_as_text(encoding='latin-1')
            infile = StringIO(file_contents)

            # Validar el delimitador del archivo
            reader = csv.reader(infile, delimiter=";")
            data = list(reader)

            # Verificar si el archivo está vacío o tiene solo una fila vacía
            if not data or len(data) < 2:  # Debe tener al menos encabezado y una fila de datos
                print(
                    f"El archivo {filename} está vacío o no tiene encabezados suficientes. Moviendo a carpeta de errores.")
                move_to_error_path(bucket, blob, filename, "Archivo vacío o sin encabezados")
                continue

            # Comprobar si la primera fila tiene el número correcto de columnas
            if len(data[0]) != len(expected_headers_ajustes_financeiro) and len(data[0]) != len(
                    expected_headers_vendas):
                print(f"El archivo {filename} no tiene el delimitador correcto ';'. Moviendo a carpeta de errores.")
                move_to_error_path(bucket, blob, filename, "Delimitador incorrecto")
                continue

            # Verificar si el archivo está vacío
            if not data or len(data) < 2:  # Debe tener al menos encabezado y una fila de datos
                print(
                    f"El archivo {filename} está vacío o no tiene encabezados suficientes. Moviendo a carpeta de errores.")
                move_to_error_path(bucket, blob, filename, "Archivo vacío o sin encabezados")
                continue

            # Validar encabezados
            valid_header, error_header = validate_headers(data, filename)
            if not valid_header:
                print(f"El archivo {filename} tiene un encabezado no válido. Moviendo a carpeta de errores.")
                move_to_error_path(bucket, blob, filename, error_header)
                continue

            # Validar tráiler
            trailer_valido, trailer_error = validate_trailer(data, filename)
            if not trailer_valido:
                print(f"El archivo {filename} tiene un tráiler no válido. Moviendo a carpeta de errores.")
                move_to_error_path(bucket, blob, filename, trailer_error)
                continue

            # Validar tipos de datos solo si los encabezados son válidos
            valid_data_types, error_data_types = valid_expected_data_types(data, filename)
            if not valid_data_types:
                print(f"El archivo {filename} tiene errores de tipo de datos. Moviendo a carpeta de errores.")
                move_to_error_path(bucket, blob, filename, error_data_types)
                continue

            # Agregar la columna FILENAME y convertir valores científicos a float o int
            print(f"{filename} pasó todas las validaciones.")

            if "ajustes" in filename or "finan" in filename:
                if "FILENAME" in data[0]:
                    filename_index = data[0].index("FILENAME")
                    data[0].insert(filename_index, "DATA_DO_PAGAMENTO")
                    
                    for row in data[1:]:
                        row.insert(filename_index, "")
                    data[0].append('FILENAME')
                else:
                    data[0].append("DATA_DO_PAGAMENTO")
                    for row in data[1:]:
                        row.append("")

            # Asegúrate de que el encabezado tenga la columna 'FILENAME'
            if data[0][-1] != 'FILENAME':
                print("Agregando columna FILENAME")
                data[0].append('FILENAME')

            print("Cargando a 'Procesar'.")
            # Convertir valores científicos a float o int donde sea necesario
            for row in data[1:]:
                for i, value in enumerate(row):
                    if value not in ["TRAILER", ""]:  # Excluir la fila de TRAILER y valores vacíos
                        try:
                            # Reemplazar comas por puntos para manejar decimales
                            value = value.replace(",", ".")

                            # Convertir en notación científica a float
                            if re.match(r"^-?\d+(\.\d+)?[eE][-+]?\d+$", value):
                                value = float(value)

                            # Convertir a int si el valor es entero
                            if isinstance(value, float) and value.is_integer():
                                value = int(value)

                            row[i] = value  # Reemplazar el valor original por el convertido
                        except ValueError:
                            # Si no se puede convertir, dejar el valor tal cual
                            pass

                if row[0] == "TRAILER":
                    # Si es la fila de TRAILER, rellena con valores vacíos hasta la última columna y agrega el filename
                    row.extend([''] * (len(data[0]) - len(row) - 1))
                    row.append(filename)
                else:
                    # Si es una fila normal, simplemente agrega el filename al final
                    row.append(filename)

            filename_nw_path = path + 'Procesar/' + filename
            new_blob = bucket.blob(filename_nw_path)
            output = StringIO()
            writer = csv.writer(output, delimiter=';')
            writer.writerows(data)
            new_blob.upload_from_string(output.getvalue(), content_type='text/csv')
            blob.delete()
            print(f'Archivo {filename} cargado y archivo original eliminado.')


# Configuración y ejecución
bucket_name = "<BUCKET>"
folders = ['Financeiro', 'Ajustes', 'Vendas']
for folder in folders:
    path = f'FLUXO_DE_CAIXA_BOLETOS/{folder}/'
    listar_blobs(bucket_name, path, folder)
Leave a Comment