Untitled
unknown
plain_text
a year ago
10 kB
18
Indexable
# Imports
from opensearchpy import OpenSearch
import json
from math import inf
import random
import re
from datetime import datetime
from opensearchpy.client.snapshot import SnapshotClient
from opensearchpy.client.indices import IndicesClient
# Client intialization
host = 'default.elk.inf.use1.cwdevsandbox.cwnet.io'
port = 9200
auth = ('admin', '/hk3UlwmalS72t0dK+B6G+8Q4ZEnSNi9+mENgtWBBeo=')
client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_compress=True,
http_auth=auth,
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
if client.ping():
print("Client created successfully")
snapshot_client = SnapshotClient(client)
if snapshot_client:
print("Snapshot client created successfully")
index_client = IndicesClient(client)
if index_client:
print("Index client created successfully")
# Helper Functions
def extract_date_from_string(string):
pattern = r'\d+\.\d+'
match = re.search(pattern, string)
if match:
date_str = match.group(0)
date_obj = datetime.strptime(date_str, '%Y.%m')
return date_obj
else:
return None
def extract_date_from_snapshot(string):
snapshot_timestamp = string.split('-')[1:]
snapshot_timestamp = "".join(snapshot_timestamp)
dt_object = datetime.strptime(snapshot_timestamp, '%Y%m%d%H%M%S')
return dt_object
def create_repository(repository_name):
snapshot_client.create_repository(
repository=repository_name,
body={
'type': 's3',
'settings': {
'bucket': 'pf-elkcluster-default-essnapshotss3bucket-zrop9rmz1gbk',
}
}
)
def get_repositories():
response = client.cat.repositories()
temp = response.split("\n")
repository_names = []
for string in temp:
repository_names.append(string.split(" ")[0])
return repository_names[:-1]
def take_snapshot(repository_name="os-snapshot-repo"):
snapshot_settings = {
"indices": ["*", "-restored_*"],
}
today = str(datetime.now())
year, month, day, hour, minutes, seconds = today[0:4], today[5:
7], today[8:10], today[11:13], today[14:16], today[17:19]
client.snapshot.create(
repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings)
def get_snapshots(repository_name):
snapshots = client.snapshot.get(
repository=repository_name, snapshot='_all')["snapshots"]
snapshot_names = []
for i in snapshots:
snapshot_names.append(i["snapshot"])
return snapshot_names
def get_indices_from_snapshot(repository_name, snapshot_name):
return snapshot_client.get(repository=repository_name, snapshot=snapshot_name)["snapshots"][0]["indices"]
def milliseconds_to_datetime(millis_str):
millis_int = int(millis_str)
seconds = millis_int / 1000
dt_object = datetime.fromtimestamp(seconds)
return dt_object
def extract_creation_date_from_index(index_name):
response = index_client.get(index=index_name)
miliseconds = response[index_name]["settings"]["index"]["creation_date"]
return milliseconds_to_datetime(miliseconds)
def extract_creation_date_from_restored_index(index_name):
response = index_client.get(index=f'restored_{index_name}')
miliseconds = response[f'restored_{index_name}']["settings"]["index"]["creation_date"]
return milliseconds_to_datetime(miliseconds)
def extract_date_of_deleted_index(repository_name, snapshot_name, index):
restore_snapshot(repository_name, snapshot_name, index, searchable=True)
temp = extract_creation_date_from_restored_index(index)
delete_index(f'restored_{index}')
return temp
def restore_snapshot(repository_name, snapshot_name, indices, searchable=False):
client.snapshot.restore(repository=repository_name, snapshot=snapshot_name, body={
"indices": indices,
'storage_type': 'remote_snapshot' if searchable else 'local',
'rename_pattern': "(.+)" if searchable else "",
"rename_replacement": "restored_$1" if searchable else ""
# "ignore_unavailable": False,
# "include_global_state": False
})
def delete_index(index_name):
try:
response = index_client.delete(index=index_name,)
return response
except Exception as e:
print(f"Failed to delete index '{index_name}'. Error: {str(e)}")
return None
def convert_year_month_day_to_datetime(year, month, day):
dt_object = datetime(year, month, day)
return dt_object
def diff_in_days(start_date, end_date):
return (end_date - start_date).days
def diff_in_weeks(start_date, end_date):
return (end_date - start_date).days // 7
def formatter(obj):
return json.dumps(obj, indent=4)
def user_input():
# region = input("Enter the region (NA, EU, AU): ")
# valid_regions = ['NA', 'EU', 'AU']
# while region not in valid_regions:
# print("Invalid region entered. Please enter a valid region.")
# region = input("Enter the region (NA, EU, AU): ")
index_name = input("Enter the index name: ")
start_date_str = input("Enter the start date (YYYY MM DD): ")
start_date = convert_year_month_day_to_datetime(
int(start_date_str.split(" ")[0]), int(start_date_str.split(" ")[1]), int(start_date_str.split(" ")[2]))
end_date_str = input("Enter the end date (YYYY MM DD): ")
end_date = convert_year_month_day_to_datetime(
int(end_date_str.split(" ")[0]), int(end_date_str.split(" ")[1]), int(end_date_str.split(" ")[2]))
return index_name, start_date, end_date
def delete_indices(indices):
print("Deleting indices:", indices)
try:
if type(indices)==type(""):
client.indices.delete(index=indices, params=None, headers=None)
else:
for index in indices:
client.indices.delete(index=index, params=None, headers=None)
except Exception as e:
print(f"An error occurred while deleting indices: {e}")
def delete_restored_indices(indices):
print("Deleting indices:", indices)
try:
if type(indices)==type(""):
client.indices.delete(index=f'restored_{indices}', params=None, headers=None)
else:
for index in indices:
client.indices.delete(index=f'restored_{index}', params=None, headers=None)
print("Indices Deleted")
except Exception as e:
print(f"An error occurred while deleting indices: {e}")
def get_documents_from_index(index_name, size=inf):
return client.search({
"size": size,
"query": {
"match_all": {}
}
}, index_name)
def filter_snapshots(snapshots, start_date, end_date):
filtered_snapshots = []
for snapshot in snapshots:
snapshot_time = extract_date_from_snapshot(snapshot)
if start_date <= snapshot_time <= end_date:
filtered_snapshots.append(snapshot)
return filtered_snapshots
def extract_indices_from_time_frame(repository_name, micro_service, start_date, end_date):
res = []
snapshots = get_snapshots(repository_name)
filtered_snapshots = filter_snapshots(snapshots, start_date, end_date)
for snapshot in filtered_snapshots:
indices = get_indices_from_snapshot(repository_name, snapshot)
res.extend(indices)
res = list(set(res))
filtered = []
for item in res:
if micro_service in item:
filtered.append(item)
filtered.sort()
return filtered,filtered_snapshots
# Code
## User Input
# index_name, start_date, end_date = user_input()
## Restore Index
repository = get_repositories()[0]
print(repository)
snapshots = get_snapshots(repository)
print(snapshots)
# print(formatter(snapshots))
extracted_indices,filtered_snapshots = extract_indices_from_time_frame(
repository, 'security-auditlog', start_date, end_date)
my_snapshot = filtered_snapshots[0]
for i in range(len(extracted_indices)):
restore_snapshot(repository,my_snapshot,extracted_indices[i],True)
creation_date = extract_creation_date_from_restored_index(extracted_indices[i])
if creation_date>start_date:
extracted_indices = extracted_indices[i:]
break
delete_restored_indices(extracted_indices[i])
restore_snapshot("os-snapshot-repo","snapshot-2024-07-04-12-48-26","kc-hamza-008",True)
# print(extract_creation_date_from_index("automation-engine-service-000002-restored"))
get_documents_from_index("automation-engine-service-000002-restored",50000)['took']
get_documents_from_index("restored_kc-hamza-001",500)
# client.snapshot.restore(repository="os-snapshot-repo", snapshot="snapshot-2024-07-04-12-06-18", body={
# "indices": "kc-hamza-008",
# 'storage_type': 'remote_snapshot',
# 'rename_pattern': "(.+)" ,
# "rename_replacement": "restored_$1"
# # "ignore_unavailable": False,
# # "include_global_state": False
# })
reindex_request_body = {
"source": {
"index": "automation-engine-service-000002-restored",
# "query" : {'term': {'field_name': 'value'}}
},
"dest": {
"index": "kc-hamza-007"
}
}
client.reindex(reindex_request_body)
settings_response = client.indices.get_settings(index="automation-engine-service-000002-restored")
source_settings = settings_response["automation-engine-service-000002-restored"]
client.indices.put_settings(index="kc-hamza-007", body=source_settings)
def take_snapshot(repository_name="os-snapshot-repo"):
snapshot_settings = {
"indices": ["*", "-restored_*"],
}
today = str(datetime.now())
year, month, day, hour, minutes, seconds = today[0:4], today[5:
7], today[8:10], today[11:13], today[14:16], today[17:19]
client.snapshot.create(
repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings)
take_snapshot()
Editor is loading...
Leave a Comment