Untitled

 avatar
unknown
plain_text
5 months ago
5.2 kB
2
Indexable
from typing import List, Optional

import requests
from fastapi import HTTPException

from app.config.settings import get_settings
from app.graphql.types import ReltioSearchType
from app.models import ReltioSearch

_settings = get_settings()

AUTH_URL = _settings.auth_url
TENANT_URL = _settings.tenant_url
USERNAME = _settings.username
PASSWORD = _settings.password
AUTHORIZATION_CODE = _settings.authorization_code


# Function to get the access token
def get_access_token():
    try:
        response = requests.post(
            AUTH_URL,
            data={"grant_type": "password", "username": USERNAME, "password": PASSWORD},
            headers={
                "Content-Type": "application/x-www-form-urlencoded",
                "Authorization": AUTHORIZATION_CODE,
            },
            verify=True,  # Disable SSL verification for testing
        )
        response.raise_for_status()  # Check if the request was successful
        return response.json().get("access_token")
    except requests.RequestException as e:
        return [f"Authentication error: {e}"]


# Function to extract the first string from an array of dictionaries
def extract_string_from_dict(arr):
    if arr and isinstance(arr, list) and len(arr) > 0:
        item = arr[0]
        if isinstance(item, dict):
            return item.get("value")  # Adjust based on the actual key in the dictionary
    return None


# Function to map JSON response to ReltioSearch model
def map_json(response_data) -> List[ReltioSearch]:
    mapped_fields = []

    if response_data and "objects" in response_data:
        for obj in response_data["objects"]:
            attributes = obj.get("attributes", {})

            # Create ReltioSearch instance and map values
            product_field = ReltioSearch(
                display_name=extract_string_from_dict(attributes.get("display_name", [])),
                display_name_product=extract_string_from_dict(
                    attributes.get("display_name", [])
                ),
                therapeutic_area=extract_string_from_dict(
                    attributes.get("therapeutic_area", [])
                ),
                group_status=extract_string_from_dict(attributes.get("group_status", [])),
                pgjn_flag=extract_string_from_dict(attributes.get("pgjn_flag", [])),
                group_type=extract_string_from_dict(attributes.get("group_type", [])),
                uri=obj.get("uri", None),
            )

            # If `uri` exists, trim the first 9 characters
            if product_field.uri:
                product_field.uri = product_field.uri[9:]

            # Only append if there are non-null values
            if any(
                value is not None
                for value in [
                    product_field.display_name,
                    product_field.therapeutic_area,
                    product_field.group_status,
                    product_field.pgjn_flag,
                    product_field.uri,
                    product_field.display_name_product,
                    product_field.group_type,
                ]
            ):
                mapped_fields.append(product_field)

    return mapped_fields


# Function to make the API call
def api_call(product_name: str = None, reltio_id: str = None):
    request_param = "equals(type,'configuration/entityTypes/ProductGroup') and equals(attributes.GroupType,'Brand')"

    if reltio_id:
        request_param += f" and contains (uri,'*{reltio_id}*')"

    if product_name:
        request_param += f" and contains(attributes.Name,'*{product_name}*')"

    try:
        access_token = get_access_token()

        response = requests.post(
            TENANT_URL,
            json=None,  # Adjust this based on your API's requirements
            headers={
                "Authorization": f"Bearer {access_token}",
                "Content-Type": "application/json",
            },
            params={"filter": request_param},
            verify=True,  # Disable SSL verification for testing
        )
        response.raise_for_status()  # Check if the request was successful
        return response.json()

    except requests.RequestException as e:
        return [f"API error: {e}"]


async def search_products(
    self, product_name: Optional[str] = None, reltio_id: Optional[str] = None
) -> List[ReltioSearchType]:
    try:
        response_data = api_call(product_name, reltio_id)
        mapped_fields = map_json(response_data)

        # Return a list of ReltioSearchType while excluding null values
        return [
            ReltioSearchType(
                display_name=field.display_name,
                therapeutic_area=field.therapeutic_area,
                group_status=field.group_status,
                pgjn_flag=field.pgjn_flag,
                uri=field.uri,
                display_name_product=field.display_name_product,
                group_type=field.group_type,
            )
            for field in mapped_fields
        ]
    except Exception as e:
        return [f"Search error: {e}"]
Editor is loading...
Leave a Comment