Source code for btfx_trader.public

from contextlib import suppress
from queue import Queue, Empty
import logging
import time

import requests
import btfxwss
import btfxwss.queue_processor
import btfxwss.connection

log = logging.getLogger(__name__)
# Multiprocessing queue is slow...
btfxwss.connection.Queue = btfxwss.queue_processor.Queue = Queue


[docs]def get_symbols(): return list( map(lambda q: q.upper(), filter(lambda i: 'usd' in i, requests.get('https://api.bitfinex.com/v1/symbols').json())) )
[docs]def get_symbols_as_updated(check_every=43200): symbols = set(get_symbols()) for s in list(sorted(symbols)): yield s while True: time.sleep(check_every) current_symbols = set(get_symbols()) new_symbols = current_symbols.difference(symbols) symbols.update(new_symbols) for s in list(sorted(new_symbols)): yield s
def _handle_ticker(data): return { 'bid': data[0][0], 'bid_size': data[0][1], 'ask': data[0][2], 'ask_size': data[0][3], 'mid': round((data[0][0] + data[0][2]) / 2, 4), 'weighted_mid': round((data[0][0] * data[0][1] + data[0][2] * data[0][3]) / (data[0][1] + data[0][3]), 4), 'last_price': data[0][6], 'spread': round(2 * (data[0][2] - data[0][0]) / (data[0][0] + data[0][2]), 8), 'change': data[0][5], 'volume': data[0][7], 'high': data[0][8], 'low': data[0][9], 'timestamp': round(time.time(), 0) } def _handle_trade(data): if data[0] == 'tu': return { 'id': data[1][0], 'timestamp': data[1][1] / 1000, 'price': data[1][3], 'amount': data[1][2] } def _handle_candle(data): return { 'timestamp': data[0][0] / 1000, 'open': data[0][1], 'high': data[0][2], 'low': data[0][3], 'close': data[0][4], 'volume': data[0][5] }
[docs]class PublicData: """ Wrapper for BtfxWss for access to public bitfinex data :param types: list: (optional) data types to filter data by (tickers, trades, candles) :param symbols: list: (optional) symbol to filter data by (e.g. ['BTCUSD', 'ETHUSD']) """ candle_periods = [ '1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D', '1M' ] def __init__(self, types=None, symbols=None): self.types = set(types or []) self.symbols = set(symbols or []) self._assert_types_are_correct(self.types) self.wss = btfxwss.BtfxWss(key='', secret='', log_level='CRITICAL') def __iter__(self): while True: types, symbols = self.types.copy(), self.symbols.copy() for _type in types: for symbol in symbols: try: yield _type, symbol, self.get_nowait(_type, symbol) except Empty: continue
[docs] def get(self, _type, symbol): """ Gets next available data (blocks until new datum is received) :param _type: str: type of data to get (e.g. 'tickers', 'candles'...) :param symbol: str: symbol for which to get data (e.g. 'BTCUSD', 'ETHUSD'...) :return: dict: requested data in json format :raises: AssertionError: if either of the provided type and symbol are unrecognized """ assert _type in self.types, "Unknown type '%s'" % _type assert symbol in self.symbols, "Unknown symbol '%s'" % symbol while True: try: return self._get_nowait(_type, symbol) except Empty: time.sleep(1e-6)
[docs] def get_nowait(self, _type, symbol): """ Gets currently available data with no blocking :param _type: str: type of data to get (e.g. 'tickers', 'candles'...) :param symbol: str: symbol for which to get data (e.g. 'BTCUSD', 'ETHUSD'...) :return: dict: requested data in json format :raises: AssertionError: if either of the provided type and symbol are unrecognized Empty: if no new data is available (same as queue.Queue api) """ assert _type in self.types, "Unknown type '%s'" % _type assert symbol in self.symbols, "Unknown symbol '%s'" % symbol return self._get_nowait(_type, symbol)
[docs] def subscribe(self, _type, symbol): """ Subscribe to a stream of data :param _type: str: type of data to get (e.g. 'tickers', 'candles'...) :param symbol: str: symbol for which to get data (e.g. 'BTCUSD', 'ETHUSD'...) """ self._assert_types_are_correct([_type]) self.types.add(_type) self.symbols.add(symbol) if _type == 'tickers': self.wss.subscribe_to_ticker(symbol) elif _type == 'trades': self.wss.subscribe_to_trades(symbol) elif isinstance(_type, tuple): self.wss.subscribe_to_candles(symbol, timeframe=_type[1])
[docs] def connect(self): """Open a connection to a Bitfinex websocket""" self.wss.start() while not self.wss.conn.connected.is_set(): time.sleep(1e-4) for _type in self.types: for symbol in self.symbols: self.subscribe(_type, symbol)
[docs] def close(self): """Close the connection to the Bitfinex websocket""" for symbol in self.symbols: with suppress(KeyError): self.wss.unsubscribe_from_ticker(symbol) with suppress(KeyError): self.wss.unsubscribe_from_trades(symbol) with suppress(KeyError): self.wss.unsubscribe_from_candles(symbol) self.wss.stop()
@staticmethod def _assert_types_are_correct(types): candle_types = [] for _type in types: if isinstance(_type, str): assert _type in {'tickers', 'trades'}, \ "String type must be either 'tickers' or 'trades'" else: candle_types.append(_type) for name, period in candle_types: assert name == 'candles', \ 'Candles types must be in the format ("candles", <period>)' assert period in PublicData.candle_periods, \ 'Candle period must be one of %s' % PublicData.candle_periods @staticmethod def _clean_data(_type, data): result = data[0] if _type == 'trades': result = data[0] if isinstance(data[0][0], str) \ else ['tu', data[0][0][0]] elif isinstance(_type, tuple): result = data[0] if isinstance(data[0][0][0], (int, float)) \ else [data[0][0][0]] return result def _get_queue(self, _type, symbol): if _type == 'tickers': pair = ('ticker', symbol) elif _type == 'trades': pair = ('trades', symbol) else: pair = ('candles', symbol, _type[1]) _type = 'candles' return getattr(self.wss.queue_processor, _type)[pair] def _get_nowait(self, _type, symbol): if _type == 'tickers': handler = _handle_ticker elif _type == 'trades': handler = _handle_trade else: handler = _handle_candle log.debug("Getting %s - %s from queues", str(_type), symbol) data = self._get_queue(_type, symbol).get_nowait() result = handler(self._clean_data(_type, data)) if result is None: log.debug("%s - %s queue is empty", _type, symbol) raise Empty return result