Untitled
unknown
python
a year ago
5.8 kB
2
Indexable
Never
# coding: utf-8 import pandas as pd import numpy as np import collections import sys import json, ast from datetime import datetime, timedelta import json, zlib from base64 import b64encode, b64decode sys.path.append('/apps/chickenboy-site/scripts/data_processing/db_connection') from python_db import DbConnection def is_base64(s): s = ''.join([s.strip() for s in s.split("\n")]) try: enc = b64encode(b64decode(s)).strip() return enc == s except TypeError: return False def decompress_json(compressed_b64): ld = json.loads(zlib.decompress(b64decode(compressed_b64)).decode('utf-8')) return ld class Interpolation(object): def __init__(self, round_id): self.round_id = round_id # self.observable_name = observable_name self.cnx = DbConnection().connect_to_db() self.cursor = self.cnx.cursor() self.precision = { 0: {'float': '{: 0.0f}'.format}, 1: {'float': '{: 0.1f}'.format} } self.replace_precision = { 0: ' 0', 1: ' 0.0' } # Make sure at least 50 days def interpolate_for_pc(self): start, end, targets = self.get_pc_details() dates = self.between_dates(start, end) try: targets = (json.loads(targets.decode('utf-8').replace('₂', '2'))) except: targets = (json.loads(str(targets).replace('₂', '2'))) curves = [] for i in range(2, len(targets)): curves.append({ "day": int(targets[i][1]['value']), "temperature": targets[i][2]['value'], "humidity": targets[i][3]['value'], "co2": targets[i][4]['value'], "sound_level": targets[i][5]['value'], "free_space": targets[i][6]['value'], "airspeed": targets[i][7]['value'], "ammonia": targets[i][8]['value'], "light_intensity": targets[i][9]['value'], "weight": targets[i][10]['value'], }) days = len(dates) cols = 10 df = pd.DataFrame(curves).sort_values(by=['day']) df = df[df['day'] <= days] # remember to sort by day cause user can set it in any order df.set_index('day', inplace=True) data = np.full([days, cols], np.nan) target = pd.DataFrame(data, columns=['day', 'temperature', 'humidity', 'co2', 'sound_level', 'free_space', 'airspeed', 'ammonia', 'light_intensity', 'weight']) target['day'] = np.arange(days) target.set_index('day', inplace=True) res = np.nonzero(df.index.isin(target.index) == True)[0] df = df.iloc[res, :] target.loc[df.index] = df for col in target: target[col] = pd.to_numeric(target[col], errors='coerce') interpolate_target = target.interpolate() interpolate_target['time'] = dates jsons = interpolate_target.to_json(orient="index") return jsons def interpolate_for_graphs(self, observable, data, dates): if is_base64(data): parse_data = decompress_json(data) else: parse_data = json.loads(data) parse_dates = json.loads(dates) target = self.interpolate_for_pc() target = pd.read_json(target, orient="index") # formatted_dates = [(datetime.strptime(date.split()[0], "%Y-%m-%d").date()) for date in parse_dates] formatted_dates = [date.split()[0] for date in parse_dates] sub_target = [float(target[target['time'] == date].iloc[0][observable]) for date in formatted_dates] precision = self.get_observable_precision(observable) np.set_printoptions(threshold=sys.maxsize, precision=2, formatter=self.precision[precision]) diff = np.subtract((sub_target), (parse_data)) return sub_target, np.array2string(diff, separator=', ', max_line_width=100000).replace('\n', '').replace('\'', '"') def get_curves(self): query = "Select targets from round where id ='{}'".format(self.round_id) self.cursor.execute(query) targets = self.cursor.fetchone() return targets def get_pc_details(self): query = "Select `from`, `to`, targets from round where id ='{}'".format(self.round_id) self.cursor.execute(query) round = self.cursor.fetchone() return round def between_dates(self, start, end): # start_date = datetime.strptime(start, "%Y-%m-%d").date() # end_date = datetime.strptime(end, "%Y-%m-%d").date() days = [] diff = end - start for i in range(diff.days + 1): day = start + timedelta(days=i) days.append(str(day)) return days def get_observable_precision(self, observable): query = "SELECT observable_precision FROM observable WHERE code_name ='{}'".format(observable) self.cursor.execute(query) precision = self.cursor.fetchone() return precision[0] if __name__ == '__main__': interpolate = Interpolation(sys.argv[1]) arg_names = ['file_name', 'round_id', 'observable', 'data', 'dates'] args = dict(zip(arg_names, sys.argv)) Arg_list = collections.namedtuple('Arg_list', arg_names) args = Arg_list(*(args.get(arg, None) for arg in arg_names)) if (args.observable == None): print(interpolate.interpolate_for_pc()) else: try: target, diff = (interpolate.interpolate_for_graphs(args.observable, args.data, args.dates)) print(target) print(diff) except: print() print()