Untitled
unknown
plain_text
a year ago
6.6 kB
12
Indexable
import argparse
from datetime import datetime
from opensearchpy import OpenSearch
from opensearchpy.client.snapshot import SnapshotClient
from opensearchpy.client.indices import IndicesClient
import re
import json
# Client initialization
host = 'default.elk.inf.use1.cwdevsandbox.cwnet.io'
port = 9200
auth = ('admin', '/hk3UlwmalS72t0dK+B6G+8Q4ZEnSNi9+mENgtWBBeo=')
client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_compress=True,
http_auth=auth,
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
if client.ping():
print("Client created successfully")
snapshot_client = SnapshotClient(client)
if snapshot_client:
print("Snapshot client created successfully")
index_client = IndicesClient(client)
if index_client:
print("Index client created successfully")
# Helper Functions
def extract_date_from_snapshot(string):
snapshot_timestamp = string.split('-')[1:]
snapshot_timestamp = "".join(snapshot_timestamp)
dt_object = datetime.strptime(snapshot_timestamp, '%Y%m%d%H%M%S')
return dt_object
def get_repositories():
response = client.cat.repositories()
temp = response.split("\n")
repository_names = [string.split(" ")[0] for string in temp if string]
return repository_names
def take_snapshot(repository_name="os-snapshot-repo"):
snapshot_settings = {
"indices": ["*", "-*_restored"],
}
today = str(datetime.now())
year, month, day, hour, minutes, seconds = today[0:4], today[5:7], today[8:10], today[11:13], today[14:16], today[17:19]
client.snapshot.create(
repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings)
def get_snapshots(repository_name):
snapshots = client.snapshot.get(repository=repository_name, snapshot='_all')["snapshots"]
snapshot_names = [i["snapshot"] for i in snapshots]
return snapshot_names
def get_indices_from_snapshot(repository_name, snapshot_name):
return snapshot_client.get(repository=repository_name, snapshot=snapshot_name)["snapshots"][0]["indices"]
def milliseconds_to_datetime(millis_str):
millis_int = int(millis_str)
seconds = millis_int / 1000
dt_object = datetime.fromtimestamp(seconds)
return dt_object
def extract_creation_date_from_restored_index(index_name):
response = index_client.get(index=f'{index_name}_restored')
miliseconds = response[f'{index_name}_restored']["settings"]["index"]["creation_date"]
return milliseconds_to_datetime(miliseconds)
def restore_snapshot(repository_name, snapshot_name, indices, searchable=False):
client.snapshot.restore(repository=repository_name, snapshot=snapshot_name, body={
"indices": indices,
'storage_type': 'remote_snapshot' if searchable else 'local',
'rename_pattern': "(.+)" if searchable else "",
"rename_replacement": "$1_restored" if searchable else ""
})
def convert_year_month_day_to_datetime(year, month, day):
dt_object = datetime(year, month, day)
return dt_object
def formatter(obj):
return json.dumps(obj, indent=4)
def parse_date(date_str):
return datetime.strptime(date_str, '%Y-%m-%d')
parser = argparse.ArgumentParser(description="Process some dates.")
parser.add_argument('--index_name', type=str, required=True, help='Index name')
parser.add_argument('--repository_name', type=str, required=True, help='Repository name')
parser.add_argument('--start_date', type=str, required=True, help='Start date in YYYY-MM-DD format')
parser.add_argument('--end_date', type=str, required=True, help='End date in YYYY-MM-DD format')
args = parser.parse_args()
def user_input():
index_name = args.index_name
repository_name = args.repository_name
start_date = parse_date(args.start_date)
end_date = parse_date(args.end_date)
return index_name, repository_name, start_date, end_date
def delete_restored_indices(indices):
print("Deleting indices:", indices)
try:
if isinstance(indices, str):
client.indices.delete(index=f'{indices}_restored', params=None, headers=None)
else:
for index in indices:
client.indices.delete(index=f'{index}_restored', params=None, headers=None)
print("Indices Deleted")
except Exception as e:
print(f"An error occurred while deleting indices: {e}")
def filter_snapshots(snapshots, start_date, end_date):
filtered_snapshots = []
for snapshot in snapshots:
snapshot_time = extract_date_from_snapshot(snapshot)
if start_date <= snapshot_time <= end_date:
filtered_snapshots.append(snapshot)
return filtered_snapshots
def extract_indices_from_time_frame(repository_name, micro_service_pattern, start_date, end_date):
res = []
snapshots = get_snapshots(repository_name)
filtered_snapshots = filter_snapshots(snapshots, start_date, end_date)
for snapshot in filtered_snapshots:
indices = get_indices_from_snapshot(repository_name, snapshot)
res.extend(indices)
res = list(set(res))
filtered = [item for item in res if re.match(micro_service_pattern, item)]
filtered.sort()
return filtered, filtered_snapshots
repository = args.repository_name
snapshots = get_snapshots(repository)
index_name, repository_name, start_date, end_date = user_input()
extracted_indices, filtered_snapshots = extract_indices_from_time_frame(
repository, index_name, start_date, end_date)
extracted_indices = extracted_indices[::-1]
filtered_snapshots = filtered_snapshots[::-1]
temp = extracted_indices.copy()
for snapshot in filtered_snapshots:
copy_of_extracted_indices = extracted_indices.copy() # Make a copy to work with
for index in copy_of_extracted_indices:
try:
restore_snapshot(repository, snapshot, index, True)
copy_of_extracted_indices[copy_of_extracted_indices.index(index)] = None
continue
except Exception as e:
while copy_of_extracted_indices and not copy_of_extracted_indices[0]: # Check if list is not empty
copy_of_extracted_indices.pop(0)
break
# Update extracted_indices based on changes in copy_of_extracted_indices
extracted_indices[:] = copy_of_extracted_indices
if __name__ == "__main__":
index_name, repository_name, start_date, end_date = user_input()
print(f"Index Name: {index_name}")
print(f"Repository Name: {repository_name}")
print(f"Start Date: {start_date}")
print(f"End Date: {end_date}")
Editor is loading...
Leave a Comment