file1

mail@pastecode.io avatar
unknown
python
2 years ago
11 kB
7
Indexable
# TODO(alaurens): dataclasses are super useful and you should use them
import dataclasses
# TODO(alaurens): don't do this just import datetime and then use datetime.date
from datetime import date, datetime
import pandas as pd
import pymongo


@dataclasses.dataclass # (frozen=True) this makes it immutable
class HeartRateDataPoint:
  """Structure to hold a heart rate data point.
  
  Attributes:
    date: Date of the measurment.
    time: Time of the measurment.
    measurement: Heart rate value measured.
  """
  date: datetime.date
  time: datetime.time
  measurement: int
  
  
hr = HeartRateDataPoint()



  def transform_for_panda(self):
    return {
        'date': self.date,
        'time': self.time,
        'measurement': self.measurement
    }

class MongoClientDataframes:

  # TODO(alaurens): Make sure to type your functions
  def __init__(self, connection: str, database, collection):
    # Connect to the MongoDB database and collection specified by the arguments
    try:
      self._mongo_client = pymongo.MongoClient(connection_string)
      self._db = self.mongo_client[database]
      self._collection = self.db[collection]
    except Exception as e:
      # TODO(alaurens): again here why do you need to do this? The comment does not explain why
      # If there is an error, set the connection variables to None and raise an exception
      self.mongo_client = None
      self.db = None
      self.collection = None
      raise Exception(e)

  def dataframe_heart_rate(self, start_date: datetime.date = None, end_date=None):
    """ksdjfnklsjn"""
    # If start_date and end_date are not specified, set them to today's date
    # TODO(alaurens): why don't you do similar things in the other file? Here you 
    # are using the datetime library which is great
    start_date = start_date or datetime.now().date()
    end_date = end_date or datetime.now().date()

    # Convert the start and end dates to datetime objects
    start_datetime = datetime.combine(start_date, datetime.min.time())
    end_datetime = datetime.combine(end_date, datetime.max.time())

    # Format the start and end dates as strings in "YYYY-MM-DD" format
    date_format = '%Y-%m-%d'
    start_date_string = start_datetime.strftime(date_format)
    end_date_string = end_datetime.strftime(date_format)

    # TODO(alaurens): Here you really only need the first comment and even
    # that one might be too much
    # Query the MongoDB collection for heart rate data between the start and end dates
    query = {
        'type': 'heart',  # Select documents with "type" equal to "heart"
        'date': {  # Select documents where "date" is between the start and end dates
            '$gte': start_date_string,  # Greater than or equal to start date
            '$lte': end_date_string,  # Less than or equal to end date
        },
    }
    results = self.collection.find(query)

    # TODO(alaurens): You need a schema that allows you to pass you
    # data around in your code more easily
    # Extract heart rate data from the MongoDB documents and store it as a list of dictionaries
    data = [
        HeartRateDataPoint(
            date=result['date'],  # Date of the document
            time=datetime.time(item['time']),  # Time of the heart rate measurement
            measurment=int(item['value']),  # Heart rate value
        )
        for result in results
        for item in result[
            'heartIntraday'
        ]  # Loop through the heart rate measurements for each document
    ]

    # Create a pandas dataframe from the list of dictionaries
    df = pd.DataFrame(data)

    # Save the dataframe to a CSV file with a descriptive file name
    # filename = f"heart_rate_data_{start_date_string}_{end_date_string}.csv"
    # df.to_csv(filename, index=False)

    # Return the pandas dataframe
    return df

  def my_cool_function(self, heart_rate_measurement: HeartRateDataPoint):
    dictionnary = heart_rate_measurement.transform_for_panda()
    
  # TODO(alaurens): I'm a bit confused as to how this function is different from the previous one
  def dataframe_heart_summary(self, start_date=None, end_date=None):
    # If start_date and end_date are not specified, set them to today's date
    # TODO(alaurens): Here you repeat this code maybe you could simply make a function out of this?
    start_date = start_date or datetime.now().date()
    end_date = end_date or datetime.now().date()

    # Convert the start and end dates to datetime objects
    start_datetime = datetime.combine(start_date, datetime.min.time())
    end_datetime = datetime.combine(end_date, datetime.max.time())

    # Format the start and end dates as strings in "YYYY-MM-DD" format
    date_format = '%Y-%m-%d'
    start_date_string = start_datetime.strftime(date_format)
    end_date_string = end_datetime.strftime(date_format)

    # Query the MongoDB collection for heart rate data between the start and end dates
    query = {
        'type': 'heart',  # Select documents with "type" equal to "heart"
        'date': {  # Select documents where "date" is between the start and end dates
            '$gte': start_date_string,  # Greater than or equal to start date
            '$lte': end_date_string,  # Less than or equal to end date
        },
    }
    results = self.collection.find(query)

    # Extract heart rate data from the MongoDB documents and store it as a list of dictionaries
    data = [
        {
            'date': result['date'],  # Date of the document
            'caloriesOut': item[
                'caloriesOut'
            ],  # Number calories burned with the specified heart rate zone
            'max': item['max'],  # Maximum range for the heart rate zone
            'min': item['min'],  # Minimum range for the heart rate zone
            'minutes': item[
                'minutes'
            ],  # Number minutes withing the specified heart rate zone
            'name': item['name'],  # Name of the heart rate zone
        }
        for result in results
        for item in result[
            'heartRateZones'
        ]  # Loop through the heart rate measurements for each document
    ]

    # Create a pandas dataframe from the list of dictionaries
    df = pd.DataFrame(data)

    # Save the dataframe to a CSV file with a descriptive file name
    # filename = f"heart_rate_summary_{start_date_string}_{end_date_string}.csv"
    # df.to_csv(filename, index=False)

    # Return the pandas dataframe
    return df

  def dataframe_heart_resting_heart_rate(self, start_date=None, end_date=None):
    # If start_date and end_date are not specified, set them to today's date
    start_date = start_date or datetime.now().date()
    end_date = end_date or datetime.now().date()

    # Convert the start and end dates to datetime objects
    start_datetime = datetime.combine(start_date, datetime.min.time())
    end_datetime = datetime.combine(end_date, datetime.max.time())

    # Format the start and end dates as strings in "YYYY-MM-DD" format
    date_format = '%Y-%m-%d'
    start_date_string = start_datetime.strftime(date_format)
    end_date_string = end_datetime.strftime(date_format)

    # Query the MongoDB collection for heart rate data between the start and end dates
    query = {
        'type': 'heart',  # Select documents with "type" equal to "heart"
        'date': {  # Select documents where "date" is between the start and end dates
            '$gte': start_date_string,  # Greater than or equal to start date
            '$lte': end_date_string,  # Less than or equal to end date
        },
    }
    results = self.collection.find(query)

    # Extract heart rate data from the MongoDB documents and store it as a list of dictionaries
    data = []
    for result in results:
      if 'restingHeartrate' in result:
        data.append({
            'date': result['date'],
            'restingHeartRate': result[
                'restingHeartrate'
            ],  # Resting heart rate value for the day (daily)
        })

    # Create a pandas dataframe from the list of dictionaries
    df = pd.DataFrame(data)

    # Save the dataframe to a CSV file with a descriptive file name
    # filename = f"heart_resting_heart_rate_{start_date_string}_{end_date_string}.csv"
    # df.to_csv(filename, index=False)

    # Return the pandas dataframe
    return df

  def dataframe_hrv(self, start_date=None, end_date=None):
    # If start_date and end_date are not specified, set them to today's date
    start_date = start_date or datetime.now().date()
    end_date = end_date or datetime.now().date()

    # Convert the start and end dates to datetime objects
    start_datetime = datetime.combine(start_date, datetime.min.time())
    end_datetime = datetime.combine(end_date, datetime.max.time())

    # Format the start and end dates as strings in "YYYY-MM-DD" format
    date_format = '%Y-%m-%d'
    start_date_string = start_datetime.strftime(date_format)
    end_date_string = end_datetime.strftime(date_format)

    # Query the MongoDB collection for heart rate data between the start and end dates
    query = {
        'type': 'hrv',  # Select documents with "type" equal to "heart"
        'date': {  # Select documents where "date" is between the start and end dates
            '$gte': start_date_string,  # Greater than or equal to start date
            '$lte': end_date_string,  # Less than or equal to end date
        },
    }
    results = self.collection.find(query)

    # Extract heart rate data from the MongoDB documents and store it as a list of dictionaries
    data = []
    for result in results:
      data.append({
          'date': result['date'],
          'daily_rmssd': result[
              'dailyRmssd'
          ],  # The Root Mean Square of Successive Differences (RMSSD) between heart beats. It measures short-term variability in the user’s daily heart rate in milliseconds (ms).
          'deep_rmssd': result[
              'deepRmssd'
          ],  # The Root Mean Square of Successive Differences (RMSSD) between heart beats. It measures short-term variability in the user’s heart rate while in deep sleep, in milliseconds (ms).
      })

    # Create a pandas dataframe from the list of dictionaries
    df = pd.DataFrame(data)

    # Save the dataframe to a CSV file with a descriptive file name
    # filename = f"heart_hrv_{start_date_string}_{end_date_string}.csv"
    # df.to_csv(filename, index=False)

    # Return the pandas dataframe
    return df


# EXAMPLE CODE
# client = MongoClientDataframes(
#     connection_string = "mongodb://localhost:27017/",
#     database="local",
#     collection="fitbit",
# )
# startTime = date(year = 2023, month = 4, day = 20)
# endTime =  date(year = 2023, month = 4, day = 20)
# client.dataframe_heart_rate(start_date=startTime)
# client.dataframe_heart_summary(start_date=startTime)
# client.dataframe_heart_resting_heart_rate(start_date=startTime)
# client.dataframe_hrv(start_date=startTime)