Untitled
unknown
plain_text
a year ago
6.6 kB
3
Indexable
import argparse from datetime import datetime from opensearchpy import OpenSearch from opensearchpy.client.snapshot import SnapshotClient from opensearchpy.client.indices import IndicesClient import re import json # Client initialization host = 'default.elk.inf.use1.cwdevsandbox.cwnet.io' port = 9200 auth = ('admin', '/hk3UlwmalS72t0dK+B6G+8Q4ZEnSNi9+mENgtWBBeo=') client = OpenSearch( hosts=[{'host': host, 'port': port}], http_compress=True, http_auth=auth, use_ssl=True, verify_certs=False, ssl_assert_hostname=False, ssl_show_warn=False, ) if client.ping(): print("Client created successfully") snapshot_client = SnapshotClient(client) if snapshot_client: print("Snapshot client created successfully") index_client = IndicesClient(client) if index_client: print("Index client created successfully") # Helper Functions def extract_date_from_snapshot(string): snapshot_timestamp = string.split('-')[1:] snapshot_timestamp = "".join(snapshot_timestamp) dt_object = datetime.strptime(snapshot_timestamp, '%Y%m%d%H%M%S') return dt_object def get_repositories(): response = client.cat.repositories() temp = response.split("\n") repository_names = [] for string in temp: repository_names.append(string.split(" ")[0]) return repository_names[:-1] def take_snapshot(repository_name="os-snapshot-repo"): snapshot_settings = { "indices": ["*", "-restored_*"], } today = str(datetime.now()) year, month, day, hour, minutes, seconds = today[0:4], today[5:7], today[8:10], today[11:13], today[14:16], today[17:19] client.snapshot.create( repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings) def get_snapshots(repository_name): snapshots = client.snapshot.get(repository=repository_name, snapshot='_all')["snapshots"] snapshot_names = [i["snapshot"] for i in snapshots] return snapshot_names def get_indices_from_snapshot(repository_name, snapshot_name): return snapshot_client.get(repository=repository_name, snapshot=snapshot_name)["snapshots"][0]["indices"] def milliseconds_to_datetime(millis_str): millis_int = int(millis_str) seconds = millis_int / 1000 dt_object = datetime.fromtimestamp(seconds) return dt_object def extract_creation_date_from_restored_index(index_name): response = index_client.get(index=f'restored_{index_name}') miliseconds = response[f'restored_{index_name}']["settings"]["index"]["creation_date"] return milliseconds_to_datetime(miliseconds) def restore_snapshot(repository_name, snapshot_name, indices, searchable=False): client.snapshot.restore(repository=repository_name, snapshot=snapshot_name, body={ "indices": indices, 'storage_type': 'remote_snapshot' if searchable else 'local', 'rename_pattern': "(.+)" if searchable else "", "rename_replacement": "restored_$1" if searchable else "" }) def convert_year_month_day_to_datetime(year, month, day): dt_object = datetime(year, month, day) return dt_object def formatter(obj): return json.dumps(obj, indent=4) def parse_date(year, month, day): return datetime(int(year), int(month), int(day)) parser = argparse.ArgumentParser() parser.add_argument('index_name', type=str, help='Index name') parser.add_argument('start_year', type=int, help='Start year') parser.add_argument('start_month', type=int, help='Start month') parser.add_argument('start_day', type=int, help='Start day') parser.add_argument('end_year', type=int, help='End year') parser.add_argument('end_month', type=int, help='End month') parser.add_argument('end_day', type=int, help='End day') args = parser.parse_args() def user_input(): index_name = args.index_name start_date = parse_date(args.start_year, args.start_month, args.start_day) end_date = parse_date(args.end_year, args.end_month, args.end_day) return index_name, start_date, end_date def delete_restored_indices(indices): print("Deleting indices:", indices) try: if isinstance(indices, str): client.indices.delete(index=f'restored_{indices}', params=None, headers=None) else: for index in indices: client.indices.delete(index=f'restored_{index}', params=None, headers=None) print("Indices Deleted") except Exception as e: print(f"An error occurred while deleting indices: {e}") def filter_snapshots(snapshots, start_date, end_date): filtered_snapshots = [] for snapshot in snapshots: snapshot_time = extract_date_from_snapshot(snapshot) if start_date <= snapshot_time <= end_date: filtered_snapshots.append(snapshot) return filtered_snapshots def extract_indices_from_time_frame(repository_name, micro_service_pattern, start_date, end_date): res = [] snapshots = get_snapshots(repository_name) filtered_snapshots = filter_snapshots(snapshots, start_date, end_date) for snapshot in filtered_snapshots: indices = get_indices_from_snapshot(repository_name, snapshot) res.extend(indices) res = list(set(res)) filtered = [item for item in res if re.match(micro_service_pattern, item)] filtered.sort() return filtered, filtered_snapshots repository = get_repositories()[0] snapshots = get_snapshots(repository) index_name, start_date, end_date = user_input() extracted_indices, filtered_snapshots = extract_indices_from_time_frame( repository, index_name, start_date, end_date) extracted_indices = extracted_indices[::-1] filtered_snapshots = filtered_snapshots[::-1] temp = extracted_indices.copy() for snapshot in filtered_snapshots: copy_of_extracted_indices = extracted_indices.copy() # Make a copy to work with for index in copy_of_extracted_indices: try: restore_snapshot(repository, snapshot, index, True) copy_of_extracted_indices[copy_of_extracted_indices.index(index)] = None continue except Exception as e: while copy_of_extracted_indices and not copy_of_extracted_indices[0]: # Check if list is not empty copy_of_extracted_indices.pop(0) break # Update extracted_indices based on changes in copy_of_extracted_indices extracted_indices[:] = copy_of_extracted_indices if __name__ == "__main__": index_name, start_date, end_date = user_input() print(f"Index Name: {index_name}") print(f"Start Date: {start_date}") print(f"End Date: {end_date}")
Editor is loading...
Leave a Comment