Untitled

mail@pastecode.io avatar
unknown
python
2 years ago
5.8 kB
2
Indexable
# coding: utf-8
import pandas as pd
import numpy as np
import collections
import sys
import json, ast
from datetime import datetime, timedelta
import json, zlib
from base64 import b64encode, b64decode
sys.path.append('/apps/chickenboy-site/scripts/data_processing/db_connection')
from python_db import DbConnection


def is_base64(s):
    s = ''.join([s.strip() for s in s.split("\n")])
    try:
        enc = b64encode(b64decode(s)).strip()
        return enc == s
    except TypeError:
        return False

def decompress_json(compressed_b64):
    ld = json.loads(zlib.decompress(b64decode(compressed_b64)).decode('utf-8'))
    return ld

class Interpolation(object):

    def __init__(self, round_id):
        self.round_id = round_id
        # self.observable_name = observable_name
        self.cnx = DbConnection().connect_to_db()
        self.cursor = self.cnx.cursor()
        self.precision = {
            0: {'float': '{: 0.0f}'.format},
            1: {'float': '{: 0.1f}'.format}
        }
        self.replace_precision = {
            0: ' 0',
            1: ' 0.0'
        }

    # Make sure at least 50 days
    def interpolate_for_pc(self):
        start, end, targets = self.get_pc_details()
        dates = self.between_dates(start, end)
        try:
            targets = (json.loads(targets.decode('utf-8').replace('₂', '2')))
        except:
            targets = (json.loads(str(targets).replace('₂', '2')))
        curves = []
        for i in range(2, len(targets)):
            curves.append({
                "day": int(targets[i][1]['value']),
                "temperature": targets[i][2]['value'],
                "humidity": targets[i][3]['value'],
                "co2": targets[i][4]['value'],
                "sound_level": targets[i][5]['value'],
                "free_space": targets[i][6]['value'],
                "airspeed": targets[i][7]['value'],
                "ammonia": targets[i][8]['value'],
                "light_intensity": targets[i][9]['value'],
                "weight": targets[i][10]['value'],
            })
        days = len(dates)
        cols = 10

        df = pd.DataFrame(curves).sort_values(by=['day'])
        df = df[df['day'] <= days]
        # remember to sort by day cause user can set it in any order
        df.set_index('day', inplace=True)
        data = np.full([days, cols], np.nan)
        target = pd.DataFrame(data,
                              columns=['day', 'temperature', 'humidity', 'co2', 'sound_level', 'free_space', 'airspeed',
                                       'ammonia', 'light_intensity', 'weight'])
        target['day'] = np.arange(days)
        target.set_index('day', inplace=True)
        res = np.nonzero(df.index.isin(target.index) == True)[0]
        df = df.iloc[res, :]
        target.loc[df.index] = df
        for col in target:
            target[col] = pd.to_numeric(target[col], errors='coerce')
        interpolate_target = target.interpolate()
        interpolate_target['time'] = dates
        jsons = interpolate_target.to_json(orient="index")
        return jsons

    def interpolate_for_graphs(self, observable, data, dates):
        if is_base64(data):
            parse_data = decompress_json(data)
        else:
            parse_data = json.loads(data)

        parse_dates = json.loads(dates)
        target = self.interpolate_for_pc()
        target = pd.read_json(target, orient="index")
        # formatted_dates = [(datetime.strptime(date.split()[0], "%Y-%m-%d").date()) for date in parse_dates]
        formatted_dates = [date.split()[0] for date in parse_dates]
        sub_target = [float(target[target['time'] == date].iloc[0][observable]) for date in formatted_dates]
        precision = self.get_observable_precision(observable)
        np.set_printoptions(threshold=sys.maxsize, precision=2, formatter=self.precision[precision])
        diff = np.subtract((sub_target), (parse_data))
        return sub_target, np.array2string(diff, separator=', ', max_line_width=100000).replace('\n', '').replace('\'',
                                                                                                                  '"')

    def get_curves(self):
        query = "Select targets from round where id ='{}'".format(self.round_id)
        self.cursor.execute(query)
        targets = self.cursor.fetchone()
        return targets

    def get_pc_details(self):
        query = "Select `from`, `to`, targets from round where id ='{}'".format(self.round_id)
        self.cursor.execute(query)
        round = self.cursor.fetchone()
        return round

    def between_dates(self, start, end):
        # start_date = datetime.strptime(start, "%Y-%m-%d").date()
        # end_date = datetime.strptime(end, "%Y-%m-%d").date()
        days = []
        diff = end - start
        for i in range(diff.days + 1):
            day = start + timedelta(days=i)
            days.append(str(day))
        return days

    def get_observable_precision(self, observable):
        query = "SELECT observable_precision FROM observable WHERE code_name ='{}'".format(observable)
        self.cursor.execute(query)
        precision = self.cursor.fetchone()
        return precision[0]


if __name__ == '__main__':
    interpolate = Interpolation(sys.argv[1])
    arg_names = ['file_name', 'round_id', 'observable', 'data', 'dates']



    args = dict(zip(arg_names, sys.argv))
    Arg_list = collections.namedtuple('Arg_list', arg_names)
    args = Arg_list(*(args.get(arg, None) for arg in arg_names))
    if (args.observable == None):
        print(interpolate.interpolate_for_pc())
    else:
        try:
            target, diff = (interpolate.interpolate_for_graphs(args.observable, args.data, args.dates))
            print(target)
            print(diff)
        except:
            print()
            print()