Untitled

mail@pastecode.io avatar
unknown
plain_text
a month ago
6.7 kB
2
Indexable
Never
import { InfluxDB, FluxResultObserver } from '@influxdata/influxdb-client';
import moment from 'moment';

// Define PowerDevice interface
export interface PowerDevice {
  id: string;
  phase: number;
}

// Define RangeTime type

export type RangeTime = '-1d' | '-30d' | '-1y';


export interface kWhData {
  xAxis: string[];
  series: {
    valuesOne: number[]; 
    valuesTwo: number[]; 
    valuesThree: number[];
  };
}


export interface VoltageData {
  xAxis: string[];
  series: {
    valuesOne: number[];
    valuesTwo: number[];
    valuesThree: number[];
  };
}


export interface CurrentData {
  xAxis: string[];
  series: {
    valuesOne: number[];
    valuesTwo: number[];
    valuesThree: number[];
  };
}

export interface PowerFactorData {
  xAxis: string[];
  series: {
    valuesOne: number[];
    valuesTwo: number[];
    valuesThree: number[];
  };
}


// Define InfluxDB connection details
const url = process.env.NEXT_PUBLIC_INFLUX_URL || 'http:8086';
const token = process.env.NEXT_PUBLIC_INFLUX_API_TOKEN || 'MDO-g8PH3CosbXk6JN9Y4EgsAmuGb89uGsNyx40tL3vsehIisfz0ACMHlgbXTCb3-SPz8x8Ho-XYuojeGaajGw==';
const org = 'polygon';

// Function to get start date based on RangeTime
const getStartDate = (start: RangeTime): string => {
  let startDate = '-1d';

  switch (start) {
    case '-1d':
      startDate = moment().startOf('day').toISOString();
      break;
    case '-30d':
      startDate = moment().startOf('month').toISOString();
      break;
    case '-1y':
      startDate = moment().startOf('year').toISOString();
      break;
    default:
      break;
  }

  return startDate;
};

// Function to construct InfluxDB Flux query
const getQuery = (item: string, start: RangeTime): string => {
  const startDate = getStartDate(start);

  const query = `
    from(bucket: "polygon_power_device")
      |> range(start: ${startDate})
      |> filter(fn: (r) => r["_measurement"] == "mqtt_consumer")
      |> filter(fn: (r) => r["_field"] == "${item}")
      |> filter(fn: (r) => r["topic"] == "all_data")
      |> aggregateWindow(every: ${start === '-1d' ? '30m' : start === '-30d' ? '1d' : '30d'}, fn: mean, createEmpty: false)
      |> yield(name: "mean")
  `;

  return query;
};


export const querykWh = async (device: PowerDevice, time: RangeTime): Promise<kWhData> => {
  const queryApi = new InfluxDB({ url, token }).getQueryApi(org);

  const fieldOne = `all_data_${device.id}_${device.phase === 2 ? '' : 'p1_'}kwh`;
  const fieldTwo = `all_data_${device.id}_${device.phase === 2 ? '' : 'p2_'}kwh`;
  const fieldThree = `all_data_${device.id}_${device.phase === 2 ? '' : 'p3_'}kwh`;

  const valuesOne: number[] = [];
  const valuesTwo: number[] = [];
  const valuesThree: number[] = [];
  const xAxis: string[] = [];

  try {
    // Fetch data for fieldOne
    await fetchAndProcessData(queryApi, getQuery(fieldOne, time), valuesOne, xAxis);
    // Fetch data for fieldTwo
    await fetchAndProcessData(queryApi, getQuery(fieldTwo, time), valuesTwo);
    // Fetch data for fieldThree
    await fetchAndProcessData(queryApi, getQuery(fieldThree, time), valuesThree);

    return { xAxis, series: { valuesOne, valuesTwo, valuesThree } };
  } catch (error) {
    throw error;
  }
};



