Untitled

 avatar
unknown
plain_text
10 months ago
12 kB
7
Indexable
import abc
from typing import Tuple
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.colors import ListedColormap
from typing import Tuple
import numpy as np
import os

plt.close()
%matplotlib widget


import json
import pandas as pd
import enum

class HKID(enum.Enum):
    HKQuantityTypeIdentifierStepCount = "HKQuantityTypeIdentifierStepCount"
    HKQuantityTypeIdentifierActiveEnergyBurned = "HKQuantityTypeIdentifierActiveEnergyBurned"
    HKQuantityTypeIdentifierDistanceWalkingRunning = "HKQuantityTypeIdentifierDistanceWalkingRunning"
    HKQuantityTypeIdentifierDistanceCycling = "HKQuantityTypeIdentifierDistanceCycling"
    HKQuantityTypeIdentifierAppleStandTime = "HKQuantityTypeIdentifierAppleStandTime"
    HKQuantityTypeIdentifierHeartRate = "HKQuantityTypeIdentifierHeartRate"
    HKCategoryTypeIdentifierSleepAnalysis = "HKCategoryTypeIdentifierSleepAnalysis"
    HKWorkoutTypeIdentifier = "HKWorkoutTypeIdentifier"
    MotionCollector = "MotionCollector"

class CalculatRobustStatsForHealthkit:

    def calculate(self, df, filepath=None):
        sums = df.groupby("recordId").value.sum(numeric_only=True)
        means = df.groupby("recordId").value.mean(numeric_only=True)
        medians = df.groupby("recordId").value.median(numeric_only=True)
        q25s = self._calculate_quantiles(df, 0.25)
        q75s = self._calculate_quantiles(df, 0.75)
        iqrs = q75s - q25s
        result = {
            "sums": sums.to_dict(),
            "means": means.to_dict(),
            "medians": medians.to_dict(),
            "q25s": q25s.to_dict(),
            "q75s": q75s.to_dict(),
            "iqrs": iqrs.to_dict()
        }
        if filepath is not None:
            with open(filepath + ".json", "w") as f:
                json.dump(result, f)
        return result
        
    def _calculate_quantiles(self, df, q):
        return df.groupby("recordId").value.quantile(q, numeric_only=True)

class CalculateHourlyMeanForHealthkit:
    
    def calculate(self, df, filepath=None):
        if not isinstance(df.index, pd.DatetimeIndex):
            raise ValueError("DataFrame must have a DateTime index.")
        
        hourly_median = df.resample('H').value.mean()
        full_range = pd.date_range(start=df.index.min().normalize(), periods=24, freq='H')
        hourly_median = hourly_median.reindex(full_range, fill_value=pd.NA)

        if filepath is not None:
            hourly_median.to_frame().to_parquet(filepath + ".parquet")

        return hourly_median


class CalculateSleepDuration:

    def calculate(self, df, filepath=None):
        total_seconds_in_bed = df[df["category value"] == "HKCategoryValueSleepAnalysisInBed"].groupby("device").value.sum()
        total_seconds_asleep = df[df["category value"] == "HKCategoryValueSleepAnalysisAsleep"].groupby("device").value.sum()
        
        result = {
            "seconds_in_bed": total_seconds_in_bed.to_dict(),
            "total_seconds_asleep": total_seconds_asleep.to_dict()
        }

        if filepath is not None:
            with open(filepath + ".json", "w") as f:
                json.dump(result, f)

        return result
        


class CalculateWorkoutSeconds:

    def calculate(self, df, filepath=None):
        df = df.copy()
        df["delta"] = (df["endTime"] - df["startTime"]).dt.total_seconds()
        result_df = df[["workoutType", "delta"]]

        # Summing deltas for each workout type
        result_df = result_df.groupby("workoutType", as_index=False).sum()

        # Creating a DataFrame for all workout categories with delta 0
        all_workouts_df = pd.DataFrame(HkWorkout2Plot.WORKOUT_CATEGORIES, columns=["workoutType"])
        all_workouts_df["delta"] = 0

        # Merging with the result_df to ensure all workout types are present
        final_df = pd.merge(all_workouts_df, result_df, on="workoutType", how="left", suffixes=("", "_actual"))
        final_df["delta"] = final_df["delta_actual"].fillna(0)
        final_df = final_df[["workoutType", "delta"]]

        if filepath is not None:
            final_df.to_parquet(filepath + ".parquet")

        return final_df


