Untitled
unknown
plain_text
a year ago
1.3 kB
12
Indexable
import boto3
from pydantic import BaseModel
from typing import List
import json
class BusinessUnitSearchResultItem(BaseModel):
lookup_value: str
linked_records: List[str]
class BusinessUnitSearchResults(BaseModel):
features: List[BusinessUnitSearchResultItem]
class BusinessUnitSearchParameters(BaseModel):
query: str
fields: List[str] = ["lookup_value"]
sort: str = '_score desc'
size: int = 10
class BusinessUnitCloudSearchClient:
def __init__(self, domain_endpoint: str):
self.client = boto3.client('cloudsearchdomain', endpoint_url=domain_endpoint)
def search(self, params: BusinessUnitSearchParameters) -> BusinessUnitSearchResults:
response = self.client.search(
query=params.query,
queryParser='simple',
sort=params.sort,
size=params.size,
q_options=json.dumps({"fields": params.fields})
)
search_results = []
for hit in response['hits']['hit']:
search_results.append(BusinessUnitSearchResultItem(
lookup_value=hit['fields'].get('lookup_value', [''])[0],
linked_records=hit['fields'].get('linked_records', [])
))
return BusinessUnitSearchResults(
features=search_results
)
Editor is loading...
Leave a Comment