Untitled
unknown
plain_text
a year ago
1.3 kB
6
Indexable
import boto3 from pydantic import BaseModel from typing import List import json class BusinessUnitSearchResultItem(BaseModel): lookup_value: str linked_records: List[str] class BusinessUnitSearchResults(BaseModel): features: List[BusinessUnitSearchResultItem] class BusinessUnitSearchParameters(BaseModel): query: str fields: List[str] = ["lookup_value"] sort: str = '_score desc' size: int = 10 class BusinessUnitCloudSearchClient: def __init__(self, domain_endpoint: str): self.client = boto3.client('cloudsearchdomain', endpoint_url=domain_endpoint) def search(self, params: BusinessUnitSearchParameters) -> BusinessUnitSearchResults: response = self.client.search( query=params.query, queryParser='simple', sort=params.sort, size=params.size, q_options=json.dumps({"fields": params.fields}) ) search_results = [] for hit in response['hits']['hit']: search_results.append(BusinessUnitSearchResultItem( lookup_value=hit['fields'].get('lookup_value', [''])[0], linked_records=hit['fields'].get('linked_records', []) )) return BusinessUnitSearchResults( features=search_results )
Editor is loading...
Leave a Comment