Untitled

mail@pastecode.io avatar
unknown
plain_text
7 days ago
5.5 kB
9
Indexable
Never
  async def get_signals(self):
      """Return signal df
      In test mode, return a random selection of tickers from a predefined list.
      """
      tickers = []
      if self.test_mode:
          # Return a handful of tickers for testing.
          self.iter = self.iter + 1 if self.iter + 1 < len(self.test_file) else 0
          signals = pd.DataFrame(self.test_file[self.iter])
          signals["EXIT"] = signals.apply(
              lambda x: (
                  1
                  if (
                      (
                          x["norm_mid_edge"] <= -500
                          and x["size"] >= 500
                          and x["mid_edge"] <= -5000
                      )
                  )
                  else 0
              ),
              axis=1,
          )
          signals = (
              signals[(signals["ENTRY"] == 1) | (signals["EXIT"] == 1)]
              .sort_values(by="time", ascending=True)
              .groupby("usymbol")
              .last()
              .reset_index()
          )
      elif not self.sweep_df.empty:
          signals = (
              self.sweep_df[
                  (self.sweep_df["ENTRY"] == 1) | (self.sweep_df["EXIT"] == 1)
              ]
              .sort_values(by="time", ascending=True)
              .groupby("usymbol")
              .last()
              .reset_index()
          )

      return signals



async def _ta_sweep_query(self, scheduled_time, *args, **kwargs):
      if self.test_mode:
          logging.info("Skipping ta_sweep_query Test Mode")
          return
      logging.info("Running ta_sweep_query")
      interval = self.ta.get_5min_interval()
      logging.info(f"TA query triggered with interval {interval}")
      self.sweep_df: pd.DataFrame = await self.ta.query(
          {
              "cmd": "sweep",
              "symbol": "@ALL",
              "limit": 500,
              "output": "jsn",
              "where": '(size >= (0.001) * adv) and not index and time >= "'
              + interval["start"]
              + '" and time < "'
              + interval["end"]
              + '"',
              "fields": ";".join(self.ta.trade_fields),
          }
      )
      # derive fields
      time_info = self.ta.get_current_timestamp()
      self.sweep_df["current_time"] = time_info["current_time"]
      self.sweep_df["flag"] = self.sweep_df["option"].apply(
          lambda x: 1.0 if x == "Call" else -1.0
      )
      self.sweep_df["mid"] = (self.sweep_df["bid"] + self.sweep_df["ask"]) / 2.0
      self.sweep_df["symbol"] = (
          self.sweep_df["expiry"]
          + " "
          + self.sweep_df["strike"].astype(str)
          + " "
          + self.sweep_df["option"].apply(lambda x: "C" if x == "Call" else "P")
      )
      self.sweep_df["theo_side"] = self.sweep_df.apply(
          lambda x: self.ta.get_side(x["price"], x["theo"]), axis=1
      )
      self.sweep_df["mkt_side"] = self.sweep_df.apply(
          lambda x: self.ta.get_side(x["price"], (x["bid"] + x["ask"]) / 2.0), axis=1
      )
      self.sweep_df["edge"] = (
          100.0
          * self.sweep_df["size"]
          * (self.sweep_df["price"] - self.sweep_df["theo"])
          * self.sweep_df["flag"]
      )
      self.sweep_df["norm_edge"] = self.sweep_df["edge"] / self.sweep_df["spot"]
      self.sweep_df["mid_edge"] = (
          100.0
          * self.sweep_df["size"]
          * (self.sweep_df["price"] - self.sweep_df["mid"])
          * self.sweep_df["flag"]
      )
      self.sweep_df["norm_mid_edge"] = (
          self.sweep_df["mid_edge"] / self.sweep_df["spot"]
      )
      self.sweep_df["time"] = self.sweep_df["date"] + " " + self.sweep_df["timestamp"]
      self.sweep_df["side"] = self.sweep_df.apply(
          lambda x: self.ta.get_side_custom(x), axis=1
      )
      self.sweep_df["delta_impact"] = (
          100.0
          * self.sweep_df["size"]
          * self.sweep_df["side"]
          * self.sweep_df["flag"]
          * self.sweep_df["delta"]
      )
      self.sweep_df["dtx"] = (
          pd.to_datetime(self.sweep_df["expiry"], format="%Y-%m-%d")
          - pd.to_datetime(self.sweep_df["date"], format="%Y-%m-%d")
      ).dt.days
      self.sweep_df["ENTRY"] = self.sweep_df.apply(
          lambda x: (
              1.0
              if (
                  (x["norm_edge"] >= 500 or x["edge"] >= 10000)
                  and x["size"] >= 1000
                  and x["ivol_chg"] > 0
                  and x["flag"] == 1.0
                  and x["strike"] > x["spot"]
                  and x["size"] > x["open_int"]
                  and x["dtx"] < 90
                  and x["price"] > x["mid"]
                  and x["usymbol"] not in ["META", "ORCL"]
              )
              else 0.0
          ),
          axis=1,
      )
      self.sweep_df["EXIT"] = self.sweep_df.apply(
          lambda x: (
              1
              if (
                  (
                      x["norm_edge"] <= -500
                      and x["size"] >= 1000
                      and x["edge"] <= -5000
                  )
              )
              else 0
          ),
          axis=1,
      )
      self.sweep_df["edge_pct"] = self.sweep_df.apply(
          lambda x: (x["price"] - x["theo"]) / x["theo"] if x["theo"] != 0.0 else 1.0,
          axis=1,
      )
      self.sweep_df["timestamp"] = scheduled_time
      data = {"timestamp": scheduled_time, "data": self.sweep_df.to_dict()}
      path = os.path.join(self.path, "top_sweeps.jsonl")
      await write_jsonl(path, data)
Leave a Comment