Untitled

 avatar
unknown
plain_text
a year ago
9.5 kB
6
Indexable
import os
from math import inf
from opensearchpy import OpenSearch
import json
import re
from datetime import datetime
from opensearchpy.client.snapshot import SnapshotClient
from opensearchpy.client.indices import IndicesClient

# Client initialization
host = 'default.elk.inf.use1.cwdevsandbox.cwnet.io'
port = 9200
auth = ('admin', '/hk3UlwmalS72t0dK+B6G+8Q4ZEnSNi9+mENgtWBBeo=')

client = OpenSearch(
    hosts=[{'host': host, 'port': port}],
    http_compress=True,
    http_auth=auth,
    use_ssl=True,
    verify_certs=False,
    ssl_assert_hostname=False,
    ssl_show_warn=False,
)

if client.ping():
    print("Client created successfully")

snapshot_client = SnapshotClient(client)
if snapshot_client:
    print("Snapshot client created successfully")

index_client = IndicesClient(client)
if index_client:
    print("Index client created successfully")

# Helper Functions

def extract_date_from_string(string):
    pattern = r'\d+\.\d+'
    match = re.search(pattern, string)
    if match:
        date_str = match.group(0)
        date_obj = datetime.strptime(date_str, '%Y.%m')
        return date_obj
    else:
        return None

def extract_date_from_snapshot(string):
    snapshot_timestamp = string.split('-')[1:]
    snapshot_timestamp = "".join(snapshot_timestamp)
    dt_object = datetime.strptime(snapshot_timestamp, '%Y%m%d%H%M%S')
    return dt_object

def create_repository(repository_name):
    snapshot_client.create_repository(
        repository=repository_name,
        body={
            'type': 's3',
            'settings': {
                'bucket': 'pf-elkcluster-default-essnapshotss3bucket-zrop9rmz1gbk',
            }
        }
    )

def get_repositories():
    response = client.cat.repositories()
    temp = response.split("\n")
    repository_names = []
    for string in temp:
        repository_names.append(string.split(" ")[0])
    return repository_names[:-1]

def take_snapshot(repository_name="os-snapshot-repo"):
    snapshot_settings = {
        "indices": ["*", "-restored_*"],
    }
    today = str(datetime.now())
    year, month, day, hour, minutes, seconds = today[0:4], today[5:7], today[8:10], today[11:13], today[14:16], today[17:19]
    client.snapshot.create(
        repository=repository_name, snapshot=f"snapshot-{year}-{month}-{day}-{hour}-{minutes}-{seconds}", body=snapshot_settings)

def get_snapshots(repository_name):
    snapshots = client.snapshot.get(repository=repository_name, snapshot='_all')["snapshots"]
    snapshot_names = [i["snapshot"] for i in snapshots]
    return snapshot_names

def get_indices_from_snapshot(repository_name, snapshot_name):
    return snapshot_client.get(repository=repository_name, snapshot=snapshot_name)["snapshots"][0]["indices"]

def milliseconds_to_datetime(millis_str):
    millis_int = int(millis_str)
    seconds = millis_int / 1000
    dt_object = datetime.fromtimestamp(seconds)
    return dt_object

def extract_creation_date_from_index(index_name):
    response = index_client.get(index=index_name)
    miliseconds = response[index_name]["settings"]["index"]["creation_date"]
    return milliseconds_to_datetime(miliseconds)

def extract_creation_date_from_restored_index(index_name):
    response = index_client.get(index=f'restored_{index_name}')
    miliseconds = response[f'restored_{index_name}']["settings"]["index"]["creation_date"]
    return milliseconds_to_datetime(miliseconds)

def extract_date_of_deleted_index(repository_name, snapshot_name, index):
    restore_snapshot(repository_name, snapshot_name, index, searchable=True)
    temp = extract_creation_date_from_restored_index(index)
    delete_index(f'restored_{index}')
    return temp

def restore_snapshot(repository_name, snapshot_name, indices, searchable=False):
    client.snapshot.restore(repository=repository_name, snapshot=snapshot_name, body={
        "indices": indices,
        'storage_type': 'remote_snapshot' if searchable else 'local',
        'rename_pattern': "(.+)" if searchable else "",
        "rename_replacement": "restored_$1" if searchable else ""
    })

def delete_index(index_name):
    try:
        response = index_client.delete(index=index_name)
        return response
    except Exception as e:
        print(f"Failed to delete index '{index_name}'. Error: {str(e)}")
        return None

def convert_year_month_day_to_datetime(year, month, day):
    dt_object = datetime(year, month, day)
    return dt_object

