VT analytics
unknown
python
4 years ago
7.4 kB
6
Indexable
""" Paper trade analytics data script What this script does: 1. Finds active monthly users of paper trade (user who creates at least MIN_ACTIVE_USER_ORDER_COUNT_THRESHOLD number of virtual trade orders is deemed 'active') 2. Aggregates number of users who have been continually active over 1, 2, 3, ..., n months """ # pylint: disable=logging-fstring-interpolation from dataclasses import dataclass from datetime import date, timedelta from typing import List, Any, Dict import csv import logging # import pprint from db_analytics.util import connect_to_db DATA_OUTPUT_DIR = "output_data" # Min no. of orders placed in a month for a user # to qualify as an active papertrade user MIN_ACTIVE_USER_ORDER_COUNT_THRESHOLD = 5 ORDERS_PER_USER_IN_DATE_RANGE_QUERY = """ select paper_trade_portfolio.paper_portfolio_user_id, count(paper_trade_order.id) from paper_trade_order join paper_trade_group on paper_trade_group.id = paper_trade_order.paper_trade_group_id join paper_trade_portfolio on paper_trade_portfolio.id = paper_trade_group.paper_trade_portfolio_id where paper_trade_order.created_at between %s and %s group by paper_trade_portfolio.paper_portfolio_user_id having count(paper_trade_order.id) > %s; """ @dataclass class CountPerUser: """ User id vs num papertrade orders """ user_id: int order_count: int @dataclass(frozen=True) class TimeRange: """ Time range with start and end dates """ start_date: date end_date: date @dataclass(frozen=False) class PerUserDataAcrossMonths: """ Usage pattern of a specific user aggregated over multiple months """ user_id: int = -1 num_months_active: int = 0 total_order_count: int = 0 @dataclass class AggregateUsage: """ Active users who kept repeatedly using VT over a specified number of months """ num_months: int active_users: int def construct_monthly_ranges(input_start: date, input_end: date) -> List[TimeRange]: """ Generates monthly date ranges within given start and end dates """ month_ranges: List[TimeRange] = [] end_date = input_start while end_date < input_end: start_date = end_date end_date = date(2021, start_date.month + 1, 1) month_ranges.append(TimeRange(start_date, end_date - timedelta(days=1))) return month_ranges def fetch_order_count_per_user_in_date_range( conn: Any, time_range: TimeRange, min_count: int ) -> List[CountPerUser]: """ Retrieves count of orders per user within specified date range """ start_date_str = time_range.start_date.strftime("%Y-%m-%d 00:00:00") end_date_str = time_range.end_date.strftime("%Y-%m-%d 00:00:00") logging.info( f"Fetching order count per user from {start_date_str} to {end_date_str}" ) result: List[CountPerUser] = [] with conn.cursor() as curs: curs.execute( ORDERS_PER_USER_IN_DATE_RANGE_QUERY, (start_date_str, end_date_str, min_count), ) fetched_data = curs.fetchall() for entry in fetched_data: user_id, order_count = entry result.append(CountPerUser(user_id, order_count)) return result def aggregate_monthly_per_user_data( input_data: Dict[TimeRange, List[CountPerUser]] ) -> List[AggregateUsage]: """ Aggregate per user monthly data across months Computes how many users used paper trade over 1 month, 2 months, ... etc. """ # Aggregate per user data across months to find # number of months a specific user was active, and total # number of orders placed by user across months per_user_data_across_months: Dict[int, PerUserDataAcrossMonths] = {} month_count = 0 for _, month_data in input_data.items(): for entry in month_data: user_entry = per_user_data_across_months.get( entry.user_id, PerUserDataAcrossMonths(user_id=entry.user_id) ) user_entry.total_order_count += entry.order_count user_entry.num_months_active += 1 per_user_data_across_months[user_entry.user_id] = user_entry month_count += 1 per_user_usage_across_months = list(per_user_data_across_months.values()) write_per_user_aggregate_csv(per_user_usage_across_months) # Count number of users active across n, n-1, n-2... months result: List[AggregateUsage] = [] while month_count >= 1: active_users = [ x for x in per_user_usage_across_months if x.num_months_active == month_count ] result.append(AggregateUsage(month_count, len(active_users))) month_count -= 1 return result def write_monthly_data_csv( time_range: TimeRange, input_data: List[CountPerUser] ) -> None: """ Outputs downloaded monthly data as CSV """ start_date_str = time_range.start_date.strftime("%Y-%m-%d") end_date_str = time_range.end_date.strftime("%Y-%m-%d") output_file = ( f"{DATA_OUTPUT_DIR}/per_month_data_{start_date_str}_{end_date_str}.csv" ) with open(output_file, "w") as csv_file: writer = csv.writer(csv_file) writer.writerow(["User ID", "Num orders"]) for entry in input_data: writer.writerow([entry.user_id, entry.order_count]) def write_per_user_aggregate_csv(input_data: List[PerUserDataAcrossMonths]) -> None: """ Writes aggregate usage pattern on a per user basis, i.e. user id vs number of months that this user has been using paper trade """ output_file = f"{DATA_OUTPUT_DIR}/per_user_monthly_usage.csv" with open(output_file, "w") as csv_file: writer = csv.writer(csv_file) writer.writerow(["User ID", "Num months active", "Total order count"]) for entry in input_data: writer.writerow( [entry.user_id, entry.num_months_active, entry.total_order_count] ) def write_aggregate_results(input_data: List[AggregateUsage]) -> None: """ Writes aggregate results to CSV i.e. num users active over a specified number of months """ output_file = f"{DATA_OUTPUT_DIR}/monthly_usage_results.csv" with open(output_file, "w") as csv_file: writer = csv.writer(csv_file) writer.writerow(["Num Months", "Num active users"]) for entry in input_data: writer.writerow([entry.num_months, entry.active_users]) DATA_SAMPLING_START = date(2021, 1, 1) DATA_SAMPLING_END = date(2021, 5, 23) def run_paper_trade_analytics() -> None: """ Paper trade analytics main """ date_ranges = construct_monthly_ranges(DATA_SAMPLING_START, DATA_SAMPLING_END) conn = connect_to_db() results_over_range: Dict[TimeRange, List[CountPerUser]] = {} for time_range in date_ranges: result = fetch_order_count_per_user_in_date_range( conn, time_range, MIN_ACTIVE_USER_ORDER_COUNT_THRESHOLD ) write_monthly_data_csv(time_range, result) results_over_range[time_range] = result aggregate_result = aggregate_monthly_per_user_data(results_over_range) write_aggregate_results(aggregate_result) print(f"Output data written as CSV to {DATA_OUTPUT_DIR} directory") if __name__ == "__main__": run_paper_trade_analytics()
Editor is loading...