Untitled

 avatar
unknown
plain_text
2 years ago
2.2 kB
6
Indexable
from fastapi import HTTPException
from src.app_functions.core.S3_Funcs import LoadParquet
from pathlib import Path
import pytest
import duckdb
import os
import boto3
from moto import mock_s3


# def test_S3_funcs():
#     TestPath = S3_funcs("data/ftegoldenimage.db/orderitemcustomfieldsdim/PARTITION_ID=120/part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet")  # noqa: E501
#     assert TestPath.get_data_file() == "data/ftegoldenimage.db/orderitemcustomfieldsdim/PARTITION_ID=120/part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet"  # noqa: E501
#     assert TestPath.get_filename() == "part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet"  # noqa: E501
#     assert TestPath.get_bucketpath() == "s3://infor-sandbox-gtn-gtndndbxusw2-dbx-datastore-us-west-2" # noqa: E501
#     assert TestPath.loadParquetIntoDuckDB() == "part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet has been created."  # noqa: E501
#     TestWrongPath = S3_funcs("ThisPathDoesntExist")
#     assert TestWrongPath.loadParquetIntoDuckDB() == "error"
@pytest.fixture
def dummy_db():
    DUCKDB_PATH: Path = Path("/database/Flock_db_test.db")
    con = duckdb.connect(
        database=DUCKDB_PATH.as_posix(),
    )
    return con

def test_s3_funcs():
   
    testPath = "data/ftegoldenimage.db/orderitemcustomfieldsdim/PARTITION_ID=120/part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet"  # noqa: E501
    loadParquet= LoadParquet(dummy_db,testPath)
    filename=loadParquet.extractFilename()
    assert  filename == "part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.c000.snappy.parquet" 
    bucketpath= loadParquet.s3BucketPath()
    assert bucketpath=="s3://infor-sandbox-gtn-gtndndbxusw2-dbx-datastore-us-west-2"
    testWrongPath="data/ftegoldenimage.db/orderitemcustomfieldsdim/PARTITION_ID=120/part-00000-bf77a4a6-c7fc-4f45-9a86-be989892826c.snappy.parquet"
    loadParquetTest=LoadParquet(dummy_db,testWrongPath) 
    with pytest.raises(HTTPException) as exc_info:
        loadParquetTest.loadParquetIntoDuckDB()
    assert exc_info.value.detail == "File not found.."
    assert exc_info.value.status_code== 400
Editor is loading...