Untitled

mail@pastecode.io avatar
unknown
plain_text
a month ago
12 kB
3
Indexable
Never
import abc
from typing import Tuple
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.colors import ListedColormap
from typing import Tuple
import numpy as np
import os

plt.close()
%matplotlib widget


import json
import pandas as pd
import enum

class HKID(enum.Enum):
    HKQuantityTypeIdentifierStepCount = "HKQuantityTypeIdentifierStepCount"
    HKQuantityTypeIdentifierActiveEnergyBurned = "HKQuantityTypeIdentifierActiveEnergyBurned"
    HKQuantityTypeIdentifierDistanceWalkingRunning = "HKQuantityTypeIdentifierDistanceWalkingRunning"
    HKQuantityTypeIdentifierDistanceCycling = "HKQuantityTypeIdentifierDistanceCycling"
    HKQuantityTypeIdentifierAppleStandTime = "HKQuantityTypeIdentifierAppleStandTime"
    HKQuantityTypeIdentifierHeartRate = "HKQuantityTypeIdentifierHeartRate"
    HKCategoryTypeIdentifierSleepAnalysis = "HKCategoryTypeIdentifierSleepAnalysis"
    HKWorkoutTypeIdentifier = "HKWorkoutTypeIdentifier"
    MotionCollector = "MotionCollector"

class CalculatRobustStatsForHealthkit:

    def calculate(self, df, filepath=None):
        sums = df.groupby("recordId").value.sum(numeric_only=True)
        means = df.groupby("recordId").value.mean(numeric_only=True)
        medians = df.groupby("recordId").value.median(numeric_only=True)
        q25s = self._calculate_quantiles(df, 0.25)
        q75s = self._calculate_quantiles(df, 0.75)
        iqrs = q75s - q25s
        result = {
            "sums": sums.to_dict(),
            "means": means.to_dict(),
            "medians": medians.to_dict(),
            "q25s": q25s.to_dict(),
            "q75s": q75s.to_dict(),
            "iqrs": iqrs.to_dict()
        }
        if filepath is not None:
            with open(filepath + ".json", "w") as f:
                json.dump(result, f)
        return result
        
    def _calculate_quantiles(self, df, q):
        return df.groupby("recordId").value.quantile(q, numeric_only=True)

class CalculateHourlyMeanForHealthkit:
    
    def calculate(self, df, filepath=None):
        if not isinstance(df.index, pd.DatetimeIndex):
            raise ValueError("DataFrame must have a DateTime index.")
        
        hourly_median = df.resample('H').value.mean()
        full_range = pd.date_range(start=df.index.min().normalize(), periods=24, freq='H')
        hourly_median = hourly_median.reindex(full_range, fill_value=pd.NA)

        if filepath is not None:
            hourly_median.to_frame().to_parquet(filepath + ".parquet")

        return hourly_median


class CalculateSleepDuration:

    def calculate(self, df, filepath=None):
        total_seconds_in_bed = df[df["category value"] == "HKCategoryValueSleepAnalysisInBed"].groupby("device").value.sum()
        total_seconds_asleep = df[df["category value"] == "HKCategoryValueSleepAnalysisAsleep"].groupby("device").value.sum()
        
        result = {
            "seconds_in_bed": total_seconds_in_bed.to_dict(),
            "total_seconds_asleep": total_seconds_asleep.to_dict()
        }

        if filepath is not None:
            with open(filepath + ".json", "w") as f:
                json.dump(result, f)

        return result
        


class CalculateWorkoutSeconds:

    def calculate(self, df, filepath=None):
        df = df.copy()
        df["delta"] = (df["endTime"] - df["startTime"]).dt.total_seconds()
        result_df = df[["workoutType", "delta"]]

        # Summing deltas for each workout type
        result_df = result_df.groupby("workoutType", as_index=False).sum()

        # Creating a DataFrame for all workout categories with delta 0
        all_workouts_df = pd.DataFrame(HkWorkout2Plot.WORKOUT_CATEGORIES, columns=["workoutType"])
        all_workouts_df["delta"] = 0

        # Merging with the result_df to ensure all workout types are present
        final_df = pd.merge(all_workouts_df, result_df, on="workoutType", how="left", suffixes=("", "_actual"))
        final_df["delta"] = final_df["delta_actual"].fillna(0)
        final_df = final_df[["workoutType", "delta"]]

        if filepath is not None:
            final_df.to_parquet(filepath + ".parquet")

        return final_df


