Untitled
user_3839718
python
2 years ago
5.0 kB
7
Indexable
import os
import time
from datetime import datetime
import pandas as pd
from es import ES
from science_calc import compute_specific
from concurrent.futures import ProcessPoolExecutor
DP_CONTAINER_ES_USER = "elastic"
DP_CONTAINER_ES_PASSWORD = "F4R0M4T1ks!Scout_"
INDEX = "round_data"
host = "3.79.200.14:9200"
query = {"query": {"match_all": {}}}
# if data.csv exists, delete it
if not os.path.exists("dev_data.csv"):
es = ES(container_es_host=host, container_es_user=DP_CONTAINER_ES_USER,
container_es_password=DP_CONTAINER_ES_PASSWORD)
print("Going to fetch data from ES")
res = es.search(index_name=INDEX, query=query)
data = [x['_source'] for x in res]
df = pd.DataFrame(data)
df.to_csv("dev_data.csv", index=False)
# show all columns
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.width', None)
def process_item(item):
observable_name = ["effective_temperature", "humidex", "digestion_index", "heat_stress_index"]
results = []
observable_pivot = item[1].pivot(index="timestamp", columns="observable_name", values="value")
for o in observable_name:
try:
match o:
case "effective_temperature":
columns = ["temperature", "humidity", "airspeed"]
result = compute_specific(o, observable_pivot[columns])
case "humidex":
columns = ["temperature", "humidity"]
result = compute_specific(o, observable_pivot[columns])
case "digestion_index":
columns = ["bad_droppings", "good_droppings"]
result = compute_specific(o, observable_pivot[columns])
case "heat_stress_index":
columns = ["temperature", "humidity"]
result = compute_specific(o, observable_pivot[columns])
case _:
result = None
# if result is not None, or if result is not 0
if (result is not None) and (result != 0):
round_id = item[1]["round_id"].values[0]
robot_id = item[1]["robot_id"].values[0]
timestamp = item[0][0]
try:
dt_zero_seconds = timestamp.replace(second=0, microsecond=0)
timestamp = int(dt_zero_seconds.timestamp())
document_id = f"{robot_id}-{o}-{timestamp}"
iso_format_timestamp = datetime.utcfromtimestamp(timestamp).isoformat() + "Z"
source = {
"value": result,
"timestamp": iso_format_timestamp,
"round_id": round_id,
"day_of_production": item[1]["day_of_production"].values[0],
"observable_name": o,
"x": item[1]["x"].values[0],
"y": item[1]["y"].values[0],
"z": item[1]["z"].values[0],
"round_number": item[1]["round_number"].values[0],
"robot_id": robot_id,
"created_date": int(time.time() * 1000),
"type_of_data": item[1]["type_of_data"].values[0]
}
results.append({"_source": source,
"_id": document_id,
"_index": "round_data",
"_type": "_doc"})
except ValueError:
continue
except KeyError:
continue
return results
# Main multiprocessing logic
def main():
cd = ["vubiekq","qna2fij","o2snqde", "ipwxkzq","ajgu3ge","s64h58y","njiklye"]
df = pd.read_csv("dev_data.csv")
df = df[df["robot_id"].isin(cd)]
df["timestamp"] = pd.to_datetime(df["timestamp"], errors='coerce')
df = df.drop_duplicates(subset=["timestamp", "robot_id", "observable_name"], keep="last")
grouped_df = df.groupby([pd.Grouper(key='timestamp', freq='1min'), 'robot_id'])
with ProcessPoolExecutor() as executor:
# Map the data processing function across the grouped data
all_results = list(executor.map(process_item, grouped_df))
# Flatten the list of lists
all_results = [item for sublist in all_results for item in sublist]
print(len(all_results))
# save to json
df_results = pd.DataFrame(all_results)
df_results.to_json("all_results.json", orient="records")
if __name__ == "__main__":
main()
# insert bulk data to ES
es = ES(container_es_host=host, container_es_user=DP_CONTAINER_ES_USER,
container_es_password=DP_CONTAINER_ES_PASSWORD)
df_results = pd.read_json("all_results.json")
df_results = df_results.to_dict(orient="records")
print(len(df_results))
res = es.save_bulk(data=df_results)Editor is loading...
Leave a Comment