Untitled
unknown
plain_text
7 days ago
5.5 kB
9
Indexable
Never
async def get_signals(self): """Return signal df In test mode, return a random selection of tickers from a predefined list. """ tickers = [] if self.test_mode: # Return a handful of tickers for testing. self.iter = self.iter + 1 if self.iter + 1 < len(self.test_file) else 0 signals = pd.DataFrame(self.test_file[self.iter]) signals["EXIT"] = signals.apply( lambda x: ( 1 if ( ( x["norm_mid_edge"] <= -500 and x["size"] >= 500 and x["mid_edge"] <= -5000 ) ) else 0 ), axis=1, ) signals = ( signals[(signals["ENTRY"] == 1) | (signals["EXIT"] == 1)] .sort_values(by="time", ascending=True) .groupby("usymbol") .last() .reset_index() ) elif not self.sweep_df.empty: signals = ( self.sweep_df[ (self.sweep_df["ENTRY"] == 1) | (self.sweep_df["EXIT"] == 1) ] .sort_values(by="time", ascending=True) .groupby("usymbol") .last() .reset_index() ) return signals async def _ta_sweep_query(self, scheduled_time, *args, **kwargs): if self.test_mode: logging.info("Skipping ta_sweep_query Test Mode") return logging.info("Running ta_sweep_query") interval = self.ta.get_5min_interval() logging.info(f"TA query triggered with interval {interval}") self.sweep_df: pd.DataFrame = await self.ta.query( { "cmd": "sweep", "symbol": "@ALL", "limit": 500, "output": "jsn", "where": '(size >= (0.001) * adv) and not index and time >= "' + interval["start"] + '" and time < "' + interval["end"] + '"', "fields": ";".join(self.ta.trade_fields), } ) # derive fields time_info = self.ta.get_current_timestamp() self.sweep_df["current_time"] = time_info["current_time"] self.sweep_df["flag"] = self.sweep_df["option"].apply( lambda x: 1.0 if x == "Call" else -1.0 ) self.sweep_df["mid"] = (self.sweep_df["bid"] + self.sweep_df["ask"]) / 2.0 self.sweep_df["symbol"] = ( self.sweep_df["expiry"] + " " + self.sweep_df["strike"].astype(str) + " " + self.sweep_df["option"].apply(lambda x: "C" if x == "Call" else "P") ) self.sweep_df["theo_side"] = self.sweep_df.apply( lambda x: self.ta.get_side(x["price"], x["theo"]), axis=1 ) self.sweep_df["mkt_side"] = self.sweep_df.apply( lambda x: self.ta.get_side(x["price"], (x["bid"] + x["ask"]) / 2.0), axis=1 ) self.sweep_df["edge"] = ( 100.0 * self.sweep_df["size"] * (self.sweep_df["price"] - self.sweep_df["theo"]) * self.sweep_df["flag"] ) self.sweep_df["norm_edge"] = self.sweep_df["edge"] / self.sweep_df["spot"] self.sweep_df["mid_edge"] = ( 100.0 * self.sweep_df["size"] * (self.sweep_df["price"] - self.sweep_df["mid"]) * self.sweep_df["flag"] ) self.sweep_df["norm_mid_edge"] = ( self.sweep_df["mid_edge"] / self.sweep_df["spot"] ) self.sweep_df["time"] = self.sweep_df["date"] + " " + self.sweep_df["timestamp"] self.sweep_df["side"] = self.sweep_df.apply( lambda x: self.ta.get_side_custom(x), axis=1 ) self.sweep_df["delta_impact"] = ( 100.0 * self.sweep_df["size"] * self.sweep_df["side"] * self.sweep_df["flag"] * self.sweep_df["delta"] ) self.sweep_df["dtx"] = ( pd.to_datetime(self.sweep_df["expiry"], format="%Y-%m-%d") - pd.to_datetime(self.sweep_df["date"], format="%Y-%m-%d") ).dt.days self.sweep_df["ENTRY"] = self.sweep_df.apply( lambda x: ( 1.0 if ( (x["norm_edge"] >= 500 or x["edge"] >= 10000) and x["size"] >= 1000 and x["ivol_chg"] > 0 and x["flag"] == 1.0 and x["strike"] > x["spot"] and x["size"] > x["open_int"] and x["dtx"] < 90 and x["price"] > x["mid"] and x["usymbol"] not in ["META", "ORCL"] ) else 0.0 ), axis=1, ) self.sweep_df["EXIT"] = self.sweep_df.apply( lambda x: ( 1 if ( ( x["norm_edge"] <= -500 and x["size"] >= 1000 and x["edge"] <= -5000 ) ) else 0 ), axis=1, ) self.sweep_df["edge_pct"] = self.sweep_df.apply( lambda x: (x["price"] - x["theo"]) / x["theo"] if x["theo"] != 0.0 else 1.0, axis=1, ) self.sweep_df["timestamp"] = scheduled_time data = {"timestamp": scheduled_time, "data": self.sweep_df.to_dict()} path = os.path.join(self.path, "top_sweeps.jsonl") await write_jsonl(path, data)
Leave a Comment