// Export queryVoltage function to fetch voltage data
export const queryVoltage = async (device: PowerDevice, time: RangeTime): Promise<VoltageData> => {
  const queryApi = new InfluxDB({ url, token }).getQueryApi(org);

  const fieldOne = `all_data_${device.id}_${device.phase === 2 ? '' : 'p1_'}v`;
  const fieldTwo = `all_data_${device.id}_${device.phase === 2 ? '' : 'p2_'}v`;
  const fieldThree = `all_data_${device.id}_${device.phase === 2 ? '' : 'p3_'}v`;

  const valuesOne: number[] = [];
  const valuesTwo: number[] = [];
  const valuesThree: number[] = [];
  const xAxis: string[] = [];

  try {
    // Fetch data for fieldOne
    await fetchAndProcessData(queryApi, getQuery(fieldOne, time), valuesOne, xAxis);
    // Fetch data for fieldTwo
    await fetchAndProcessData(queryApi, getQuery(fieldTwo, time), valuesTwo);
    // Fetch data for fieldThree
    await fetchAndProcessData(queryApi, getQuery(fieldThree, time), valuesThree);

    return { xAxis, series: { valuesOne, valuesTwo, valuesThree } };
  } catch (error) {
    throw error;
  }
};



export const queryCurrent = async (device: PowerDevice, time: RangeTime): Promise<CurrentData> => {
  const queryApi = new InfluxDB({ url, token }).getQueryApi(org);

  const fieldOne = `all_data_${device.id}_${device.phase === 2 ? '' : 'p1_'}i`;
  const fieldTwo = `all_data_${device.id}_${device.phase === 2 ? '' : 'p2_'}i`;
  const fieldThree = `all_data_${device.id}_${device.phase === 2 ? '' : 'p3_'}i`;

  const valuesOne: number[] = [];
  const valuesTwo: number[] = [];
  const valuesThree: number[] = [];
  const xAxis: string[] = [];

  try {
    // Fetch data for fieldOne
    await fetchAndProcessData(queryApi, getQuery(fieldOne, time), valuesOne, xAxis);
    // Fetch data for fieldTwo
    await fetchAndProcessData(queryApi, getQuery(fieldTwo, time), valuesTwo);
    // Fetch data for fieldThree
    await fetchAndProcessData(queryApi, getQuery(fieldThree, time), valuesThree);

    return { xAxis, series: { valuesOne, valuesTwo, valuesThree } };
  } catch (error) {
    throw error;
  }
};



export const queryPF = async (device: PowerDevice, time: RangeTime): Promise<PowerFactorData> => {
  const queryApi = new InfluxDB({ url, token }).getQueryApi(org);

  const fieldOne = `all_data_${device.id}_${device.phase === 2 ? '' : 'p1_'}pf`;
  const fieldTwo = `all_data_${device.id}_${device.phase === 2 ? '' : 'p2_'}pf`;
  const fieldThree = `all_data_${device.id}_${device.phase === 2 ? '' : 'p3_'}pf`;

  const valuesOne: number[] = [];
  const valuesTwo: number[] = [];
  const valuesThree: number[] = [];
  const xAxis: string[] = [];

  try {
    // Fetch data for fieldOne
    await fetchAndProcessData(queryApi, getQuery(fieldOne, time), valuesOne, xAxis);
    // Fetch data for fieldTwo
    await fetchAndProcessData(queryApi, getQuery(fieldTwo, time), valuesTwo);
    // Fetch data for fieldThree
    await fetchAndProcessData(queryApi, getQuery(fieldThree, time), valuesThree);

    return { xAxis, series: { valuesOne, valuesTwo, valuesThree } };
  } catch (error) {

    throw error;
  }
};



// Function to fetch and process data rows from InfluxDB
const fetchAndProcessData = async (
  queryApi: ReturnType<InfluxDB['getQueryApi']>,
  query: string,
  values: number[],
  xAxis?: string[]
): Promise<void> => {
  for await (const { values: rowValues, tableMeta } of queryApi.iterateRows(query)) {
    const o = tableMeta.toObject(rowValues);
    xAxis?.push(o._time);
    values.push(parseFloat(o._value));
  }
};
Leave a Comment