FIWARE Banner

FIWARE Core Context Management License: MIT Support badge NGSI v2
Documentation

このチュートリアルは FIWARE Cosmos Orion Flink Connector の紹介です。これは、最も 人気のあるビッグデータ・プラットフォームの1つである Apache Flink との統合により、コンテキスト ・データのビッグデータ分析を容易にします。Apache Flink は、無制限および有界のデータ・ストリーム上でステートフルな計算を 行うためのフレームワークおよび分散処理エンジンです。Flink は、すべての一般的なクラスタ環境で実行され、メモリ内の速度と 任意の規模で計算を実行するように設計されています。

チュートリアルでは cUrl コマンドを使用しますが、 Postman ドキュメント としても利用可能です。

Run in Postman


リアルタイム処理とビッグデータ分析

"Who controls the past controls the future: who controls the present controls the past."

— George Orwell. "1984"

FIWARE に基づくスマート・ソリューションは、マイクロサービスを中心に設計されています。したがって、シンプルな アプリケーション (スーパーマーケット・チュートリアルなど) から、IoT センサやその他のコンテキスト・データ・プロバイダの 大規模な配列に基づく都市全体のインストールにスケールアップするように設計されています。

関与する膨大な量のデータは、1台のマシンで分析、処理、保存するには膨大な量になるため、追加の分散サービスに作業を委任する 必要があります。これらの分散システムは、いわゆる ビッグデータ分析 の基礎を形成します。タスクの分散により、開発者は、 従来の方法では処理するには複雑すぎる巨大なデータ・セットから洞察を抽出することができます。隠れたパターンと相関関係を 明らかにします。

これまで見てきたように、コンテキスト・データはすべてのスマート・ソリューションの中核であり、Context Broker は状態の変化を 監視し、コンテキストの変化に応じて、サブスクリプション・イベント を発生させることができます。小規模なインストールの場合、各サブスクリプション・イベントは単一の受信エンドポイントで1つずつ 処理できますが、システムが大きくなると、リスナーを圧倒し、潜在的にリソースをブロックし、更新が失われないようにするために 別の手法が必要になります。

Apache Flink は、データ・フロー・プロセスの委任を可能にする Java/Scala ベースのストリーム処理フレームワークです。 したがって、イベントの到着時にデータを処理するために、追加の計算リソースを呼び出すことができます。 Cosmos Flink コネクタを使用すると、開発者はカスタム・ビジネスロジックを記述して、コンテキスト・データのサブスクリプション・イベントを リッスンし、コンテキスト・データのフローを処理できます。 Flink はこれらのアクションを他のワーカーに委任することができ、 そこで必要に応じて順次または並行してアクションが実行されます。データフローの処理自体は、任意に複雑にすることができます。

実際には、明らかに、既存のスーパーマーケット・シナリオは小さすぎてビッグデータ・ソリューションを使用する必要はありませんが、 コンテキスト・データ・イベントの連続ストリームを処理する大規模ソリューションで必要となる可能性のある、リアルタイム処理の タイプを実証するための基盤として機能します。

アーキテクチャ

このアプリケーションは、以前のチュートリアルで作成されたコンポーネントと ダミー IoT デバイス上に構築されます。 3つの FIWARE コンポーネントを使用します。 Orion Context Broker, IoT Agent for Ultralight 2.0 および Orion を Apache Flink cluster クラスタに接続する ための Cosmos Orion Flink Connector です。Flink クラスタ自体は、 実行を調整する単一の JobManager master と、タスクを実行する単一の TaskManager worker で構成されます。

Orion Context Broker と IoT Agent はどちらも、オープンソースの MongoDB テクノロジーに依存して、 保持している情報の永続性を維持しています。また、以前のチュートリアルで 作成したダミー IoT デバイスを使用します。

