Untitled

 avatar
unknown
python
a year ago
5.2 kB
9
Indexable
from textual.app import App, ComposeResult
from textual.widgets import Header, Header, Footer, Label
from textual.reactive import Reactive
from utils.views.buy.buy import BuyApp
from utils.views.sell.sell import SellApp
from utils.views.transfer.transfer import TransferApp
from utils.views.balance.balancescreen import BalanceApp
# to revisit later:
# from utils.api.CDSCore import CDSCore
import multiprocessing
from multiprocessing import Pipe
from utils.api.apis.BinanceUsAPIConnector import BinanceUsAPIConnector
from utils.api.apis.BinanceComAPIConnector import BinanceComAPIConnector
from utils.api.apis.KucoinComAPISDK import KucoinComAPISDK
import os
from dotenv import load_dotenv
import logging.config
logging.config.fileConfig('logging.ini',
                          disable_existing_loggers=False)
logger = logging.getLogger(__name__)

load_dotenv()

# get api key
BINANCE_US_API_KEY = os.getenv('BINANCE_US_API_KEY')
BINANCE_US_SECRET_KEY = os.getenv('BINANCE_US_SECRET_KEY')

BINANCE_COM_API_KEY = os.getenv('BINANCE_COM_API_KEY')
BINANCE_COM_SECRET_KEY = os.getenv('BINANCE_COM_SECRET_KEY')

KUCOIN_COM_API_KEY = os.getenv('KUCOIN_COM_API_KEY')
KUCOIN_COM_SECRET_KEY = os.getenv('KUCOIN_COM_SECRET_KEY')
KUCOIN_COM_PASSPHRASE = os.getenv('KUCOIN_COM_PASSPHRASE')


class MainApp(App):
    current_screen = Reactive("")
    CSS_PATH = "app.css"
    BINDINGS = [
        ("escape", "quit", "Quit"),
        ("f1", "open_buy_screen", "Buy"),
        ("f2", "open_sell_screen", "Sell"),
        ("f3", "open_transfer_screen", "Transfer"),
        ("f4", "open_balance_screen", "Balance"),

    ]
    TITLE = "CDSApp Main Page"
    SUB_TITLE = "This is the landing page"

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.accounts = {}
        self.transfer_list = []
        self.manager = multiprocessing.Manager()

        if BINANCE_COM_API_KEY is not None:
            self.accounts['Binance.com'] = BinanceComAPIConnector(api_key=BINANCE_COM_API_KEY,
                                                                  secret_key=BINANCE_COM_SECRET_KEY, recv_window=60000)

        if KUCOIN_COM_API_KEY is not None:
            self.accounts['Kucoin.com'] = KucoinComAPISDK(api_key=KUCOIN_COM_API_KEY,
                                                          secret_key=KUCOIN_COM_SECRET_KEY,
                                                          passphrase=KUCOIN_COM_PASSPHRASE)

        # if BINANCE_US_API_KEY is not None:
        #    self.accounts['Binance.us'] = BinanceUsAPIConnector(api_key=BINANCE_US_API_KEY,
        #                                                        secret_key=BINANCE_US_SECRET_KEY, recv_window=60000)

        # with Pool() as pool:
        #     pool.map(get_exchange_info, list(self.accounts.values()))
        self.log(f"{self.__class__} __init__")
        for exchange in self.accounts:
            self.log(f"{exchange}")
            self.accounts[exchange].get_exchange_pair()
        self.data = {}
        self.cds = None
        self.cds_timer = None
        # self.cds_transfer_status = {
        #     'a': {'is_complete': False, 'timestamp': 'aa', 'is_error': False}}
        self.cds_transfer_status = {}
        self.conn = None

    def set_data(self, key, value):
        self.log(f"{self.__class__} on_mount")
        self.data[key] = value
        self.log(self.data)

    def get_data(self, key):
        self.log(f"{self.__class__} on_mount")
        return self.data[key] if key in self.data else None

    def set_cds(self, cds):
        if cds is not None:
            self.cds = cds
            if not self.cds.get_status():
                parent_conn, child_conn = Pipe()
                self.p = multiprocessing.Process(
                    target=self.cds.transfer_with_coin, args=(child_conn,))
                self.conn = parent_conn
                self.p.start()
                self.check_transfer_status()
                self.cds_timer = self.set_interval(
                    10, self.check_transfer_status)
        else:
            self.cds = None

    def check_transfer_status(self):
        if self.cds is None:
            return None
        try:
            if self.conn.poll():
                self.cds_transfer_status = dict(self.conn.recv())
                # self.log(self.cds_transfer_status)
        except Exception as error:
            self.log(error)

    def compose(self) -> ComposeResult:
        yield Header()
        yield Label("Please press the respective buttons located on the footer below this screen", id="question")
        yield Footer()

    def action_open_buy_screen(self) -> None:
        buy_app = BuyApp(accounts=self.accounts, data=self.data)
        self.push_screen(buy_app)

    def action_open_sell_screen(self) -> None:
        sell_app = SellApp(accounts=self.accounts, data=self.data)
        self.push_screen(sell_app)

    def action_open_transfer_screen(self) -> None:
        transfer_app = TransferApp()
        self.push_screen(transfer_app)

    def action_open_balance_screen(self) -> None:
        balance_app = BalanceApp()
        self.push_screen(balance_app)


if __name__ == "__main__":
    app = MainApp()
    app.run()
Editor is loading...
Leave a Comment