私の仕事は簡単です - Apache NiFiを使用してHBase(カウンタ)のカラム値を増やしたいと思います。Apache NiFiを使用したHbaseカウンタ
私は列キーとしてaccountidを持っています。私は、ストリーミング値に基づいてincr/decrバランスカラムを作成したいと思います。何がNiFiでそれを行うための最良の方法でしょう。
たとえば、アカウントAの残高= 100の開始値です。私は(A、-20)をイベントとして取得します。これを行うには、ボックスプロセッサーの中で最高のものは何ですか(A balance = 80)。それらのすべてが価値を置き換えるように思える。私はスキーマを変更することにもオープンしています...
私はgroovyスクリプトを書こうとしましたが、このエラーをnifiで取得しようとしました。私の基本的な構造が間違っているのは単なる単純な疑問です。
2017-03-10 06:38:54,067エラー[タイマー駆動プロセススレッド6] oanifi.processors.script.ExecuteScript ExecuteScript [id = b5a0e7b7-015a-1000-ab9c-0696c8297e8d] ExecuteScript [id = b5a0e7b7-015a-1000-ab9c-0696c8297e8d]がjava.lang.NoClassDefFoundErrorのために処理に失敗しました:org/apache/hadoop/conf/Configuration;ロールバックセッション:
import org.apache.nifi.controller.ControllerService
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.ResultScanner
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.Table
import org.apache.hadoop.hbase.util.Bytes
def lookup = context.controllerServiceLookup
def HbaseServiceName =HBaseClient.value
def HBaseServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == HBaseServiceName
}
def conn = lookup.getControllerService(HBaseServiceId)?.getConnection()
try {
flowFile = session.create()
def table = conn.getTable(TableName.valueOf("crap"))
myfile = flowFile.getAttribute("filename")
def p = new Put(Bytes.toBytes("crap"));
p.add(Bytes.toBytes("crap"), Bytes.toBytes("cf1"),Bytes.toBytes("SomeValue"))
table.put(p);
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error', e)
session.transfer(flowFile, REL_FAILURE)
}
conn?.close()
同意する。 GetHbase(現在の値を取得)とストリーミング値を追加し、最後にPutHBase(更新する)の問題 - これは2つの操作で、データはクライアント(NiFi)に戻ってきますか?私はパラダイムを増やすのが好きです。なぜなら、それはずっと速く、現在の値を読むのは気にしません(私はそれを増やしたいだけです)。スループットは大きな問題です。私は各NiFiマシン上でphoenixサーバーを実行し、NiFiからのカウンタインクリメントでphoenix-jdbcクライアントを呼び出すアイデアを跳ね返しています。 –
HBaseの経験はあまりありません。関連するモジュールフォルダにHBaseライブラリを含めることで、GroovyでExecuteScriptプロセッサを使用できますか? HBaseの 'Increment'オブジェクトを使用する例(https://github.com/larsgeorge/hbase-book/blob/master/ch04/src/main/java/client/IncrementMultipleExample.java)です[HBaseコントローラサービスをスクリプトから参照できる](https://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html)も可能です。 – Andy
ありがとうAndyこれは、私たちが望んでいたよりも少しコーディングをしていることを除いて、私の要件を満たしているようです。完了後に解決策を投稿します。 –