したがって、全体的なアーキテクチャは次の要素で構成されます :

  • 独立したマイクロサービスとしての2つの FIWARE Generic Enablers :
    • FIWARE Orion Context Brokerは、 NGSI-v2 を使用してリクエストを受信します
    • FIWARE IoT Agent for Ultralight 2.0 は、ダミー IoT デバイスから Ultralight 2.0 形式のノースバウンド測定値を受信し、Context Broker の NGSI-v2 リクエストに変換して、コンテキスト・ エンティティの状態を変更します
  • Apache Flink cluster は、 単一の JobManager と単一の TaskManager で構成されます
    • FIWARE Cosmos Orion Flink Connector は、 コンテキストの変更をサブスクライブし、リアルタイムでそれらの操作を実際に行うデータフローの一部として デプロイされます
  • 1つの MongoDB データベース :
    • Orion Context Broker がデータ・エンティティ、サブスクリプション、レジストレーションなどの コンテキスト・データ情報を保持するために使用します
    • IoT Agent がデバイスの URL やキーなどのデバイス情報を保持するために使用します
  • 3つのコンテキスト・プロバイダ :
    • HTTP 上で実行される Ultralight 2.0 を使用する、ダミー IoT デバイス のセットとして 機能する Webサーバ
    • 在庫管理フロントエンド は、このチュートリアルでは使用しません。次のことを行います :
      • ストア情報を表示し、ユーザがダミー IoT デバイスと対話できるようにします
      • 各ストアで購入できる製品を表示します
      • ユーザが製品を "購入" して在庫数を減らすことを許可します
    • Context Provider NGSI プロキシは、このチュートリアルでは使用しません。次のことを行います :
      • NGSI-v2 を使用してリクエストを受信します
      • 独自形式の独自 API を使用して、公開されているデータソースへのリクエストを行います
      • コンテキスト・データをNGSI-v2 形式で Orion Context Broker に返します

全体のアーキテクチャを以下に示します :

要素間の相互作用はすべて HTTP リクエストによって開始されるため、エンティティはコンテナ化され、公開されたポートから 実行できます。

Apache Flink クラスタの設定情報は、関連する docker-compose.yml ファイルの jobmanager および taskmanager セクションで確認できます :

Flink Cluster の設定

jobmanager:
    image: flink:1.9.0-scala_2.11
    hostname: jobmanager
    container_name: flink-jobmanager
    expose:
        - "8081"
        - "9001"
    ports:
        - "6123:6123"
        - "8081:8081"
        - "9001:9001"
    command: jobmanager
    environment:
        - JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
    image: flink:1.9.0-scala_2.11
    hostname: taskmanager
    container_name: flink-taskmanager
    ports:
        - "6121:6121"
        - "6122:6122"
    depends_on:
        - jobmanager
    command: taskmanager
    links:
        - "jobmanager:jobmanager"
    environment:
        - JOB_MANAGER_RPC_ADDRESS=jobmanager

jobmanager コンテナは3つのポートでリッスンしています :

  • ポート 8081 が公開されているため、Apache Flink ダッシュボードの Web フロントエンドを確認できます
  • ポート 9001 が公開されていまため、インストレーションがコンテキスト・データのサブスクリプションを 受信できます
  • ポート 6123 は標準の JobManager RPC ポートで、内部通信に使用されます

taskmanager コンテナは2つのポートでリッスンしています :

  • ポート 61216122 が使用され、RPC ポートは TaskManager によって、内部通信に使用されます

Flinki クラスタ内のコンテナは、次のように単一の環境変数によって駆動されます。

キー 説明
JOB_MANAGER_RPC_ADDRESS jobmanager タスク処理をコーディネートする master Job Manager のURL

前提条件

Docker および Docker Compose

物事を単純にするために、すべてのコンポーネントは Docker を使用して実行されます。 Docker は、さまざまなコンポーネントをそれぞれの環境に分離できるようにするコンテナ・テクノロジーです。

  • Windows に Docker をインストールするには、こちらの指示に従って ください
  • Mac に Docker をインストールするには、こちらの指示に従ってください
  • Linux に Docker をインストールするには、こちらの指示に従ってください

