Untitled
unknown
plain_text
2 years ago
3.7 kB
8
Indexable
from influxdb import InfluxDBClient
import plotly.graph_objects as go
class InfluxDBHandler:
def __init__(self, host='localhost', port=8086, username='', password='', database=''):
self.client = InfluxDBClient(host, port, username, password, database)
def query_data(self, query):
"""
Query data from InfluxDB.
"""
return self.client.query(query)
def export_time_series(self, measurement, start_time, end_time):
"""
Export time series data within a specified time range.
"""
query = f"SELECT * FROM {measurement} WHERE time >= '{start_time}' AND time <= '{end_time}'"
return list(self.query_data(query).get_points())
def identify_production_runs(self, data, field, threshold):
"""
Identify production runs based on threshold crossings and return their start and end times.
"""
runs = []
run_started = False
start_time = None
for point in data:
if not run_started and point[field] > threshold:
start_time = point['time'] # Mark the start of a new run
run_started = True
elif run_started and point[field] <= threshold:
end_time = point['time']
runs.append((start_time, end_time)) # End of a run
run_started = False
if run_started: # Handle case where last run goes till the end of data
runs.append((start_time, data[-1]['time']))
return runs
def export_runs(self, measurement, runs):
"""
Export time series data for each identified production run.
"""
run_data = []
for start_time, end_time in runs:
run_data.append(self.export_time_series(measurement, start_time, end_time))
return run_data
def plot_multiple_tags(self, data, fields):
"""
Plot multiple tags from time series data in one Plotly line plot.
"""
fig = go.Figure()
# Convert data to a dictionary of lists for easy access
data_dict = {field: [] for field in fields}
data_dict['time'] = []
for point in data:
for field in fields:
if field in point:
data_dict[field].append(point[field])
data_dict['time'].append(point['time'])
# Add a line for each field
for field in fields:
fig.add_trace(go.Scatter(x=data_dict['time'], y=data_dict[field], mode='lines', name=field))
fig.update_layout(title='Multiple Tags Time Series Plot', xaxis_title='Time', yaxis_title='Value')
fig.show()
# Example usage
db_handler = InfluxDBHandler(host='your_host', port=8086, username='your_username', password='your_password', database='your_database')
measurement = 'your_measurement'
start_time = '2023-01-01T00:00:00Z'
end_time = '2023-01-02T00:00:00Z'
threshold_field = 'your_threshold_field'
threshold_value = 50 # Define your threshold value here
fields_to_plot = ['field1', 'field2', 'field3'] # Replace these with your actual field names
# Export data
exported_data = db_handler.export_time_series(measurement, start_time, end_time)
# Identify production runs based on a threshold
runs = db_handler.identify_production_runs(exported_data, threshold_field, threshold_value)
# Export runs data (optional step depending on your needs)
run_data = db_handler.export_runs(measurement, runs)
# Plotting the data of the first run for demonstration
if run_data:
db_handler.plot_multiple_tags(run_data[0], fields_to_plot)
Editor is loading...
Leave a Comment