class CalculateMotionDistribution:

    def calculate(self, df, filepath=None):
        df = df.copy()
        df["delta"] = (df["endTime"] - df["startTime"]).dt.total_seconds()

        # Summing deltas for each activity
        sums = df.groupby("activity", as_index=False)["delta"].sum()

        # Creating a DataFrame for all activities with delta 0
        all_activities_df = pd.DataFrame(Motion2Plot.ACTIVITIES, columns=["activity"])
        all_activities_df["delta"] = 0

        # Merging with the sums to ensure all activities are present
        final_df = pd.merge(all_activities_df, sums, on="activity", how="left", suffixes=("", "_actual"))
        final_df["delta"] = final_df["delta_actual"].fillna(0)
        final_df = final_df[["activity", "delta"]]

        # Ensuring the order of activities is as specified in ACTIVITIES
        final_df = final_df.set_index("activity").reindex(Motion2Plot.ACTIVITIES).reset_index()
        final_df.index = final_df.activity

        result = final_df.delta.to_dict()

        if filepath is not None:
            with open(filepath + ".json", "w") as f:
                json.dump(result, f)

        return result


class HealthKitCalculatorFactory:
    
    @staticmethod
    def get_calculator(hk_id):
        if hk_id == HKID.HKQuantityTypeIdentifierStepCount:
            return CalculatRobustStatsForHealthkit()
        elif hk_id == HKID.HKQuantityTypeIdentifierActiveEnergyBurned:
            return CalculatRobustStatsForHealthkit()
        elif hk_id == HKID.HKQuantityTypeIdentifierDistanceWalkingRunning:
            return CalculatRobustStatsForHealthkit()
        elif hk_id == HKID.HKQuantityTypeIdentifierDistanceCycling:
            return CalculatRobustStatsForHealthkit()
        elif hk_id == HKID.HKQuantityTypeIdentifierAppleStandTime:
            return CalculatRobustStatsForHealthkit()
        elif hk_id == HKID.HKQuantityTypeIdentifierHeartRate:
            return CalculateHourlyMeanForHealthkit()
        elif hk_id == HKID.HKCategoryTypeIdentifierSleepAnalysis:
            return CalculateSleepDuration()
        elif hk_id == HKID.HKWorkoutTypeIdentifier:
            return CalculateWorkoutSeconds()
        elif hk_id == HKID.MotionCollector:
            return CalculateMotionDistribution()
        else:
            raise ValueError(f"Unsupported HKID: {hk_id}")


# Example usage
#c = CalculatRobustStatsForHealthkit()
#c.calculate(sub_2, filepath="/home/users/schuetzn/test/1")

#ch = CalculateHourlyMeanForHealthkit()
#ch.calculate(sub_1, filepath="/home/users/schuetzn/test/2")

#cs = CalculateSleepDuration()
#cs.calculate(sub_sleep, filepath="/home/users/schuetzn/test/3")

#wo = CalculateWorkoutSeconds()
#wo.calculate(sub_workout, filepath="/home/users/schuetzn/test/4")

#wm = CalculateMotionDistribution()
#wm.calculate(sub_motion, filepath="/home/users/schuetzn/test/5")


class TsPreprocessors(abc.ABC):
    
    @abc.abstractmethod
    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        pass


class Identity(TsPreprocessors):

    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        return df


class HkDedupAppVersion(TsPreprocessors):

    def __init__(self, strategy: str = "dominant"):
        self._strategy = strategy

    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        if self._strategy == "dominant":
            return self._dedup_dominant_app_version(df)
        else:
            raise ValueError(f"Unknown strategy '{self._strategy}'")

    def _dedup_dominant_app_version(self, df: pd.DataFrame) -> pd.DataFrame:
        dominant_app_version = df.appVersion.value_counts().index[0]
        return df[df.appVersion == dominant_app_version]


