VT analytics
unknown
python
4 years ago
7.4 kB
10
Indexable
"""
Paper trade analytics data script
What this script does:
1. Finds active monthly users of paper trade
(user who creates at least MIN_ACTIVE_USER_ORDER_COUNT_THRESHOLD
number of virtual trade orders is deemed 'active')
2. Aggregates number of users who have been continually active over
1, 2, 3, ..., n months
"""
# pylint: disable=logging-fstring-interpolation
from dataclasses import dataclass
from datetime import date, timedelta
from typing import List, Any, Dict
import csv
import logging
# import pprint
from db_analytics.util import connect_to_db
DATA_OUTPUT_DIR = "output_data"
# Min no. of orders placed in a month for a user
# to qualify as an active papertrade user
MIN_ACTIVE_USER_ORDER_COUNT_THRESHOLD = 5
ORDERS_PER_USER_IN_DATE_RANGE_QUERY = """
select
paper_trade_portfolio.paper_portfolio_user_id,
count(paper_trade_order.id)
from
paper_trade_order
join
paper_trade_group
on
paper_trade_group.id = paper_trade_order.paper_trade_group_id
join
paper_trade_portfolio
on
paper_trade_portfolio.id = paper_trade_group.paper_trade_portfolio_id
where
paper_trade_order.created_at between %s and %s
group by
paper_trade_portfolio.paper_portfolio_user_id
having
count(paper_trade_order.id) > %s;
"""
@dataclass
class CountPerUser:
"""
User id vs num papertrade orders
"""
user_id: int
order_count: int
@dataclass(frozen=True)
class TimeRange:
"""
Time range with start and end dates
"""
start_date: date
end_date: date
@dataclass(frozen=False)
class PerUserDataAcrossMonths:
"""
Usage pattern of a specific user aggregated
over multiple months
"""
user_id: int = -1
num_months_active: int = 0
total_order_count: int = 0
@dataclass
class AggregateUsage:
"""
Active users who kept repeatedly using VT
over a specified number of months
"""
num_months: int
active_users: int
def construct_monthly_ranges(input_start: date, input_end: date) -> List[TimeRange]:
"""
Generates monthly date ranges within given start
and end dates
"""
month_ranges: List[TimeRange] = []
end_date = input_start
while end_date < input_end:
start_date = end_date
end_date = date(2021, start_date.month + 1, 1)
month_ranges.append(TimeRange(start_date, end_date - timedelta(days=1)))
return month_ranges
def fetch_order_count_per_user_in_date_range(
conn: Any, time_range: TimeRange, min_count: int
) -> List[CountPerUser]:
"""
Retrieves count of orders per user within specified date range
"""
start_date_str = time_range.start_date.strftime("%Y-%m-%d 00:00:00")
end_date_str = time_range.end_date.strftime("%Y-%m-%d 00:00:00")
logging.info(
f"Fetching order count per user from {start_date_str} to {end_date_str}"
)
result: List[CountPerUser] = []
with conn.cursor() as curs:
curs.execute(
ORDERS_PER_USER_IN_DATE_RANGE_QUERY,
(start_date_str, end_date_str, min_count),
)
fetched_data = curs.fetchall()
for entry in fetched_data:
user_id, order_count = entry
result.append(CountPerUser(user_id, order_count))
return result
def aggregate_monthly_per_user_data(
input_data: Dict[TimeRange, List[CountPerUser]]
) -> List[AggregateUsage]:
"""
Aggregate per user monthly data across months
Computes how many users used paper trade over 1 month, 2 months, ... etc.
"""
# Aggregate per user data across months to find
# number of months a specific user was active, and total
# number of orders placed by user across months
per_user_data_across_months: Dict[int, PerUserDataAcrossMonths] = {}
month_count = 0
for _, month_data in input_data.items():
for entry in month_data:
user_entry = per_user_data_across_months.get(
entry.user_id, PerUserDataAcrossMonths(user_id=entry.user_id)
)
user_entry.total_order_count += entry.order_count
user_entry.num_months_active += 1
per_user_data_across_months[user_entry.user_id] = user_entry
month_count += 1
per_user_usage_across_months = list(per_user_data_across_months.values())
write_per_user_aggregate_csv(per_user_usage_across_months)
# Count number of users active across n, n-1, n-2... months
result: List[AggregateUsage] = []
while month_count >= 1:
active_users = [
x
for x in per_user_usage_across_months
if x.num_months_active == month_count
]
result.append(AggregateUsage(month_count, len(active_users)))
month_count -= 1
return result
def write_monthly_data_csv(
time_range: TimeRange, input_data: List[CountPerUser]
) -> None:
"""
Outputs downloaded monthly data as CSV
"""
start_date_str = time_range.start_date.strftime("%Y-%m-%d")
end_date_str = time_range.end_date.strftime("%Y-%m-%d")
output_file = (
f"{DATA_OUTPUT_DIR}/per_month_data_{start_date_str}_{end_date_str}.csv"
)
with open(output_file, "w") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(["User ID", "Num orders"])
for entry in input_data:
writer.writerow([entry.user_id, entry.order_count])
def write_per_user_aggregate_csv(input_data: List[PerUserDataAcrossMonths]) -> None:
"""
Writes aggregate usage pattern on a per user basis,
i.e. user id vs number of months that this user has
been using paper trade
"""
output_file = f"{DATA_OUTPUT_DIR}/per_user_monthly_usage.csv"
with open(output_file, "w") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(["User ID", "Num months active", "Total order count"])
for entry in input_data:
writer.writerow(
[entry.user_id, entry.num_months_active, entry.total_order_count]
)
def write_aggregate_results(input_data: List[AggregateUsage]) -> None:
"""
Writes aggregate results to CSV
i.e. num users active over a specified number of months
"""
output_file = f"{DATA_OUTPUT_DIR}/monthly_usage_results.csv"
with open(output_file, "w") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(["Num Months", "Num active users"])
for entry in input_data:
writer.writerow([entry.num_months, entry.active_users])
DATA_SAMPLING_START = date(2021, 1, 1)
DATA_SAMPLING_END = date(2021, 5, 23)
def run_paper_trade_analytics() -> None:
"""
Paper trade analytics main
"""
date_ranges = construct_monthly_ranges(DATA_SAMPLING_START, DATA_SAMPLING_END)
conn = connect_to_db()
results_over_range: Dict[TimeRange, List[CountPerUser]] = {}
for time_range in date_ranges:
result = fetch_order_count_per_user_in_date_range(
conn, time_range, MIN_ACTIVE_USER_ORDER_COUNT_THRESHOLD
)
write_monthly_data_csv(time_range, result)
results_over_range[time_range] = result
aggregate_result = aggregate_monthly_per_user_data(results_over_range)
write_aggregate_results(aggregate_result)
print(f"Output data written as CSV to {DATA_OUTPUT_DIR} directory")
if __name__ == "__main__":
run_paper_trade_analytics()Editor is loading...