Untitled
unknown
plain_text
3 years ago
2.1 kB
8
Indexable
from fastapi import HTTPException
from src.app_functions.S3_funcs import parquetExists, s3Setup
import pytest
@pytest.fixture
def bucketName():
return "test"
@pytest.fixture
def fileName():
return "userdata1.parquet"
# @pytest.fixture
# def listFiles(bucketName):
# client=s3Setup()
# objects=client.list_objects(Bucket=bucketName)['Contents']
# list_files=[]
# for obj in objects:
# list_files.append(obj['Key'])
# return listFiles
def tests3Setup ():
client = s3Setup()
assert client._client_config.signature_version == 's3v4'
assert client._request_signer._credentials.access_key == 'admin12345'
assert client._request_signer._credentials.secret_key == 'admin12345'
# Test if the parquetFile exists in AWS
def testParquetExists(getConnection,fileName,bucketName):
# fileName and bucketName are True
assert parquetExists(getConnection,fileName,bucketName)==True
# assert parquetExists(getConnection,"gzeucf.parquet",bucketName)==pytest.raises
# Test when the file does not exist in the bucket
with pytest.raises(HTTPException) as exc_info:
parquetExists(getConnection,"gzeucf.parquet",bucketName)
assert isinstance(exc_info.value, HTTPException)
assert exc_info.value.status_code == 404
assert exc_info.value.detail == f"File gzeucf.parquet does not exist in S3 bucket {bucketName}"
# Test when bucketName does not exist
with pytest.raises(HTTPException) as exc_info:
parquetExists(getConnection,fileName,"test1")
assert isinstance(exc_info.value, HTTPException)
assert exc_info.value.status_code == 404
assert exc_info.value.detail == f"S3 bucket test1 does not exist"
# Test when bucket is empty
# with pytest.raises(HTTPException) as exc_info:
# parquetExists(getConnection,"",bucketName)
# assert isinstance(exc_info.value, HTTPException)
# assert exc_info.value.status_code == 404
# assert exc_info.value.detail == f"S3 bucket {bucketName} is empty"
Editor is loading...