Untitled
unknown
plain_text
a year ago
1.9 kB
16
Indexable
import boto3
from pydantic import BaseModel
from typing import List
import json
class CustomerSearchResultItem(BaseModel):
id: str
title: str
content: str
score: str
customer_type: str
customer_key: str
customer_name: str
child_details: str
customer_id: str
class CustomerSearchResults(BaseModel):
hits: List[CustomerSearchResultItem]
found: int
class CustomerSearchParameters(BaseModel):
query: str
fields: List[str] = ["customer_name"]
sort: str = '_score desc'
size: int = 10
class CustomerCloudSearchClient:
def __init__(self, domain_endpoint: str):
self.client = boto3.client('cloudsearchdomain', endpoint_url=domain_endpoint)
def search(self, params: CustomerSearchParameters) -> CustomerSearchResults:
response = self.client.search(
query=params.query,
queryParser='simple',
sort=params.sort,
size=params.size,
q_options=json.dumps({"fields": params.fields})
)
search_results = []
for hit in response['hits']['hit']:
search_results.append(CustomerSearchResultItem(
id=hit['id'],
title=hit['fields'].get('title', [''])[0],
content=hit['fields'].get('content', [''])[0],
score=hit['fields'].get('_score', [''])[0],
customer_type=hit['fields'].get('customer_type', [''])[0],
customer_key=hit['fields'].get('customer_key', [''])[0],
customer_name=hit['fields'].get('customer_name', [''])[0],
child_details=hit['fields'].get('child_details', [''])[0],
customer_id=hit['fields'].get('customer_id', [''])[0]
))
return CustomerSearchResults(
hits=search_results,
found=response['hits']['found']
)
Editor is loading...
Leave a Comment