Untitled
unknown
plain_text
5 months ago
5.2 kB
2
Indexable
from typing import List, Optional import requests from fastapi import HTTPException from app.config.settings import get_settings from app.graphql.types import ReltioSearchType from app.models import ReltioSearch _settings = get_settings() AUTH_URL = _settings.auth_url TENANT_URL = _settings.tenant_url USERNAME = _settings.username PASSWORD = _settings.password AUTHORIZATION_CODE = _settings.authorization_code # Function to get the access token def get_access_token(): try: response = requests.post( AUTH_URL, data={"grant_type": "password", "username": USERNAME, "password": PASSWORD}, headers={ "Content-Type": "application/x-www-form-urlencoded", "Authorization": AUTHORIZATION_CODE, }, verify=True, # Disable SSL verification for testing ) response.raise_for_status() # Check if the request was successful return response.json().get("access_token") except requests.RequestException as e: return [f"Authentication error: {e}"] # Function to extract the first string from an array of dictionaries def extract_string_from_dict(arr): if arr and isinstance(arr, list) and len(arr) > 0: item = arr[0] if isinstance(item, dict): return item.get("value") # Adjust based on the actual key in the dictionary return None # Function to map JSON response to ReltioSearch model def map_json(response_data) -> List[ReltioSearch]: mapped_fields = [] if response_data and "objects" in response_data: for obj in response_data["objects"]: attributes = obj.get("attributes", {}) # Create ReltioSearch instance and map values product_field = ReltioSearch( display_name=extract_string_from_dict(attributes.get("display_name", [])), display_name_product=extract_string_from_dict( attributes.get("display_name", []) ), therapeutic_area=extract_string_from_dict( attributes.get("therapeutic_area", []) ), group_status=extract_string_from_dict(attributes.get("group_status", [])), pgjn_flag=extract_string_from_dict(attributes.get("pgjn_flag", [])), group_type=extract_string_from_dict(attributes.get("group_type", [])), uri=obj.get("uri", None), ) # If `uri` exists, trim the first 9 characters if product_field.uri: product_field.uri = product_field.uri[9:] # Only append if there are non-null values if any( value is not None for value in [ product_field.display_name, product_field.therapeutic_area, product_field.group_status, product_field.pgjn_flag, product_field.uri, product_field.display_name_product, product_field.group_type, ] ): mapped_fields.append(product_field) return mapped_fields # Function to make the API call def api_call(product_name: str = None, reltio_id: str = None): request_param = "equals(type,'configuration/entityTypes/ProductGroup') and equals(attributes.GroupType,'Brand')" if reltio_id: request_param += f" and contains (uri,'*{reltio_id}*')" if product_name: request_param += f" and contains(attributes.Name,'*{product_name}*')" try: access_token = get_access_token() response = requests.post( TENANT_URL, json=None, # Adjust this based on your API's requirements headers={ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", }, params={"filter": request_param}, verify=True, # Disable SSL verification for testing ) response.raise_for_status() # Check if the request was successful return response.json() except requests.RequestException as e: return [f"API error: {e}"] async def search_products( self, product_name: Optional[str] = None, reltio_id: Optional[str] = None ) -> List[ReltioSearchType]: try: response_data = api_call(product_name, reltio_id) mapped_fields = map_json(response_data) # Return a list of ReltioSearchType while excluding null values return [ ReltioSearchType( display_name=field.display_name, therapeutic_area=field.therapeutic_area, group_status=field.group_status, pgjn_flag=field.pgjn_flag, uri=field.uri, display_name_product=field.display_name_product, group_type=field.group_type, ) for field in mapped_fields ] except Exception as e: return [f"Search error: {e}"]
Editor is loading...
Leave a Comment