63 lines
1.5 KiB
Python
63 lines
1.5 KiB
Python
import os
|
|
from threading import Thread
|
|
import json
|
|
from rethinkdb import RethinkDB
|
|
from pybit.unified_trading import WebSocket
|
|
from time import sleep
|
|
|
|
r = RethinkDB()
|
|
|
|
# RethinkDB settings
|
|
RDB_HOST = os.environ.get('RETHINKDB_URL')
|
|
RDB_PORT = os.environ.get('RETHINKDB_PORT')
|
|
DB_NAME = "finfree"
|
|
TABLE_NAME = 'conditions'
|
|
|
|
# Initialize RethinkDB connection
|
|
rdb_conn = r.connect(host=RDB_HOST, port=RDB_PORT, db=DB_NAME)
|
|
|
|
# WebSocket setup
|
|
WS_URL = "wss://stream.bybit.com/v5/public/linear"
|
|
ws = WebSocket(
|
|
testnet=True,
|
|
channel_type="linear",
|
|
)
|
|
|
|
subscribed = set()
|
|
|
|
def fetch_symbols():
|
|
try:
|
|
symbols = r.table(TABLE_NAME).pluck('symbol').run(rdb_conn)
|
|
return list(set([symbol['symbol'] for symbol in symbols]))
|
|
except Exception as e:
|
|
print(f"Error fetching symbols: {e}")
|
|
return []
|
|
|
|
def handle_message(message):
|
|
print(message)
|
|
|
|
def watch_symbols_table():
|
|
feed = r.table(TABLE_NAME).changes().run(rdb_conn)
|
|
for change in feed:
|
|
print(f"Change detected: {change}")
|
|
if change['new_val'] and not change['old_val']: # New symbol added
|
|
symbol = change['new_val']['symbol']
|
|
subscribe_to_symbol(symbol)
|
|
|
|
def subscribe_to_symbol(symbol):
|
|
if symbol not in subscribed:
|
|
subscribed.add(symbol)
|
|
ws.ticker_stream(
|
|
symbol=symbol,
|
|
callback=handle_message
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
for symbol in fetch_symbols():
|
|
subscribe_to_symbol(symbol)
|
|
|
|
watch_symbols_table()
|
|
|
|
while True:
|
|
sleep(1)
|