Untitled
unknown
plain_text
a year ago
1.9 kB
11
Indexable
import boto3 from pydantic import BaseModel from typing import List import json class CustomerSearchResultItem(BaseModel): id: str title: str content: str score: str customer_type: str customer_key: str customer_name: str child_details: str customer_id: str class CustomerSearchResults(BaseModel): hits: List[CustomerSearchResultItem] found: int class CustomerSearchParameters(BaseModel): query: str fields: List[str] = ["customer_name"] sort: str = '_score desc' size: int = 10 class CustomerCloudSearchClient: def __init__(self, domain_endpoint: str): self.client = boto3.client('cloudsearchdomain', endpoint_url=domain_endpoint) def search(self, params: CustomerSearchParameters) -> CustomerSearchResults: response = self.client.search( query=params.query, queryParser='simple', sort=params.sort, size=params.size, q_options=json.dumps({"fields": params.fields}) ) search_results = [] for hit in response['hits']['hit']: search_results.append(CustomerSearchResultItem( id=hit['id'], title=hit['fields'].get('title', [''])[0], content=hit['fields'].get('content', [''])[0], score=hit['fields'].get('_score', [''])[0], customer_type=hit['fields'].get('customer_type', [''])[0], customer_key=hit['fields'].get('customer_key', [''])[0], customer_name=hit['fields'].get('customer_name', [''])[0], child_details=hit['fields'].get('child_details', [''])[0], customer_id=hit['fields'].get('customer_id', [''])[0] )) return CustomerSearchResults( hits=search_results, found=response['hits']['found'] )
Editor is loading...
Leave a Comment