Untitled
{{ config( materialized='table', engine='MergeTree()', post_hook="INSERT INTO prod_transfer_transform (id, slot, transactionHash, timestamp, sourceAddress, destinationAddress, mintAddress, authorityAddress, signerAddress, unadjustedTokenAmount, tokenDecimals, adjustedTokenAmount, instructionType, mint_token_symbol, avg_token_price, price_minute, transfer_usd_value) SELECT id, slot, transactionHash, timestamp, sourceAddress, destinationAddress, mintAddress, authorityAddress, signerAddress, unadjustedTokenAmount, tokenDecimals, adjustedTokenAmount, instructionType, mint_token_symbol, avg_token_price, price_minute, transfer_usd_value FROM {{ this }} WHERE id NOT IN (SELECT id FROM prod_transfer_transform);;" ) }} WITH token_info AS ( SELECT token_address, token_symbol FROM {{ ref('token_data') }} ), transfer_mapped AS ( SELECT t.*, tm.token_symbol AS mint_token_symbol FROM {{ ref('transfer_data') }} t LEFT JOIN token_info tm ON t.mintAddress = tm.token_address ), exchange_prices AS ( SELECT te.fromToken AS token_address, te.timestamp, toStartOfMinute(te.timestamp) AS minute, te.usdVolume / NULLIF(te.fromAmount, 0) AS token_price FROM {{ ref('temp_exchange_staging') }} te UNION ALL SELECT te.toToken AS token_address, te.timestamp, toStartOfMinute(te.timestamp) AS minute, te.usdVolume / NULLIF(te.toAmount, 0) AS token_price FROM {{ ref('temp_exchange_staging') }} te ), pumpfun_prices AS ( SELECT tp.tokenAddress AS token_address, tp.timestamp, toStartOfMinute(tp.timestamp) AS minute, tp.usd_volume / NULLIF(tp.tokenAmount, 0) AS token_price FROM {{ ref('temp_pumpfun_staging') }} tp ), aggregated_prices AS ( SELECT token_address, minute, AVG(token_price) AS avg_token_price FROM ( SELECT * FROM exchange_prices UNION ALL SELECT * FROM pumpfun_prices ) all_prices GROUP BY token_address, minute ), transfer_with_prices AS ( SELECT tm.*, ap.avg_token_price, ap.minute AS price_minute FROM transfer_mapped tm LEFT JOIN aggregated_prices ap ON tm.mintAddress = ap.token_address AND toStartOfMinute(tm.timestamp) = ap.minute ) SELECT *, COALESCE(avg_token_price * adjustedTokenAmount, 0) AS transfer_usd_value FROM transfer_with_prices
Leave a Comment