Untitled

mail@pastecode.io avatar
unknown
python
17 days ago
12 kB
6
Indexable
Never
from textual.screen import ModalScreen, Screen
from textual.app import ComposeResult
from textual.reactive import reactive
from textual.containers import Vertical, Horizontal, Grid
from textual.widget import Widget
from textual.binding import Binding
from textual.widgets import Button, Footer, Static, Input, Label, Select
from textual import on
from model.CoinPair import CoinPair
from utils.views.balance.balancescreen import BalanceApp

import logging
logger = logging.getLogger(__name__)


class BaseCoin(Widget):

    base_coin = reactive("______")

    def render(self) -> str:
        return f"You are buying: {self.base_coin}"


class Quote(Widget):

    quote_coin = reactive("______")

    def render(self) -> str:
        return f"You are using: {self.quote_coin}"


class Quantity(Widget):

    quantity = reactive("0")

    def render(self) -> str:
        return f"You are using this for Quantity: {self.quantity}"


class Price(Widget):

    price = reactive("0")

    def render(self) -> str:
        return f"You are using this for Price Percentage: {self.price}"


class MarketPriceBuy(Widget):

    marketprice = reactive(0)

    def render(self) -> int:
        return f"Market Price: {self.marketprice}"


class TotalPriceBuy(Widget):

    total = reactive("")

    def render(self) -> str:
        return f"Total Price: {self.total}"


class SystemMessageBuy(Widget):

    message = reactive("")

    def render(self) -> str:
        return f"{self.message}"


ORDER = """MARKET
LIMIT""".splitlines()


class BuyApp(Screen):
    CSS_PATH = "buy.css"
    BINDINGS = [Binding(key="f5", action="app.pop_screen",
                        description="Close"),]

    def compose(self) -> ComposeResult:
        yield Horizontal(
            Vertical(
                Static("Buy", classes="buy"),
                Static("Exchange:"),
                Select([(exchange, exchange)
                       for exchange in self.accounts], id="exchange", allow_blank=False),
                Static("                                  "),
                Input(placeholder="Base Coin", id="base_coin"),
                BaseCoin(),
                Static("                                  "),
                Input(placeholder="Quote", id="quote_coin"),
                Quote(),
                Static("                                  "),
                Input(placeholder="Quantity", id="quantity"),
                Quantity(),
                Static("                                  "),
                Static("Order:"),
                Select([(line, line)
                       for line in ORDER], id="order", allow_blank=False),
                Static("                                  "),
                Input(placeholder="1 = 100%, 0.01 = 1%",
                      classes="hidden", id="price"),
                Price(classes="hidden"),
                Static("                                  "),
                Button("Confirm Buy", id="buy"),
                Static("                                  "),
                classes="column",
            ),
            Vertical(
                TotalPriceBuy(),
                Static("                                  "),
                MarketPriceBuy(),
                Static("                                  "),
                SystemMessageBuy(),
                Static("                                  "),
                classes="column1",
            ),
        )
        yield Footer()

    @on(Select.Changed, "#exchange")
    def select_changed_exchange(self, event: Select.Changed) -> None:
        self.exchange = str(event.value)
        logger.info(self.exchange)

    @on(Select.Changed, "#order")
    def select_changed_order_type(self, event: Select.Changed) -> None:
        self.order_type = str(event.value)
        logger.info(self.order_type)
        if self.order_type == "LIMIT":
            self.query_one(Price).set_class(not event.value, "hidden")
            self.query_one("#price").set_class(not event.value, "hidden")
        else:
            self.query_one(Price).set_class(event.value, "hidden")
            self.query_one("#price").set_class(event.value, "hidden")

    def __init__(self, accounts=[], data=None):
        super().__init__()
        self.inputs = []
        self.accounts = accounts
        self.data = data
        self.timer = None

    def load_price(self):
        self.log(f"{self.__class__} load_price")
        if self.base_coin == '' or self.quote_coin == '':
            return None

        coin_pair = CoinPair(base_coin=self.base_coin,
                             quote_coin=self.quote_coin)
        sum_price = 0
        exchange_count = 0
        for exchange in self.accounts:
            try:
                sum_price += self.accounts[exchange].get_avg_price(coin_pair)
                exchange_count += 1
            except Exception as e:
                pass
            return sum_price

        if exchange_count > 0:
            self.marketprice = str(sum_price / exchange_count)

    def cal_total_price(self):
        self.log(f"{self.__class__} cal_total_price")
        if self.quantity == '' or (self.order_type == 'LIMIT' and self.price.value == ''):
            self.total = ""
            return None

        price = self.marketprice if self.order_type == 'MARKET' else self.price.value

        try:
            total_price = float(self.quantity) * \
                float(price)
            self.total = str(total_price)
        except Exception as e:
            self.total = ""

    async def action_reset_form(self):
        self.base_coin = self.data['coin'] if 'coin' in self.data else ""
        self.exchange = self.data['exchange'] if 'exchange' in self.data else ""
        self.quote_coin = ''
        self.quantity = ''
        self.price = ''
        self.message = ''

    def validate_form(self):
        self.log(f"{self.__class__} validate_form")

        if self.exchange not in self.accounts:
            return False, "Exchange does not exist"
        elif not self.accounts[self.exchange].is_base_coin_exist(base_coin=self.query_one(BaseCoin).base_coin):
            return False, "Base coin is not supported"
        elif not self.accounts[self.exchange].is_quote_coin_exist(base_coin=self.query_one(BaseCoin).base_coin, quote_coin=self.query_one(Quote).quote_coin):
            return False, "Qoute coin is not supported"
        elif self.query_one(Quantity).quantity == '' or float(self.query_one(Quantity).quantity) < 0:
            return False, "Quantity must be greater than 0"
        elif self.order_type != 'LIMIT' and self.order_type != 'MARKET':
            return False, "Order Type must be either MARKET or LIMIT"
        elif self.order_type == 'LIMIT' and (self.query_one(Price).price == '' or float(self.query_one(Price).price) < 0):
            return False, "Price must be greater than 0"
        else:
            return True, ""

    async def on_button_pressed(self):
        logger.info("on_button_pressed")

        def check_quit(quit: bool) -> None:
            if quit:
                self.log(f"{self.__class__} on_button_pressed")
                is_valid, message = self.validate_form()
                if not is_valid:
                    self.query_one(SystemMessageBuy).message = message
                else:
                    coin_pair = CoinPair(
                        base_coin=self.query_one(BaseCoin).base_coin, quote_coin=self.query_one(Quote).quote_coin)
                    order_id, error = self.accounts[self.exchange].new_spot_buy_order(
                        coin_pair, quantity=float(self.query_one(Quantity).quantity), price=float(self.query_one(Price).price if self.order_type == 'LIMIT' else 0), order_type=self.order_type)

                    if error is None:
                        logger.info("Buy successful")
                        self.query_one(
                            SystemMessageBuy).message = f"Buy successful! Order ID: {order_id}"
                    else:
                        self.query_one(
                            SystemMessageBuy).message = f"{error.code} {error.message}"
            else:
                logger.info("Cancelled Order")
                self.query_one(
                    SystemMessageBuy).message = f"Cancelled Order"
        quote_coin = self.query_one(Quote).quote_coin
        quantity = self.query_one(Quantity).quantity
        total_value = self.query_one(TotalPriceBuy).total
        self.app.push_screen(ConfirmScreenBuy(
            self.query_one(BaseCoin).base_coin, quote_coin, total_value, quantity), check_quit)

    def on_input_changed(self, event: Input.Changed) -> None:
        if event.input.id == "base_coin":
            self.query_one(BaseCoin).base_coin = event.value
        elif event.input.id == "quote_coin":
            self.query_one(Quote).quote_coin = event.value
            self.base_coin = self.query_one(BaseCoin).base_coin
            self.quote_coin = self.query_one(Quote).quote_coin
            self.query_one(MarketPriceBuy).marketprice = self.load_price()
        elif event.input.id == "quantity":
            self.query_one(Quantity).quantity = event.value
        elif event.input.id == "price":
            self.query_one(Price).price = event.value

    @on(Input.Changed, "#price")
    def custom_function_price_change(self, event: Input.Changed):
        self.query_one(Price).price = event.value
        self.query_one(TotalPriceBuy).total = self.load_total_price()

    @on(Input.Changed, "#quantity")
    def custom_function_quantity_change(self, event: Input.Changed):
        self.query_one(Quantity).quantity = event.value
        self.query_one(TotalPriceBuy).total = self.load_total_price()

    def load_total_price(self):
        try:
            return float(self.query_one(MarketPriceBuy).marketprice) * float(self.query_one(Quantity).quantity)
        except ValueError:
            return 0


