Repository With Sample Code
https://github.com/imwatsi/crypto-market-samples
Find the complete Python script on GitHub: bitfinex_websocket_multi.py
My Profile On GitHub
You can find code used in this tutorial as well as other tools and examples in the GitHub repositories under my profile:
What Will I Learn
- Subscribing to multiple ticker and candle channels
- Maintaining ticker and candle state in memory
- Reducing websocket latency using multithreading
Requirements
- Python 3.6+
- Dependencies:
websocket-client
- Active internet connection
Difficulty
Intermediate
Tutorial Content
To follow this tutorial, you should already know how to setup a basic websocket connection to the Bitfinex API. You can learn how to do this by following the previous tutorial in this series.
In this tutorial, you will learn how to subscribe to multiple ticker and candle channels, and maintain their state in memory, with low latency. Python's multithreading feature helps reduce latency by allowing operations to be performed synchronously.
Latency bottleneck created by on_message() function
Using the websocket-client
Python module to create a connection for fast data streams to Bitfinex's API introduces a potential bottleneck that can create latency. For strategies or scripts that subscribe to a large number of data streams, for example hundreds of order books or candles, the time it takes to receive and process updates needs to be as low as possible.
Since all updates are received on the on_message
function and it is not asynchronous, it means that any delay in that function will delay the processing of other messages. This can cause an outdated state of market data to be maintained in memory.
In this tutorial, we explore various optimizations we can use to reduce latency and improve the performance of this function as part of a larger trading system.
Using new threads to process messages
We use two functions to handle the parsing of candle and ticker updates, and start them in a new thread each time they are called. Like so:
chan_id = data[0]
if chan_id in channels:
if 'ticker' in channels[chan_id]:
# if channel is for ticker
if data[1] == 'hb':
# Ignore heartbeat messages
pass
else:
# parse ticker and save to memory
Thread(target=update_tickers, args=(data,)).start()
elif 'candles' in channels[chan_id]:
# if channel is for candles
if data[1] == 'hb':
# Ignore heartbeat messages
pass
else:
# parse candle update and save to memory
Thread(target=update_candles, args=(data,)).start()
This means that the message function will only have to start the new thread and free itself up for more messages, while the process of parsing and other related processes can be managed by individual threads that all update the same variables (candles
and tickers
).
Below is code for the two functions used in the script for this tutorial:
update_tickers()
def update_tickers(data):
global tickers
sym = channels[data[0]][1]
ticker_raw = data[1]
ticker_parsed = {
'bid': ticker_raw[0],
'ask': ticker_raw[2],
'last_price': ticker_raw[6],
'volume': ticker_raw[7],
}
tickers[sym] = ticker_parsed
This function takes the input and parses ticker info from it, and then saves it to the tickers
variable, with the symbol as a key.
update_candles()
def update_candles(data):
global candles
def truncate_market(str_data):
# Get market symbol from channel key
col1 = str_data.find(':t')
res = str_data[col1+2:]
return res
def parse_candle(lst_data):
# Get candle dictionary from list
return {
'mts': lst_data[0],
'open': lst_data[1],
'close': lst_data[2],
'high': lst_data[3],
'low': lst_data[4],
'vol': lst_data[5]
}
market = truncate_market(channels[data[0]][1])
# Identify snapshot (list=snapshot, int=update)
if type(data[1][0]) is list:
lst_candles = []
for raw_candle in data[1]:
candle = parse_candle(raw_candle)
lst_candles.append(candle)
candles[market] = lst_candles
elif type(data[1][0]) is int:
raw_candle = data[1]
lst_candles = candles[market]
candle = parse_candle(raw_candle)
if candle['mts'] == candles[market][0]['mts']:
# Update latest candle
lst_candles[0] = candle
candles[market] = lst_candles
elif candle['mts'] > candles[market][0]['mts']:
# Insert new (latest) candle
lst_candles.insert(0, candle)
candles[market] = lst_candles
This function takes a message from the candles channel, parses it and does one of the following:
- Initial Snapshot: save the whole list of candles
- Update to latest candle: save the new values to memory
- New candle addition: create a new candle entry and insert it in memory
Two nested functions, truncate_market
and parse_candle
help retrieve market symbol and parse individual candles to a format that is easy to work with.
Less code is desirable
Here is how the whole on_message()
function looks in our tutorial's script:
def on_message(ws, message):
global channels, balances, tickers
data = json.loads(message)
# Handle events
if 'event' in data:
if data['event'] == 'info':
pass # ignore info messages
elif data['event'] == 'auth':
if data['status'] == 'OK':
print('API authentication successful')
else:
print(data['status'])
# Capture all subscribed channels
elif data['event'] == 'subscribed':
if data['channel'] == 'ticker':
channels[data['chanId']] = [data['channel'], data['pair']]
elif data['channel'] == 'candles':
channels[data['chanId']] = [data['channel'], data['key']]
# Handle channel data
else:
chan_id = data[0]
if chan_id in channels:
if 'ticker' in channels[chan_id]:
# if channel is for ticker
if data[1] == 'hb':
# Ignore heartbeat messages
pass
else:
# parse ticker and save to memory
Thread(target=update_tickers, args=(data,)).start()
elif 'candles' in channels[chan_id]:
# if channel is for candles
if data[1] == 'hb':
# Ignore heartbeat messages
pass
else:
# parse candle update and save to memory
Thread(target=update_candles, args=(data,)).start()
By taking processes like parsing candles and tickers away from this function, the script can handle messages quickly. The general guideline here is to keep the code as lightweight as possible. Any other calculations such as technical analysis or strategy implementation should be in different threads, all referencing main variables (e.g. tickers
and candles
in this tutorial), that are updated frequently and in realtime by many different threads spawned off websocket messages.
You can further optimize the code in this tutorial by delegating most of the code found in the on_message()
function to another function opened in a new thread of its own. Things like checking message contents for its type, extracting channel IDs, etc can be handled by that separate function. We did not implement that in this example, for simplicity's sake, but it is possible.
Subscribe to multiple ticker channels
The rest of the code that makes up this tutorial's script includes the following:
def on_open(ws):
print('API connected')
for sym in symbols:
sub_tickers = {
'event': 'subscribe',
'channel': 'ticker',
'symbol': sym
}
ws.send(json.dumps(sub_tickers))
sub_candles = {
'event': 'subscribe',
'channel': 'candles',
'key': 'trade:15m:t' + sym
}
ws.send(json.dumps(sub_candles))
# start printing the books
Thread(target=print_details).start()
This subscribes to tickers and symbols for all chosen symbols. As an example, the script attached to this tutorial gets a list of all symbols available on the Bitfinex exchange and then filters the results to take only USD pairs. The code for this is below:
# load USD tickers
res = requests.get("https://api.bitfinex.com/v1/symbols")
all_sym = json.loads(res.content)
for x in all_sym:
if "usd" in x:
symbols.append(x.upper())
print('Found (%s) USD symbols' %(len(symbols)))
The interactive tool
There is an interactive function running in the background, that you can use to query for a symbol's latest ticker info and candle data. Here's the code:
def print_details():
# interactive function to view tickers and candles
while len(tickers) == 0 or len(candles) == 0:
# wait for tickers to populate
time.sleep(1)
print('Tickers and candles loaded. You may query a symbol now.')
while True:
symbol = input()
symbol = symbol.upper()
if symbol not in symbols:
print('%s not in list of symbols.' %(symbol))
continue
details = tickers[symbol]
print('%s: Bid: %s, Ask: %s, Last Price: %s, Volume: %s'\
%(symbol, details['bid'], details['ask'],\
details['last_price'], details['volume']))
print('%s: currently has (%s) candles, latest candle: %s'\
%(symbol, len(candles[symbol]), str(candles[symbol][0])))
To initialize the script, this is added at the end:
# initialize api connection
connect_api()
Running the script
The script is interactive, because of the print_details()
function. Once all data is loaded, you can type in a symbol name, e.g. BTCUSD and it will print the current ticker and latest candle data for it, as seen below.
Using the interactive tool
- After running the script, wait for it to print a statement saying
Tickers and candles loaded. You may query a symbol now.
- Type in a symbol name and press ENTER, e.g. btcusd (it is not case-sensitive) or ltcusd
- The current ticker details as well as the latest candle will be printed
Thank you for your contribution @imwatsi
After reviewing your contribution, we suggest you following points:
The tutorial is well structured and explained, however it is necessary to put more functionality into your contributions.
The GIF with the script results looks great.
Thank you for following some suggestions we put on your previous tutorial.
Looking forward to your upcoming tutorials.
Your contribution has been evaluated according to Utopian policies and guidelines, as well as a predefined set of questions pertaining to the category.
To view those questions and the relevant answers related to your post, click here.
Need help? Chat with us on Discord.
[utopian-moderator]
Downvoting a post can decrease pending rewards and make it less visible. Common reasons:
Submit
Thank you for the review.
Downvoting a post can decrease pending rewards and make it less visible. Common reasons:
Submit
Thank you for your review, @portugalcoin! Keep up the good work!
Downvoting a post can decrease pending rewards and make it less visible. Common reasons:
Submit
Hi @imwatsi!
Your post was upvoted by @steem-ua, new Steem dApp, using UserAuthority for algorithmic post curation!
Your post is eligible for our upvote, thanks to our collaboration with @utopian-io!
Feel free to join our @steem-ua Discord server
Downvoting a post can decrease pending rewards and make it less visible. Common reasons:
Submit
Hey, @imwatsi!
Thanks for contributing on Utopian.
We’re already looking forward to your next contribution!
Get higher incentives and support Utopian.io!
Simply set @utopian.pay as a 5% (or higher) payout beneficiary on your contribution post (via SteemPlus or Steeditor).
Want to chat? Join us on Discord https://discord.gg/h52nFrV.
Vote for Utopian Witness!
Downvoting a post can decrease pending rewards and make it less visible. Common reasons:
Submit