Untitled
unknown
plain_text
a year ago
5.2 kB
6
Indexable
from typing import List, Optional
import requests
from fastapi import HTTPException
from app.config.settings import get_settings
from app.graphql.types import ReltioSearchType
from app.models import ReltioSearch
_settings = get_settings()
AUTH_URL = _settings.auth_url
TENANT_URL = _settings.tenant_url
USERNAME = _settings.username
PASSWORD = _settings.password
AUTHORIZATION_CODE = _settings.authorization_code
# Function to get the access token
def get_access_token():
try:
response = requests.post(
AUTH_URL,
data={"grant_type": "password", "username": USERNAME, "password": PASSWORD},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": AUTHORIZATION_CODE,
},
verify=True, # Disable SSL verification for testing
)
response.raise_for_status() # Check if the request was successful
return response.json().get("access_token")
except requests.RequestException as e:
return [f"Authentication error: {e}"]
# Function to extract the first string from an array of dictionaries
def extract_string_from_dict(arr):
if arr and isinstance(arr, list) and len(arr) > 0:
item = arr[0]
if isinstance(item, dict):
return item.get("value") # Adjust based on the actual key in the dictionary
return None
# Function to map JSON response to ReltioSearch model
def map_json(response_data) -> List[ReltioSearch]:
mapped_fields = []
if response_data and "objects" in response_data:
for obj in response_data["objects"]:
attributes = obj.get("attributes", {})
# Create ReltioSearch instance and map values
product_field = ReltioSearch(
display_name=extract_string_from_dict(attributes.get("display_name", [])),
display_name_product=extract_string_from_dict(
attributes.get("display_name", [])
),
therapeutic_area=extract_string_from_dict(
attributes.get("therapeutic_area", [])
),
group_status=extract_string_from_dict(attributes.get("group_status", [])),
pgjn_flag=extract_string_from_dict(attributes.get("pgjn_flag", [])),
group_type=extract_string_from_dict(attributes.get("group_type", [])),
uri=obj.get("uri", None),
)
# If `uri` exists, trim the first 9 characters
if product_field.uri:
product_field.uri = product_field.uri[9:]
# Only append if there are non-null values
if any(
value is not None
for value in [
product_field.display_name,
product_field.therapeutic_area,
product_field.group_status,
product_field.pgjn_flag,
product_field.uri,
product_field.display_name_product,
product_field.group_type,
]
):
mapped_fields.append(product_field)
return mapped_fields
# Function to make the API call
def api_call(product_name: str = None, reltio_id: str = None):
request_param = "equals(type,'configuration/entityTypes/ProductGroup') and equals(attributes.GroupType,'Brand')"
if reltio_id:
request_param += f" and contains (uri,'*{reltio_id}*')"
if product_name:
request_param += f" and contains(attributes.Name,'*{product_name}*')"
try:
access_token = get_access_token()
response = requests.post(
TENANT_URL,
json=None, # Adjust this based on your API's requirements
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
},
params={"filter": request_param},
verify=True, # Disable SSL verification for testing
)
response.raise_for_status() # Check if the request was successful
return response.json()
except requests.RequestException as e:
return [f"API error: {e}"]
async def search_products(
self, product_name: Optional[str] = None, reltio_id: Optional[str] = None
) -> List[ReltioSearchType]:
try:
response_data = api_call(product_name, reltio_id)
mapped_fields = map_json(response_data)
# Return a list of ReltioSearchType while excluding null values
return [
ReltioSearchType(
display_name=field.display_name,
therapeutic_area=field.therapeutic_area,
group_status=field.group_status,
pgjn_flag=field.pgjn_flag,
uri=field.uri,
display_name_product=field.display_name_product,
group_type=field.group_type,
)
for field in mapped_fields
]
except Exception as e:
return [f"Search error: {e}"]
Editor is loading...
Leave a Comment