# coding: utf-8
import pandas as pd
import numpy as np
import collections
import sys
import json, ast
from datetime import datetime, timedelta
import json, zlib
from base64 import b64encode, b64decode
sys.path.append('/apps/chickenboy-site/scripts/data_processing/db_connection')
from python_db import DbConnection
def is_base64(s):
s = ''.join([s.strip() for s in s.split("\n")])
try:
enc = b64encode(b64decode(s)).strip()
return enc == s
except TypeError:
return False
def decompress_json(compressed_b64):
ld = json.loads(zlib.decompress(b64decode(compressed_b64)).decode('utf-8'))
return ld
class Interpolation(object):
def __init__(self, round_id):
self.round_id = round_id
# self.observable_name = observable_name
self.cnx = DbConnection().connect_to_db()
self.cursor = self.cnx.cursor()
self.precision = {
0: {'float': '{: 0.0f}'.format},
1: {'float': '{: 0.1f}'.format}
}
self.replace_precision = {
0: ' 0',
1: ' 0.0'
}
# Make sure at least 50 days
def interpolate_for_pc(self):
start, end, targets = self.get_pc_details()
dates = self.between_dates(start, end)
try:
targets = (json.loads(targets.decode('utf-8').replace('₂', '2')))
except:
targets = (json.loads(str(targets).replace('₂', '2')))
curves = []
for i in range(2, len(targets)):
curves.append({
"day": int(targets[i][1]['value']),
"temperature": targets[i][2]['value'],
"humidity": targets[i][3]['value'],
"co2": targets[i][4]['value'],
"sound_level": targets[i][5]['value'],
"free_space": targets[i][6]['value'],
"airspeed": targets[i][7]['value'],
"ammonia": targets[i][8]['value'],
"light_intensity": targets[i][9]['value'],
"weight": targets[i][10]['value'],
})
days = len(dates)
cols = 10
df = pd.DataFrame(curves).sort_values(by=['day'])
df = df[df['day'] <= days]
# remember to sort by day cause user can set it in any order
df.set_index('day', inplace=True)
data = np.full([days, cols], np.nan)
target = pd.DataFrame(data,
columns=['day', 'temperature', 'humidity', 'co2', 'sound_level', 'free_space', 'airspeed',
'ammonia', 'light_intensity', 'weight'])
target['day'] = np.arange(days)
target.set_index('day', inplace=True)
res = np.nonzero(df.index.isin(target.index) == True)[0]
df = df.iloc[res, :]
target.loc[df.index] = df
for col in target:
target[col] = pd.to_numeric(target[col], errors='coerce')
interpolate_target = target.interpolate()
interpolate_target['time'] = dates
jsons = interpolate_target.to_json(orient="index")
return jsons
def interpolate_for_graphs(self, observable, data, dates):
if is_base64(data):
parse_data = decompress_json(data)
else:
parse_data = json.loads(data)
parse_dates = json.loads(dates)
target = self.interpolate_for_pc()
target = pd.read_json(target, orient="index")
# formatted_dates = [(datetime.strptime(date.split()[0], "%Y-%m-%d").date()) for date in parse_dates]
formatted_dates = [date.split()[0] for date in parse_dates]
sub_target = [float(target[target['time'] == date].iloc[0][observable]) for date in formatted_dates]
precision = self.get_observable_precision(observable)
np.set_printoptions(threshold=sys.maxsize, precision=2, formatter=self.precision[precision])
diff = np.subtract((sub_target), (parse_data))
return sub_target, np.array2string(diff, separator=', ', max_line_width=100000).replace('\n', '').replace('\'',
'"')
def get_curves(self):
query = "Select targets from round where id ='{}'".format(self.round_id)
self.cursor.execute(query)
targets = self.cursor.fetchone()
return targets
def get_pc_details(self):
query = "Select `from`, `to`, targets from round where id ='{}'".format(self.round_id)
self.cursor.execute(query)
round = self.cursor.fetchone()
return round
def between_dates(self, start, end):
# start_date = datetime.strptime(start, "%Y-%m-%d").date()
# end_date = datetime.strptime(end, "%Y-%m-%d").date()
days = []
diff = end - start
for i in range(diff.days + 1):
day = start + timedelta(days=i)
days.append(str(day))
return days
def get_observable_precision(self, observable):
query = "SELECT observable_precision FROM observable WHERE code_name ='{}'".format(observable)
self.cursor.execute(query)
precision = self.cursor.fetchone()
return precision[0]
if __name__ == '__main__':
interpolate = Interpolation(sys.argv[1])
arg_names = ['file_name', 'round_id', 'observable', 'data', 'dates']
args = dict(zip(arg_names, sys.argv))
Arg_list = collections.namedtuple('Arg_list', arg_names)
args = Arg_list(*(args.get(arg, None) for arg in arg_names))
if (args.observable == None):
print(interpolate.interpolate_for_pc())
else:
try:
target, diff = (interpolate.interpolate_for_graphs(args.observable, args.data, args.dates))
print(target)
print(diff)
except:
print()
print()