Raspberry Piで取得した心拍データをAWSへ送信する方法

心拍等のデータをクラウド上に送信し、機械学習を行ってみたい。
ということで、心拍データをクラウドに送信するところをやってみたいと思います。

心拍データを取得する

最初は心拍データってどうやって取得するの?っていう状態でしたが、こちらの記事を参考にして心拍データの取得に成功しました。

https://qiita.com/kswc/items/6e2e7214738e8d30d91a

用意したもの

せっかく購入してデータが取れないっていうの嫌だったので、Qiitaの記事に紹介されているものと同じものを購入しました。

  • Raspberry Pi 3 Model b+
  • GARMIN vivosmart HR J
  • Suunto Movestick Mini

GARMIN vivosmart HR Jについてはamazonだと中古のみ取り扱っていました。(2019年8月現在)

Suunto Movestick Miniの設定

Suunto Movestick Miniをラズパイにセットします。
セットしたらコマンドを入力します。

$ dmesg | tail

結果が表示されます。2行目のidProduct=1008が使用する値です。1008という値は環境により変わることがあるようです。

[  202.018309] usb 1-1.1.2: new full-speed USB device number 10 using dwc_otg
[  202.160558] usb 1-1.1.2: New USB device found, idVendor=0fcf, idProduct=1008, bcdDevice= 1.00
[  202.160574] usb 1-1.1.2: New USB device strings: Mfr=1, Product=2, SerialNumber=3
[  202.160584] usb 1-1.1.2: Product: Movestick mini
[  202.160593] usb 1-1.1.2: Manufacturer: Suunto
[  202.160602] usb 1-1.1.2: SerialNumber: 1722800196

次に以下のコマンドを使いgarmin-ant2.rulesを作成します。

$ sudo idle /etc/udev/rules.d/garmin-ant2.rules

以下の1行をgarmin-ant2.rulesにコピペします。

SUBSYSTEM=="usb", ATTRS{idVendor}=="0fcf", ATTRS{idProduct}=="1008", RUN+="/sbin/modprobe usbserial vendor=0x0fcf product=0x1008", MODE="0666", OWNER="pi", GROUP="root"

一度ラズパイを再起動し、python-antをインストールします。

$ sudo pip install git+https://github.com/baderj/python-ant.git

ソースコード

コードについては参考にさせてもらったものとほぼ同じです。

import sys
import time
from ant.core import driver, node, event, message, log
from ant.core.constants import CHANNEL_TYPE_TWOWAY_RECEIVE, TIMEOUT_NEVER

class HeartRateMonitor(event.EventCallback):

    DEFAULT_SERIAL = "/dev/ttyUSB0"
    DEFAULT_NETKEY = 'B9A521FBBD72C345'.decode('hex')

    def __init__(self, call_func, serial = None, netkey = None):
        self.call_func = call_func
        self.serial = serial or HeartRateMonitor.DEFAULT_SERIAL
        self.netkey = netkey or HeartRateMonitor.DEFAULT_NETKEY
        self.antnode = None
        self.channel = None

    def start(self):
        self._start_antnode()
        self._setup_channel()
        self.channel.registerCallback(self)

    def stop(self):
        if self.channel:
            self.channel.close()
            self.channel.unassign()
        if self.antnode:
            self.antnode.stop()

    def __enter__(self):
        return self

    def __exit__(self, type_, value, traceback):
        self.stop()

    def _start_antnode(self):
        stick = driver.USB2Driver(self.serial)
        self.antnode = node.Node(stick)
        self.antnode.start()

    def _setup_channel(self):
        key = node.NetworkKey('N:ANT+', self.netkey)
        self.antnode.setNetworkKey(0, key)
        self.channel = self.antnode.getFreeChannel()
        self.channel.name = 'C:HRM'
        self.channel.assign('N:ANT+', CHANNEL_TYPE_TWOWAY_RECEIVE)
        self.channel.setID(120, 0, 0)
        self.channel.setSearchTimeout(TIMEOUT_NEVER)
        self.channel.setPeriod(8070)
        self.channel.setFrequency(57)
        self.channel.open()

    def process(self, msg):
        if isinstance(msg, message.ChannelBroadcastDataMessage):
            heart_rate = int(ord(msg.payload[-1]))
            self.call_func(heart_rate)

def my_print(data):
    print(data)

if __name__ == '__main__':
    hrm = HeartRateMonitor(my_print)
    print("monitor start")
    hrm.start()
    print("monitor start listening")
    print("press Ctrl-C to stop this script")
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            print("monitor stop")
            hrm.stop()
            sys.exit(0)

Garmin vivosmart HR Jを装着して、心拍転送モードに切り替えます。
画面をスライドして心拍数表示画面に切り替え、画面を2秒ほど長押しすると切り替わります。
heart_rate_monitor.pyを実行して、以下のように心拍数が表示されれば成功。

$ python heart_rate_monitor.py
monitor start
monitor start listening
press Ctrl-C to stop this script
68
68
68
...

ここまでできたら心拍データの取得は完了です。

心拍データをAWS IoT Coreへ送信する

AWS IoT Coreへのデータ送信方法は別の記事で詳しく紹介しています。

https://lifetime-engineer.com/pc-aws-iot-connection-part1/

https://lifetime-engineer.com/pc-aws-iot-connection-part2/

ソースコード

参考サイトのコードをベースにMQTTを使ってAWS IoT Coreへデータを送信するようにしています。

# coding:utf-8

import sys
import json
import datetime
from time import sleep
from heart_rate_monitor import HeartRateMonitor
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient

class HeartRatePublish:

    def __init__(self):
        self.monitor = HeartRateMonitor(self.check_heart_rate)
        print("monitor start")
        self.monitor.start()
        print("monitor listening")

    def check_heart_rate(self, hr):
        dict = {"date":str(datetime.datetime.now()),"HeartRate": str(hr)}
        enc = json.dumps(dict)
        print(enc)
        myMQTTClient.publish("myTopic/HeartRate", enc, 0)

    def stop(self):
        print("monitor stop")

    def my_print(self, hr):
        print(hr)

if __name__ == '__main__':
    myMQTTClient = AWSIoTMQTTClient('myClientID')
    myMQTTClient.configureEndpoint('aujptitwxadlc-ats.iot.ap-northeast-1.amazonaws.com', 8883) # 設定画面のエンドポイントの値
    myMQTTClient.configureCredentials("AmazonRootCA1.pem", "e9b3eb7903-private.pem.key", "e9b3eb7903-certificate.pem.crt")
    myMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
    myMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
    myMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
    myMQTTClient.configureMQTTOperationTimeout(5) # 5 sec

    myMQTTClient.connect()

    publish = HeartRatePublish()
    print("press Ctrl-C to stop this script.")

    while True:
        try:
            sleep(1)
        except KeyboardInterrupt:
            myMQTTClient.disconnect()
            publish.stop()
            sys.exit(0)

heart_rate_publish.pyを実行します。

$ python heart_rate_publish.py

実行し以下のような出力がされれば成功です。

AWS IoT Core側でmyTopic/HeartRateをサブスクライブしておけば以下のようにデータの受信ができます。

今後の展望

  • python3での実装をしたい
  • Suunto Movestick Miniが認識しないことがあるので、対策を行う

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

CAPTCHA