Untitled
unknown
plain_text
a year ago
9.5 kB
6
Indexable
import os from math import inf from opensearchpy import OpenSearch import json import re from datetime import datetime from opensearchpy.client.snapshot import SnapshotClient from opensearchpy.client.indices import IndicesClient # Client initialization host = 'default.elk.inf.use1.cwdevsandbox.cwnet.io' port = 9200 auth = ('admin', '/hk3UlwmalS72t0dK+B6G+8Q4ZEnSNi9+mENgtWBBeo=') client = OpenSearch( hosts=[{'host': host, 'port': port}], http_compress=True, http_auth=auth, use_ssl=True, verify_certs=False, ssl_assert_hostname=False, ssl_show_warn=False, ) if client.ping(): print("Client created successfully") snapshot_client = SnapshotClient(client) if snapshot_client: print("Snapshot client created successfully") index_client = IndicesClient(client) if index_client: print("Index client created successfully") # Helper Functions def extract_date_from_string(string): pattern = r'\d+\.\d+' match = re.search(pattern, string) if match: date_str = match.group(0) date_obj = datetime.strptime(date_str, '%Y.%m') return date_obj else: return None def extract_date_from_snapshot(string): snapshot_timestamp = string.split('-')[1:] snapshot_timestamp = "".join(snapshot_timestamp) dt_object = datetime.strptime(snapshot_timestamp, '%Y%m%d%H%M%S') return dt_object def create_repository(repository_name): snapshot_client.create_repository( repository=repository_name, body={ 'type': 's3', 'settings': { 'bucket': 'pf-elkcluster-default-essnapshotss3bucket-zrop9rmz1gbk', } } ) def get_repositories(): response = client.cat.repositories() temp = response.split("\n") repository_names = [] for string in temp: repository_names.append(string.split(" ")[0]) return repository_names[:-1] def take_snapshot(repository_name="os-snapshot-repo"): snapshot_settings = { "indices": ["*", "-restored_*"], } today = str(datetime.now()) year, month, day, hour, minutes, seconds = today[0:4], today[5:7], today[8:10], today[11:13], today[14:16], today[17:19] client.snapshot.create( repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings) def get_snapshots(repository_name): snapshots = client.snapshot.get(repository=repository_name, snapshot='_all')["snapshots"] snapshot_names = [i["snapshot"] for i in snapshots] return snapshot_names def get_indices_from_snapshot(repository_name, snapshot_name): return snapshot_client.get(repository=repository_name, snapshot=snapshot_name)["snapshots"][0]["indices"] def milliseconds_to_datetime(millis_str): millis_int = int(millis_str) seconds = millis_int / 1000 dt_object = datetime.fromtimestamp(seconds) return dt_object def extract_creation_date_from_index(index_name): response = index_client.get(index=index_name) miliseconds = response[index_name]["settings"]["index"]["creation_date"] return milliseconds_to_datetime(miliseconds) def extract_creation_date_from_restored_index(index_name): response = index_client.get(index=f'restored_{index_name}') miliseconds = response[f'restored_{index_name}']["settings"]["index"]["creation_date"] return milliseconds_to_datetime(miliseconds) def extract_date_of_deleted_index(repository_name, snapshot_name, index): restore_snapshot(repository_name, snapshot_name, index, searchable=True) temp = extract_creation_date_from_restored_index(index) delete_index(f'restored_{index}') return temp def restore_snapshot(repository_name, snapshot_name, indices, searchable=False): client.snapshot.restore(repository=repository_name, snapshot=snapshot_name, body={ "indices": indices, 'storage_type': 'remote_snapshot' if searchable else 'local', 'rename_pattern': "(.+)" if searchable else "", "rename_replacement": "restored_$1" if searchable else "" }) def delete_index(index_name): try: response = index_client.delete(index=index_name) return response except Exception as e: print(f"Failed to delete index '{index_name}'. Error: {str(e)}") return None def convert_year_month_day_to_datetime(year, month, day): dt_object = datetime(year, month, day) return dt_object def diff_in_days(start_date, end_date): return (end_date - start_date).days def diff_in_weeks(start_date, end_date): return (end_date - start_date).days // 7 def formatter(obj): return json.dumps(obj, indent=4) def user_input(): index_name = input("Enter the index name: ") start_date_str = input("Enter the start date (YYYY MM DD): ") start_date = convert_year_month_day_to_datetime( int(start_date_str.split(" ")[0]), int(start_date_str.split(" ")[1]), int(start_date_str.split(" ")[2])) end_date_str = input("Enter the end date (YYYY MM DD): ") end_date = convert_year_month_day_to_datetime( int(end_date_str.split(" ")[0]), int(end_date_str.split(" ")[1]), int(end_date_str.split(" ")[2])) return index_name, start_date, end_date def delete_indices(indices): print("Deleting indices:", indices) try: if isinstance(indices, str): client.indices.delete(index=indices, params=None, headers=None) else: for index in indices: client.indices.delete(index=index, params=None, headers=None) except Exception as e: print(f"An error occurred while deleting indices: {e}") def delete_restored_indices(indices): print("Deleting indices:", indices) try: if isinstance(indices, str): client.indices.delete(index=f'restored_{indices}', params=None, headers=None) else: for index in indices: client.indices.delete(index=f'restored_{index}', params=None, headers=None) print("Indices Deleted") except Exception as e: print(f"An error occurred while deleting indices: {e}") def get_documents_from_index(index_name, size=inf): return client.search({ "size": size, "query": { "match_all": {} } }, index_name) def filter_snapshots(snapshots, start_date, end_date): filtered_snapshots = [] for snapshot in snapshots: snapshot_time = extract_date_from_snapshot(snapshot) if start_date <= snapshot_time <= end_date: filtered_snapshots.append(snapshot) return filtered_snapshots def extract_indices_from_time_frame(repository_name, micro_service_pattern, start_date, end_date): res = [] snapshots = get_snapshots(repository_name) filtered_snapshots = filter_snapshots(snapshots, start_date, end_date) for snapshot in filtered_snapshots: indices = get_indices_from_snapshot(repository_name, snapshot) res.extend(indices) res = list(set(res)) filtered = [item for item in res if re.match(micro_service_pattern, item)] filtered.sort() return filtered, filtered_snapshots # Restore and check indices in reverse order within a snapshot def restore_and_check_indices_in_reverse(repository, snapshot, indices, start_date): remaining_indices = [] for i in range(len(indices)-1, -1, -1): restore_snapshot(repository, snapshot, indices[i], True) creation_date = extract_creation_date_from_restored_index(indices[i]) if creation_date > start_date: remaining_indices = indices[i:] break delete_restored_indices(indices[i]) return remaining_indices def log_documents_to_file(log_filename, snapshot_name, index_name, documents, max_size=10*1024*1024): log_file_index = 0 # Check and rollover the file if it exceeds max_size while os.path.exists(log_filename) and os.path.getsize(log_filename) > max_size: log_file_index += 1 log_filename = f'logs_{log_file_index}.json' log_entry = { snapshot_name: { index_name: documents } } with open(log_filename, 'a') as log_file: log_file.write(json.dumps(log_entry, indent=4) + "\n") # Example dates for testing start_date = convert_year_month_day_to_datetime(2024, 1, 1) end_date = convert_year_month_day_to_datetime(2024, 7, 1) repository = get_repositories()[0] print(repository) snapshots = get_snapshots(repository) print(snapshots) # Example pattern for testing micro_service_pattern = r'agent-microservice-\d+' extracted_indices, filtered_snapshots = extract_indices_from_time_frame( repository, micro_service_pattern, start_date, end_date) if filtered_snapshots: my_snapshot = filtered_snapshots[0] remaining_indices = restore_and_check_indices_in_reverse(repository, my_snapshot, extracted_indices, start_date) print(f"Remaining indices: {remaining_indices}") for index in remaining_indices: documents = get_documents_from_index(index) log_documents_to_file('logs.json', my_snapshot, index, documents) delete_restored_indices(index) else: print("No snapshots found in the specified date range.")
Editor is loading...
Leave a Comment