file1
unknown
python
2 years ago
11 kB
7
Indexable
Never
# TODO(alaurens): dataclasses are super useful and you should use them import dataclasses # TODO(alaurens): don't do this just import datetime and then use datetime.date from datetime import date, datetime import pandas as pd import pymongo @dataclasses.dataclass # (frozen=True) this makes it immutable class HeartRateDataPoint: """Structure to hold a heart rate data point. Attributes: date: Date of the measurment. time: Time of the measurment. measurement: Heart rate value measured. """ date: datetime.date time: datetime.time measurement: int hr = HeartRateDataPoint() def transform_for_panda(self): return { 'date': self.date, 'time': self.time, 'measurement': self.measurement } class MongoClientDataframes: # TODO(alaurens): Make sure to type your functions def __init__(self, connection: str, database, collection): # Connect to the MongoDB database and collection specified by the arguments try: self._mongo_client = pymongo.MongoClient(connection_string) self._db = self.mongo_client[database] self._collection = self.db[collection] except Exception as e: # TODO(alaurens): again here why do you need to do this? The comment does not explain why # If there is an error, set the connection variables to None and raise an exception self.mongo_client = None self.db = None self.collection = None raise Exception(e) def dataframe_heart_rate(self, start_date: datetime.date = None, end_date=None): """ksdjfnklsjn""" # If start_date and end_date are not specified, set them to today's date # TODO(alaurens): why don't you do similar things in the other file? Here you # are using the datetime library which is great start_date = start_date or datetime.now().date() end_date = end_date or datetime.now().date() # Convert the start and end dates to datetime objects start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time()) # Format the start and end dates as strings in "YYYY-MM-DD" format date_format = '%Y-%m-%d' start_date_string = start_datetime.strftime(date_format) end_date_string = end_datetime.strftime(date_format) # TODO(alaurens): Here you really only need the first comment and even # that one might be too much # Query the MongoDB collection for heart rate data between the start and end dates query = { 'type': 'heart', # Select documents with "type" equal to "heart" 'date': { # Select documents where "date" is between the start and end dates '$gte': start_date_string, # Greater than or equal to start date '$lte': end_date_string, # Less than or equal to end date }, } results = self.collection.find(query) # TODO(alaurens): You need a schema that allows you to pass you # data around in your code more easily # Extract heart rate data from the MongoDB documents and store it as a list of dictionaries data = [ HeartRateDataPoint( date=result['date'], # Date of the document time=datetime.time(item['time']), # Time of the heart rate measurement measurment=int(item['value']), # Heart rate value ) for result in results for item in result[ 'heartIntraday' ] # Loop through the heart rate measurements for each document ] # Create a pandas dataframe from the list of dictionaries df = pd.DataFrame(data) # Save the dataframe to a CSV file with a descriptive file name # filename = f"heart_rate_data_{start_date_string}_{end_date_string}.csv" # df.to_csv(filename, index=False) # Return the pandas dataframe return df def my_cool_function(self, heart_rate_measurement: HeartRateDataPoint): dictionnary = heart_rate_measurement.transform_for_panda() # TODO(alaurens): I'm a bit confused as to how this function is different from the previous one def dataframe_heart_summary(self, start_date=None, end_date=None): # If start_date and end_date are not specified, set them to today's date # TODO(alaurens): Here you repeat this code maybe you could simply make a function out of this? start_date = start_date or datetime.now().date() end_date = end_date or datetime.now().date() # Convert the start and end dates to datetime objects start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time()) # Format the start and end dates as strings in "YYYY-MM-DD" format date_format = '%Y-%m-%d' start_date_string = start_datetime.strftime(date_format) end_date_string = end_datetime.strftime(date_format) # Query the MongoDB collection for heart rate data between the start and end dates query = { 'type': 'heart', # Select documents with "type" equal to "heart" 'date': { # Select documents where "date" is between the start and end dates '$gte': start_date_string, # Greater than or equal to start date '$lte': end_date_string, # Less than or equal to end date }, } results = self.collection.find(query) # Extract heart rate data from the MongoDB documents and store it as a list of dictionaries data = [ { 'date': result['date'], # Date of the document 'caloriesOut': item[ 'caloriesOut' ], # Number calories burned with the specified heart rate zone 'max': item['max'], # Maximum range for the heart rate zone 'min': item['min'], # Minimum range for the heart rate zone 'minutes': item[ 'minutes' ], # Number minutes withing the specified heart rate zone 'name': item['name'], # Name of the heart rate zone } for result in results for item in result[ 'heartRateZones' ] # Loop through the heart rate measurements for each document ] # Create a pandas dataframe from the list of dictionaries df = pd.DataFrame(data) # Save the dataframe to a CSV file with a descriptive file name # filename = f"heart_rate_summary_{start_date_string}_{end_date_string}.csv" # df.to_csv(filename, index=False) # Return the pandas dataframe return df def dataframe_heart_resting_heart_rate(self, start_date=None, end_date=None): # If start_date and end_date are not specified, set them to today's date start_date = start_date or datetime.now().date() end_date = end_date or datetime.now().date() # Convert the start and end dates to datetime objects start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time()) # Format the start and end dates as strings in "YYYY-MM-DD" format date_format = '%Y-%m-%d' start_date_string = start_datetime.strftime(date_format) end_date_string = end_datetime.strftime(date_format) # Query the MongoDB collection for heart rate data between the start and end dates query = { 'type': 'heart', # Select documents with "type" equal to "heart" 'date': { # Select documents where "date" is between the start and end dates '$gte': start_date_string, # Greater than or equal to start date '$lte': end_date_string, # Less than or equal to end date }, } results = self.collection.find(query) # Extract heart rate data from the MongoDB documents and store it as a list of dictionaries data = [] for result in results: if 'restingHeartrate' in result: data.append({ 'date': result['date'], 'restingHeartRate': result[ 'restingHeartrate' ], # Resting heart rate value for the day (daily) }) # Create a pandas dataframe from the list of dictionaries df = pd.DataFrame(data) # Save the dataframe to a CSV file with a descriptive file name # filename = f"heart_resting_heart_rate_{start_date_string}_{end_date_string}.csv" # df.to_csv(filename, index=False) # Return the pandas dataframe return df def dataframe_hrv(self, start_date=None, end_date=None): # If start_date and end_date are not specified, set them to today's date start_date = start_date or datetime.now().date() end_date = end_date or datetime.now().date() # Convert the start and end dates to datetime objects start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time()) # Format the start and end dates as strings in "YYYY-MM-DD" format date_format = '%Y-%m-%d' start_date_string = start_datetime.strftime(date_format) end_date_string = end_datetime.strftime(date_format) # Query the MongoDB collection for heart rate data between the start and end dates query = { 'type': 'hrv', # Select documents with "type" equal to "heart" 'date': { # Select documents where "date" is between the start and end dates '$gte': start_date_string, # Greater than or equal to start date '$lte': end_date_string, # Less than or equal to end date }, } results = self.collection.find(query) # Extract heart rate data from the MongoDB documents and store it as a list of dictionaries data = [] for result in results: data.append({ 'date': result['date'], 'daily_rmssd': result[ 'dailyRmssd' ], # The Root Mean Square of Successive Differences (RMSSD) between heart beats. It measures short-term variability in the user’s daily heart rate in milliseconds (ms). 'deep_rmssd': result[ 'deepRmssd' ], # The Root Mean Square of Successive Differences (RMSSD) between heart beats. It measures short-term variability in the user’s heart rate while in deep sleep, in milliseconds (ms). }) # Create a pandas dataframe from the list of dictionaries df = pd.DataFrame(data) # Save the dataframe to a CSV file with a descriptive file name # filename = f"heart_hrv_{start_date_string}_{end_date_string}.csv" # df.to_csv(filename, index=False) # Return the pandas dataframe return df # EXAMPLE CODE # client = MongoClientDataframes( # connection_string = "mongodb://localhost:27017/", # database="local", # collection="fitbit", # ) # startTime = date(year = 2023, month = 4, day = 20) # endTime = date(year = 2023, month = 4, day = 20) # client.dataframe_heart_rate(start_date=startTime) # client.dataframe_heart_summary(start_date=startTime) # client.dataframe_heart_resting_heart_rate(start_date=startTime) # client.dataframe_hrv(start_date=startTime)