class ConfirmScreenBuy(ModalScreen[bool]):
    """Screen with a dialog to confirm purchase."""

    def __init__(self, base_coin: str, quote_coin: str, total_value: float, quantity: str):
        super().__init__()
        self.base_coin = base_coin
        self.quote_coin = quote_coin
        self.total_value = total_value
        self.quantity = quantity
        self.balance_app = BalanceApp()

    def retrieve_coin_amount(self, coin_symbol: str):
        coin_amount = self.balance_app.fetch_wallet_coin_amount(coin_symbol)
        return coin_amount

    def compose(self) -> ComposeResult:
        total_price_formatted = "{:.2f}".format(self.total_value)
        coin_symbol = self.base_coin
        amount = self.retrieve_coin_amount(coin_symbol)
        amount_base_coin = "{:.2f}".format(amount)
        amount_total = amount + self.total_value
        total_all_in_all = "{:.2f}".format(amount_total)

        yield Grid(
            Label("Here are the results of your transaction:", id="questions"),
            Label(f"Current amount of " + self.base_coin +
                  " :          " + str(amount_base_coin), id="sample"),
            Label("Quote Coin Amount of " + self.quote_coin + " :          " +
                  total_price_formatted + " " + self.quote_coin, id="display"),
            Label(self.base_coin +
                  " after purchase :          " + str(amount_total) + " or " + total_all_in_all, id="total"),
            Button("Continue?", variant="success", id="quit"),
            Button("Cancel?", variant="warning", id="cancel"),
            id="dialog",
        )

    def on_button_pressed(self, event: Button.Pressed) -> None:
        if event.button.id == "quit":
            self.dismiss(True)
        else:
            self.dismiss(False)


if __name__ == "__main__":
    app = BuyApp()

    app.run()
Leave a Comment