Untitled

mail@pastecode.io avatarunknown
python
a month ago
4.5 kB
8
Indexable
Never
import json
import time

import requests

from index_merge.const import Constants
from datetime import datetime

last_task_id = None
last_time_stamp = None


def reindex_into_destination():
    global last_task_id, last_time_stamp
    current_timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000+0000")
    last_updated_at_query = {
        "lte": current_timestamp
    }
    if last_time_stamp is not None:
        last_updated_at_query["gte"] = last_time_stamp

    reindex_query = {
        "bool": {
            "must": [
                {
                    "match": {
                        "companyId": Constants.SOURCE_COMPANY_ID
                    }
                },
                {
                    "range": {
                        "lastUpdatedAt": last_updated_at_query
                    }
                }
            ]
        }
    }

    reindex_payload = {
        "source": {
            "index": "process1",
            "query": reindex_query
        },
        "dest": {
            "index": Constants.DESTINATION_INDEX_NAME
        }
    }

    reindex_url = Constants.ES_CLUSTER_HOST_URL + '/_reindex?wait_for_completion=false'
    response = requests.post(reindex_url, data=json.dumps(reindex_payload),
                             auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD), headers={
            "Content-Type": "application/json"
        })
    if response.status_code == 200:
        last_task_id = response.json()["task"]
        last_time_stamp = current_timestamp
        print("task created with id: {}".format(last_task_id))


def poll_task_for_completion(task_id):
    poll_task_url = Constants.ES_CLUSTER_HOST_URL + '/_tasks/{}'.format(task_id)
    response = requests.get(url=poll_task_url, auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD))
    poll_task_data = response.json()
    print("Poll Task Status === Completed: {}, Total: {}, Updated: {}, Created: {}".format(poll_task_data["completed"],
                                                                                           poll_task_data["response"]["total"],
                                                                                           poll_task_data["response"]["updated"],
                                                                                           poll_task_data["response"]["created"]))
    return poll_task_data["completed"]


def check_and_create_destination_index():
    index_url = Constants.ES_CLUSTER_HOST_URL + '/{}'.format(Constants.DESTINATION_INDEX_NAME)
    response = requests.get(index_url, auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD))
    if response.status_code != 200:
        print("Destination index {} not found. going to create".format(Constants.DESTINATION_INDEX_NAME))
        with (open('index_merge/index_mapping.json', 'r')) as f:
            mapping_data = json.load(f)
            response = requests.put(index_url, auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD),
                                    data=json.dumps(mapping_data), headers={
                    "Content-Type": "application/json"
                })
            if response.status_code == 200:
                print("Index Created Successfully")


def load_stats():
    global last_task_id, last_time_stamp
    with open('index_merge/stats.json', 'r') as f:
        try:
            stats_json = json.load(f)
            last_task_id = stats_json["last_task_id"]
            last_time_stamp = stats_json["last_time_stamp"]
            print(last_task_id, last_time_stamp)
        except Exception as e:
            print("Error while loading stats", e)
        finally:
            f.close()


def save_stats():
    global last_task_id, last_time_stamp
    with open('index_merge/stats.json', 'w+') as f:
        json.dump({"last_task_id": last_task_id, "last_time_stamp": last_time_stamp}, f)
        f.close()


def wait_for_pending_task():
    global last_task_id

    if last_task_id is not None:
        while True:
            is_completed = poll_task_for_completion(last_task_id)
            if is_completed:
                last_task_id = None
                break


def start_activity():
    load_stats()
    check_and_create_destination_index()

    while True:
        wait_for_pending_task()
        save_stats()
        print("Sleep for 10 seconds")
        time.sleep(10)
        reindex_into_destination()
        save_stats()
        print("Sleep for 10 seconds")
        time.sleep(10)