Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
2.7 kB
2
Indexable
def prepare_moving_aggregate_query(
    start_dt: str,
    end_dt: str,
    agg_size_in_days: int,
    table_info: TableInfo,
    monthly_table_name: str,
    include_monthly: bool
) -> str:
    
    # If include_monthly is False, just create an empty CTE for monthly_aggregates.
    monthly_aggregates_cte = """
        monthly_aggregates AS (
            SELECT NULL as cust_id, NULL as token_idx, NULL as value_sum, 
                   NULL as agg_days, NULL as source_name, NULL as end_observation
            WHERE FALSE
        )
    """ if not include_monthly else f"""
        monthly_aggregates AS (
            SELECT
                m.cust_id,
                m.token_idx,
                m.value_sum,
                30 as agg_days, -- Assuming 30 days for monthly aggregate
                'monthly_source_name' as source_name, -- Adjust source name as needed
                d.interval_end_dt as end_observation
            FROM
                date_range d
            JOIN
                {monthly_table_name} m
            ON m.end_observation = DATE_TRUNC(d.interval_end_dt, MONTH)
        )
    """

    # Combine everything into the main query
    return f"""
        WITH date_range AS (
                 SELECT (DATE_ADD(DATE('{start_dt}'), INTERVAL x DAY)) as interval_end_dt
                 FROM UNNEST(GENERATE_ARRAY(0, DATE_DIFF(DATE('{end_dt}'), DATE('{start_dt}'), DAY))) as x
             ),
        daily_aggregates AS (
            SELECT
                t.cust_id,
                t.token_idx,
                SUM(t.value_sum) AS value_sum,
                {agg_size_in_days} as agg_days,
                '{table_info.source_name}' as source_name,
                d.interval_end_dt as end_observation
            FROM
                date_range d
            JOIN
                {table_info.input_table_name} t
            ON t.end_observation BETWEEN DATE_SUB(d.interval_end_dt, INTERVAL {agg_size_in_days - 1} DAY)
                AND d.interval_end_dt
            WHERE
              t.end_observation > '2022-01-01' -- to satisfy need to use where clause on partitioned col
              AND t.end_observation BETWEEN DATE_SUB('{start_dt}', INTERVAL {agg_size_in_days} DAY) AND '{end_dt}'
            GROUP BY
                d.interval_end_dt,
                t.cust_id,
                t.token_idx
        ),
        {monthly_aggregates_cte}  -- Use the conditioned CTE here

        SELECT * FROM daily_aggregates
        UNION ALL
        SELECT * FROM monthly_aggregates
        ORDER BY
            end_observation,
            cust_id,
            token_idx
    """

# You'll call this function with the additional parameters for monthly_table_name and include_monthly.