Untitled
unknown
python
3 years ago
4.5 kB
16
Indexable
import json
import time
import requests
from index_merge.const import Constants
from datetime import datetime
last_task_id = None
last_time_stamp = None
def reindex_into_destination():
global last_task_id, last_time_stamp
current_timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000+0000")
last_updated_at_query = {
"lte": current_timestamp
}
if last_time_stamp is not None:
last_updated_at_query["gte"] = last_time_stamp
reindex_query = {
"bool": {
"must": [
{
"match": {
"companyId": Constants.SOURCE_COMPANY_ID
}
},
{
"range": {
"lastUpdatedAt": last_updated_at_query
}
}
]
}
}
reindex_payload = {
"source": {
"index": "process1",
"query": reindex_query
},
"dest": {
"index": Constants.DESTINATION_INDEX_NAME
}
}
reindex_url = Constants.ES_CLUSTER_HOST_URL + '/_reindex?wait_for_completion=false'
response = requests.post(reindex_url, data=json.dumps(reindex_payload),
auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD), headers={
"Content-Type": "application/json"
})
if response.status_code == 200:
last_task_id = response.json()["task"]
last_time_stamp = current_timestamp
print("task created with id: {}".format(last_task_id))
def poll_task_for_completion(task_id):
poll_task_url = Constants.ES_CLUSTER_HOST_URL + '/_tasks/{}'.format(task_id)
response = requests.get(url=poll_task_url, auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD))
poll_task_data = response.json()
print("Poll Task Status === Completed: {}, Total: {}, Updated: {}, Created: {}".format(poll_task_data["completed"],
poll_task_data["response"]["total"],
poll_task_data["response"]["updated"],
poll_task_data["response"]["created"]))
return poll_task_data["completed"]
def check_and_create_destination_index():
index_url = Constants.ES_CLUSTER_HOST_URL + '/{}'.format(Constants.DESTINATION_INDEX_NAME)
response = requests.get(index_url, auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD))
if response.status_code != 200:
print("Destination index {} not found. going to create".format(Constants.DESTINATION_INDEX_NAME))
with (open('index_merge/index_mapping.json', 'r')) as f:
mapping_data = json.load(f)
response = requests.put(index_url, auth=(Constants.ES_CLUSTER_USERNAME, Constants.ES_CLUSTER_PASSWORD),
data=json.dumps(mapping_data), headers={
"Content-Type": "application/json"
})
if response.status_code == 200:
print("Index Created Successfully")
def load_stats():
global last_task_id, last_time_stamp
with open('index_merge/stats.json', 'r') as f:
try:
stats_json = json.load(f)
last_task_id = stats_json["last_task_id"]
last_time_stamp = stats_json["last_time_stamp"]
print(last_task_id, last_time_stamp)
except Exception as e:
print("Error while loading stats", e)
finally:
f.close()
def save_stats():
global last_task_id, last_time_stamp
with open('index_merge/stats.json', 'w+') as f:
json.dump({"last_task_id": last_task_id, "last_time_stamp": last_time_stamp}, f)
f.close()
def wait_for_pending_task():
global last_task_id
if last_task_id is not None:
while True:
is_completed = poll_task_for_completion(last_task_id)
if is_completed:
last_task_id = None
break
def start_activity():
load_stats()
check_and_create_destination_index()
while True:
wait_for_pending_task()
save_stats()
print("Sleep for 10 seconds")
time.sleep(10)
reindex_into_destination()
save_stats()
print("Sleep for 10 seconds")
time.sleep(10)
Editor is loading...