Untitled
plain_text
2 months ago
3.9 kB
0
Indexable
Never
from kubernetes import client, config from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS import logging from logging import Logger import os def get_master_route_tables_statuses(logger: Logger, group: str, version: str, namespace: str, plural: str): route_tables_statuses = [] try: api_client = config.load_kube_config(os.path.join(os.environ["HOME"], ".kube/config")) except Exception as e: print("Error loading out of cluster k8s config: {0}".format(e)) return custom_api = client.CustomObjectsApi(api_client) try: route_tables = custom_api.list_namespaced_custom_object( group=group, version=version, namespace=namespace, plural=plural ) except client.ApiException as err: logger.error(f"Could not list route table objects: {err.status} {err.reason}") for route_table in route_tables["items"]: route_table_status = { "namespace": route_table["metadata"]["namespace"], "route_table_name": route_table["metadata"]["name"], "state": route_table["status"]["global"]["state"], } route_tables_statuses.append(route_table_status) return route_tables_statuses class InfluxClient: # __conf = { # "org": "", # "bucket": "", # "token": "", # "url": "http://localhost:8086", # "client": InfluxDBClient(url=url, token=token) # } # __setters = ["org", "bucket", "token"] # # @staticmethod # def config(name): # return InfluxClient.__conf[name] # # @staticmethod # def set(name, value): # if name in InfluxClient.__setters: # InfluxClient.__conf[name] = value # else: # raise NameError("Name not accepted in set() method") def __init__(self, token, org, bucket): self.__org = org self.__bucket = bucket self.__client = InfluxDBClient(url="http://localhost:8086", token=token) def write_data(self, data, write_option=SYNCHRONOUS): write_api = self.__client.write_api(write_option) write_api.write(self.__bucket, self.__org, data, write_precision='m') # def save_status_to_influxdb(statuses): # influxdb_url = "http://localhost:8086" # influxdb_token = "GFwfSh-lfTXbtfdtz7rKqXTyuVK-PlOW-WA1mh8tg5boH8VBMkqgZQLyFIXLZ0xi3i64Z0pPyBDAmp5TBxuWsg==" # influxdb_org = "TestOrg" # influxdb_bucket = "TestBucket" # # influxdb_client = InfluxDBClient(url=influxdb_url, token=influxdb_token) # # write_api = influxdb_client.write_api(write_options=SYNCHRONOUS) # for status in statuses: # point = Point("master_route_table_status") \ # .tag("namespace", status["namespace"]) \ # .tag("master_route_table_name", status["route_table_name"]) \ # .field("state", status["state"]) # print(point) # write_api.write(bucket=influxdb_bucket, org=influxdb_org, record=point) if __name__ == "__main__": logger = logging.getLogger("uvicorn") route_table_statuses = get_master_route_tables_statuses(logger, "networking.gloo.solo.io", "v2", "product-shop-master-route-table", "routetables") print(route_table_statuses) influxdb_token = "GFwfSh-lfTXbtfdtz7rKqXTyuVK-PlOW-WA1mh8tg5boH8VBMkqgZQLyFIXLZ0xi3i64Z0pPyBDAmp5TBxuWsg==" influxdb_org = "TestOrg" influxdb_bucket = "TestBucket" IC = InfluxClient(influxdb_token, influxdb_org, influxdb_bucket) IC.write_data( [ Point("master_route_table_status") .tag("route_table_name", route_table_statuses[""]) ], ) # save_status_to_influxdb(route_table_statuses) print("Route Table Statuses saved to InfluxDB")