class CalculateMotionDistribution:

    def calculate(self, df, filepath=None):
        df = df.copy()
        df["delta"] = (df["endTime"] - df["startTime"]).dt.total_seconds()

        # Summing deltas for each activity
        sums = df.groupby("activity", as_index=False)["delta"].sum()

        # Creating a DataFrame for all activities with delta 0
        all_activities_df = pd.DataFrame(Motion2Plot.ACTIVITIES, columns=["activity"])
        all_activities_df["delta"] = 0

        # Merging with the sums to ensure all activities are present
        final_df = pd.merge(all_activities_df, sums, on="activity", how="left", suffixes=("", "_actual"))
        final_df["delta"] = final_df["delta_actual"].fillna(0)
        final_df = final_df[["activity", "delta"]]

        # Ensuring the order of activities is as specified in ACTIVITIES
        final_df = final_df.set_index("activity").reindex(Motion2Plot.ACTIVITIES).reset_index()
        final_df.index = final_df.activity

        result = final_df.delta.to_dict()

        if filepath is not None:
            with open(filepath + ".json", "w") as f:
                json.dump(result, f)

        return result


class HealthKitCalculatorFactory:
    
    @staticmethod
    def get_calculator(hk_id):
        if hk_id == HKID.HKQuantityTypeIdentifierStepCount:
            return CalculatRobustStatsForHealthkit()
        elif hk_id == HKID.HKQuantityTypeIdentifierActiveEnergyBurned:
            return CalculatRobustStatsForHealthkit()
        elif hk_id == HKID.HKQuantityTypeIdentifierDistanceWalkingRunning:
            return CalculatRobustStatsForHealthkit()
        elif hk_id == HKID.HKQuantityTypeIdentifierDistanceCycling:
            return CalculatRobustStatsForHealthkit()
        elif hk_id == HKID.HKQuantityTypeIdentifierAppleStandTime:
            return CalculatRobustStatsForHealthkit()
        elif hk_id == HKID.HKQuantityTypeIdentifierHeartRate:
            return CalculateHourlyMeanForHealthkit()
        elif hk_id == HKID.HKCategoryTypeIdentifierSleepAnalysis:
            return CalculateSleepDuration()
        elif hk_id == HKID.HKWorkoutTypeIdentifier:
            return CalculateWorkoutSeconds()
        elif hk_id == HKID.MotionCollector:
            return CalculateMotionDistribution()
        else:
            raise ValueError(f"Unsupported HKID: {hk_id}")


# Example usage
#c = CalculatRobustStatsForHealthkit()
#c.calculate(sub_2, filepath="/home/users/schuetzn/test/1")

#ch = CalculateHourlyMeanForHealthkit()
#ch.calculate(sub_1, filepath="/home/users/schuetzn/test/2")

#cs = CalculateSleepDuration()
#cs.calculate(sub_sleep, filepath="/home/users/schuetzn/test/3")

#wo = CalculateWorkoutSeconds()
#wo.calculate(sub_workout, filepath="/home/users/schuetzn/test/4")

#wm = CalculateMotionDistribution()
#wm.calculate(sub_motion, filepath="/home/users/schuetzn/test/5")


class TsPreprocessors(abc.ABC):
    
    @abc.abstractmethod
    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        pass


class Identity(TsPreprocessors):

    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        return df


class HkDedupAppVersion(TsPreprocessors):

    def __init__(self, strategy: str = "dominant"):
        self._strategy = strategy

    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        if self._strategy == "dominant":
            return self._dedup_dominant_app_version(df)
        else:
            raise ValueError(f"Unknown strategy '{self._strategy}'")

    def _dedup_dominant_app_version(self, df: pd.DataFrame) -> pd.DataFrame:
        dominant_app_version = df.appVersion.value_counts().index[0]
        return df[df.appVersion == dominant_app_version]