Docker Compose は、マルチ・コンテナ Docker アプリケーションを定義および実行するためのツールです。一連の YAML files は、アプリケーション に必要なサービスを構成するために使用されます。これは、すべてのコンテナ・サービスを単一のコマンドで起動できることを 意味します。Docker Compose は、デフォルトで Docker for Windows および Docker for Mac の一部としてインストール されますが、Linux ユーザはこちらにある指示に従う必要があります。

次のコマンドを使用して、現在の Docker および Docker Compose バージョンを確認できます :

docker-compose -v
docker version

Docker バージョン18.03 以降および Docker Compose 1.21 以降を使用していることを確認し、必要に応じてアップグレード してください。

Maven

Apache Maven は、ソフトウェア・プロジェクト管理ツールです。プロジェクト・ オブジェクト・モデル (POM) の概念に基づいて、Maven は情報の中心部分からプロジェクトのビルド、レポート、および ドキュメントを管理できます。Maven を使用して、依存関係を定義およびダウンロードし、コードをビルドして JAR ファイルに パッケージ化します。

Cygwin for Windows

簡単な Bash スクリプトを使用してサービスを開始します。Windows ユーザは、cygwin を ダウンロードして、Windows 上の Linux ディストリビューションに類似したコマンドライン機能を提供する必要があります。

起動

開始する前に、必要な Docker イメージをローカルで取得または構築したことを確認する必要があります。以下に示すコマンド を実行して、リポジトリを複製し、必要なイメージを作成してください。いくつかのコマンドを特権ユーザとして実行する 必要がある場合があることに注意してください :

git clone https://github.com/FIWARE/tutorials.Big-Data-Analysis.git
cd tutorials.Big-Data-Analysis
./services create

このコマンドは、以前のチュートリアルからシードデータをインポートし、起動時にダミー IoT センサをプロビジョニング します。

システムを起動するには、次のコマンドを実行します :

./services start

注 : クリーンアップしてやり直す場合は、次のコマンドを使用します :

./services stop

リアルタイム・プロセシング・オペレーション

Apache Flink 内のデータフローは、 Flink ドキュメント で次のように定義されています。

"Flink プログラムの基本的な構成要素はストリームと変換です。概念的には、ストリームはデータ・レコードの潜在的に終わりの ないフローであり、変換は入力として1つ以上のストリームを受け取り、結果として1つ以上の出力ストリームを生成する操作です。

Flink プログラムは、実行されると、ストリームと変換オペレータで構成されるストリーミング・データフローにマッピング されます。各データフローは、1つ以上のソースで始まり、1つ以上のシンクで終わります。 データフローは、任意の有向非巡回 グラフ (DAG) に似ています。反復の構造を介して特殊な形式のサイクルが許可されますが、ほとんどの場合、これを単純化するため にこれを変更できます。"

つまり、ストリーミング・データフローを作成するには、次のものを指定する必要があります :

  • Source Operator としてコンテキスト・データを読み取るためのメカニズム
  • 変換操作を定義するビジネスロジック
  • Sink Operatorとしてコンテキスト・データを Context Broker にプッシュバックするメカニズム

orion-flink.connect.jarSourceSink の両方の操作を提供します。 したがって、ストリーミング・データフローの パイプライン操作を接続するために必要な Scala コードを記述するだけです。処理コードは、flink クラスターにアップロードできる JAR ファイルにコンパイルできます。 以下に2つの例を詳しく説明します。このチュートリアルのすべてのソースコードは、 cosmos-examples ディレクトリ内に あります。

その他の Flink 処理の例は、 Apache Flink サイト および Flink Connector の例にあります。

Flink 用の JAR ファイルのコンパイル

サンプル JAR ファイルをビルドするために必要な前提条件を保持する既存の pom.xml ファイルが作成されました。

