Untitled
unknown
plain_text
a year ago
2.1 kB
1
Indexable
Never
from fastapi import HTTPException from src.app_functions.S3_funcs import parquetExists, s3Setup import pytest @pytest.fixture def bucketName(): return "test" @pytest.fixture def fileName(): return "userdata1.parquet" # @pytest.fixture # def listFiles(bucketName): # client=s3Setup() # objects=client.list_objects(Bucket=bucketName)['Contents'] # list_files=[] # for obj in objects: # list_files.append(obj['Key']) # return listFiles def tests3Setup (): client = s3Setup() assert client._client_config.signature_version == 's3v4' assert client._request_signer._credentials.access_key == 'admin12345' assert client._request_signer._credentials.secret_key == 'admin12345' # Test if the parquetFile exists in AWS def testParquetExists(getConnection,fileName,bucketName): # fileName and bucketName are True assert parquetExists(getConnection,fileName,bucketName)==True # assert parquetExists(getConnection,"gzeucf.parquet",bucketName)==pytest.raises # Test when the file does not exist in the bucket with pytest.raises(HTTPException) as exc_info: parquetExists(getConnection,"gzeucf.parquet",bucketName) assert isinstance(exc_info.value, HTTPException) assert exc_info.value.status_code == 404 assert exc_info.value.detail == f"File gzeucf.parquet does not exist in S3 bucket {bucketName}" # Test when bucketName does not exist with pytest.raises(HTTPException) as exc_info: parquetExists(getConnection,fileName,"test1") assert isinstance(exc_info.value, HTTPException) assert exc_info.value.status_code == 404 assert exc_info.value.detail == f"S3 bucket test1 does not exist" # Test when bucket is empty # with pytest.raises(HTTPException) as exc_info: # parquetExists(getConnection,"",bucketName) # assert isinstance(exc_info.value, HTTPException) # assert exc_info.value.status_code == 404 # assert exc_info.value.detail == f"S3 bucket {bucketName} is empty"