finfree-bybit-ticker/messages/DbConnector.py

68 lines
1.9 KiB
Python

import os
from typing import Callable
from flask import jsonify
from rethinkdb import RethinkDB
from .Condition import Condition
RDB_HOST = os.environ.get('RETHINKDB_URL')
RDB_PORT = os.environ.get('RETHINKDB_PORT')
DB_NAME = "finfree"
TABLE_NAME = 'conditions'
r = RethinkDB()
connection = None
def get_connection():
global connection
if connection is None or not connection.is_open():
connection = r.connect(RDB_HOST, RDB_PORT, db=DB_NAME)
return connection
def getRethinkDB():
return r
def fetch_conditions() -> list[Condition]:
try:
cursor = r.table(TABLE_NAME).run(get_connection())
return [Condition(**doc) for doc in cursor]
except Exception as e:
print(f"Error fetching symbols: {e}")
return []
conditions: list[Condition] = list()
class DbConnector:
@staticmethod
def watch_conditions(callback: Callable[[Condition], None]):
global conditions
conditions = fetch_conditions()
for cond in conditions:
if not cond.disabled and not cond.fulfilled:
callback(cond)
feed = r.table(TABLE_NAME).changes().run(get_connection())
for change in feed:
cond = Condition(**change['new_val'])
if not cond.disabled and not cond.fulfilled:
callback(cond)
@staticmethod
def disable_condition(condition_id: str):
cursor = r.table(TABLE_NAME).get(condition_id).run(get_connection())
json = jsonify(cursor)
json.disabled = True
r.table(TABLE_NAME).get(condition_id).update(json).run(get_connection())
@staticmethod
def fulfill_condition(condition_id: str):
cursor = r.table(TABLE_NAME).get(condition_id).run(get_connection())
json = jsonify(cursor)
json.disabled = True
json.fulfilled = True
r.table(TABLE_NAME).get(condition_id).update(json).run(get_connection())