Untitled

mail@pastecode.io avatarunknown
plain_text
2 months ago
3.9 kB
0
Indexable
Never
from kubernetes import client, config
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import logging
from logging import Logger
import os


def get_master_route_tables_statuses(logger: Logger, group: str, version: str, namespace: str, plural: str):
    route_tables_statuses = []
    try:
        api_client = config.load_kube_config(os.path.join(os.environ["HOME"], ".kube/config"))
    except Exception as e:
        print("Error loading out of cluster k8s config: {0}".format(e))
        return
    custom_api = client.CustomObjectsApi(api_client)
    try:
        route_tables = custom_api.list_namespaced_custom_object(
            group=group,
            version=version,
            namespace=namespace,
            plural=plural
        )
    except client.ApiException as err:
        logger.error(f"Could not list route table objects: {err.status} {err.reason}")

    for route_table in route_tables["items"]:
        route_table_status = {
            "namespace": route_table["metadata"]["namespace"],
            "route_table_name": route_table["metadata"]["name"],
            "state": route_table["status"]["global"]["state"],
        }
        route_tables_statuses.append(route_table_status)
    return route_tables_statuses


class InfluxClient:
    # __conf = {
    #     "org": "",
    #     "bucket": "",
    #     "token": "",
    #     "url": "http://localhost:8086",
    #     "client": InfluxDBClient(url=url, token=token)
    # }
    # __setters = ["org", "bucket", "token"]
    #
    # @staticmethod
    # def config(name):
    #     return InfluxClient.__conf[name]
    #
    # @staticmethod
    # def set(name, value):
    #     if name in InfluxClient.__setters:
    #         InfluxClient.__conf[name] = value
    #     else:
    #         raise NameError("Name not accepted in set() method")


    def __init__(self,
                 token,
                 org,
                 bucket):
        self.__org = org
        self.__bucket = bucket
        self.__client = InfluxDBClient(url="http://localhost:8086", token=token)

    def write_data(self, data, write_option=SYNCHRONOUS):
        write_api = self.__client.write_api(write_option)
        write_api.write(self.__bucket, self.__org, data, write_precision='m')


# def save_status_to_influxdb(statuses):
#     influxdb_url = "http://localhost:8086"
#     influxdb_token = "GFwfSh-lfTXbtfdtz7rKqXTyuVK-PlOW-WA1mh8tg5boH8VBMkqgZQLyFIXLZ0xi3i64Z0pPyBDAmp5TBxuWsg=="
#     influxdb_org = "TestOrg"
#     influxdb_bucket = "TestBucket"
#
#     influxdb_client = InfluxDBClient(url=influxdb_url, token=influxdb_token)
#
#     write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
#     for status in statuses:
#         point = Point("master_route_table_status") \
#             .tag("namespace", status["namespace"]) \
#             .tag("master_route_table_name", status["route_table_name"]) \
#             .field("state", status["state"])
#         print(point)
#         write_api.write(bucket=influxdb_bucket, org=influxdb_org, record=point)


if __name__ == "__main__":
    logger = logging.getLogger("uvicorn")
    route_table_statuses = get_master_route_tables_statuses(logger, "networking.gloo.solo.io", "v2", "product-shop-master-route-table", "routetables")
    print(route_table_statuses)
    influxdb_token = "GFwfSh-lfTXbtfdtz7rKqXTyuVK-PlOW-WA1mh8tg5boH8VBMkqgZQLyFIXLZ0xi3i64Z0pPyBDAmp5TBxuWsg=="
    influxdb_org = "TestOrg"
    influxdb_bucket = "TestBucket"
    IC = InfluxClient(influxdb_token, influxdb_org, influxdb_bucket)
    IC.write_data(
        [
            Point("master_route_table_status")
            .tag("route_table_name", route_table_statuses[""])
        ],
    )
    # save_status_to_influxdb(route_table_statuses)
    print("Route Table Statuses saved to InfluxDB")