Untitled

 avatar
user_3839718
python
a year ago
5.0 kB
3
Indexable
import os
import time
from datetime import datetime

import pandas as pd

from es import ES
from science_calc import compute_specific
from concurrent.futures import ProcessPoolExecutor


DP_CONTAINER_ES_USER = "elastic"
DP_CONTAINER_ES_PASSWORD = "F4R0M4T1ks!Scout_"
INDEX = "round_data"
host = "3.79.200.14:9200"
query = {"query": {"match_all": {}}}



# if data.csv exists, delete it
if not os.path.exists("dev_data.csv"):
    es = ES(container_es_host=host, container_es_user=DP_CONTAINER_ES_USER,
            container_es_password=DP_CONTAINER_ES_PASSWORD)
    print("Going to fetch data from ES")
    res = es.search(index_name=INDEX, query=query)

    data = [x['_source'] for x in res]

    df = pd.DataFrame(data)

    df.to_csv("dev_data.csv", index=False)

# show all columns
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.width', None)

def process_item(item):
    observable_name = ["effective_temperature", "humidex", "digestion_index", "heat_stress_index"]
    results = []

    observable_pivot = item[1].pivot(index="timestamp", columns="observable_name", values="value")
    for o in observable_name:
        try:
            match o:
                case "effective_temperature":
                    columns = ["temperature", "humidity", "airspeed"]
                    result = compute_specific(o, observable_pivot[columns])
                case "humidex":
                    columns = ["temperature", "humidity"]
                    result = compute_specific(o, observable_pivot[columns])
                case "digestion_index":
                    columns = ["bad_droppings", "good_droppings"]
                    result = compute_specific(o, observable_pivot[columns])
                case "heat_stress_index":
                    columns = ["temperature", "humidity"]
                    result = compute_specific(o, observable_pivot[columns])
                case _:
                    result = None

            # if result is not None, or if result is not 0

            if (result is not None) and (result != 0):
                round_id = item[1]["round_id"].values[0]
                robot_id = item[1]["robot_id"].values[0]
                timestamp = item[0][0]

                try:
                    dt_zero_seconds = timestamp.replace(second=0, microsecond=0)
                    timestamp = int(dt_zero_seconds.timestamp())
                    document_id = f"{robot_id}-{o}-{timestamp}"
                    iso_format_timestamp = datetime.utcfromtimestamp(timestamp).isoformat() + "Z"

                    source = {
                        "value": result,
                        "timestamp": iso_format_timestamp,
                        "round_id": round_id,
                        "day_of_production": item[1]["day_of_production"].values[0],
                        "observable_name": o,
                        "x": item[1]["x"].values[0],
                        "y": item[1]["y"].values[0],
                        "z": item[1]["z"].values[0],
                        "round_number": item[1]["round_number"].values[0],
                        "robot_id": robot_id,
                        "created_date": int(time.time() * 1000),
                        "type_of_data": item[1]["type_of_data"].values[0]
                    }
                    results.append({"_source": source,
                                    "_id": document_id,
                                    "_index": "round_data",
                                    "_type": "_doc"})
                except ValueError:
                    continue
        except KeyError:
            continue

    return results


# Main multiprocessing logic
def main():
    cd = ["vubiekq","qna2fij","o2snqde", "ipwxkzq","ajgu3ge","s64h58y","njiklye"]

    df = pd.read_csv("dev_data.csv")
    df = df[df["robot_id"].isin(cd)]
    df["timestamp"] = pd.to_datetime(df["timestamp"], errors='coerce')
    df = df.drop_duplicates(subset=["timestamp", "robot_id", "observable_name"], keep="last")

    grouped_df = df.groupby([pd.Grouper(key='timestamp', freq='1min'), 'robot_id'])
    with ProcessPoolExecutor() as executor:
        # Map the data processing function across the grouped data
        all_results = list(executor.map(process_item, grouped_df))
        # Flatten the list of lists
        all_results = [item for sublist in all_results for item in sublist]

    print(len(all_results))
    # save to json
    df_results = pd.DataFrame(all_results)
    df_results.to_json("all_results.json", orient="records")





if __name__ == "__main__":
    main()
    # insert bulk data to ES
    es = ES(container_es_host=host, container_es_user=DP_CONTAINER_ES_USER,
            container_es_password=DP_CONTAINER_ES_PASSWORD)
    df_results = pd.read_json("all_results.json")
    df_results = df_results.to_dict(orient="records")
    print(len(df_results))
    res = es.save_bulk(data=df_results)
Editor is loading...
Leave a Comment