# TODO(alaurens): dataclasses are super useful and you should use them
import dataclasses
# TODO(alaurens): don't do this just import datetime and then use datetime.date
from datetime import date, datetime
import pandas as pd
import pymongo
@dataclasses.dataclass # (frozen=True) this makes it immutable
class HeartRateDataPoint:
"""Structure to hold a heart rate data point.
Attributes:
date: Date of the measurment.
time: Time of the measurment.
measurement: Heart rate value measured.
"""
date: datetime.date
time: datetime.time
measurement: int
hr = HeartRateDataPoint()
def transform_for_panda(self):
return {
'date': self.date,
'time': self.time,
'measurement': self.measurement
}
class MongoClientDataframes:
# TODO(alaurens): Make sure to type your functions
def __init__(self, connection: str, database, collection):
# Connect to the MongoDB database and collection specified by the arguments
try:
self._mongo_client = pymongo.MongoClient(connection_string)
self._db = self.mongo_client[database]
self._collection = self.db[collection]
except Exception as e:
# TODO(alaurens): again here why do you need to do this? The comment does not explain why
# If there is an error, set the connection variables to None and raise an exception
self.mongo_client = None
self.db = None
self.collection = None
raise Exception(e)
def dataframe_heart_rate(self, start_date: datetime.date = None, end_date=None):
"""ksdjfnklsjn"""
# If start_date and end_date are not specified, set them to today's date
# TODO(alaurens): why don't you do similar things in the other file? Here you
# are using the datetime library which is great
start_date = start_date or datetime.now().date()
end_date = end_date or datetime.now().date()
# Convert the start and end dates to datetime objects
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
# Format the start and end dates as strings in "YYYY-MM-DD" format
date_format = '%Y-%m-%d'
start_date_string = start_datetime.strftime(date_format)
end_date_string = end_datetime.strftime(date_format)
# TODO(alaurens): Here you really only need the first comment and even
# that one might be too much
# Query the MongoDB collection for heart rate data between the start and end dates
query = {
'type': 'heart', # Select documents with "type" equal to "heart"
'date': { # Select documents where "date" is between the start and end dates
'$gte': start_date_string, # Greater than or equal to start date
'$lte': end_date_string, # Less than or equal to end date
},
}
results = self.collection.find(query)
# TODO(alaurens): You need a schema that allows you to pass you
# data around in your code more easily
# Extract heart rate data from the MongoDB documents and store it as a list of dictionaries
data = [
HeartRateDataPoint(
date=result['date'], # Date of the document
time=datetime.time(item['time']), # Time of the heart rate measurement
measurment=int(item['value']), # Heart rate value
)
for result in results
for item in result[
'heartIntraday'
] # Loop through the heart rate measurements for each document
]
# Create a pandas dataframe from the list of dictionaries
df = pd.DataFrame(data)
# Save the dataframe to a CSV file with a descriptive file name
# filename = f"heart_rate_data_{start_date_string}_{end_date_string}.csv"
# df.to_csv(filename, index=False)
# Return the pandas dataframe
return df
def my_cool_function(self, heart_rate_measurement: HeartRateDataPoint):
dictionnary = heart_rate_measurement.transform_for_panda()
# TODO(alaurens): I'm a bit confused as to how this function is different from the previous one
def dataframe_heart_summary(self, start_date=None, end_date=None):
# If start_date and end_date are not specified, set them to today's date
# TODO(alaurens): Here you repeat this code maybe you could simply make a function out of this?
start_date = start_date or datetime.now().date()
end_date = end_date or datetime.now().date()
# Convert the start and end dates to datetime objects
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
# Format the start and end dates as strings in "YYYY-MM-DD" format
date_format = '%Y-%m-%d'
start_date_string = start_datetime.strftime(date_format)
end_date_string = end_datetime.strftime(date_format)
# Query the MongoDB collection for heart rate data between the start and end dates
query = {
'type': 'heart', # Select documents with "type" equal to "heart"
'date': { # Select documents where "date" is between the start and end dates
'$gte': start_date_string, # Greater than or equal to start date
'$lte': end_date_string, # Less than or equal to end date
},
}
results = self.collection.find(query)
# Extract heart rate data from the MongoDB documents and store it as a list of dictionaries
data = [
{
'date': result['date'], # Date of the document
'caloriesOut': item[
'caloriesOut'
], # Number calories burned with the specified heart rate zone
'max': item['max'], # Maximum range for the heart rate zone
'min': item['min'], # Minimum range for the heart rate zone
'minutes': item[
'minutes'
], # Number minutes withing the specified heart rate zone
'name': item['name'], # Name of the heart rate zone
}
for result in results
for item in result[
'heartRateZones'
] # Loop through the heart rate measurements for each document
]
# Create a pandas dataframe from the list of dictionaries
df = pd.DataFrame(data)
# Save the dataframe to a CSV file with a descriptive file name
# filename = f"heart_rate_summary_{start_date_string}_{end_date_string}.csv"
# df.to_csv(filename, index=False)
# Return the pandas dataframe
return df
def dataframe_heart_resting_heart_rate(self, start_date=None, end_date=None):
# If start_date and end_date are not specified, set them to today's date
start_date = start_date or datetime.now().date()
end_date = end_date or datetime.now().date()
# Convert the start and end dates to datetime objects
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
# Format the start and end dates as strings in "YYYY-MM-DD" format
date_format = '%Y-%m-%d'
start_date_string = start_datetime.strftime(date_format)
end_date_string = end_datetime.strftime(date_format)
# Query the MongoDB collection for heart rate data between the start and end dates
query = {
'type': 'heart', # Select documents with "type" equal to "heart"
'date': { # Select documents where "date" is between the start and end dates
'$gte': start_date_string, # Greater than or equal to start date
'$lte': end_date_string, # Less than or equal to end date
},
}
results = self.collection.find(query)
# Extract heart rate data from the MongoDB documents and store it as a list of dictionaries
data = []
for result in results:
if 'restingHeartrate' in result:
data.append({
'date': result['date'],
'restingHeartRate': result[
'restingHeartrate'
], # Resting heart rate value for the day (daily)
})
# Create a pandas dataframe from the list of dictionaries
df = pd.DataFrame(data)
# Save the dataframe to a CSV file with a descriptive file name
# filename = f"heart_resting_heart_rate_{start_date_string}_{end_date_string}.csv"
# df.to_csv(filename, index=False)
# Return the pandas dataframe
return df
def dataframe_hrv(self, start_date=None, end_date=None):
# If start_date and end_date are not specified, set them to today's date
start_date = start_date or datetime.now().date()
end_date = end_date or datetime.now().date()
# Convert the start and end dates to datetime objects
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
# Format the start and end dates as strings in "YYYY-MM-DD" format
date_format = '%Y-%m-%d'
start_date_string = start_datetime.strftime(date_format)
end_date_string = end_datetime.strftime(date_format)
# Query the MongoDB collection for heart rate data between the start and end dates
query = {
'type': 'hrv', # Select documents with "type" equal to "heart"
'date': { # Select documents where "date" is between the start and end dates
'$gte': start_date_string, # Greater than or equal to start date
'$lte': end_date_string, # Less than or equal to end date
},
}
results = self.collection.find(query)
# Extract heart rate data from the MongoDB documents and store it as a list of dictionaries
data = []
for result in results:
data.append({
'date': result['date'],
'daily_rmssd': result[
'dailyRmssd'
], # The Root Mean Square of Successive Differences (RMSSD) between heart beats. It measures short-term variability in the user’s daily heart rate in milliseconds (ms).
'deep_rmssd': result[
'deepRmssd'
], # The Root Mean Square of Successive Differences (RMSSD) between heart beats. It measures short-term variability in the user’s heart rate while in deep sleep, in milliseconds (ms).
})
# Create a pandas dataframe from the list of dictionaries
df = pd.DataFrame(data)
# Save the dataframe to a CSV file with a descriptive file name
# filename = f"heart_hrv_{start_date_string}_{end_date_string}.csv"
# df.to_csv(filename, index=False)
# Return the pandas dataframe
return df
# EXAMPLE CODE
# client = MongoClientDataframes(
# connection_string = "mongodb://localhost:27017/",
# database="local",
# collection="fitbit",
# )
# startTime = date(year = 2023, month = 4, day = 20)
# endTime = date(year = 2023, month = 4, day = 20)
# client.dataframe_heart_rate(start_date=startTime)
# client.dataframe_heart_summary(start_date=startTime)
# client.dataframe_heart_resting_heart_rate(start_date=startTime)
# client.dataframe_hrv(start_date=startTime)