Orion Flink Connector を使用するには、最初に Maven を使用してアーティファクト (artifact) としてコネクタ JAR を手動で インストールする必要があります :

cd cosmos-examples
mvn install:install-file \
  -Dfile=./orion.flink.connector-1.2.3.jar \
  -DgroupId=org.fiware.cosmos \
  -DartifactId=orion.flink.connector \
  -Dversion=1.2.3 \
  -Dpackaging=jar

その後、同じディレクトリ内で mvn package コマンドを実行することでソースコードをコンパイルできます :

cd cosmos-examples
mvn package

cosmos-examples-1.0.jar という新しい JAR ファイルが cosmos-examples/target ディレクトリ内に作成されます。

コンテキスト・データのストリームの生成

このチュートリアルでは、コンテキストが定期的に更新されるシステムを監視する必要があります。これを行うには、ダミー IoT センサーを使用できます。http://localhost:3000/device/monitor のデバイス・モニターのページを開き、Smart Door の ロックを解除して、Smart Lamp をオンにします。 これは、ドロップ・ダウン・リストから適切なコマンドを選択し、send ボタンを押すことで実行できます。デバイスからの測定値のストリームは、同じページで見ることができます :

ロガー - コンテキスト・データのストリームの読み取り

最初の例では、Orion Context Broker から通知を受信するために、OrionSource オペレータを使用します。具体的には、 この例では、各タイプのデバイスが1分で送信する通知の数をカウントします。 サンプルのソースコードは org/fiware/cosmos/tutorial/Logger.scala にあります。

ロガー - JAR のインストール

http://localhost:8081/#/submit を開きます

新しいジョブを設定します

  • Filename: cosmos-examples-1.0.jar
  • Entry Class: org.fiware.cosmos.tutorial.Logger

ロガー - コンテキスト変更のサブスクライブ

動的コンテキスト・システムが起動して実行されると (Logger を実行)、Flink にコンテキストの変更を通知する 必要があります。

これは、Orion Context Broker の /v2/subscription エンドポイントに POST リクエストを行うことで実行できます。

  • fiware-service および fiware-servicepath ヘッダは、これらの設定を使用してプロビジョニングされている ため、接続された IoT センサからの測定値のみをリッスンするようにサブスクリプションをフィルター処理する ために使用されます

  • 通知 URL は、Flink プログラムがリッスンしている URL と一致する必要があります

  • throttling 値は、変更がサンプリングされるレートを定義します

1 リクエスト :

curl -iX POST \
  'http://localhost:1026/v2/subscriptions' \
  -H 'Content-Type: application/json' \
  -H 'fiware-service: openiot' \
  -H 'fiware-servicepath: /' \
  -d '{
  "description": "Notify Flink of all context changes",
  "subject": {
    "entities": [
      {
      "idPattern": ".*"
      }
    ]
  },
  "notification": {
    "http": {
    "url": "http://jobmanager:9001
  }
}'

レスポンスは 201 - Created になります

サブスクリプションが作成されている場合、/v2/subscriptions エンドポイントに対して GET リクエストを行うことで、 サブスクリプションが起動しているかどうかを確認できます。

2 リクエスト :

curl -X GET \
'http://localhost:1026/v2/subscriptions/' \
-H 'fiware-service: openiot' \
-H 'fiware-servicepath: /'

レスポンス :

[
    {
        "id": "5d76059d14eda92b0686f255",
        "description": "Notify Flink of all context changes",
        "status": "active",
        "subject": {
            "entities": [
                {
                    "idPattern": ".*"
                }
            ],
            "condition": {
                "attrs": []
            }
        },
        "notification": {
            "timesSent": 362,
            "lastNotification": "2019-09-09T09:36:33.00Z",
            "attrs": [],
            "attrsFormat": "normalized",
            "http": {
                "url": "http://jobmanager:9001"
            },
            "lastSuccess": "2019-09-09T09:36:33.00Z",
            "lastSuccessCode": 200
        }
    }
]

