VT analytics

 avatar
unknown
python
4 years ago
7.4 kB
6
Indexable
"""
Paper trade analytics data script

What this script does:
1. Finds active monthly users of paper trade
    (user who creates at least MIN_ACTIVE_USER_ORDER_COUNT_THRESHOLD
        number of virtual trade orders is deemed 'active')
2. Aggregates number of users who have been continually active over
    1, 2, 3, ..., n months
"""

# pylint: disable=logging-fstring-interpolation

from dataclasses import dataclass
from datetime import date, timedelta
from typing import List, Any, Dict
import csv
import logging

# import pprint

from db_analytics.util import connect_to_db

DATA_OUTPUT_DIR = "output_data"

# Min no. of orders placed in a month for a user
# to qualify as an active papertrade user
MIN_ACTIVE_USER_ORDER_COUNT_THRESHOLD = 5

ORDERS_PER_USER_IN_DATE_RANGE_QUERY = """
    select
        paper_trade_portfolio.paper_portfolio_user_id,
        count(paper_trade_order.id)
    from
        paper_trade_order
    join 
        paper_trade_group
    on
        paper_trade_group.id = paper_trade_order.paper_trade_group_id
    join 
        paper_trade_portfolio
    on 
        paper_trade_portfolio.id = paper_trade_group.paper_trade_portfolio_id
    where
        paper_trade_order.created_at between %s and %s
    group by 
        paper_trade_portfolio.paper_portfolio_user_id
    having 
        count(paper_trade_order.id) > %s;
"""


@dataclass
class CountPerUser:
    """
    User id vs num papertrade orders
    """

    user_id: int
    order_count: int


@dataclass(frozen=True)
class TimeRange:
    """
    Time range with start and end dates
    """

    start_date: date
    end_date: date


@dataclass(frozen=False)
class PerUserDataAcrossMonths:
    """
    Usage pattern of a specific user aggregated
    over multiple months
    """

    user_id: int = -1
    num_months_active: int = 0
    total_order_count: int = 0


@dataclass
class AggregateUsage:
    """
    Active users who kept repeatedly using VT
    over a specified number of months
    """

    num_months: int
    active_users: int


def construct_monthly_ranges(input_start: date, input_end: date) -> List[TimeRange]:
    """
    Generates monthly date ranges within given start
    and end dates
    """
    month_ranges: List[TimeRange] = []
    end_date = input_start
    while end_date < input_end:
        start_date = end_date
        end_date = date(2021, start_date.month + 1, 1)
        month_ranges.append(TimeRange(start_date, end_date - timedelta(days=1)))
    return month_ranges


def fetch_order_count_per_user_in_date_range(
    conn: Any, time_range: TimeRange, min_count: int
) -> List[CountPerUser]:
    """
    Retrieves count of orders per user within specified date range
    """
    start_date_str = time_range.start_date.strftime("%Y-%m-%d 00:00:00")
    end_date_str = time_range.end_date.strftime("%Y-%m-%d 00:00:00")
    logging.info(
        f"Fetching order count per user from {start_date_str} to {end_date_str}"
    )
    result: List[CountPerUser] = []
    with conn.cursor() as curs:
        curs.execute(
            ORDERS_PER_USER_IN_DATE_RANGE_QUERY,
            (start_date_str, end_date_str, min_count),
        )
        fetched_data = curs.fetchall()
        for entry in fetched_data:
            user_id, order_count = entry
            result.append(CountPerUser(user_id, order_count))
    return result


def aggregate_monthly_per_user_data(
    input_data: Dict[TimeRange, List[CountPerUser]]
) -> List[AggregateUsage]:
    """
    Aggregate per user monthly data across months

    Computes how many users used paper trade over 1 month, 2 months, ... etc.
    """
    # Aggregate per user data across months to find
    # number of months a specific user was active, and total
    # number of orders placed by user across months
    per_user_data_across_months: Dict[int, PerUserDataAcrossMonths] = {}
    month_count = 0
    for _, month_data in input_data.items():
        for entry in month_data:
            user_entry = per_user_data_across_months.get(
                entry.user_id, PerUserDataAcrossMonths(user_id=entry.user_id)
            )
            user_entry.total_order_count += entry.order_count
            user_entry.num_months_active += 1
            per_user_data_across_months[user_entry.user_id] = user_entry
        month_count += 1

    per_user_usage_across_months = list(per_user_data_across_months.values())
    write_per_user_aggregate_csv(per_user_usage_across_months)

    # Count number of users active across n, n-1, n-2... months
    result: List[AggregateUsage] = []
    while month_count >= 1:
        active_users = [
            x
            for x in per_user_usage_across_months
            if x.num_months_active == month_count
        ]
        result.append(AggregateUsage(month_count, len(active_users)))
        month_count -= 1
    return result


def write_monthly_data_csv(
    time_range: TimeRange, input_data: List[CountPerUser]
) -> None:
    """
    Outputs downloaded monthly data as CSV
    """
    start_date_str = time_range.start_date.strftime("%Y-%m-%d")
    end_date_str = time_range.end_date.strftime("%Y-%m-%d")
    output_file = (
        f"{DATA_OUTPUT_DIR}/per_month_data_{start_date_str}_{end_date_str}.csv"
    )
    with open(output_file, "w") as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(["User ID", "Num orders"])
        for entry in input_data:
            writer.writerow([entry.user_id, entry.order_count])


def write_per_user_aggregate_csv(input_data: List[PerUserDataAcrossMonths]) -> None:
    """
    Writes aggregate usage pattern on a per user basis,
    i.e. user id vs number of months that this user has
    been using paper trade
    """
    output_file = f"{DATA_OUTPUT_DIR}/per_user_monthly_usage.csv"
    with open(output_file, "w") as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(["User ID", "Num months active", "Total order count"])
        for entry in input_data:
            writer.writerow(
                [entry.user_id, entry.num_months_active, entry.total_order_count]
            )


def write_aggregate_results(input_data: List[AggregateUsage]) -> None:
    """
    Writes aggregate results to CSV
    i.e. num users active over a specified number of months
    """
    output_file = f"{DATA_OUTPUT_DIR}/monthly_usage_results.csv"
    with open(output_file, "w") as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(["Num Months", "Num active users"])
        for entry in input_data:
            writer.writerow([entry.num_months, entry.active_users])


DATA_SAMPLING_START = date(2021, 1, 1)
DATA_SAMPLING_END = date(2021, 5, 23)


def run_paper_trade_analytics() -> None:
    """
    Paper trade analytics main
    """
    date_ranges = construct_monthly_ranges(DATA_SAMPLING_START, DATA_SAMPLING_END)
    conn = connect_to_db()
    results_over_range: Dict[TimeRange, List[CountPerUser]] = {}
    for time_range in date_ranges:
        result = fetch_order_count_per_user_in_date_range(
            conn, time_range, MIN_ACTIVE_USER_ORDER_COUNT_THRESHOLD
        )
        write_monthly_data_csv(time_range, result)
        results_over_range[time_range] = result
    aggregate_result = aggregate_monthly_per_user_data(results_over_range)
    write_aggregate_results(aggregate_result)
    print(f"Output data written as CSV to {DATA_OUTPUT_DIR} directory")


if __name__ == "__main__":
    run_paper_trade_analytics()
Editor is loading...