class BaseTs2Plot(abc.ABC):

    HK_QUANTITY_TYPE_IDENTIFIER = None

    _FACTORY = HealthKitCalculatorFactory

    _YLIM = None
    _COLORMAP = None

    _XLABEL = ""
    _YLABEL = ""
    _TITLE = ""

    def __init__(self, preprocessors: Tuple = tuple(), alpha=0.8, color=True, labels=False, legend=True, root_dir="."):
        self._preprocessors = preprocessors
        self.alpha = alpha
        self.color = color
        self._labels = labels
        self._legend = legend
        self._root_dir = root_dir

        if self.HK_QUANTITY_TYPE_IDENTIFIER is None:
            raise NotImplementedError("HK_QUANTITY_TYPE_IDENTIFIER must be defined in any subclass")
        if self._YLIM is None:
            raise NotImplementedError("_YLIM must be defined in any subclass")
        if self._COLORMAP is None:
            raise NotImplementedError("_COLORMAP must be defined in any subclass")

    def _main(self, df: pd.DataFrame, ax: plt.axes) -> None:
        if df.empty:
            return

        df = df.copy()
        for preprocessor in self._preprocessors:
            df = preprocessor.preprocess(df)

        self._calculate_and_save_reconstruction_targets(df, os.path.join(self._root_dir, self.HK_QUANTITY_TYPE_IDENTIFIER.value))

        return self.extract(df, ax)

    @abc.abstractmethod
    def extract(self, df: pd.DataFrame, ax: plt.axes) -> None:
        pass

    def _calculate_and_save_reconstruction_targets(self, df, filepath):
        calculator = self._FACTORY.get_calculator(self.HK_QUANTITY_TYPE_IDENTIFIER)
        calculator.calculate(df=df, filepath=filepath)

    def _add_labels(self, ax: plt.axes):
        ax.set_title(self._TITLE)
        ax.set_ylabel(self._YLABEL)
        ax.set_xlabel(self._XLABEL)

    __call__ = _main


class HkBarChart(BaseTs2Plot):

    HK_QUANTITY_TYPE_IDENTIFIER = None

    _YLIM = None
    _COLORMAP = None

    _XLABEL = ""
    _YLABEL = ""
    _TITLE = ""

    def extract(self, df: pd.DataFrame, ax: plt.axes) -> None:
        # Get the top 5 devices
        devices = df.recordId.value_counts()[:5]
        devices = devices.index

        # Sort the devices alphabetically
        devices = sorted(devices)

        # Prepare colors for each device
        device_colors = {device: self._COLORMAP(i) for i, device in enumerate(devices)}

        handles = []
        for device in devices:
            sub = df[df.recordId == device]
            color = device_colors[device]
            for i, (_, row) in enumerate(sub.iterrows()):
                start_time, end_time = row.startTime, row.endTime
                # Get start_time as seconds since midnight
                start_second = start_time.hour*60*60 + start_time.minute*60 + start_time.second

                duration = (end_time - start_time).total_seconds()
                if duration == 0:
                    continue
                
                # Plot and create a handle for the legend
                height = row["value"] / duration
                height = np.clip(height, *self._YLIM)
                if self.color:
                    patch = ax.broken_barh([(start_second, duration)], (0, height), facecolors=color, alpha=self.alpha)
                else:
                    patch = ax.broken_barh([(start_second, duration)], (0, height), alpha=self.alpha)
            handles.append(patch)  # Append the last patch of each device to the list for the legend

        # Adding legend with device names
        ax.set_ylim(*self._YLIM)
        ax.set_xlim(0, 24*60*60)
        if self._legend:
            ax.legend(handles, devices, loc='upper right', title="Devices")
        if self._labels:
            self._add_labels(ax)        


class HkSteps2Plot(HkBarChart):

    HK_QUANTITY_TYPE_IDENTIFIER = HKID.HKQuantityTypeIdentifierStepCount

    _YLIM = (0, 5)  # 0-5 Steps per second

    # Blue shades for devices
    _COLORMAP = ListedColormap(['#1A237E', '#304FFE','#3949AB', '#7986CB', '#8C9EFF'])

    _XLABEL = "Time (seconds from midnight)"
    _YLABEL = "Steps per second (steps/s)"
    _TITLE = "Step Count"
Leave a Comment