mbed で非同期メッセージドリブンなスレッドクラスを作った

mbed rtosのThreadクラスはそのままでも簡単ではあるのですが、Thread と Mail をより簡単に使うための非同期処理用スレッドクラス Actor を作りました。

http://developer.mbed.org/users/mzta/code/actor/

サンプル1
まず、イベントドリブンにするためのイベント登録処理のサンプルです。

#include "mbed.h"
#include "Actor.h"

class Person : public Actor<Person> {
public:
    Person() {
        subscribe(MSG_GREET, &Person::greet);
    }

    void greet(void *p) {
        printf("Hi! %s\n", (const char *)p);
    }

    enum Messages {
        MSG_GREET = 0x01
    };
};

int main()
{
    Person *taro = new Person();

    sendMail(taro, Person::MSG_GREET, (void *)"Jiro");

    while (true) {}
}

/**
 * execution result :
 * 
 * > Hi! Jiro
 * > 
 */

Actor テンプレートクラスから、Person クラスを作成して、そのコンストラクタメンバ関数 Person::greet() を Person::MSG_GREET というメッセージIDに登録しています。

main 関数では、Person インスタンス taro を作成して、taro に Person::MSG_GREET のメールを送ります。メールでは任意のデータポインタを渡せるので、ここではconst char * "Jiro"を渡します。メールを受信した taro スレッドは、greet() をコールして "Hi! Jiro" を出力しますが、このサンプルでは別スレッドで動いているのがわかりません。次の非同期処理のテストで確認します。

サンプル2
RTOS で非同期処理をやりたいのってどんなシーンでしょう。ストリーミングデータをパケット解析して、逐次上位に渡すようなバケツリレーの処理を思いつくのですが、今回は時間がかかる処理をパイプラインのように処理するようなサンプルを作ってみました。

Taskクラスは、MSG_JOB_START を受けて job() を実行し、完了したらpipelineの次のTaskインスタンスに MSG_JOB_START メッセージを送るクラスです。job() はループで時間稼ぎするだけの無意味な関数で、開始時刻、終了時刻を出力します。TaskA, B, C の3つのタスクを用意し、3段パイプラインのような動きをするテストです。TaskCだけプライオリティを高くしています。

#include <stdlib.h>
#include <string>
#include <list>
#include "mbed.h"
#include "Actor.h"
#include "rtos.h"

Serial pc(USBTX, USBRX);
Timer tim;

class Task : public Actor<Task> {
public:
    typedef std::list<Task *> Pipeline;

    Task(const char *name_str) : name(name_str) {
        subscribe(MSG_JOB_START, &Task::job);
    }

    void job(void *p) {
        int cnt = rand() % 100000000;
        printf("%6d [%s] start\n", tim.read_ms(), name.c_str());
        for (int i = 0; i < cnt; i++) { /* nop */ }
        printf("%6d [%s] end\n", tim.read_ms(), name.c_str());

        Pipeline *pipeline = (Pipeline *)p;
        if (!pipeline->empty()) {
            Task *next = pipeline->front();
            pipeline->pop_front();
            sendMail(next, MSG_JOB_START, pipeline);
        } else {
            delete pipeline;
        }
    }

    static const MessageID MSG_JOB_START = 0x01;

private:
    string name;

public:
    void setPriority(osPriority lv) { thread.set_priority(lv); }
};

int main()
{
    tim.start();

    Task *taskA = new Task("TaskA");
    Task *taskB = new Task("TaskB");
    Task *taskC = new Task("TaskC");

    taskC->setPriority(osPriorityHigh);

    for (int i=0; i<5; i++) {
        Task::Pipeline *pipeline = new Task::Pipeline();
        pipeline->push_back(taskB);
        pipeline->push_back(taskC);
        sendMail(taskA, Task::MSG_JOB_START, pipeline);     // input job
    }

    while (true) {}
}

/**
 * execution result :
 *
 * >     4 [TaskA] start
 * >   509 [TaskA] end
 * >   538 [TaskA] start
 * >   544 [TaskB] start
 * >   636 [TaskA] end
 * >   678 [TaskB] end
 * >   679 [TaskA] start
 * >   713 [TaskC] start
 * >  1215 [TaskC] end
 * >  1237 [TaskB] start
 * >  1341 [TaskA] end
 * >  1385 [TaskA] start
 * >  1819 [TaskA] end
 * >  1863 [TaskA] start
 * >  2559 [TaskB] end
 * >  2603 [TaskC] start
 * >  2890 [TaskC] end
 * >  2908 [TaskB] start
 * >  3341 [TaskA] end
 * >  3867 [TaskB] end
 * >  3897 [TaskC] start
 * >  3952 [TaskC] end
 * >  3970 [TaskB] start
 * >  4831 [TaskB] end
 * >  4860 [TaskC] start
 * >  5276 [TaskC] end
 * >  5294 [TaskB] start
 * >  5818 [TaskB] end
 * >  5847 [TaskC] start
 * >  6033 [TaskC] end
 */

図にすると、投入された5つのジョブがパイプラインのように実行できました。

f:id:mzta:20150528130416p:plain

まとめ
スレッド毎にクラスを定義してやる必要がありますが、簡単にメッセージドリブンのスレッドを作ることができました。
なぜこれを作ったかというと、mruby-mbed でこんな感じのスレッドを試したかったからなのですが、subscribe でメンバ関数しか登録できないのは不便なので、テンプレートクラスをやめて、外の関数ポインタが登録できるようにした方が良い気がしてきました。