Untitled
user_3839718
python
a year ago
5.0 kB
3
Indexable
import os import time from datetime import datetime import pandas as pd from es import ES from science_calc import compute_specific from concurrent.futures import ProcessPoolExecutor DP_CONTAINER_ES_USER = "elastic" DP_CONTAINER_ES_PASSWORD = "F4R0M4T1ks!Scout_" INDEX = "round_data" host = "3.79.200.14:9200" query = {"query": {"match_all": {}}} # if data.csv exists, delete it if not os.path.exists("dev_data.csv"): es = ES(container_es_host=host, container_es_user=DP_CONTAINER_ES_USER, container_es_password=DP_CONTAINER_ES_PASSWORD) print("Going to fetch data from ES") res = es.search(index_name=INDEX, query=query) data = [x['_source'] for x in res] df = pd.DataFrame(data) df.to_csv("dev_data.csv", index=False) # show all columns pd.set_option('display.max_columns', None) pd.set_option('display.max_rows', None) pd.set_option('display.max_colwidth', None) pd.set_option('display.width', None) def process_item(item): observable_name = ["effective_temperature", "humidex", "digestion_index", "heat_stress_index"] results = [] observable_pivot = item[1].pivot(index="timestamp", columns="observable_name", values="value") for o in observable_name: try: match o: case "effective_temperature": columns = ["temperature", "humidity", "airspeed"] result = compute_specific(o, observable_pivot[columns]) case "humidex": columns = ["temperature", "humidity"] result = compute_specific(o, observable_pivot[columns]) case "digestion_index": columns = ["bad_droppings", "good_droppings"] result = compute_specific(o, observable_pivot[columns]) case "heat_stress_index": columns = ["temperature", "humidity"] result = compute_specific(o, observable_pivot[columns]) case _: result = None # if result is not None, or if result is not 0 if (result is not None) and (result != 0): round_id = item[1]["round_id"].values[0] robot_id = item[1]["robot_id"].values[0] timestamp = item[0][0] try: dt_zero_seconds = timestamp.replace(second=0, microsecond=0) timestamp = int(dt_zero_seconds.timestamp()) document_id = f"{robot_id}-{o}-{timestamp}" iso_format_timestamp = datetime.utcfromtimestamp(timestamp).isoformat() + "Z" source = { "value": result, "timestamp": iso_format_timestamp, "round_id": round_id, "day_of_production": item[1]["day_of_production"].values[0], "observable_name": o, "x": item[1]["x"].values[0], "y": item[1]["y"].values[0], "z": item[1]["z"].values[0], "round_number": item[1]["round_number"].values[0], "robot_id": robot_id, "created_date": int(time.time() * 1000), "type_of_data": item[1]["type_of_data"].values[0] } results.append({"_source": source, "_id": document_id, "_index": "round_data", "_type": "_doc"}) except ValueError: continue except KeyError: continue return results # Main multiprocessing logic def main(): cd = ["vubiekq","qna2fij","o2snqde", "ipwxkzq","ajgu3ge","s64h58y","njiklye"] df = pd.read_csv("dev_data.csv") df = df[df["robot_id"].isin(cd)] df["timestamp"] = pd.to_datetime(df["timestamp"], errors='coerce') df = df.drop_duplicates(subset=["timestamp", "robot_id", "observable_name"], keep="last") grouped_df = df.groupby([pd.Grouper(key='timestamp', freq='1min'), 'robot_id']) with ProcessPoolExecutor() as executor: # Map the data processing function across the grouped data all_results = list(executor.map(process_item, grouped_df)) # Flatten the list of lists all_results = [item for sublist in all_results for item in sublist] print(len(all_results)) # save to json df_results = pd.DataFrame(all_results) df_results.to_json("all_results.json", orient="records") if __name__ == "__main__": main() # insert bulk data to ES es = ES(container_es_host=host, container_es_user=DP_CONTAINER_ES_USER, container_es_password=DP_CONTAINER_ES_PASSWORD) df_results = pd.read_json("all_results.json") df_results = df_results.to_dict(orient="records") print(len(df_results)) res = es.save_bulk(data=df_results)
Editor is loading...
Leave a Comment