Untitled
unknown
plain_text
2 years ago
3.9 kB
7
Indexable
from kubernetes import client, config
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import logging
from logging import Logger
import os
def get_master_route_tables_statuses(logger: Logger, group: str, version: str, namespace: str, plural: str):
route_tables_statuses = []
try:
api_client = config.load_kube_config(os.path.join(os.environ["HOME"], ".kube/config"))
except Exception as e:
print("Error loading out of cluster k8s config: {0}".format(e))
return
custom_api = client.CustomObjectsApi(api_client)
try:
route_tables = custom_api.list_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural
)
except client.ApiException as err:
logger.error(f"Could not list route table objects: {err.status} {err.reason}")
for route_table in route_tables["items"]:
route_table_status = {
"namespace": route_table["metadata"]["namespace"],
"route_table_name": route_table["metadata"]["name"],
"state": route_table["status"]["global"]["state"],
}
route_tables_statuses.append(route_table_status)
return route_tables_statuses
class InfluxClient:
# __conf = {
# "org": "",
# "bucket": "",
# "token": "",
# "url": "http://localhost:8086",
# "client": InfluxDBClient(url=url, token=token)
# }
# __setters = ["org", "bucket", "token"]
#
# @staticmethod
# def config(name):
# return InfluxClient.__conf[name]
#
# @staticmethod
# def set(name, value):
# if name in InfluxClient.__setters:
# InfluxClient.__conf[name] = value
# else:
# raise NameError("Name not accepted in set() method")
def __init__(self,
token,
org,
bucket):
self.__org = org
self.__bucket = bucket
self.__client = InfluxDBClient(url="http://localhost:8086", token=token)
def write_data(self, data, write_option=SYNCHRONOUS):
write_api = self.__client.write_api(write_option)
write_api.write(self.__bucket, self.__org, data, write_precision='m')
# def save_status_to_influxdb(statuses):
# influxdb_url = "http://localhost:8086"
# influxdb_token = "GFwfSh-lfTXbtfdtz7rKqXTyuVK-PlOW-WA1mh8tg5boH8VBMkqgZQLyFIXLZ0xi3i64Z0pPyBDAmp5TBxuWsg=="
# influxdb_org = "TestOrg"
# influxdb_bucket = "TestBucket"
#
# influxdb_client = InfluxDBClient(url=influxdb_url, token=influxdb_token)
#
# write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
# for status in statuses:
# point = Point("master_route_table_status") \
# .tag("namespace", status["namespace"]) \
# .tag("master_route_table_name", status["route_table_name"]) \
# .field("state", status["state"])
# print(point)
# write_api.write(bucket=influxdb_bucket, org=influxdb_org, record=point)
if __name__ == "__main__":
logger = logging.getLogger("uvicorn")
route_table_statuses = get_master_route_tables_statuses(logger, "networking.gloo.solo.io", "v2", "product-shop-master-route-table", "routetables")
print(route_table_statuses)
influxdb_token = "GFwfSh-lfTXbtfdtz7rKqXTyuVK-PlOW-WA1mh8tg5boH8VBMkqgZQLyFIXLZ0xi3i64Z0pPyBDAmp5TBxuWsg=="
influxdb_org = "TestOrg"
influxdb_bucket = "TestBucket"
IC = InfluxClient(influxdb_token, influxdb_org, influxdb_bucket)
IC.write_data(
[
Point("master_route_table_status")
.tag("route_table_name", route_table_statuses[""])
],
)
# save_status_to_influxdb(route_table_statuses)
print("Route Table Statuses saved to InfluxDB")
Editor is loading...