Python, ANT+, a dude and his Bike...

in exhaust •  5 years ago  (edited)

I've had an idea floating around in my head recently -- mostly inspired from my buddies that are into cycling, and my endeavouring to learn some Python. Now that I think about it -- I've learned quite a lot of python over the last year or so... Picking a project and running with it is a pretty great way to learn (bad habits).


  • I've got a bike a home w/ a neat indoor trainer set up. Means I can get sweaty while going precisely nowhere, with the added benefit of terrifying my dog the whole time;
  • Said bicycle has ANT+ speed and cadence sensors, and I've got a comparable heart-rate monitor as well;
  • I've got an ANT+ USB dongle that acts as a receiver; and
  • I recently discovered THIS python library that implements the ANT protocols:

Python implementation of the ANT, ANT+, and ANT-FS protocols. For more information about ANT, see

Tape that shit on
Bike and Sensors

Lets play!

Add the afforementioned python library to your projects requirements.txt file:

# requirements.txt

pip install -r requirements.txt


Start up a new python file called, and copy/paste the following:

import math
import re
import sys
import time

from ant.core import driver, node, event, message
from ant.core.constants import *

NETKEY = b'\xB9\xA5\x21\xFB\xBD\x72\xC3\x45'

class ANTListener(event.EventCallback):
    last_speed_time = 0
    last_speed_revs = 0
    now_speed_revs = 0
#    fivesec_speed_revs = 0
#    tensec_speed_revs = 0
    last_cadence_time = 0
    last_cadence_revs = 0
    now_cadence_revs = 0
#    fives_cadence_revs = 0
#    tens_cadence_revs = 0
    wheel_diameter = 0.673 ## Wheel diameter in m

    def process(self, msg, _channel):
        if isinstance(msg, message.ChannelBroadcastDataMessage):
            self.last_cadence_revs = self.now_cadence_revs
            self.last_speed_revs = self.now_speed_revs
            ## Process Cadence Cumulative Revolutions
            ## ctimeLSB = msg.payload[1],ctimeMSB = msg.payload[2],cadenceLSB = msg.payload[3], cadenceMSB = msg.payload[4]
            ctime = int(format(msg.payload[2],'#010b')+re.sub('0b','',format(msg.payload[1],'#010b')),2)
            self.now_cadence_revs = int(format(msg.payload[4],'#010b')+re.sub('0b','',format(msg.payload[3],'#010b')),2)
            ## Precess Speed Cumulative Revolutions
            ## stimeLSB = msg.payload[5],stimeMSB = msg.payload[6],speedLSB = msg.payload[7],speedMSB = msg.payload[8]
            stime = int(format(msg.payload[6],'#010b')+re.sub('0b','',format(msg.payload[5],'#010b')),2)
            self.now_speed_revs = int(format(msg.payload[8],'#010b')+re.sub('0b','',format(msg.payload[7],'#010b')),2)

            speed_rev_delta = self.now_speed_revs - self.last_speed_revs
            speed_time_delta = stime - self.last_speed_time
            cadence_rev_delta = self.now_cadence_revs - self.last_cadence_revs
            cadence_time_delta = ctime - self.last_speed_time
            speed = (speed_rev_delta/speed_time_delta)*1024*(self.wheel_diameter*math.pi*60*60/1000) ## rev/sec x 0.673*math.pi m/rev x 1km/1000m x 60s/min x 60min/hr
            cadence = (cadence_rev_delta/cadence_time_delta)*1024*60

            print("Cadence Timestamps: ",ctime/1024,"|| Cadence Revolutions: ",self.now_cadence_revs)
            print("Cadence Rev Delta: ", cadence_rev_delta," Cadence Time Delta", cadence_time_delta," cadence: ",cadence," RPM")
            print("Speed Timestamps: ",stime/1024,"|| Speed Revolutions: ",self.now_speed_revs)
            print("Speed Rev Delta: ", speed_rev_delta," Speed Time Delta", speed_time_delta," Speed: ",speed," km/hr")
            self.last_speed_time = stime
            self.last_cadence_time = ctime

    antDevice = '/dev/ttyUSB0'
    # antProduct=0x1009
    stick = driver.USB2Driver(idVendor=antVendor, idProduct=antProduct)
    antnode = node.Node(stick)
    channel = antnode.getFreeChannel()

    def setup(self):
        # Start shit up
        #stick = driver.USB2Driver()
        if not self.antnode.running:

        # Setup channel
        net = node.Network(name='N:ANT+', key=NETKEY)
        self.antnode.setNetworkKey(0, net) = 'C:HRM', CHANNEL_TYPE_TWOWAY_RECEIVE), 0, 0) = TIMEOUT_NEVER = 8070 = 57

    def start_listen(self):
        if not self.antnode.running:
        # Setup callback
        # Note: We could also register an event listener for non-channel events by
        # calling registerEventListener() on antnode rather than channel.

    def stop_listen(self):
        # Shutdown

