Untitled
user_3839718
plain_text
2 years ago
1.5 kB
11
Indexable
from elasticsearch import Elasticsearch
from loguru import logger
from elasticsearch.helpers import bulk, scan
class ES:
def __init__(self, container_es_host=None, container_es_user=None, container_es_password=None):
self.CONTAINER_ES_HOST = container_es_host
self.CONTAINER_ES_USER = container_es_user
self.CONTAINER_ES_PASSWORD = container_es_password
self._es = self.connect_es()
def connect_es(self):
_es = Elasticsearch(hosts=[self.CONTAINER_ES_HOST],
http_auth=(self.CONTAINER_ES_USER, self.CONTAINER_ES_PASSWORD))
if _es.ping():
logger.info("Connected to ES")
return _es
else:
logger.warning("Could not connect to ES")
def dump(self, index_name):
es_data = []
scan_query = {"query": {"match_all": {}}}
for hit in scan(self._es, query=scan_query, index=index_name):
es_data.append(hit)
return es_data
def search(self, index_name, query):
es_data = []
for hit in scan(self._es, query=query, index=index_name):
es_data.append(hit)
return es_data
def update(self, index_name, doc_id, doc):
return self._es.update(index=index_name, doc_type="_doc", id=doc_id, body=doc)
def save(self, index_name,document_id, doc):
return self._es.index(index=index_name, id=document_id, body=doc)
def save_bulk(self, data):
return bulk(self._es, data)Editor is loading...
Leave a Comment