mail@pastecode.io avatar
23 days ago
3.7 kB
from influxdb import InfluxDBClient
import plotly.graph_objects as go

class InfluxDBHandler:
    def __init__(self, host='localhost', port=8086, username='', password='', database=''):
        self.client = InfluxDBClient(host, port, username, password, database)

    def query_data(self, query):
        Query data from InfluxDB.
        return self.client.query(query)

    def export_time_series(self, measurement, start_time, end_time):
        Export time series data within a specified time range.
        query = f"SELECT * FROM {measurement} WHERE time >= '{start_time}' AND time <= '{end_time}'"
        return list(self.query_data(query).get_points())

    def identify_production_runs(self, data, field, threshold):
        Identify production runs based on threshold crossings and return their start and end times.
        runs = []
        run_started = False
        start_time = None
        for point in data:
            if not run_started and point[field] > threshold:
                start_time = point['time']  # Mark the start of a new run
                run_started = True
            elif run_started and point[field] <= threshold:
                end_time = point['time']
                runs.append((start_time, end_time))  # End of a run
                run_started = False
        if run_started:  # Handle case where last run goes till the end of data
            runs.append((start_time, data[-1]['time']))
        return runs

    def export_runs(self, measurement, runs):
        Export time series data for each identified production run.
        run_data = []
        for start_time, end_time in runs:
            run_data.append(self.export_time_series(measurement, start_time, end_time))
        return run_data

    def plot_multiple_tags(self, data, fields):
        Plot multiple tags from time series data in one Plotly line plot.
        fig = go.Figure()

        # Convert data to a dictionary of lists for easy access
        data_dict = {field: [] for field in fields}
        data_dict['time'] = []
        for point in data:
            for field in fields:
                if field in point:

        # Add a line for each field
        for field in fields:
            fig.add_trace(go.Scatter(x=data_dict['time'], y=data_dict[field], mode='lines', name=field))

        fig.update_layout(title='Multiple Tags Time Series Plot', xaxis_title='Time', yaxis_title='Value')

# Example usage
db_handler = InfluxDBHandler(host='your_host', port=8086, username='your_username', password='your_password', database='your_database')
measurement = 'your_measurement'
start_time = '2023-01-01T00:00:00Z'
end_time = '2023-01-02T00:00:00Z'
threshold_field = 'your_threshold_field'
threshold_value = 50  # Define your threshold value here
fields_to_plot = ['field1', 'field2', 'field3']  # Replace these with your actual field names

# Export data
exported_data = db_handler.export_time_series(measurement, start_time, end_time)
# Identify production runs based on a threshold
runs = db_handler.identify_production_runs(exported_data, threshold_field, threshold_value)
# Export runs data (optional step depending on your needs)
run_data = db_handler.export_runs(measurement, runs)
# Plotting the data of the first run for demonstration
if run_data:
    db_handler.plot_multiple_tags(run_data[0], fields_to_plot)
Leave a Comment