diff --git a/app.py b/app.py index 33836af..969dcd2 100644 --- a/app.py +++ b/app.py @@ -5,6 +5,8 @@ from rethinkdb import RethinkDB from pybit.unified_trading import WebSocket from time import sleep from messages.TickerData import Message,TickerData +from messages.Condition import Condition +from typing import List, Set r = RethinkDB() @@ -24,42 +26,107 @@ ws = WebSocket( channel_type="linear", ) -subscribed = set() +_subscribed = set() +_conditions = list() +_last = dict() -def fetch_symbols(): +def getSubscribed() -> Set[str]: + return _subscribed + +def addToSubscribed(symbol): + global _subscribed + _subscribed.add(symbol) + +def getConditions() -> List[Condition]: + return _conditions + +def setConditions(condition): + global _conditions + _conditions = condition + +def getLast(): + return _last + +def setLast(key, value): + global _last + _last[key] = value + +def replace(new_object, array): + index = next((i for i, obj in enumerate(array) if obj.id == new_object.id), None) + if index is not None: + array[index] = new_object + +def fetch_conditions(): try: - symbols = r.table(TABLE_NAME).pluck('symbol').run(rdb_conn) - return list(set([symbol['symbol'] for symbol in symbols])) + symbols = r.table(TABLE_NAME).run(rdb_conn) + return list(symbols) except Exception as e: print(f"Error fetching symbols: {e}") return [] -def handle_message(message): +def get_symbols(conditions: List[Condition]): + return list(set([symbol['symbol'] for symbol in conditions])) + + +def get_symbol(symbol: str, conditions: List[Condition]) -> List[Condition]: + return list(filter(lambda c: c["symbol"] == symbol, conditions)) + +def handle_message(message: dict): msg = Message.from_json(message) if (isinstance(msg.data, TickerData)): - print(msg.data) + ticker = msg.data + symbol = ticker.symbol + price = ticker.lastPrice + last = getLast() + + if symbol not in last or last[symbol] != price: + conditions = getConditions() + + filtered = get_symbol(ticker.symbol, conditions) + + for condition in filtered: + if (condition["condition"] == "lt"): + lower_than(condition, price) + elif (condition["condition"] == "gt"): + greater_than(condition, price) + + setLast(symbol, price) def watch_symbols_table(): feed = r.table(TABLE_NAME).changes().run(rdb_conn) for change in feed: print(f"Change detected: {change}") + + replace(change['new_val'], getConditions()) + if change['new_val'] and not change['old_val']: # New symbol added symbol = change['new_val']['symbol'] subscribe_to_symbol(symbol) def subscribe_to_symbol(symbol): + subscribed = getSubscribed() + if symbol not in subscribed: - subscribed.add(symbol) + addToSubscribed(symbol) ws.ticker_stream( symbol=symbol, callback=handle_message ) +def lower_than(condition: Condition, price: float): + print(f"is {condition['symbol']} price {condition['value']} lower than {price}? {condition['value'] < price}") + +def greater_than(condition: Condition, price): + print(f"is {condition['symbol']} price {condition['value']} greater than {price}? {condition['value'] > price}") + def main(): - for symbol in fetch_symbols(): - subscribe_to_symbol(symbol) + setConditions(fetch_conditions()) + + for symbol in get_symbols(getConditions()): + subscribe_to_symbol(symbol) + watch_symbols_table() while True: diff --git a/messages/Condition.py b/messages/Condition.py new file mode 100644 index 0000000..9def66d --- /dev/null +++ b/messages/Condition.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass + +@dataclass +class Condition: + id: str + symbol: str + condition: str + value: float \ No newline at end of file diff --git a/messages/TickerData.py b/messages/TickerData.py index d5296c9..ff8fb3c 100644 --- a/messages/TickerData.py +++ b/messages/TickerData.py @@ -1,6 +1,5 @@ from dataclasses import dataclass from typing import Generic, TypeVar, Optional -import json T = TypeVar('T')