Untitled

 avatar
user_3839718
plain_text
8 months ago
1.5 kB
2
Indexable
Never
from elasticsearch import Elasticsearch
from loguru import logger
from elasticsearch.helpers import bulk, scan
class ES:
    def __init__(self, container_es_host=None, container_es_user=None, container_es_password=None):
        self.CONTAINER_ES_HOST = container_es_host
        self.CONTAINER_ES_USER = container_es_user
        self.CONTAINER_ES_PASSWORD = container_es_password
        self._es = self.connect_es()

    def connect_es(self):
        _es = Elasticsearch(hosts=[self.CONTAINER_ES_HOST],
                            http_auth=(self.CONTAINER_ES_USER, self.CONTAINER_ES_PASSWORD))
        if _es.ping():
            logger.info("Connected to ES")
            return _es
        else:
            logger.warning("Could not connect to ES")

    def dump(self, index_name):
        es_data = []
        scan_query = {"query": {"match_all": {}}}
        for hit in scan(self._es, query=scan_query, index=index_name):
            es_data.append(hit)

        return es_data

    def search(self, index_name, query):
        es_data = []
        for hit in scan(self._es, query=query, index=index_name):
            es_data.append(hit)

        return es_data

    def update(self, index_name, doc_id, doc):
        return self._es.update(index=index_name, doc_type="_doc", id=doc_id, body=doc)

    def save(self, index_name,document_id, doc):
        return self._es.index(index=index_name, id=document_id, body=doc)

    def save_bulk(self, data):
        return bulk(self._es, data)
Leave a Comment