Untitled

 avatar
unknown
plain_text
a year ago
6.6 kB
3
Indexable
import argparse
from datetime import datetime
from opensearchpy import OpenSearch
from opensearchpy.client.snapshot import SnapshotClient
from opensearchpy.client.indices import IndicesClient
import re
import json

# Client initialization
host = 'default.elk.inf.use1.cwdevsandbox.cwnet.io'
port = 9200
auth = ('admin', '/hk3UlwmalS72t0dK+B6G+8Q4ZEnSNi9+mENgtWBBeo=')

client = OpenSearch(
    hosts=[{'host': host, 'port': port}],
    http_compress=True,
    http_auth=auth,
    use_ssl=True,
    verify_certs=False,
    ssl_assert_hostname=False,
    ssl_show_warn=False,
)

if client.ping():
    print("Client created successfully")

snapshot_client = SnapshotClient(client)
if snapshot_client:
    print("Snapshot client created successfully")

index_client = IndicesClient(client)
if index_client:
    print("Index client created successfully")

# Helper Functions
def extract_date_from_snapshot(string):
    snapshot_timestamp = string.split('-')[1:]
    snapshot_timestamp = "".join(snapshot_timestamp)
    dt_object = datetime.strptime(snapshot_timestamp, '%Y%m%d%H%M%S')
    return dt_object

def get_repositories():
    response = client.cat.repositories()
    temp = response.split("\n")
    repository_names = []
    for string in temp:
        repository_names.append(string.split(" ")[0])
    return repository_names[:-1]

def take_snapshot(repository_name="os-snapshot-repo"):
    snapshot_settings = {
        "indices": ["*", "-restored_*"],
    }
    today = str(datetime.now())
    year, month, day, hour, minutes, seconds = today[0:4], today[5:7], today[8:10], today[11:13], today[14:16], today[17:19]
    client.snapshot.create(
        repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings)

def get_snapshots(repository_name):
    snapshots = client.snapshot.get(repository=repository_name, snapshot='_all')["snapshots"]
    snapshot_names = [i["snapshot"] for i in snapshots]
    return snapshot_names

def get_indices_from_snapshot(repository_name, snapshot_name):
    return snapshot_client.get(repository=repository_name, snapshot=snapshot_name)["snapshots"][0]["indices"]

def milliseconds_to_datetime(millis_str):
    millis_int = int(millis_str)
    seconds = millis_int / 1000
    dt_object = datetime.fromtimestamp(seconds)
    return dt_object

def extract_creation_date_from_restored_index(index_name):
    response = index_client.get(index=f'restored_{index_name}')
    miliseconds = response[f'restored_{index_name}']["settings"]["index"]["creation_date"]
    return milliseconds_to_datetime(miliseconds)

def restore_snapshot(repository_name, snapshot_name, indices, searchable=False):
    client.snapshot.restore(repository=repository_name, snapshot=snapshot_name, body={
        "indices": indices,
        'storage_type': 'remote_snapshot' if searchable else 'local',
        'rename_pattern': "(.+)" if searchable else "",
        "rename_replacement": "restored_$1" if searchable else ""
    })

def convert_year_month_day_to_datetime(year, month, day):
    dt_object = datetime(year, month, day)
    return dt_object

def formatter(obj):
    return json.dumps(obj, indent=4)

def parse_date(year, month, day):
    return datetime(int(year), int(month), int(day))

parser = argparse.ArgumentParser()
parser.add_argument('index_name', type=str, help='Index name')
parser.add_argument('start_year', type=int, help='Start year')
parser.add_argument('start_month', type=int, help='Start month')
parser.add_argument('start_day', type=int, help='Start day')
parser.add_argument('end_year', type=int, help='End year')
parser.add_argument('end_month', type=int, help='End month')
parser.add_argument('end_day', type=int, help='End day')

args = parser.parse_args()

def user_input():
    index_name = args.index_name
    start_date = parse_date(args.start_year, args.start_month, args.start_day)
    end_date = parse_date(args.end_year, args.end_month, args.end_day)
    return index_name, start_date, end_date

def delete_restored_indices(indices):
    print("Deleting indices:", indices)
    try:
        if isinstance(indices, str):
            client.indices.delete(index=f'restored_{indices}', params=None, headers=None)
        else:
            for index in indices:
                client.indices.delete(index=f'restored_{index}', params=None, headers=None)
        print("Indices Deleted")
    except Exception as e:
        print(f"An error occurred while deleting indices: {e}")

def filter_snapshots(snapshots, start_date, end_date):
    filtered_snapshots = []
    for snapshot in snapshots:
        snapshot_time = extract_date_from_snapshot(snapshot)
        if start_date <= snapshot_time <= end_date:
            filtered_snapshots.append(snapshot)
    return filtered_snapshots

def extract_indices_from_time_frame(repository_name, micro_service_pattern, start_date, end_date):
    res = []
    snapshots = get_snapshots(repository_name)
    filtered_snapshots = filter_snapshots(snapshots, start_date, end_date)
    for snapshot in filtered_snapshots:
        indices = get_indices_from_snapshot(repository_name, snapshot)
        res.extend(indices)
    res = list(set(res))
    filtered = [item for item in res if re.match(micro_service_pattern, item)]
    filtered.sort()
    return filtered, filtered_snapshots
        
repository = get_repositories()[0]
snapshots = get_snapshots(repository)

index_name, start_date, end_date = user_input()

extracted_indices, filtered_snapshots = extract_indices_from_time_frame(
    repository, index_name, start_date, end_date)
extracted_indices = extracted_indices[::-1]
filtered_snapshots = filtered_snapshots[::-1]
temp = extracted_indices.copy()
for snapshot in filtered_snapshots:
    copy_of_extracted_indices = extracted_indices.copy()  # Make a copy to work with

    for index in copy_of_extracted_indices:
        try:
            restore_snapshot(repository, snapshot, index, True)
            copy_of_extracted_indices[copy_of_extracted_indices.index(index)] = None
            continue
        except Exception as e:
            while copy_of_extracted_indices and not copy_of_extracted_indices[0]:  # Check if list is not empty
                copy_of_extracted_indices.pop(0)
            break

    # Update extracted_indices based on changes in copy_of_extracted_indices
    extracted_indices[:] = copy_of_extracted_indices

if __name__ == "__main__":
    index_name, start_date, end_date = user_input()
    print(f"Index Name: {index_name}")
    print(f"Start Date: {start_date}")
    print(f"End Date: {end_date}")
Editor is loading...
Leave a Comment