IBM Compose for ElasticsearchをNode-REDから使ってみる
投稿者:大島
皆様、Elasticsearch(以下Elastic)を使ってますか? IBM CloudでもElasticが使えるのですが、以前はちょっと使いづらい仕組みになっていたのですが、いつの間にかアップデートされ簡単にオーダーして利用出来るようになっておりました。 今回は、Node-REDからIBM Compose for Elasticsearchを利用する場合の手順を紹介したいと思います。
※Node-REDは既にオーダーされている状況からスタートしたいと思います。
IBM Compose for Elasticsearchをオーダーする
IBM CloudのカタログよりIBM Compose for Elasticsearchをオーダーします。データ&分析にElasticがありますので、クリックしてください。
Elasticのオーダー画面に遷移しますが、Composeは他のサービスと違い仮想サーバーのようなプランニングになります(同じぐらいの使用量だとStandardプランのCloudantより安くなるではと思います)。オーダー画面では特に選択することはないので、サクッと作成ボタンをクリックしてください。
Node-REDからElasticを使ってみる
Elasticのオーダーが完了したら、早速使っていきたいと思います。 まずはElasticの管理画面からAPIコール用のURLを控えておきます。
次にNode-REDのエディター画面にアクセスして、Elasticを利用するための準備をしていくのですが。。。
今回はElastic用のノードは使わずに、直接APIをコールする方法を実装していきます。
なぜ、Elastic用のノードを使わずに直接APIをコールするかといいますと、実はElasticをオーダーした時点ではデータを格納する事が出来ないので、Elasticに対して設定が必要になります。
この設定を行うためのノードが、私が見た限りでは現在インストール出来るElastic用のノードには含まれていなかったので、今回はAPIを直接コールする方式をとりました。
でも、そんなに難しくはないので皆様気楽に読んで頂ければと思います。
ということで、Elasticへの設定ですが、何が必要かといいますと、RDBで言うところの「データベース」の作成と「テーブル」の作成が必要です。この「データベース」にあたるのが「Index」、「テーブル」にあたるのが「Type」になります。 まずは「Index」と「Type」を作る方法をご紹介していきたいと思います。
Indexの一覧取得方法
作る前にやっぱり確認方法が必要ですよねということで、現在Elasticに登録されているIndexの一覧を取得する方法を紹介します。
– コード
[{"id":"47fec6f3.45a168","type":"inject","z":"f01daee3.e5712","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":100,"y":80,"wires":[["7f3319f2.e61bc8"]]},{"id":"a8feee2d.44587","type":"debug","z":"f01daee3.e5712","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":590,"y":80,"wires":[]},{"id":"7f3319f2.e61bc8","type":"function","z":"f01daee3.e5712","name":"URL設定","func":"var url = \"\";\nmsg.url = url + \"_cat/indices?v\";\n\nmsg.method = \"GET\";\nmsg.headers ={\n \"content-type\": \"application/json\"\n};\nreturn msg;","outputs":1,"noerr":0,"x":260,"y":80,"wires":[["fcb98e14.b5bcd"]]},{"id":"fcb98e14.b5bcd","type":"http request","z":"f01daee3.e5712","name":"","method":"use","ret":"txt","url":"","tls":"","x":430,"y":80,"wires":[["a8feee2d.44587"]]},{"id":"65812275.fbab2c","type":"comment","z":"f01daee3.e5712","name":"Indexの一覧取得","info":"","x":100,"y":40,"wires":[]}]
– フロー
– コードのキモ
URL設定と記載されたfunctionノードを開き、エディター上に表示されたurlに先程控えたElasticのURLを記載してください。URIに「_cat/indices?v」を追記しておくとIndexの一覧が取得できます。
var url = ""; msg.url = url + "_cat/indices?v"; msg.method = "GET"; msg.headers ={ "content-type": "application/json" }; return msg;
– 結果
msg.payloadを確認するとこんな項目が表示されると思います。今は何も作っていないので、空の状態となります。
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
IndexとTypeの作成方法
Indexを作りつつ、一緒にTypeも作る方法を紹介したいと思います。 今回は”elastic”というIndex(テーブル)に”api_counter”というType(テーブル)を作成したいと思います。”api_counter”には”api”・”user”・”date”というデータを格納するフィールド(カラム)を作ります。
– コード
[{"id":"94584f5d.ca507","type":"inject","z":"f01daee3.e5712","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":100,"y":180,"wires":[["6f932776.a38488"]]},{"id":"6f932776.a38488","type":"function","z":"f01daee3.e5712","name":"Index作成設定","func":"var url = \"\";\nmsg.url = url + \"elastic\";\nmsg.method = \"PUT\";\nmsg.headers ={\n \"content-type\": \"application/json\"\n};\nmsg.payload = {\n mappings: {\n \"api_counter\": {\n \"properties\": {\n \"api\": { \"type\": \"text\" },\n \"user\": { \"type\": \"text\" },\n \"date\": { \n \"type\": \"date\",\n \"format\": \"YYYY-MM-DD kk:mm:ss\" \n \n }\n }\n }\n }\n };\n\n\nreturn msg;","outputs":1,"noerr":0,"x":280,"y":180,"wires":[["5444bbeb.8619b4"]]},{"id":"ba91a11c.902a5","type":"comment","z":"f01daee3.e5712","name":"Indexの作成と確認","info":"","x":110,"y":140,"wires":[]},{"id":"5444bbeb.8619b4","type":"http request","z":"f01daee3.e5712","name":"","method":"use","ret":"obj","url":"","tls":"","x":450,"y":180,"wires":[["8d604026.e49b","f941289e.d84f28"]]},{"id":"8d604026.e49b","type":"debug","z":"f01daee3.e5712","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":610,"y":180,"wires":[]},{"id":"f941289e.d84f28","type":"function","z":"f01daee3.e5712","name":"URL設定","func":"var url = \"\";\nmsg.url = url + \"_cat/indices?v\";\n\nmsg.method = \"GET\";\nmsg.headers ={\n \"content-type\": \"application/json\"\n};\nreturn msg;","outputs":1,"noerr":0,"x":440,"y":220,"wires":[["71467d69.30b7f4"]]},{"id":"71467d69.30b7f4","type":"http request","z":"f01daee3.e5712","name":"","method":"use","ret":"txt","url":"","tls":"","x":450,"y":260,"wires":[["d64d7dda.0f9ab"]]},{"id":"d64d7dda.0f9ab","type":"debug","z":"f01daee3.e5712","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":610,"y":260,"wires":[]}]
– フロー
– コードのキモ
Index作成設定というfunctionノードを開いて、先程と同じくurlにElasticのURLを記載してください。
Indexの作成方法はURIにIndex名を与えると作成ができます。
今回の場合はmsg.urlの文字列結合で最後に渡している部分になります。typeとフィールドはmsg.payloadにmappingsというフィールドを作り、その中にtypeとなる”api_counter”というフィールドを作成。”api”・”user”・”date”は”api_counter”のpropertiesの中に設定します。
もっと詳しく知りたい方はElasticのサイトをご確認ください。
因みに、日付フィールドのフォーマットは自由に定義できますが、hh:mm:ss形式の場合はkk:mm:ssにしないとだめです(仕様で)。
var url = ""; msg.url = url + "elastic"; msg.method = "PUT"; msg.headers ={ "content-type": "application/json" }; msg.payload = { mappings: { "api_counter": { "properties": { "api": { "type": "text" }, "user": { "type": "text" }, "date": { "type": "date", "format": "YYYY-MM-DD kk:mm:ss" } } } } }; return msg;
フローの中にURL設定というfunctionノードがありますが、これは上記で紹介したIndex一覧取得の設定ノードになります。
ノードを開いて頂き、urlに先程と同様にElasticのURLを記載いしてください。 さて、実行すると最後のdebugノードのmsg.payloadで以下のような結果が得られると思います。 elasticというIndexが表示されたら無事成功です。おめでとうございます。
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open elastic t75vuW2mSXabZtE8YWfaAg 3 1 0 0 131.7kb 67.4kb
データを登録する
それではデータを登録していきたいと思います。
– コード
[{"id":"ccf5f4f3.ab41a8","type":"comment","z":"f01daee3.e5712","name":"データの登録","info":"","x":90,"y":480,"wires":[]},{"id":"c434d024.53cac","type":"inject","z":"f01daee3.e5712","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":100,"y":520,"wires":[["a7b8ea4f.5ba318"]]},{"id":"a7b8ea4f.5ba318","type":"function","z":"f01daee3.e5712","name":"登録データの作成","func":"//登録データの設定\n//// ミリ秒取得\nvar date1 = new Date();\nvar jisa = 9;\nvar here= date1.getTime();\nvar gmt = here + date1.getTimezoneOffset()*60*1000;\nvar dateMili = gmt+jisa*60*60*1000;\n\n/// 日時変換\nformat = 'YYYY-MM-DD hh:mm:ss';\nvar date2 = new Date(dateMili);\nformat = format.replace(/YYYY/g, date2.getFullYear());\nformat = format.replace(/MM/g, ('0' + (date2.getMonth() + 1)).slice(-2));\nformat = format.replace(/DD/g, ('0' + date2.getDate()).slice(-2));\nformat = format.replace(/hh/g, ('0' + date2.getHours()).slice(-2));\nformat = format.replace(/mm/g, ('0' + date2.getMinutes()).slice(-2));\nformat = format.replace(/ss/g, ('0' + date2.getSeconds()).slice(-2));\n\n// データ格納\nmsg.payload = {\n \"api\" : \"search\",\n \"user\": \"testUser\",\n \"date\": format\n};\n//URL設定\nvar url = \"\"\nvar index = \"elastic/\";\nvar type = \"api_counter\";\nmsg.url = url + index + type;\nmsg.method = \"POST\";\n\nreturn msg;","outputs":1,"noerr":0,"x":270,"y":520,"wires":[["b4045497.cb9028","ca3147e7.1fff88"]]},{"id":"e6af7f68.7dfe4","type":"debug","z":"f01daee3.e5712","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":230,"y":600,"wires":[]},{"id":"b4045497.cb9028","type":"http request","z":"f01daee3.e5712","name":"","method":"use","ret":"obj","url":"","tls":"","x":250,"y":560,"wires":[["e6af7f68.7dfe4"]]},{"id":"ca3147e7.1fff88","type":"debug","z":"f01daee3.e5712","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":490,"y":520,"wires":[]}]
– フロー
– コードのキモ
登録データの作成というfunctionノードを開いてください。
先ほどと同様にurlにElasticのURLを貼ります。 データ登録時のURIは「https://~/<Index名>/<Type名>/」となります。
登録するデータはmsg.payloadへ設定します。 今回はapiにsearchとう文字を、userにtestUserという文字を、dateにYYYY-MM-DD hh:mm:ss形式の日付データを設定します。
因みになんでこんなTypeを作ったのかというと、自分でAPIを作った際にどれ位APIが利用されているのかを計測するためのDB兼分析サービスとして定義してみました。 使い方のイメージとしては、自作APIがコールされる度にこのapi_counterにデータを格納していき、計測する際はapiフィールドとdateフィールドでフィルターを掛けてAPIコール数を算出するような想定です。(CloudantやDb2のスタンダードプランはRead/Writeも課金対象なので、ほぼ容量課金だけのElasticって実は便利?という事もありいろいろ触ってみました。)
//登録データの設定 //// ミリ秒取得 var date1 = new Date(); var jisa = 9; var here= date1.getTime(); var gmt = here + date1.getTimezoneOffset()*60*1000; var dateMili = gmt+jisa*60*60*1000; /// 日時変換 format = 'YYYY-MM-DD hh:mm:ss'; var date2 = new Date(dateMili); format = format.replace(/YYYY/g, date2.getFullYear()); format = format.replace(/MM/g, ('0' + (date2.getMonth() + 1)).slice(-2)); format = format.replace(/DD/g, ('0' + date2.getDate()).slice(-2)); format = format.replace(/hh/g, ('0' + date2.getHours()).slice(-2)); format = format.replace(/mm/g, ('0' + date2.getMinutes()).slice(-2)); format = format.replace(/ss/g, ('0' + date2.getSeconds()).slice(-2)); // データ格納 msg.payload = { "api" : "search", "user": "testUser", "date": format }; //URL設定 var url = "" var index = "elastic/"; var type = "api_counter"; msg.url = url + index + type; msg.method = "POST"; return msg;
このフローを実行すると以下のような結果が得られます。「”result”:”created”」となっていれば正常にデータが登録されています。 おめでとうございます。
{"_index":"elastic","_type":"api_counter","_id":"JMthRWQBn7c-9pw4LAYD","_version":1,"result":"created","_shards":{"total":2,"successful":2,"failed":0},"_seq_no":69,"_primary_term":1}
検索方法
– コード
[{"id":"b05fea03.438a68","type":"comment","z":"f01daee3.e5712","name":"データ検索","info":"","x":100,"y":1000,"wires":[]},{"id":"d8d47101.6dd13","type":"inject","z":"f01daee3.e5712","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":100,"y":1040,"wires":[["9f55646d.40b7b8"]]},{"id":"9f55646d.40b7b8","type":"function","z":"f01daee3.e5712","name":"URL設定","func":"var url = \"\"\nvar index = \"elastic/\";\nvar type = \"api_counter/\";\nmsg.url = url + index + type + \"_search\";\nmsg.method = \"POST\";\nmsg.headers ={\n \"content-type\": \"application/json\"\n};\n\nmsg.payload = {\n \"size\": 10000,//10000が上限\n \"query\":{\n \"bool\" : {\n \"must\": [\n {\"term\" : { \"api\" : \"search\" }},\n {\"range\" : {\n \"date\":{\n \"lte\": \"2018-06-21\",\n \"gte\": \"2018-06-20\",\n \"format\": \"YYYY-MM-DD||YYYY-MM-DD kk:mm:ss\"\n }\n }}\n ]\n }\n }\n};\n\nreturn msg;","outputs":1,"noerr":0,"x":260,"y":1040,"wires":[["4516c25f.a2bd7c","32de9cce.3ac7c4"]]},{"id":"4516c25f.a2bd7c","type":"http request","z":"f01daee3.e5712","name":"","method":"use","ret":"obj","url":"","tls":"","x":270,"y":1080,"wires":[["4f1a3545.32a17c"]]},{"id":"4f1a3545.32a17c","type":"debug","z":"f01daee3.e5712","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":250,"y":1120,"wires":[]},{"id":"32de9cce.3ac7c4","type":"debug","z":"f01daee3.e5712","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":430,"y":1040,"wires":[]}]
– フロー
– コードのキモ
データ検索時のURIは「https://~/<Index名>/<Type名>/_search」となります。
複数の検索条件を指定する場合はメソッドがPOSTになりますので気をつけてください。
検索用のパラメーターはmsg.payloadに設定します。 複数条件で検索する場合はBool Queryという仕組みを使いますが、細かいことはドキュメントを確認してください。
条件がmustフィールドに格納されているAND条件になります。termは文字列検索、rangeは期間を指定することができます。
rangeのlteは「<=」の意味、gteは「>=」という意味です。
var url = "" var index = "elastic/"; var type = "api_counter/"; msg.url = url + index + type + "_search"; msg.method = "POST"; msg.headers ={ "content-type": "application/json" }; msg.payload = { "size": 10000,//10000が上限 "query":{ "bool" : { "must": [ {"term" : { "api" : "search" }}, {"range" : { "date":{ "lte": "2018-06-21", "gte": "2018-06-20", "format": "YYYY-MM-DD||YYYY-MM-DD kk:mm:ss" } }} ] } } }; return msg;
このフローを実行すると以下のような結果がmsg.payloadに格納されます。取得できた方はおめでとうございます。
{ "_shards": { "failed": 0, "skipped": 0, "successful": 3, "total": 3 }, "hits": { "hits": [ { "_id": "Gf0wHGQBZG0faYEmXsdU", "_index": "elastic", "_score": 1.6931472, "_source": { "api": "search", "date": "2018-06-20 16:54:49", "user": "testUser" }, "_type": "api_counter" }, { "_id": "jFcwHGQBvxLwG2CqsycM", "_index": "elastic", "_score": 1.6931472, "_source": { "api": "search", "date": "2018-06-20 16:55:10", "user": "testUser" }, "_type": "api_counter" }, { "_id": "G_0xHGQBZG0faYEmLcfe", "_index": "elastic", "_score": 1.6931472, "_source": { "api": "search", "date": "2018-06-20 16:55:42", "user": "testUser" }, "_type": "api_counter" }, { "_id": "J_0tIGQBZG0faYEmxMed", "_index": "elastic", "_score": 1.6931472, "_source": { "api": "search", "date": "2018-06-21 11:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "mFcJIWQBvxLwG2CqfieR", "_index": "elastic", "_score": 1.6931472, "_source": { "api": "search", "date": "2018-06-21 15:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "9MvlIWQBn7c-9pw4OAWT", "_index": "elastic", "_score": 1.6931472, "_source": { "api": "search", "date": "2018-06-21 19:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "mldTImQBvxLwG2CqFSeo", "_index": "elastic", "_score": 1.6931472, "_source": { "api": "search", "date": "2018-06-21 21:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "Hv0zHGQBZG0faYEmEMes", "_index": "elastic", "_score": 1.6271892, "_source": { "api": "search", "date": "2018-06-20 16:57:46", "user": "testUser" }, "_type": "api_counter" }, { "_id": "kFczHGQBvxLwG2CqESdU", "_index": "elastic", "_score": 1.6271892, "_source": { "api": "search", "date": "2018-06-20 16:57:46", "user": "testUser" }, "_type": "api_counter" }, { "_id": "7ssIHmQBn7c-9pw4cwUD", "_index": "elastic", "_score": 1.6271892, "_source": { "api": "search", "date": "2018-06-21 01:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "lFd2HmQBvxLwG2CqUCcS", "_index": "elastic", "_score": 1.6271892, "_source": { "api": "search", "date": "2018-06-21 03:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "Jf3kHmQBZG0faYEmLcdj", "_index": "elastic", "_score": 1.6271892, "_source": { "api": "search", "date": "2018-06-21 05:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "8MtSH2QBn7c-9pw4CgVR", "_index": "elastic", "_score": 1.6271892, "_source": { "api": "search", "date": "2018-06-21 07:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "jVcxHGQBvxLwG2CqLCfj", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-20 16:55:42", "user": "testUser" }, "_type": "api_counter" }, { "_id": "58sxHGQBn7c-9pw4LQWJ", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-20 16:55:42", "user": "testUser" }, "_type": "api_counter" }, { "_id": "6cszHGQBn7c-9pw4DwXR", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-20 16:57:45", "user": "testUser" }, "_type": "api_counter" }, { "_id": "H_0zHGQBZG0faYEmIce9", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-20 16:57:50", "user": "testUser" }, "_type": "api_counter" }, { "_id": "IP1pHGQBZG0faYEml8ex", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-20 17:57:19", "user": "testUser" }, "_type": "api_counter" }, { "_id": "7Mu-HGQBn7c-9pw42wXl", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-20 19:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "klcsHWQBvxLwG2CquCfe", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-20 21:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "I_2aHWQBZG0faYEmlcfj", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-20 23:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "lle_H2QBvxLwG2Cq5ydZ", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-21 09:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "8subIGQBn7c-9pw4oQV3", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-21 13:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "Kf13IWQBZG0faYEmW8ei", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-21 17:30:27", "user": "testUser" }, "_type": "api_counter" }, { "_id": "K_3AImQBZG0faYEm8sfd", "_index": "elastic", "_score": 1.6241543, "_source": { "api": "search", "date": "2018-06-21 23:30:27", "user": "testUser" }, "_type": "api_counter" } ], "max_score": 1.6931472, "total": 25 }, "timed_out": false, "took": 22 }
## おまけ
Elasticではフィールドに対して設定できるタイプがいろいろあり、日付についてもいろいろなフォーマットを設定する事が出来ます。例えばUNIXタイムを格納することができますが、私が検証した限りではUNIXタイムでデータを格納して、検索時はYYYY-MM-DD hh:mm:ssで検索は出来ませんでした。残念です。