Untitled
unknown
plain_text
a year ago
5.5 kB
24
Indexable
async def get_signals(self):
"""Return signal df
In test mode, return a random selection of tickers from a predefined list.
"""
tickers = []
if self.test_mode:
# Return a handful of tickers for testing.
self.iter = self.iter + 1 if self.iter + 1 < len(self.test_file) else 0
signals = pd.DataFrame(self.test_file[self.iter])
signals["EXIT"] = signals.apply(
lambda x: (
1
if (
(
x["norm_mid_edge"] <= -500
and x["size"] >= 500
and x["mid_edge"] <= -5000
)
)
else 0
),
axis=1,
)
signals = (
signals[(signals["ENTRY"] == 1) | (signals["EXIT"] == 1)]
.sort_values(by="time", ascending=True)
.groupby("usymbol")
.last()
.reset_index()
)
elif not self.sweep_df.empty:
signals = (
self.sweep_df[
(self.sweep_df["ENTRY"] == 1) | (self.sweep_df["EXIT"] == 1)
]
.sort_values(by="time", ascending=True)
.groupby("usymbol")
.last()
.reset_index()
)
return signals
async def _ta_sweep_query(self, scheduled_time, *args, **kwargs):
if self.test_mode:
logging.info("Skipping ta_sweep_query Test Mode")
return
logging.info("Running ta_sweep_query")
interval = self.ta.get_5min_interval()
logging.info(f"TA query triggered with interval {interval}")
self.sweep_df: pd.DataFrame = await self.ta.query(
{
"cmd": "sweep",
"symbol": "@ALL",
"limit": 500,
"output": "jsn",
"where": '(size >= (0.001) * adv) and not index and time >= "'
+ interval["start"]
+ '" and time < "'
+ interval["end"]
+ '"',
"fields": ";".join(self.ta.trade_fields),
}
)
# derive fields
time_info = self.ta.get_current_timestamp()
self.sweep_df["current_time"] = time_info["current_time"]
self.sweep_df["flag"] = self.sweep_df["option"].apply(
lambda x: 1.0 if x == "Call" else -1.0
)
self.sweep_df["mid"] = (self.sweep_df["bid"] + self.sweep_df["ask"]) / 2.0
self.sweep_df["symbol"] = (
self.sweep_df["expiry"]
+ " "
+ self.sweep_df["strike"].astype(str)
+ " "
+ self.sweep_df["option"].apply(lambda x: "C" if x == "Call" else "P")
)
self.sweep_df["theo_side"] = self.sweep_df.apply(
lambda x: self.ta.get_side(x["price"], x["theo"]), axis=1
)
self.sweep_df["mkt_side"] = self.sweep_df.apply(
lambda x: self.ta.get_side(x["price"], (x["bid"] + x["ask"]) / 2.0), axis=1
)
self.sweep_df["edge"] = (
100.0
* self.sweep_df["size"]
* (self.sweep_df["price"] - self.sweep_df["theo"])
* self.sweep_df["flag"]
)
self.sweep_df["norm_edge"] = self.sweep_df["edge"] / self.sweep_df["spot"]
self.sweep_df["mid_edge"] = (
100.0
* self.sweep_df["size"]
* (self.sweep_df["price"] - self.sweep_df["mid"])
* self.sweep_df["flag"]
)
self.sweep_df["norm_mid_edge"] = (
self.sweep_df["mid_edge"] / self.sweep_df["spot"]
)
self.sweep_df["time"] = self.sweep_df["date"] + " " + self.sweep_df["timestamp"]
self.sweep_df["side"] = self.sweep_df.apply(
lambda x: self.ta.get_side_custom(x), axis=1
)
self.sweep_df["delta_impact"] = (
100.0
* self.sweep_df["size"]
* self.sweep_df["side"]
* self.sweep_df["flag"]
* self.sweep_df["delta"]
)
self.sweep_df["dtx"] = (
pd.to_datetime(self.sweep_df["expiry"], format="%Y-%m-%d")
- pd.to_datetime(self.sweep_df["date"], format="%Y-%m-%d")
).dt.days
self.sweep_df["ENTRY"] = self.sweep_df.apply(
lambda x: (
1.0
if (
(x["norm_edge"] >= 500 or x["edge"] >= 10000)
and x["size"] >= 1000
and x["ivol_chg"] > 0
and x["flag"] == 1.0
and x["strike"] > x["spot"]
and x["size"] > x["open_int"]
and x["dtx"] < 90
and x["price"] > x["mid"]
and x["usymbol"] not in ["META", "ORCL"]
)
else 0.0
),
axis=1,
)
self.sweep_df["EXIT"] = self.sweep_df.apply(
lambda x: (
1
if (
(
x["norm_edge"] <= -500
and x["size"] >= 1000
and x["edge"] <= -5000
)
)
else 0
),
axis=1,
)
self.sweep_df["edge_pct"] = self.sweep_df.apply(
lambda x: (x["price"] - x["theo"]) / x["theo"] if x["theo"] != 0.0 else 1.0,
axis=1,
)
self.sweep_df["timestamp"] = scheduled_time
data = {"timestamp": scheduled_time, "data": self.sweep_df.to_dict()}
path = os.path.join(self.path, "top_sweeps.jsonl")
await write_jsonl(path, data)Editor is loading...
Leave a Comment