Untitled
unknown
plain_text
a year ago
10 kB
6
Indexable
# Imports from opensearchpy import OpenSearch import json from math import inf import random import re from datetime import datetime from opensearchpy.client.snapshot import SnapshotClient from opensearchpy.client.indices import IndicesClient # Client intialization host = 'default.elk.inf.use1.cwdevsandbox.cwnet.io' port = 9200 auth = ('admin', '/hk3UlwmalS72t0dK+B6G+8Q4ZEnSNi9+mENgtWBBeo=') client = OpenSearch( hosts=[{'host': host, 'port': port}], http_compress=True, http_auth=auth, use_ssl=True, verify_certs=False, ssl_assert_hostname=False, ssl_show_warn=False, ) if client.ping(): print("Client created successfully") snapshot_client = SnapshotClient(client) if snapshot_client: print("Snapshot client created successfully") index_client = IndicesClient(client) if index_client: print("Index client created successfully") # Helper Functions def extract_date_from_string(string): pattern = r'\d+\.\d+' match = re.search(pattern, string) if match: date_str = match.group(0) date_obj = datetime.strptime(date_str, '%Y.%m') return date_obj else: return None def extract_date_from_snapshot(string): snapshot_timestamp = string.split('-')[1:] snapshot_timestamp = "".join(snapshot_timestamp) dt_object = datetime.strptime(snapshot_timestamp, '%Y%m%d%H%M%S') return dt_object def create_repository(repository_name): snapshot_client.create_repository( repository=repository_name, body={ 'type': 's3', 'settings': { 'bucket': 'pf-elkcluster-default-essnapshotss3bucket-zrop9rmz1gbk', } } ) def get_repositories(): response = client.cat.repositories() temp = response.split("\n") repository_names = [] for string in temp: repository_names.append(string.split(" ")[0]) return repository_names[:-1] def take_snapshot(repository_name="os-snapshot-repo"): snapshot_settings = { "indices": ["*", "-restored_*"], } today = str(datetime.now()) year, month, day, hour, minutes, seconds = today[0:4], today[5: 7], today[8:10], today[11:13], today[14:16], today[17:19] client.snapshot.create( repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings) def get_snapshots(repository_name): snapshots = client.snapshot.get( repository=repository_name, snapshot='_all')["snapshots"] snapshot_names = [] for i in snapshots: snapshot_names.append(i["snapshot"]) return snapshot_names def get_indices_from_snapshot(repository_name, snapshot_name): return snapshot_client.get(repository=repository_name, snapshot=snapshot_name)["snapshots"][0]["indices"] def milliseconds_to_datetime(millis_str): millis_int = int(millis_str) seconds = millis_int / 1000 dt_object = datetime.fromtimestamp(seconds) return dt_object def extract_creation_date_from_index(index_name): response = index_client.get(index=index_name) miliseconds = response[index_name]["settings"]["index"]["creation_date"] return milliseconds_to_datetime(miliseconds) def extract_creation_date_from_restored_index(index_name): response = index_client.get(index=f'restored_{index_name}') miliseconds = response[f'restored_{index_name}']["settings"]["index"]["creation_date"] return milliseconds_to_datetime(miliseconds) def extract_date_of_deleted_index(repository_name, snapshot_name, index): restore_snapshot(repository_name, snapshot_name, index, searchable=True) temp = extract_creation_date_from_restored_index(index) delete_index(f'restored_{index}') return temp def restore_snapshot(repository_name, snapshot_name, indices, searchable=False): client.snapshot.restore(repository=repository_name, snapshot=snapshot_name, body={ "indices": indices, 'storage_type': 'remote_snapshot' if searchable else 'local', 'rename_pattern': "(.+)" if searchable else "", "rename_replacement": "restored_$1" if searchable else "" # "ignore_unavailable": False, # "include_global_state": False }) def delete_index(index_name): try: response = index_client.delete(index=index_name,) return response except Exception as e: print(f"Failed to delete index '{index_name}'. Error: {str(e)}") return None def convert_year_month_day_to_datetime(year, month, day): dt_object = datetime(year, month, day) return dt_object def diff_in_days(start_date, end_date): return (end_date - start_date).days def diff_in_weeks(start_date, end_date): return (end_date - start_date).days // 7 def formatter(obj): return json.dumps(obj, indent=4) def user_input(): # region = input("Enter the region (NA, EU, AU): ") # valid_regions = ['NA', 'EU', 'AU'] # while region not in valid_regions: # print("Invalid region entered. Please enter a valid region.") # region = input("Enter the region (NA, EU, AU): ") index_name = input("Enter the index name: ") start_date_str = input("Enter the start date (YYYY MM DD): ") start_date = convert_year_month_day_to_datetime( int(start_date_str.split(" ")[0]), int(start_date_str.split(" ")[1]), int(start_date_str.split(" ")[2])) end_date_str = input("Enter the end date (YYYY MM DD): ") end_date = convert_year_month_day_to_datetime( int(end_date_str.split(" ")[0]), int(end_date_str.split(" ")[1]), int(end_date_str.split(" ")[2])) return index_name, start_date, end_date def delete_indices(indices): print("Deleting indices:", indices) try: if type(indices)==type(""): client.indices.delete(index=indices, params=None, headers=None) else: for index in indices: client.indices.delete(index=index, params=None, headers=None) except Exception as e: print(f"An error occurred while deleting indices: {e}") def delete_restored_indices(indices): print("Deleting indices:", indices) try: if type(indices)==type(""): client.indices.delete(index=f'restored_{indices}', params=None, headers=None) else: for index in indices: client.indices.delete(index=f'restored_{index}', params=None, headers=None) print("Indices Deleted") except Exception as e: print(f"An error occurred while deleting indices: {e}") def get_documents_from_index(index_name, size=inf): return client.search({ "size": size, "query": { "match_all": {} } }, index_name) def filter_snapshots(snapshots, start_date, end_date): filtered_snapshots = [] for snapshot in snapshots: snapshot_time = extract_date_from_snapshot(snapshot) if start_date <= snapshot_time <= end_date: filtered_snapshots.append(snapshot) return filtered_snapshots def extract_indices_from_time_frame(repository_name, micro_service, start_date, end_date): res = [] snapshots = get_snapshots(repository_name) filtered_snapshots = filter_snapshots(snapshots, start_date, end_date) for snapshot in filtered_snapshots: indices = get_indices_from_snapshot(repository_name, snapshot) res.extend(indices) res = list(set(res)) filtered = [] for item in res: if micro_service in item: filtered.append(item) filtered.sort() return filtered,filtered_snapshots # Code ## User Input # index_name, start_date, end_date = user_input() ## Restore Index repository = get_repositories()[0] print(repository) snapshots = get_snapshots(repository) print(snapshots) # print(formatter(snapshots)) extracted_indices,filtered_snapshots = extract_indices_from_time_frame( repository, 'security-auditlog', start_date, end_date) my_snapshot = filtered_snapshots[0] for i in range(len(extracted_indices)): restore_snapshot(repository,my_snapshot,extracted_indices[i],True) creation_date = extract_creation_date_from_restored_index(extracted_indices[i]) if creation_date>start_date: extracted_indices = extracted_indices[i:] break delete_restored_indices(extracted_indices[i]) restore_snapshot("os-snapshot-repo","snapshot-2024-07-04-12-48-26","kc-hamza-008",True) # print(extract_creation_date_from_index("automation-engine-service-000002-restored")) get_documents_from_index("automation-engine-service-000002-restored",50000)['took'] get_documents_from_index("restored_kc-hamza-001",500) # client.snapshot.restore(repository="os-snapshot-repo", snapshot="snapshot-2024-07-04-12-06-18", body={ # "indices": "kc-hamza-008", # 'storage_type': 'remote_snapshot', # 'rename_pattern': "(.+)" , # "rename_replacement": "restored_$1" # # "ignore_unavailable": False, # # "include_global_state": False # }) reindex_request_body = { "source": { "index": "automation-engine-service-000002-restored", # "query" : {'term': {'field_name': 'value'}} }, "dest": { "index": "kc-hamza-007" } } client.reindex(reindex_request_body) settings_response = client.indices.get_settings(index="automation-engine-service-000002-restored") source_settings = settings_response["automation-engine-service-000002-restored"] client.indices.put_settings(index="kc-hamza-007", body=source_settings) def take_snapshot(repository_name="os-snapshot-repo"): snapshot_settings = { "indices": ["*", "-restored_*"], } today = str(datetime.now()) year, month, day, hour, minutes, seconds = today[0:4], today[5: 7], today[8:10], today[11:13], today[14:16], today[17:19] client.snapshot.create( repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings) take_snapshot()
Editor is loading...
Leave a Comment