レスポンスの notification セクション内で、サブスクリプションの正常性を説明するいくつかの追加の attributes を確認できます

サブスクリプションの基準が満たされている場合、timesSent0 より大きくなければなりません。ゼロの値は、 サブスクリプションの subject が正しくないか、サブスクリプションが間違った fiware-service-path または fiware-service ヘッダで作成されたことを示します。

lastNotification は最新のタイムスタンプである必要があります。そうでない場合、デバイスは定期的にデータを 送信していません。Smart Door のロックを解除し、Smart Lamp をオンにしてください。

lastSuccesslastNotification の日付と一致する必要があります-そうでない場合、Cosmos はサブスクリプション を適切に受信していません。ホスト名とポートが正しいことを確認してください。

最後に、サブスクリプションの statusactive であることを確認します。有効期限が切れたサブスクリプションは 実行されません。

ロガー - 出力の確認

サブスクリプションを1分間実行したままにして、次を実行します :

docker logs flink-taskmanager -f --until=60s > stdout.log 2>stderr.log
cat stderr.log

サブスクリプションを作成すると、コンソールの出力は次のようになります :

Sensor(Bell,3)
Sensor(Door,4)
Sensor(Lamp,7)
Sensor(Motion,6)

ロガー - コードの分析

package org.fiware.cosmos.tutorial


import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector.{OrionSource}

object Logger{

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // Create Orion Source. Receive notifications on port 9001
    val eventStream = env.addSource(new OrionSource(9001))

    // Process event stream

    val processedDataStream = eventStream
    .flatMap(event => event.entities)
    .map(entity => new Sensor(entity.`type`,1))
    .keyBy("device")
    .timeWindow(Time.seconds(60))
    .sum(1)

    // print the results with a single thread, rather than in parallel
    processedDataStream.print().setParallelism(1)
    env.execute("Socket Window NgsiEvent")
  }
  case class Sensor(device: String, sum: Int)
}

プログラムの最初の行は、コネクタを含む必要な依存関係をインポートすることを目的としています。次のステップは、コネクタが 提供するクラスを使用して OrionSource のインスタンスを作成し、Flink が提供する環境に追加することです。

OrionSource コンストラクタはパラメータとしてポート番号 (9001) を受け入れます。このポートは、Orion からの サブスクリプション通知をリッスンするために使用され、NgsiEvent オブジェクトの DataStream に変換されます。これらの オブジェクトの定義は、 Orion-Flink Connector ドキュメント に記載されています。

ストリーム処理は、5つの個別のステップで構成されています。最初のステップ (flatMap()) は、一定期間内に受信したすべての NGSI イベントのエンティティ・オブジェクトをまとめるために実行されます。その後、コードはそれらを map() 操作で繰り返し、 目的の属性を抽出します。この場合、センサの type (Door, Motion, Bell or Lamp) に関心があります。

各反復内で、必要なプロパティを持つカスタム・オブジェクトを作成します : センサの type と各通知の増分。このために、 次のようにケース・クラスを定義できます :

case class Sensor(device: String, sum: Int)

その後、作成されたオブジェクトをデバイスのタイプ (keyBy("device")) でグループ化し、それらに対して timeWindow()sum() などの操作を実行できます。

処理後、結果がコンソールに出力されます :

processedDataStream.print().setParallelism(1)

フィードバック・ループ - コンテキスト・データの永続化

2番目の例では、モーション・センサが動きを検出するとランプをオンにします。

データフロー・ストリームは、通知を受信するために OrionSource オペレータを使用し、モーション・センサにのみ応答する ように入力をフィルタし、OrionSink を使用して処理されたコンテキストを Context Broker にプッシュします。サンプルの ソースコードは org/fiware/cosmos/tutorial/Feedback.scala にあります。

フィードバック・ループ - JAR のインストール

