Untitled

mail@pastecode.io avatar
unknown
python
a year ago
4.5 kB
1
Indexable
Never
# coding: utf-8
import pandas as pd
import numpy as np
import sys
from datetime import timedelta
import json, zlib
from base64 import b64encode, b64decode
import binascii
from python_db import DbConnection
sys.path.append('/apps/chickenboy-site/scripts/data_processing/db_connection')


 
def is_base64(s):
    s = ''.join([s.strip() for s in s.split("\n")])
    try:
        enc = b64encode(b64decode(s)).strip()
        return enc == s
    except TypeError:
        return False
    except binascii.Error:
        return False

def decompress_json(compressed_b64):
    ld = json.loads(zlib.decompress(b64decode(compressed_b64)).decode('utf-8'))
    return ld

class Interpolation(object):

    def __init__(self, round_id):
        self.round_id = round_id
        # self.observable_name = observable_name
        self.cnx = DbConnection().connect_to_db()
        self.cursor = self.cnx.cursor()
        self.precision = {
            0: {'float': '{: 0.0f}'.format},
            1: {'float': '{: 0.1f}'.format}
        }
        self.replace_precision = {
            0: ' 0',
            1: ' 0.0'
        }

    # Make sure at least 50 days
    def interpolate_for_pc(self):
        start, end, targets = self.get_pc_details()
        dates = self.between_dates(start, end)
        try:
            targets = (json.loads(targets.decode('utf-8').replace('₂', '2')))
        except:
            targets = (json.loads(str(targets).replace('₂', '2')))
        curves = []
        for i in range(2, len(targets)):
            curves.append({
                "day": int(targets[i][1]['value']),
                "temperature": targets[i][2]['value'],
                "humidity": targets[i][3]['value'],
                "co2": targets[i][4]['value'],
                "sound_level": targets[i][5]['value'],
                "free_space": targets[i][6]['value'],
                "airspeed": targets[i][7]['value'],
                "ammonia": targets[i][8]['value'],
                "light_intensity": targets[i][9]['value'],
                "weight": targets[i][10]['value'],
            })
        days = len(dates)
        cols = 10

        df = pd.DataFrame(curves).sort_values(by=['day'])
        df = df[df['day'] <= days]
        # remember to sort by day cause user can set it in any order
        df.set_index('day', inplace=True)
        data = np.full([days, cols], np.nan)
        target = pd.DataFrame(data,
                              columns=['day', 'temperature', 'humidity', 'co2', 'sound_level', 'free_space', 'airspeed',
                                       'ammonia', 'light_intensity', 'weight'])
        target['day'] = np.arange(days)
        target.set_index('day', inplace=True)
        res = np.nonzero(df.index.isin(target.index) == True)[0]
        df = df.iloc[res, :]
        target.loc[df.index] = df
        for col in target:
            target[col] = pd.to_numeric(target[col], errors='coerce')
        interpolate_target = target.interpolate()
        interpolate_target['time'] = dates
        jsons = interpolate_target.to_json(orient="index")
        return jsons

    def interpolate_for_graphs(self, observable, data):
        if is_base64(data):
            parse_data = decompress_json(data)
        else:
            parse_data = json.loads(data)


        target = self.interpolate_for_pc()
        target = pd.read_json(target, orient="index")
        formatted_dates = [date.split()[0] for date in parse_data["date"]]
        sub_target = [float(target[target['time'] == date].iloc[0][observable]) for date in formatted_dates]
        return sub_target

    def get_curves(self):
        query = "Select targets from round where id ='{}'".format(self.round_id)
        self.cursor.execute(query)
        targets = self.cursor.fetchone()
        return targets

    def get_pc_details(self):
        query = "Select `from`, `to`, targets from round where id ='{}'".format(self.round_id)
        self.cursor.execute(query)
        round = self.cursor.fetchone()
        return round

    def between_dates(self, start, end):
        days = []
        diff = end - start
        for i in range(diff.days + 1):
            day = start + timedelta(days=i)
            days.append(str(day))
        return days

    def get_observable_precision(self, observable):
        query = "SELECT observable_precision FROM observable WHERE code_name ='{}'".format(observable)
        self.cursor.execute(query)
        precision = self.cursor.fetchone()
        return precision[0]