The above script generally does the following:

  • setup() preps the ANT stick for the appropriate idVendor and idProduct, and finds a free channel to work on;
    • Currently, we set up a channel only for our speed/cadence sensors;
    • You can find your device type (if you're a linux user) via lsusb.
  • start_listen() starts the device listening for broadcast events from any sensors matching our specified type;
  • stop_listen() stops the device and closes the channel and antnode -- freeing the USB device to use again;
    • For whatever reason, I end up having to unplug / re-plug the device to get it connected again.. whatever...
  • Listens for broadcast events on the channel

Some notes on how this stuff works:

  • Interpreting / handling the data was a massive pain in the ass... for whatever reason, the data is sent out in byte-arrays and required a fair deal of converting back and forth between binary and integer formatting...
  • Had to learn a bit about MSB and LSB bit numbering... That was fun;
  • If your wheels are a different size, you'll probably want to update the wheel_diameter value -- mine is 673mm;
  • Script spits out your instantaneous cadence in RPM's, and your instantaneous speed in km/hr -- if you want your shit Americanized, you can 'GIT OUT:

So assuming that you've punched in the appropriate antProduct listed in your lsusb, and updated for any difference in wheel size -- you SHOULD be up and rolling:

Open up your terminal:

source env/bin/activate
import cadence
listener = cadence.ANTListener()

you SHOULD start seeing outputs like the following:

Cadence Timestamps:  6.95703125 || Cadence Revolutions:  285
Cadence Rev Delta:  4  Cadence Time Delta 2742  cadence:  89.62800875273523  RPM
Speed Timestamps:  7.2412109375 || Speed Revolutions:  1581
Speed Rev Delta:  8  Speed Time Delta 3033  Speed:  20.55819452018244  km/hr
Cadence Timestamps:  9.6435546875 || Cadence Revolutions:  289
Cadence Rev Delta:  4  Cadence Time Delta 2460  cadence:  99.90243902439025  RPM
Speed Timestamps:  10.1884765625 || Speed Revolutions:  1589
Speed Rev Delta:  8  Speed Time Delta 3018  Speed:  20.66037242535233  km/hr
Cadence Timestamps:  13.0166015625 || Cadence Revolutions:  294
Cadence Rev Delta:  5  Cadence Time Delta 2896  cadence:  106.07734806629834  RPM
Speed Timestamps:  13.140625 || Speed Revolutions:  1597
Speed Rev Delta:  8  Speed Time Delta 3023  Speed:  20.62620045640534  km/hr
Cadence Timestamps:  15.744140625 || Cadence Revolutions:  298
Cadence Rev Delta:  4  Cadence Time Delta 2666  cadence:  92.18304576144037  RPM
Speed Timestamps:  16.130859375 || Speed Revolutions:  1605
Speed Rev Delta:  8  Speed Time Delta 3062  Speed:  20.363489216104945  km/hr
Cadence Timestamps:  18.486328125 || Cadence Revolutions:  302
Cadence Rev Delta:  4  Cadence Time Delta 2412  cadence:  101.8905472636816  RPM
Speed Timestamps:  18.7626953125 || Speed Revolutions:  1612
Speed Rev Delta:  7  Speed Time Delta 2695  Speed:  20.24448181159524  km/hr

So that's it that's all! That's how you can whip up a quick and dirty ANT+ speed/cadence reader...

Next steps are up to you -- I've got some cool ideas for what I want to accomplish... Images below paint a bit of a story for what I'm thinking...

Zwift Cycling Game
Open Source Game Engine

What projects / ideas are you working on? Do you like to have some kind of creative fire burning? I feel like most people here are pretty creative -- that's kind of the "early adopter" crowd that Steem has attracted.

Do you think you'd play a cycling game where you can meetup digitally and race other Steemians / Exhaust-o-nauts? I probably would, so long as I didn't have to go up against or @jgrieco. This would probably be a fun way to have another "Solstice Race" event...

Authors get paid when people like you upvote their post.
If you enjoyed what you read here, create your account today and start earning FREE STEEM!
Sort Order:  

I can see that was too cool to resist. I really ought to look into what I can do with Python as I use it at work. Maybe I can automate some of my posts a bit.

Yeah, man! Play around and see what you can come up with.

@holger80's beem library is a pretty great resource for playing w/ python and Steem.

I've seen @felixxx has some tutorials I will bookmark. It's just a matter of finding time.

I am quite proud of part 3 tbh :)

Looks cool! I might give 'em a read!

You probably won't need them anymore. It is fairly basic stuff and only meant to bridge the gap between knowing some Python and actually getting a tool out for STEEM.

I think I would in fact I stopped cycling when the rainy season started, I'm happy to say that is mostly gone which means I'll be able to enjoy cycling around more but I would def play at night or maybe when rain is too bad.

Cool! Good to know!

I think it would be a strong community builder, and a great way to chat w/ some of the stronger athletes. I'm actually really excited about working on this.... Hoping to have a proof-of-concept fleshed out in the coming weeks....

Ideally -- it would integrate w/ EXHAUST nicely, too!

Think on building a developers support group as well I could also help if needed. Maybe start adding few tasks in @utopian-io and maybe get some support from the devlepers