Untitled
from influxdb import InfluxDBClient import plotly.graph_objects as go class InfluxDBHandler: def __init__(self, host='localhost', port=8086, username='', password='', database=''): self.client = InfluxDBClient(host, port, username, password, database) def query_data(self, query): """ Query data from InfluxDB. """ return self.client.query(query) def export_time_series(self, measurement, start_time, end_time): """ Export time series data within a specified time range. """ query = f"SELECT * FROM {measurement} WHERE time >= '{start_time}' AND time <= '{end_time}'" return list(self.query_data(query).get_points()) def identify_production_runs(self, data, field, threshold): """ Identify production runs based on threshold crossings and return their start and end times. """ runs = [] run_started = False start_time = None for point in data: if not run_started and point[field] > threshold: start_time = point['time'] # Mark the start of a new run run_started = True elif run_started and point[field] <= threshold: end_time = point['time'] runs.append((start_time, end_time)) # End of a run run_started = False if run_started: # Handle case where last run goes till the end of data runs.append((start_time, data[-1]['time'])) return runs def export_runs(self, measurement, runs): """ Export time series data for each identified production run. """ run_data = [] for start_time, end_time in runs: run_data.append(self.export_time_series(measurement, start_time, end_time)) return run_data def plot_multiple_tags(self, data, fields): """ Plot multiple tags from time series data in one Plotly line plot. """ fig = go.Figure() # Convert data to a dictionary of lists for easy access data_dict = {field: [] for field in fields} data_dict['time'] = [] for point in data: for field in fields: if field in point: data_dict[field].append(point[field]) data_dict['time'].append(point['time']) # Add a line for each field for field in fields: fig.add_trace(go.Scatter(x=data_dict['time'], y=data_dict[field], mode='lines', name=field)) fig.update_layout(title='Multiple Tags Time Series Plot', xaxis_title='Time', yaxis_title='Value') fig.show() # Example usage db_handler = InfluxDBHandler(host='your_host', port=8086, username='your_username', password='your_password', database='your_database') measurement = 'your_measurement' start_time = '2023-01-01T00:00:00Z' end_time = '2023-01-02T00:00:00Z' threshold_field = 'your_threshold_field' threshold_value = 50 # Define your threshold value here fields_to_plot = ['field1', 'field2', 'field3'] # Replace these with your actual field names # Export data exported_data = db_handler.export_time_series(measurement, start_time, end_time) # Identify production runs based on a threshold runs = db_handler.identify_production_runs(exported_data, threshold_field, threshold_value) # Export runs data (optional step depending on your needs) run_data = db_handler.export_runs(measurement, runs) # Plotting the data of the first run for demonstration if run_data: db_handler.plot_multiple_tags(run_data[0], fields_to_plot)
Leave a Comment