心拍等のデータをクラウド上に送信し、機械学習を行ってみたい。
ということで、心拍データをクラウドに送信するところをやってみたいと思います。
心拍データを取得する
最初は心拍データってどうやって取得するの?っていう状態でしたが、こちらの記事を参考にして心拍データの取得に成功しました。
https://qiita.com/kswc/items/6e2e7214738e8d30d91a
用意したもの
せっかく購入してデータが取れないっていうの嫌だったので、Qiitaの記事に紹介されているものと同じものを購入しました。
- Raspberry Pi 3 Model b+
- GARMIN vivosmart HR J
- Suunto Movestick Mini
GARMIN vivosmart HR J
についてはamazonだと中古のみ取り扱っていました。(2019年8月現在)
Suunto Movestick Miniの設定
Suunto Movestick Miniをラズパイにセットします。
セットしたらコマンドを入力します。
$ dmesg | tail
結果が表示されます。2行目のidProduct=1008
が使用する値です。1008という値は環境により変わることがあるようです。
[ 202.018309] usb 1-1.1.2: new full-speed USB device number 10 using dwc_otg
[ 202.160558] usb 1-1.1.2: New USB device found, idVendor=0fcf, idProduct=1008, bcdDevice= 1.00
[ 202.160574] usb 1-1.1.2: New USB device strings: Mfr=1, Product=2, SerialNumber=3
[ 202.160584] usb 1-1.1.2: Product: Movestick mini
[ 202.160593] usb 1-1.1.2: Manufacturer: Suunto
[ 202.160602] usb 1-1.1.2: SerialNumber: 1722800196
次に以下のコマンドを使いgarmin-ant2.rules
を作成します。
$ sudo idle /etc/udev/rules.d/garmin-ant2.rules
以下の1行をgarmin-ant2.rules
にコピペします。
SUBSYSTEM=="usb", ATTRS{idVendor}=="0fcf", ATTRS{idProduct}=="1008", RUN+="/sbin/modprobe usbserial vendor=0x0fcf product=0x1008", MODE="0666", OWNER="pi", GROUP="root"
一度ラズパイを再起動し、python-antをインストールします。
$ sudo pip install git+https://github.com/baderj/python-ant.git
ソースコード
コードについては参考にさせてもらったものとほぼ同じです。
import sys
import time
from ant.core import driver, node, event, message, log
from ant.core.constants import CHANNEL_TYPE_TWOWAY_RECEIVE, TIMEOUT_NEVER
class HeartRateMonitor(event.EventCallback):
DEFAULT_SERIAL = "/dev/ttyUSB0"
DEFAULT_NETKEY = 'B9A521FBBD72C345'.decode('hex')
def __init__(self, call_func, serial = None, netkey = None):
self.call_func = call_func
self.serial = serial or HeartRateMonitor.DEFAULT_SERIAL
self.netkey = netkey or HeartRateMonitor.DEFAULT_NETKEY
self.antnode = None
self.channel = None
def start(self):
self._start_antnode()
self._setup_channel()
self.channel.registerCallback(self)
def stop(self):
if self.channel:
self.channel.close()
self.channel.unassign()
if self.antnode:
self.antnode.stop()
def __enter__(self):
return self
def __exit__(self, type_, value, traceback):
self.stop()
def _start_antnode(self):
stick = driver.USB2Driver(self.serial)
self.antnode = node.Node(stick)
self.antnode.start()
def _setup_channel(self):
key = node.NetworkKey('N:ANT+', self.netkey)
self.antnode.setNetworkKey(0, key)
self.channel = self.antnode.getFreeChannel()
self.channel.name = 'C:HRM'
self.channel.assign('N:ANT+', CHANNEL_TYPE_TWOWAY_RECEIVE)
self.channel.setID(120, 0, 0)
self.channel.setSearchTimeout(TIMEOUT_NEVER)
self.channel.setPeriod(8070)
self.channel.setFrequency(57)
self.channel.open()
def process(self, msg):
if isinstance(msg, message.ChannelBroadcastDataMessage):
heart_rate = int(ord(msg.payload[-1]))
self.call_func(heart_rate)
def my_print(data):
print(data)
if __name__ == '__main__':
hrm = HeartRateMonitor(my_print)
print("monitor start")
hrm.start()
print("monitor start listening")
print("press Ctrl-C to stop this script")
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
print("monitor stop")
hrm.stop()
sys.exit(0)
Garmin vivosmart HR Jを装着して、心拍転送モードに切り替えます。
画面をスライドして心拍数表示画面に切り替え、画面を2秒ほど長押しすると切り替わります。
heart_rate_monitor.pyを実行して、以下のように心拍数が表示されれば成功。
$ python heart_rate_monitor.py
monitor start
monitor start listening
press Ctrl-C to stop this script
68
68
68
...
ここまでできたら心拍データの取得は完了です。
心拍データをAWS IoT Coreへ送信する
AWS IoT Coreへのデータ送信方法は別の記事で詳しく紹介しています。
https://lifetime-engineer.com/pc-aws-iot-connection-part1/
https://lifetime-engineer.com/pc-aws-iot-connection-part2/
ソースコード
参考サイトのコードをベースにMQTTを使ってAWS IoT Coreへデータを送信するようにしています。
# coding:utf-8
import sys
import json
import datetime
from time import sleep
from heart_rate_monitor import HeartRateMonitor
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
class HeartRatePublish:
def __init__(self):
self.monitor = HeartRateMonitor(self.check_heart_rate)
print("monitor start")
self.monitor.start()
print("monitor listening")
def check_heart_rate(self, hr):
dict = {"date":str(datetime.datetime.now()),"HeartRate": str(hr)}
enc = json.dumps(dict)
print(enc)
myMQTTClient.publish("myTopic/HeartRate", enc, 0)
def stop(self):
print("monitor stop")
def my_print(self, hr):
print(hr)
if __name__ == '__main__':
myMQTTClient = AWSIoTMQTTClient('myClientID')
myMQTTClient.configureEndpoint('aujptitwxadlc-ats.iot.ap-northeast-1.amazonaws.com', 8883) # 設定画面のエンドポイントの値
myMQTTClient.configureCredentials("AmazonRootCA1.pem", "e9b3eb7903-private.pem.key", "e9b3eb7903-certificate.pem.crt")
myMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
myMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myMQTTClient.configureMQTTOperationTimeout(5) # 5 sec
myMQTTClient.connect()
publish = HeartRatePublish()
print("press Ctrl-C to stop this script.")
while True:
try:
sleep(1)
except KeyboardInterrupt:
myMQTTClient.disconnect()
publish.stop()
sys.exit(0)
heart_rate_publish.pyを実行します。
$ python heart_rate_publish.py
実行し以下のような出力がされれば成功です。
AWS IoT Core側でmyTopic/HeartRate
をサブスクライブしておけば以下のようにデータの受信ができます。
今後の展望
- python3での実装をしたい
- Suunto Movestick Miniが認識しないことがあるので、対策を行う