http://localhost:8081/#/job/running を開きます

実行中のジョブ (存在する場合) を選択し、Cancel Job をクリックします

その後、http://localhost:8081/#/submit を開きます

新しいジョブを設定します

  • Filename: cosmos-examples-1.0.jar
  • Entry Class: org.fiware.cosmos.tutorial.Feedback

フィードバック・ループ - コンテキスト変更のサブスクライブ

以前の例を実行していない場合は、新しいサブスクリプションを設定する必要があります。モーション・センサが動きを検出した ときにのみ通知をトリガーするように、より限定したサブスクリプションを設定できます。

注 : 以前のサブスクリプションが既に存在する場合、2番目のより限定した Motion のみのサブスクリプションを作成する この手順は不要です。Scala タスク自体のビジネスロジック内にフィルタがあります。

curl -iX POST \
  'http://localhost:1026/v2/subscriptions' \
  -H 'Content-Type: application/json' \
  -H 'fiware-service: openiot' \
  -H 'fiware-servicepath: /' \
  -d '{
  "description": "Notify Flink of all context changes",
  "subject": {
    "entities": [
      {
        "idPattern": "Motion.*"
      }
    ]
  },
  "notification": {
    "http": {
      "url": "http://taskmanger:9001"
    }
  }
}'

フィードバック・ループ - 出力の確認

http://localhost:3000/device/monitor を開きます

いずれかのストア内で、ドアのロックを解除して待機します。ドアが開いてモーション・センサがトリガーされると、ランプが 直接オンになります。

フィードバック・ループ - コードの分析

package org.fiware.cosmos.tutorial


import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector._


object Feedback{
  final val CONTENT_TYPE = ContentType.Plain
  final val METHOD = HTTPMethod.POST
  final val CONTENT = "{  \"on\": {      \"type\" : \"command\",      \"value\" : \"\"  }}"
  final val HEADERS = Map("fiware-service" -> "openiot","fiware-servicepath" -> "/","Accept" -> "*/*")

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
  // Create Orion Source. Receive notifications on port 9001
  val eventStream = env.addSource(new OrionSource(9001))

    // Process event stream
  val processedDataStream = eventStream
      .flatMap(event => event.entities)
      .filter(entity=>(entity.attrs("count").value == "1"))
      .map(entity => new Sensor(entity.id))
      .keyBy("id")
      .timeWindow(Time.seconds(5),Time.seconds(2))
      .min("id")

    // print the results with a single thread, rather than in parallel
  processedDataStream.printToErr().setParallelism(1)

    val sinkStream = processedDataStream.map(node => {
      new OrionSinkObject("urn:ngsi-ld:Lamp"+ node.id.takeRight(3)+ "@on","http://${IP}:3001/iot/lamp"+ node.id.takeRight(3),CONTENT_TYPE,METHOD)
    })
    OrionSink.addSink(sinkStream)
    env.execute("Socket Window NgsiEvent")
  }

  case class Sensor(id: String)
}

ご覧のとおり、コードは以前の例に似ています。主な違いは、処理されたデータを OrionSink を介して Context Broker に書き戻すことです。

OrionSinkObject の引数は次のとおりです :

  • Message: "{ \"on\": { \"type\" : \"command\", \"value\" : \"\" }}". 'on' コマンドを送信します
  • URL: "http://localhost:1026/v2/entities/Lamp:"+node.id.takeRight(3)+"/attrs". TakeRight(3) は部屋の番号を 取得します。例 : '001'
  • Content Type: ContentType.Plain.
  • HTTP Method: HTTPMethod.POST.
  • Headers: Map("fiware-service" -> "openiot","fiware-servicepath" -> "/","Accept" -> "*/*"). オプション・パラメータ HTTP リクエストに必要なヘッダを追加します。

他のチュートリアルを読むことで見つけることができます


License

MIT © 2020 FIWARE Foundation e.V.