Untitled
unknown
plain_text
a year ago
1.2 kB
13
Indexable
import boto3
from pydantic import BaseModel
from typing import List
import json
class BrandSearchResultItem(BaseModel):
key: str
name: str
class BrandSearchResults(BaseModel):
features: List[BrandSearchResultItem]
class BrandSearchParameters(BaseModel):
query: str
fields: List[str] = ["brand"]
sort: str = '_score desc'
size: int = 10
class BrandCloudSearchClient:
def __init__(self, domain_endpoint: str):
self.client = boto3.client('cloudsearchdomain', endpoint_url=domain_endpoint)
def search(self, params: BrandSearchParameters) -> BrandSearchResults:
response = self.client.search(
query=params.query,
queryParser='simple',
sort=params.sort,
size=params.size,
q_options=json.dumps({"fields": params.fields})
)
search_results = []
for hit in response['hits']['hit']:
search_results.append(BrandSearchResultItem(
key=hit['fields'].get('brand', [''])[0],
name=hit['fields'].get('name', [''])[0]
))
return BrandSearchResults(
features=search_results
)
Editor is loading...
Leave a Comment