Untitled

mail@pastecode.io avatar
unknown
python
2 years ago
3.1 kB
4
Indexable
Never
# 1638177396000POST/api/v1/orders{"symbol": "XBTUSDM", "side": "buy", "type": "market", "clientOid": "1", "leverage": "10", "size": "10"}

# 2021-11-29 10:16:37,172 ERROR :: Error while making POST request to /api/v1/orders: {'code': '400005', 'msg': 'Invalid KC-API-SIGN'} (error code 401)
#
    def _generate_signature(self, expires: str, method: str, endpoint: str, data: typing.Any) -> bytes:
        if method == "POST":
            message = expires + method + endpoint + data
        else:
            message = expires + method + endpoint + "?" + urlencode(data) if len(data) > 0 else expires + method + endpoint
        return base64.b64encode(hmac.new(self._secret_key.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).digest())

    def _b64_encode_string(self, string: str) -> bytes:
        return base64.b64encode(hmac.new(self._secret_key.encode('utf-8'), string.encode('utf-8'), hashlib.sha256).digest())

    def _make_request(self, method: str, endpoint: str, data: typing.Dict):
        headers = dict()
        expires = str(int(time.time()) * 1000)
        data_json = data
        if method == "POST":
            data_json = json.dumps(data)
        headers["KC-API-SIGN"] = self._generate_signature(expires, method, endpoint, data_json)
        headers["KC-API-TIMESTAMP"] = expires
        headers["KC-API-KEY"] = self._public_key
        headers["KC-API-PASSPHRASE"] = self._b64_encode_string(self._pass_phrase)
        headers["Content-Type"] = "application/json"
        headers["KC-API-KEY-VERSION"] = "2"
        if method == "GET":
            try:
                response = requests.get(self._base_url + endpoint, headers=headers, params=data)
            except Exception as e:
                logger.error("Connection error while making %s request to %s: %s", method, endpoint, e)
                return None
        elif method == "POST":
            try:
                response = requests.post(self._base_url + endpoint, headers=headers, params=data_json)
            except Exception as e:
                logger.error("Connection error while making %s request to %s: %s", method, endpoint, e)
                return None


    def place_order(self, client_oid: str, side: str, contract: Contract, order_type: str, leverage: str, order_size: str, limit_price: Optional[str] = None) -> OrderStatus:
        data = dict()
        data['symbol'] = contract.symbol
        data['side'] = side.lower()
        data['type'] = order_type.lower()
        data['clientOid'] = client_oid
        data['leverage'] = leverage
        data['size'] = order_size

        if limit_price is not None:
            data['price'] = limit_price

        if kwargs:
            data.update(kwargs)

        if self.futures:
            order_status = self._make_request("POST", "/api/v1/orders", data)
        else:
            order_status = self._make_request("POST", "", data)

        if order_status is not None:
            order_status = OrderStatus(order_status["data"], self.platform)

        return order_status