Untitled
unknown
python
a year ago
5.2 kB
9
Indexable
from textual.app import App, ComposeResult from textual.widgets import Header, Header, Footer, Label from textual.reactive import Reactive from utils.views.buy.buy import BuyApp from utils.views.sell.sell import SellApp from utils.views.transfer.transfer import TransferApp from utils.views.balance.balancescreen import BalanceApp # to revisit later: # from utils.api.CDSCore import CDSCore import multiprocessing from multiprocessing import Pipe from utils.api.apis.BinanceUsAPIConnector import BinanceUsAPIConnector from utils.api.apis.BinanceComAPIConnector import BinanceComAPIConnector from utils.api.apis.KucoinComAPISDK import KucoinComAPISDK import os from dotenv import load_dotenv import logging.config logging.config.fileConfig('logging.ini', disable_existing_loggers=False) logger = logging.getLogger(__name__) load_dotenv() # get api key BINANCE_US_API_KEY = os.getenv('BINANCE_US_API_KEY') BINANCE_US_SECRET_KEY = os.getenv('BINANCE_US_SECRET_KEY') BINANCE_COM_API_KEY = os.getenv('BINANCE_COM_API_KEY') BINANCE_COM_SECRET_KEY = os.getenv('BINANCE_COM_SECRET_KEY') KUCOIN_COM_API_KEY = os.getenv('KUCOIN_COM_API_KEY') KUCOIN_COM_SECRET_KEY = os.getenv('KUCOIN_COM_SECRET_KEY') KUCOIN_COM_PASSPHRASE = os.getenv('KUCOIN_COM_PASSPHRASE') class MainApp(App): current_screen = Reactive("") CSS_PATH = "app.css" BINDINGS = [ ("escape", "quit", "Quit"), ("f1", "open_buy_screen", "Buy"), ("f2", "open_sell_screen", "Sell"), ("f3", "open_transfer_screen", "Transfer"), ("f4", "open_balance_screen", "Balance"), ] TITLE = "CDSApp Main Page" SUB_TITLE = "This is the landing page" def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self.accounts = {} self.transfer_list = [] self.manager = multiprocessing.Manager() if BINANCE_COM_API_KEY is not None: self.accounts['Binance.com'] = BinanceComAPIConnector(api_key=BINANCE_COM_API_KEY, secret_key=BINANCE_COM_SECRET_KEY, recv_window=60000) if KUCOIN_COM_API_KEY is not None: self.accounts['Kucoin.com'] = KucoinComAPISDK(api_key=KUCOIN_COM_API_KEY, secret_key=KUCOIN_COM_SECRET_KEY, passphrase=KUCOIN_COM_PASSPHRASE) # if BINANCE_US_API_KEY is not None: # self.accounts['Binance.us'] = BinanceUsAPIConnector(api_key=BINANCE_US_API_KEY, # secret_key=BINANCE_US_SECRET_KEY, recv_window=60000) # with Pool() as pool: # pool.map(get_exchange_info, list(self.accounts.values())) self.log(f"{self.__class__} __init__") for exchange in self.accounts: self.log(f"{exchange}") self.accounts[exchange].get_exchange_pair() self.data = {} self.cds = None self.cds_timer = None # self.cds_transfer_status = { # 'a': {'is_complete': False, 'timestamp': 'aa', 'is_error': False}} self.cds_transfer_status = {} self.conn = None def set_data(self, key, value): self.log(f"{self.__class__} on_mount") self.data[key] = value self.log(self.data) def get_data(self, key): self.log(f"{self.__class__} on_mount") return self.data[key] if key in self.data else None def set_cds(self, cds): if cds is not None: self.cds = cds if not self.cds.get_status(): parent_conn, child_conn = Pipe() self.p = multiprocessing.Process( target=self.cds.transfer_with_coin, args=(child_conn,)) self.conn = parent_conn self.p.start() self.check_transfer_status() self.cds_timer = self.set_interval( 10, self.check_transfer_status) else: self.cds = None def check_transfer_status(self): if self.cds is None: return None try: if self.conn.poll(): self.cds_transfer_status = dict(self.conn.recv()) # self.log(self.cds_transfer_status) except Exception as error: self.log(error) def compose(self) -> ComposeResult: yield Header() yield Label("Please press the respective buttons located on the footer below this screen", id="question") yield Footer() def action_open_buy_screen(self) -> None: buy_app = BuyApp(accounts=self.accounts, data=self.data) self.push_screen(buy_app) def action_open_sell_screen(self) -> None: sell_app = SellApp(accounts=self.accounts, data=self.data) self.push_screen(sell_app) def action_open_transfer_screen(self) -> None: transfer_app = TransferApp() self.push_screen(transfer_app) def action_open_balance_screen(self) -> None: balance_app = BalanceApp() self.push_screen(balance_app) if __name__ == "__main__": app = MainApp() app.run()
Editor is loading...
Leave a Comment