def diff_in_days(start_date, end_date):
    return (end_date - start_date).days

def diff_in_weeks(start_date, end_date):
    return (end_date - start_date).days // 7

def formatter(obj):
    return json.dumps(obj, indent=4)

def user_input():
    index_name = input("Enter the index name: ")
    start_date_str = input("Enter the start date (YYYY MM DD): ")
    start_date = convert_year_month_day_to_datetime(
        int(start_date_str.split(" ")[0]), int(start_date_str.split(" ")[1]), int(start_date_str.split(" ")[2]))
    end_date_str = input("Enter the end date (YYYY MM DD): ")
    end_date = convert_year_month_day_to_datetime(
        int(end_date_str.split(" ")[0]), int(end_date_str.split(" ")[1]), int(end_date_str.split(" ")[2]))
    return index_name, start_date, end_date

def delete_indices(indices):
    print("Deleting indices:", indices)
    try:
        if isinstance(indices, str):
            client.indices.delete(index=indices, params=None, headers=None)
        else:
            for index in indices:
                client.indices.delete(index=index, params=None, headers=None)
    except Exception as e:
        print(f"An error occurred while deleting indices: {e}")

def delete_restored_indices(indices):
    print("Deleting indices:", indices)
    try:
        if isinstance(indices, str):
            client.indices.delete(index=f'restored_{indices}', params=None, headers=None)
        else:
            for index in indices:
                client.indices.delete(index=f'restored_{index}', params=None, headers=None)
        print("Indices Deleted")
    except Exception as e:
        print(f"An error occurred while deleting indices: {e}")

def get_documents_from_index(index_name, size=inf):
    return client.search({
        "size": size,
        "query": {
            "match_all": {}
        }
    }, index_name)

def filter_snapshots(snapshots, start_date, end_date):
    filtered_snapshots = []
    for snapshot in snapshots:
        snapshot_time = extract_date_from_snapshot(snapshot)
        if start_date <= snapshot_time <= end_date:
            filtered_snapshots.append(snapshot)
    return filtered_snapshots

def extract_indices_from_time_frame(repository_name, micro_service_pattern, start_date, end_date):
    res = []
    snapshots = get_snapshots(repository_name)
    filtered_snapshots = filter_snapshots(snapshots, start_date, end_date)
    for snapshot in filtered_snapshots:
        indices = get_indices_from_snapshot(repository_name, snapshot)
        res.extend(indices)
    res = list(set(res))
    filtered = [item for item in res if re.match(micro_service_pattern, item)]
    filtered.sort()
    return filtered, filtered_snapshots

# Restore and check indices in reverse order within a snapshot
def restore_and_check_indices_in_reverse(repository, snapshot, indices, start_date):
    remaining_indices = []
    for i in range(len(indices)-1, -1, -1):
        restore_snapshot(repository, snapshot, indices[i], True)
        creation_date = extract_creation_date_from_restored_index(indices[i])
        if creation_date > start_date:
            remaining_indices = indices[i:]
            break
        delete_restored_indices(indices[i])
    return remaining_indices

def log_documents_to_file(log_filename, snapshot_name, index_name, documents, max_size=10*1024*1024):
    log_file_index = 0

    # Check and rollover the file if it exceeds max_size
    while os.path.exists(log_filename) and os.path.getsize(log_filename) > max_size:
        log_file_index += 1
        log_filename = f'logs_{log_file_index}.json'

    log_entry = {
        snapshot_name: {
            index_name: documents
        }
    }

    with open(log_filename, 'a') as log_file:
        log_file.write(json.dumps(log_entry, indent=4) + "\n")

# Example dates for testing
start_date = convert_year_month_day_to_datetime(2024, 1, 1)
end_date = convert_year_month_day_to_datetime(2024, 7, 1)

repository = get_repositories()[0]
print(repository)
snapshots = get_snapshots(repository)
print(snapshots)

# Example pattern for testing
micro_service_pattern = r'agent-microservice-\d+'

extracted_indices, filtered_snapshots = extract_indices_from_time_frame(
    repository, micro_service_pattern, start_date, end_date)

if filtered_snapshots:
    my_snapshot = filtered_snapshots[0]
    remaining_indices = restore_and_check_indices_in_reverse(repository, my_snapshot, extracted_indices, start_date)
    print(f"Remaining indices: {remaining_indices}")

    for index in remaining_indices:
        documents = get_documents_from_index(index)
        log_documents_to_file('logs.json', my_snapshot, index, documents)
        delete_restored_indices(index)
else:
    print("No snapshots found in the specified date range.")
Editor is loading...
Leave a Comment