Untitled

 avatar
unknown
plain_text
a year ago
4.8 kB
5
Indexable
# all pairs addresses are already collected via an easy API which gathers data from DEXScreener, and saved in datalist
# dfByPairX get the pair addresses and the prices for a specific pair (ex. "WETH/WBNB")



def dfByPairX(coppia):

    try:
        inv_coppia = coppia.split("/")[1]+"/"+coppia.split("/")[0]
    except:
        inv_coppia = ""

    inverso = 0
    dic = [[]]
    for j in range(len(datalist)):
        for k in range(len(datalist[j])):
#            print("dfByPair pair "+coppia+" "+str(j)+" "+str(k))
            pair = datalist[j][k]['baseToken']['symbol']+"/"+datalist[j][k]['quoteToken']['symbol']
            if pair == coppia or pair == inv_coppia:
                if pair == coppia or inv_coppia=="":
                    if datalist[j][k]['pairAddress'][0:2]=='0x' and "-" not in datalist[j][k]['pairAddress'] and len(datalist[j][k]['pairAddress'])==42:
                        p = rPrice(cs(datalist[j][k]['pairAddress']),datalist[j][k]['chainId'])
                    else:
                        continue
                else:
                   continue
            else:
                continue

         #   name = datalist[j][k]['baseToken']['name']+" / "+datalist[j][k]['quoteToken']['name']
            add = datalist[j][k]['pairAddress']

            try:
                liq_num = datalist[j][k]['liquidity']['base']
            except:
                liq_num = 0

            try:
                liq_den = datalist[j][k]['liquidity']['quote']
            except:
                liq_den = 0

            if liq_num>0 and liq_den>0:
                if inverso == 1:
                    c = liq_num
                    liq_num = liq_den
                    liq_den = c

            base = datalist[j][k]['baseToken']['address']
            quote = datalist[j][k]['quoteToken']['address']

            dic.append([datalist[j][k]['chainId'],datalist[j][k]['dexId'],p,add,base,quote,liq_num,liq_den])



    try:
        q = pd.DataFrame(dic,columns = ['chainId','dexId','price','pair','base','quote','liquidity_base','liquidity_quote'])
        q = q.drop([0])

        return q

    except:
  #      print("Empty for "+coppia)
        return pd.DataFrame()




# rPrice is meant to get real time prices querying directly the pair smart contracts, in order to be able to make the 
# transaction as soon as the opportunity arises avoiding the slippage of the prices.



def rPrice(pair,chain):


    if pair[0:2]!='0x':
        return 0

    P = getContract(pair,chain)
    if isinstance(P,str):
        return 0
    else:
        v = version(pair,chain)
        if v==2:
            res = P.functions.getReserves().call()
            if res[1]!=0:
                p = res[1]/res[0]
            else:
                return 0
        elif v==3:
            s = P.functions.slot0().call()[0]
            try:
                d0 = getContract(P.functions.token0().call(),chain).functions.decimals().call()
            except:
                try:
                    d0 = int(tokenInfo(P.functions.token0().call(),chain)['decimals'])
                except:
                    d0 = 18
            try:
                d1 = getContract(P.functions.token1().call(),chain).functions.decimals().call()
            except:
                try:
                    d1 = int(tokenInfo(P.functions.token1().call(),chain)['decimals'])
                except:
                    d1 = 18
            p = fromSqrtPrice(s)*10**(d0-d1)
        else:
            return 1
    return p



# getContract() and version() are simply wrappers to connect to the pair and call functions


def getContract(contract_address,chain):
    import requests
    import json
    from web3 import Web3

    if chain not in list(RPC.keys()):
        return "No RPC"
    ww = Web3(Web3.HTTPProvider(RPC[chain]))
    if contract_address[0:2]!='0x' and "-" not in contract_address:
        return "Invalid Address"
    else:
        try:
            ABI = getABI(cs(contract_address),chain)
        except:
            return "Chain not covered yet"
    if ABI=="NotVerified":
        return "NotVerified"
    else:
        try:
            cont = ww.eth.contract(address=Web3.to_checksum_address(contract_address), abi=ABI)
            return cont
        except:
            return "NoContract"




def version(pair,chain):

    P = getContract(pair,chain)
    if not isinstance(P,str):
        try:
            P.functions.getReserves().call()
            ver = 2
        except:
            try:
                P.functions.slot0().call()
                ver = 3
            except:
                ver = 4
    else:
        ver = 0
    return ver




# returning 0 means the pair contract is not verified or for some reason it is not possible to connect to the contract, 4 means 
# it is a type of pair different from Uniswap V2/V3-like






Editor is loading...