Untitled

 avatar
unknown
plain_text
a month ago
2.5 kB
1
Indexable
{{ config(
    materialized='table',
    engine='MergeTree()',
    post_hook="INSERT INTO prod_transfer_transform (id, slot, transactionHash, timestamp, sourceAddress, destinationAddress, mintAddress, authorityAddress, signerAddress, unadjustedTokenAmount, tokenDecimals, adjustedTokenAmount, instructionType, mint_token_symbol, avg_token_price, price_minute, transfer_usd_value)
    SELECT id, slot, transactionHash, timestamp, sourceAddress, destinationAddress, mintAddress, authorityAddress, signerAddress, unadjustedTokenAmount, tokenDecimals, adjustedTokenAmount, instructionType, mint_token_symbol, avg_token_price, price_minute, transfer_usd_value
    FROM {{ this }}
    WHERE id NOT IN (SELECT id FROM prod_transfer_transform);;"
) }}

WITH token_info AS (
    SELECT
        token_address,
        token_symbol
    FROM {{ ref('token_data') }}
),

transfer_mapped AS (
    SELECT
        t.*,
        tm.token_symbol AS mint_token_symbol
    FROM {{ ref('transfer_data') }} t
    LEFT JOIN token_info tm ON t.mintAddress = tm.token_address
),

exchange_prices AS (
    SELECT
        te.fromToken AS token_address,
        te.timestamp,
        toStartOfMinute(te.timestamp) AS minute,
        te.usdVolume / NULLIF(te.fromAmount, 0) AS token_price
    FROM {{ ref('temp_exchange_staging') }} te
    UNION ALL
    SELECT
        te.toToken AS token_address,
        te.timestamp,
        toStartOfMinute(te.timestamp) AS minute,
        te.usdVolume / NULLIF(te.toAmount, 0) AS token_price
    FROM {{ ref('temp_exchange_staging') }} te
),

pumpfun_prices AS (
    SELECT
        tp.tokenAddress AS token_address,
        tp.timestamp,
        toStartOfMinute(tp.timestamp) AS minute,
        tp.usd_volume / NULLIF(tp.tokenAmount, 0) AS token_price
    FROM {{ ref('temp_pumpfun_staging') }} tp
),

aggregated_prices AS (
    SELECT
        token_address,
        minute,
        AVG(token_price) AS avg_token_price
    FROM (
        SELECT * FROM exchange_prices
        UNION ALL
        SELECT * FROM pumpfun_prices
    ) all_prices
    GROUP BY token_address, minute
),

transfer_with_prices AS (
    SELECT
        tm.*,
        ap.avg_token_price,
        ap.minute AS price_minute
    FROM transfer_mapped tm
    LEFT JOIN aggregated_prices ap
        ON tm.mintAddress = ap.token_address
        AND toStartOfMinute(tm.timestamp) = ap.minute
)

SELECT 
    *,
    COALESCE(avg_token_price * adjustedTokenAmount, 0) AS transfer_usd_value
FROM transfer_with_prices
Leave a Comment