class BaseTs2Plot(abc.ABC):

    HK_QUANTITY_TYPE_IDENTIFIER = None

    _FACTORY = HealthKitCalculatorFactory

    _YLIM = None
    _COLORMAP = None

    _XLABEL = ""
    _YLABEL = ""
    _TITLE = ""

    def __init__(self, preprocessors: Tuple = tuple(), alpha=0.8, color=True, labels=False, legend=True, root_dir="."):
        self._preprocessors = preprocessors
        self.alpha = alpha
        self.color = color
        self._labels = labels
        self._legend = legend
        self._root_dir = root_dir

        if self.HK_QUANTITY_TYPE_IDENTIFIER is None:
            raise NotImplementedError("HK_QUANTITY_TYPE_IDENTIFIER must be defined in any subclass")
        if self._YLIM is None:
            raise NotImplementedError("_YLIM must be defined in any subclass")
        if self._COLORMAP is None:
            raise NotImplementedError("_COLORMAP must be defined in any subclass")

    def _main(self, df: pd.DataFrame, ax: plt.axes) -> None:
        if df.empty:
            return

        df = df.copy()
        for preprocessor in self._preprocessors:
            df = preprocessor.preprocess(df)

        self._calculate_and_save_reconstruction_targets(df, os.path.join(self._root_dir, self.HK_QUANTITY_TYPE_IDENTIFIER.value))

        return self.extract(df, ax)

    @abc.abstractmethod
    def extract(self, df: pd.DataFrame, ax: plt.axes) -> None:
        pass

    def _calculate_and_save_reconstruction_targets(self, df, filepath):
        calculator = self._FACTORY.get_calculator(self.HK_QUANTITY_TYPE_IDENTIFIER)
        calculator.calculate(df=df, filepath=filepath)

    def _add_labels(self, ax: plt.axes):
        ax.set_title(self._TITLE)
        ax.set_ylabel(self._YLABEL)
        ax.set_xlabel(self._XLABEL)

    __call__ = _main


class HkBarChart(BaseTs2Plot):

    HK_QUANTITY_TYPE_IDENTIFIER = None

    _YLIM = None
    _COLORMAP = None

    _XLABEL = ""
    _YLABEL = ""
    _TITLE = ""

    def extract(self, df: pd.DataFrame, ax: plt.axes) -> None:
        # Get the top 5 devices
        devices = df.recordId.value_counts()[:5]
        devices = devices.index

        # Sort the devices alphabetically
        devices = sorted(devices)

        # Prepare colors for each device
        device_colors = {device: self._COLORMAP(i) for i, device in enumerate(devices)}

        handles = []
        for device in devices:
            sub = df[df.recordId == device]
            color = device_colors[device]
            for i, (_, row) in enumerate(sub.iterrows()):
                start_time, end_time = row.startTime, row.endTime
                # Get start_time as seconds since midnight
                start_second = start_time.hour*60*60 + start_time.minute*60 + start_time.second

                duration = (end_time - start_time).total_seconds()
                if duration == 0:
                    continue
                
                # Plot and create a handle for the legend
                height = row["value"] / duration
                height = np.clip(height, *self._YLIM)
                if self.color:
                    patch = ax.broken_barh([(start_second, duration)], (0, height), facecolors=color, alpha=self.alpha)
                else:
                    patch = ax.broken_barh([(start_second, duration)], (0, height), alpha=self.alpha)
            handles.append(patch)  # Append the last patch of each device to the list for the legend

        # Adding legend with device names
        ax.set_ylim(*self._YLIM)
        ax.set_xlim(0, 24*60*60)
        if self._legend:
            ax.legend(handles, devices, loc='upper right', title="Devices")
        if self._labels:
            self._add_labels(ax)        


class HkSteps2Plot(HkBarChart):

    HK_QUANTITY_TYPE_IDENTIFIER = HKID.HKQuantityTypeIdentifierStepCount

    _YLIM = (0, 5)  # 0-5 Steps per second

    # Blue shades for devices
    _COLORMAP = ListedColormap(['#1A237E', '#304FFE','#3949AB', '#7986CB', '#8C9EFF'])

    _XLABEL = "Time (seconds from midnight)"
    _YLABEL = "Steps per second (steps/s)"
    _TITLE = "Step Count"
Editor is loading...
Leave a Comment