Untitled
unknown
plain_text
a year ago
7.8 kB
10
Indexable
import os
from math import inf
from opensearchpy import OpenSearch
import json
import re
from datetime import datetime
from opensearchpy.client.snapshot import SnapshotClient
from opensearchpy.client.indices import IndicesClient
# import sys
import argparse
# class DateAction(argparse.Action):
# def __call__(self, parser, namespace, values, option_string=None):
# year, month, day = values.split()
# date = datetime(int(year), int(month), int(day))
# setattr(namespace, self.dest, date)
# Client initialization
host = 'default.elk.inf.use1.cwdevsandbox.cwnet.io'
port = 9200
auth = ('admin', '/hk3UlwmalS72t0dK+B6G+8Q4ZEnSNi9+mENgtWBBeo=')
client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_compress=True,
http_auth=auth,
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
if client.ping():
print("Client created successfully")
snapshot_client = SnapshotClient(client)
if snapshot_client:
print("Snapshot client created successfully")
index_client = IndicesClient(client)
if index_client:
print("Index client created successfully")
# Helper Functions
def extract_date_from_snapshot(string):
snapshot_timestamp = string.split('-')[1:]
snapshot_timestamp = "".join(snapshot_timestamp)
dt_object = datetime.strptime(snapshot_timestamp, '%Y%m%d%H%M%S')
return dt_object
def get_repositories():
response = client.cat.repositories()
temp = response.split("\n")
repository_names = []
for string in temp:
repository_names.append(string.split(" ")[0])
return repository_names[:-1]
def take_snapshot(repository_name="os-snapshot-repo"):
snapshot_settings = {
"indices": ["*", "-restored_*"],
}
today = str(datetime.now())
year, month, day, hour, minutes, seconds = today[0:4], today[5:7], today[8:10], today[11:13], today[14:16], today[17:19]
client.snapshot.create(
repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings)
def get_snapshots(repository_name):
snapshots = client.snapshot.get(repository=repository_name, snapshot='_all')["snapshots"]
snapshot_names = [i["snapshot"] for i in snapshots]
return snapshot_names
def get_indices_from_snapshot(repository_name, snapshot_name):
return snapshot_client.get(repository=repository_name, snapshot=snapshot_name)["snapshots"][0]["indices"]
def milliseconds_to_datetime(millis_str):
millis_int = int(millis_str)
seconds = millis_int / 1000
dt_object = datetime.fromtimestamp(seconds)
return dt_object
def extract_creation_date_from_restored_index(index_name):
response = index_client.get(index=f'restored_{index_name}')
miliseconds = response[f'restored_{index_name}']["settings"]["index"]["creation_date"]
return milliseconds_to_datetime(miliseconds)
def restore_snapshot(repository_name, snapshot_name, indices, searchable=False):
client.snapshot.restore(repository=repository_name, snapshot=snapshot_name, body={
"indices": indices,
'storage_type': 'remote_snapshot' if searchable else 'local',
'rename_pattern': "(.+)" if searchable else "",
"rename_replacement": "restored_$1" if searchable else ""
})
def convert_year_month_day_to_datetime(year, month, day):
dt_object = datetime(year, month, day)
return dt_object
def formatter(obj):
return json.dumps(obj, indent=4)
#parser
class DateAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
year, month, day = values.split()
date = datetime(int(year), int(month), int(day))
setattr(namespace, self.dest, date)
parser = argparse.ArgumentParser()
parser.add_argument('index_name', type=str, help='Index name')
parser.add_argument('--start_date', type=DateAction, dest='start_date', help='Start date in YYYY MM DD format')
parser.add_argument('--end_date', type=DateAction, dest='end_date', help='End date in YYYY MM DD format')
args = parser.parse_args()
print(args)
def user_input():
index_name = args.index_name
start_date_str = args.start_date
start_date = convert_year_month_day_to_datetime(
int(start_date_str.split(" ")[0]), int(start_date_str.split(" ")[1]), int(start_date_str.split(" ")[2]))
end_date_str = args.end_date
end_date = convert_year_month_day_to_datetime(
int(end_date_str.split(" ")[0]), int(end_date_str.split(" ")[1]), int(end_date_str.split(" ")[2]))
return index_name, start_date, end_date
def delete_restored_indices(indices):
print("Deleting indices:", indices)
try:
if isinstance(indices, str):
client.indices.delete(index=f'restored_{indices}', params=None, headers=None)
else:
for index in indices:
client.indices.delete(index=f'restored_{index}', params=None, headers=None)
print("Indices Deleted")
except Exception as e:
print(f"An error occurred while deleting indices: {e}")
def filter_snapshots(snapshots, start_date, end_date):
filtered_snapshots = []
for snapshot in snapshots:
snapshot_time = extract_date_from_snapshot(snapshot)
if start_date <= snapshot_time <= end_date:
filtered_snapshots.append(snapshot)
return filtered_snapshots
def extract_indices_from_time_frame(repository_name, micro_service_pattern, start_date, end_date):
res = []
snapshots = get_snapshots(repository_name)
filtered_snapshots = filter_snapshots(snapshots, start_date, end_date)
for snapshot in filtered_snapshots:
indices = get_indices_from_snapshot(repository_name, snapshot)
res.extend(indices)
res = list(set(res))
filtered = [item for item in res if re.match(micro_service_pattern, item)]
filtered.sort()
return filtered, filtered_snapshots
repository = get_repositories()[0]
snapshots = get_snapshots(repository)
index_name = args.index_name
start_date = args.start_date
end_date = args.end_date
extracted_indices, filtered_snapshots = extract_indices_from_time_frame(
repository, index_name, start_date, end_date)
extracted_indices = extracted_indices[::-1]
filtered_snapshots = filtered_snapshots[::-1]
temp = extracted_indices.copy()
for snapshot in filtered_snapshots:
copy_of_extracted_indices = extracted_indices.copy() # Make a copy to work with
for index in copy_of_extracted_indices:
try:
restore_snapshot(repository, snapshot, index, True)
copy_of_extracted_indices[copy_of_extracted_indices.index(index)] = None
continue
except Exception as e:
while copy_of_extracted_indices and not copy_of_extracted_indices[0]: # Check if list is not empty
copy_of_extracted_indices.pop(0)
break
# Update extracted_indices based on changes in copy_of_extracted_indices
extracted_indices[:] = copy_of_extracted_indices
error
python3 main.py kc-hamza --start-date 2024 07 01 --end-date 2024 07 05
Client created successfully
Snapshot client created successfully
Index client created successfully
usage: main.py [-h] [--start_date START_DATE] [--end_date END_DATE] index_name
main.py: error: unrecognized arguments: --start-date 2024 07 01 --end-date 2024 07 05
fix this whys theres this error the example input format is
python3 main.py kc-hamza --start-date 2024 07 01 --end-date 2024 07 05Editor is loading...
Leave a Comment