Untitled
unknown
plain_text
2 years ago
2.2 kB
6
Indexable
from fastapi import HTTPException from src.app_functions.core.S3_Funcs import LoadParquet from pathlib import Path import pytest import duckdb import os import boto3 from moto import mock_s3 # def test_S3_funcs(): # TestPath = S3_funcs("data/ftegoldenimage.db/orderitemcustomfieldsdim/PARTITION_ID=120/part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet") # noqa: E501 # assert TestPath.get_data_file() == "data/ftegoldenimage.db/orderitemcustomfieldsdim/PARTITION_ID=120/part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet" # noqa: E501 # assert TestPath.get_filename() == "part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet" # noqa: E501 # assert TestPath.get_bucketpath() == "s3://infor-sandbox-gtn-gtndndbxusw2-dbx-datastore-us-west-2" # noqa: E501 # assert TestPath.loadParquetIntoDuckDB() == "part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet has been created." # noqa: E501 # TestWrongPath = S3_funcs("ThisPathDoesntExist") # assert TestWrongPath.loadParquetIntoDuckDB() == "error" @pytest.fixture def dummy_db(): DUCKDB_PATH: Path = Path("/database/Flock_db_test.db") con = duckdb.connect( database=DUCKDB_PATH.as_posix(), ) return con def test_s3_funcs(): testPath = "data/ftegoldenimage.db/orderitemcustomfieldsdim/PARTITION_ID=120/part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet" # noqa: E501 loadParquet= LoadParquet(dummy_db,testPath) filename=loadParquet.extractFilename() assert filename == "part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet" bucketpath= loadParquet.s3BucketPath() assert bucketpath=="s3://infor-sandbox-gtn-gtndndbxusw2-dbx-datastore-us-west-2" testWrongPath="data/ftegoldenimage.db/orderitemcustomfieldsdim/PARTITION_ID=120/part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.snappy.parquet" loadParquetTest=LoadParquet(dummy_db,testWrongPath) with pytest.raises(HTTPException) as exc_info: loadParquetTest.loadParquetIntoDuckDB() assert exc_info.value.detail == "File not found.." assert exc_info.value.status_code== 400
Editor is loading...