Untitled
unknown
plain_text
a year ago
9.5 kB
18
Indexable
import os
from math import inf
from opensearchpy import OpenSearch
import json
import re
from datetime import datetime
from opensearchpy.client.snapshot import SnapshotClient
from opensearchpy.client.indices import IndicesClient
# Client initialization
host = 'default.elk.inf.use1.cwdevsandbox.cwnet.io'
port = 9200
auth = ('admin', '/hk3UlwmalS72t0dK+B6G+8Q4ZEnSNi9+mENgtWBBeo=')
client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_compress=True,
http_auth=auth,
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
if client.ping():
print("Client created successfully")
snapshot_client = SnapshotClient(client)
if snapshot_client:
print("Snapshot client created successfully")
index_client = IndicesClient(client)
if index_client:
print("Index client created successfully")
# Helper Functions
def extract_date_from_string(string):
pattern = r'\d+\.\d+'
match = re.search(pattern, string)
if match:
date_str = match.group(0)
date_obj = datetime.strptime(date_str, '%Y.%m')
return date_obj
else:
return None
def extract_date_from_snapshot(string):
snapshot_timestamp = string.split('-')[1:]
snapshot_timestamp = "".join(snapshot_timestamp)
dt_object = datetime.strptime(snapshot_timestamp, '%Y%m%d%H%M%S')
return dt_object
def create_repository(repository_name):
snapshot_client.create_repository(
repository=repository_name,
body={
'type': 's3',
'settings': {
'bucket': 'pf-elkcluster-default-essnapshotss3bucket-zrop9rmz1gbk',
}
}
)
def get_repositories():
response = client.cat.repositories()
temp = response.split("\n")
repository_names = []
for string in temp:
repository_names.append(string.split(" ")[0])
return repository_names[:-1]
def take_snapshot(repository_name="os-snapshot-repo"):
snapshot_settings = {
"indices": ["*", "-restored_*"],
}
today = str(datetime.now())
year, month, day, hour, minutes, seconds = today[0:4], today[5:7], today[8:10], today[11:13], today[14:16], today[17:19]
client.snapshot.create(
repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings)
def get_snapshots(repository_name):
snapshots = client.snapshot.get(repository=repository_name, snapshot='_all')["snapshots"]
snapshot_names = [i["snapshot"] for i in snapshots]
return snapshot_names
def get_indices_from_snapshot(repository_name, snapshot_name):
return snapshot_client.get(repository=repository_name, snapshot=snapshot_name)["snapshots"][0]["indices"]
def milliseconds_to_datetime(millis_str):
millis_int = int(millis_str)
seconds = millis_int / 1000
dt_object = datetime.fromtimestamp(seconds)
return dt_object
def extract_creation_date_from_index(index_name):
response = index_client.get(index=index_name)
miliseconds = response[index_name]["settings"]["index"]["creation_date"]
return milliseconds_to_datetime(miliseconds)
def extract_creation_date_from_restored_index(index_name):
response = index_client.get(index=f'restored_{index_name}')
miliseconds = response[f'restored_{index_name}']["settings"]["index"]["creation_date"]
return milliseconds_to_datetime(miliseconds)
def extract_date_of_deleted_index(repository_name, snapshot_name, index):
restore_snapshot(repository_name, snapshot_name, index, searchable=True)
temp = extract_creation_date_from_restored_index(index)
delete_index(f'restored_{index}')
return temp
def restore_snapshot(repository_name, snapshot_name, indices, searchable=False):
client.snapshot.restore(repository=repository_name, snapshot=snapshot_name, body={
"indices": indices,
'storage_type': 'remote_snapshot' if searchable else 'local',
'rename_pattern': "(.+)" if searchable else "",
"rename_replacement": "restored_$1" if searchable else ""
})
def delete_index(index_name):
try:
response = index_client.delete(index=index_name)
return response
except Exception as e:
print(f"Failed to delete index '{index_name}'. Error: {str(e)}")
return None
def convert_year_month_day_to_datetime(year, month, day):
dt_object = datetime(year, month, day)
return dt_object
def diff_in_days(start_date, end_date):
return (end_date - start_date).days
def diff_in_weeks(start_date, end_date):
return (end_date - start_date).days // 7
def formatter(obj):
return json.dumps(obj, indent=4)
def user_input():
index_name = input("Enter the index name: ")
start_date_str = input("Enter the start date (YYYY MM DD): ")
start_date = convert_year_month_day_to_datetime(
int(start_date_str.split(" ")[0]), int(start_date_str.split(" ")[1]), int(start_date_str.split(" ")[2]))
end_date_str = input("Enter the end date (YYYY MM DD): ")
end_date = convert_year_month_day_to_datetime(
int(end_date_str.split(" ")[0]), int(end_date_str.split(" ")[1]), int(end_date_str.split(" ")[2]))
return index_name, start_date, end_date
def delete_indices(indices):
print("Deleting indices:", indices)
try:
if isinstance(indices, str):
client.indices.delete(index=indices, params=None, headers=None)
else:
for index in indices:
client.indices.delete(index=index, params=None, headers=None)
except Exception as e:
print(f"An error occurred while deleting indices: {e}")
def delete_restored_indices(indices):
print("Deleting indices:", indices)
try:
if isinstance(indices, str):
client.indices.delete(index=f'restored_{indices}', params=None, headers=None)
else:
for index in indices:
client.indices.delete(index=f'restored_{index}', params=None, headers=None)
print("Indices Deleted")
except Exception as e:
print(f"An error occurred while deleting indices: {e}")
def get_documents_from_index(index_name, size=inf):
return client.search({
"size": size,
"query": {
"match_all": {}
}
}, index_name)
def filter_snapshots(snapshots, start_date, end_date):
filtered_snapshots = []
for snapshot in snapshots:
snapshot_time = extract_date_from_snapshot(snapshot)
if start_date <= snapshot_time <= end_date:
filtered_snapshots.append(snapshot)
return filtered_snapshots
def extract_indices_from_time_frame(repository_name, micro_service_pattern, start_date, end_date):
res = []
snapshots = get_snapshots(repository_name)
filtered_snapshots = filter_snapshots(snapshots, start_date, end_date)
for snapshot in filtered_snapshots:
indices = get_indices_from_snapshot(repository_name, snapshot)
res.extend(indices)
res = list(set(res))
filtered = [item for item in res if re.match(micro_service_pattern, item)]
filtered.sort()
return filtered, filtered_snapshots
# Restore and check indices in reverse order within a snapshot
def restore_and_check_indices_in_reverse(repository, snapshot, indices, start_date):
remaining_indices = []
for i in range(len(indices)-1, -1, -1):
restore_snapshot(repository, snapshot, indices[i], True)
creation_date = extract_creation_date_from_restored_index(indices[i])
if creation_date > start_date:
remaining_indices = indices[i:]
break
delete_restored_indices(indices[i])
return remaining_indices
def log_documents_to_file(log_filename, snapshot_name, index_name, documents, max_size=10*1024*1024):
log_file_index = 0
# Check and rollover the file if it exceeds max_size
while os.path.exists(log_filename) and os.path.getsize(log_filename) > max_size:
log_file_index += 1
log_filename = f'logs_{log_file_index}.json'
log_entry = {
snapshot_name: {
index_name: documents
}
}
with open(log_filename, 'a') as log_file:
log_file.write(json.dumps(log_entry, indent=4) + "\n")
# Example dates for testing
start_date = convert_year_month_day_to_datetime(2024, 1, 1)
end_date = convert_year_month_day_to_datetime(2024, 7, 1)
repository = get_repositories()[0]
print(repository)
snapshots = get_snapshots(repository)
print(snapshots)
# Example pattern for testing
micro_service_pattern = r'agent-microservice-\d+'
extracted_indices, filtered_snapshots = extract_indices_from_time_frame(
repository, micro_service_pattern, start_date, end_date)
if filtered_snapshots:
my_snapshot = filtered_snapshots[0]
remaining_indices = restore_and_check_indices_in_reverse(repository, my_snapshot, extracted_indices, start_date)
print(f"Remaining indices: {remaining_indices}")
for index in remaining_indices:
documents = get_documents_from_index(index)
log_documents_to_file('logs.json', my_snapshot, index, documents)
delete_restored_indices(index)
else:
print("No snapshots found in the specified date range.")
Editor is loading...
Leave a Comment