Untitled
python
a month ago
4.5 kB
8
Indexable
Never
import json import time import requests from index_merge.const import Constants from datetime import datetime last_task_id = None last_time_stamp = None def reindex_into_destination(): global last_task_id, last_time_stamp current_timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000+0000") last_updated_at_query = { "lte": current_timestamp } if last_time_stamp is not None: last_updated_at_query["gte"] = last_time_stamp reindex_query = { "bool": { "must": [ { "match": { "companyId": Constants.SOURCE_COMPANY_ID } }, { "range": { "lastUpdatedAt": last_updated_at_query } } ] } } reindex_payload = { "source": { "index": "process1", "query": reindex_query }, "dest": { "index": Constants.DESTINATION_INDEX_NAME } } reindex_url = Constants.ES_CLUSTER_HOST_URL + '/_reindex?wait_for_completion=false' response = requests.post(reindex_url, data=json.dumps(reindex_payload), auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD), headers={ "Content-Type": "application/json" }) if response.status_code == 200: last_task_id = response.json()["task"] last_time_stamp = current_timestamp print("task created with id: {}".format(last_task_id)) def poll_task_for_completion(task_id): poll_task_url = Constants.ES_CLUSTER_HOST_URL + '/_tasks/{}'.format(task_id) response = requests.get(url=poll_task_url, auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD)) poll_task_data = response.json() print("Poll Task Status === Completed: {}, Total: {}, Updated: {}, Created: {}".format(poll_task_data["completed"], poll_task_data["response"]["total"], poll_task_data["response"]["updated"], poll_task_data["response"]["created"])) return poll_task_data["completed"] def check_and_create_destination_index(): index_url = Constants.ES_CLUSTER_HOST_URL + '/{}'.format(Constants.DESTINATION_INDEX_NAME) response = requests.get(index_url, auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD)) if response.status_code != 200: print("Destination index {} not found. going to create".format(Constants.DESTINATION_INDEX_NAME)) with (open('index_merge/index_mapping.json', 'r')) as f: mapping_data = json.load(f) response = requests.put(index_url, auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD), data=json.dumps(mapping_data), headers={ "Content-Type": "application/json" }) if response.status_code == 200: print("Index Created Successfully") def load_stats(): global last_task_id, last_time_stamp with open('index_merge/stats.json', 'r') as f: try: stats_json = json.load(f) last_task_id = stats_json["last_task_id"] last_time_stamp = stats_json["last_time_stamp"] print(last_task_id, last_time_stamp) except Exception as e: print("Error while loading stats", e) finally: f.close() def save_stats(): global last_task_id, last_time_stamp with open('index_merge/stats.json', 'w+') as f: json.dump({"last_task_id": last_task_id, "last_time_stamp": last_time_stamp}, f) f.close() def wait_for_pending_task(): global last_task_id if last_task_id is not None: while True: is_completed = poll_task_for_completion(last_task_id) if is_completed: last_task_id = None break def start_activity(): load_stats() check_and_create_destination_index() while True: wait_for_pending_task() save_stats() print("Sleep for 10 seconds") time.sleep(10) reindex_into_destination() save_stats() print("Sleep for 10 seconds") time.sleep(10)