2017-03-07 12 views
3

私の仕事は簡単です - Apache NiFiを使用してHBase(カウンタ)のカラム値を増やしたいと思います。Apache NiFiを使用したHbaseカウンタ

私は列キーとしてaccountidを持っています。私は、ストリーミング値に基づいてincr/decrバランスカラムを作成したいと思います。何がNiFiでそれを行うための最良の方法でしょう。

たとえば、アカウントAの残高= 100の開始値です。私は(A、-20)をイベントとして取得します。これを行うには、ボックスプロセッサーの中で最高のものは何ですか(A balance = 80)。それらのすべてが価値を置き換えるように思える。私はスキーマを変更することにもオープンしています...

私はgroovyスクリプトを書こうとしましたが、このエラーをnifiで取得しようとしました。私の基本的な構造が間違っているのは単なる単純な疑問です。

2017-03-10 06:38:54,067エラー[タイマー駆動プロセススレッド6] oanifi.processors.script.ExecuteScript ExecuteScript [id = b5a0e7b7-015a-1000-ab9c-0696c8297e8d] ExecuteScript [id = b5a0e7b7-015a-1000-ab9c-0696c8297e8d]がjava.lang.NoClassDefFoundErrorのために処理に失敗しました:org/apache/hadoop/conf/Configuration;ロールバックセッション:

import org.apache.nifi.controller.ControllerService 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.hbase.TableName 
import org.apache.hadoop.hbase.client.Connection 
import org.apache.hadoop.hbase.client.ConnectionFactory 
import org.apache.hadoop.hbase.client.Get 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.client.Result 
import org.apache.hadoop.hbase.client.ResultScanner 
import org.apache.hadoop.hbase.client.Scan 
import org.apache.hadoop.hbase.client.Table 
import org.apache.hadoop.hbase.util.Bytes 

def lookup = context.controllerServiceLookup 
def HbaseServiceName =HBaseClient.value 
def HBaseServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { 
    cs -> lookup.getControllerServiceName(cs) == HBaseServiceName 
} 
def conn = lookup.getControllerService(HBaseServiceId)?.getConnection() 
try { 
    flowFile = session.create() 
    def table = conn.getTable(TableName.valueOf("crap")) 
    myfile = flowFile.getAttribute("filename") 
    def p = new Put(Bytes.toBytes("crap")); 
    p.add(Bytes.toBytes("crap"), Bytes.toBytes("cf1"),Bytes.toBytes("SomeValue")) 
    table.put(p); 
    session.transfer(flowFile, REL_SUCCESS) 
} catch(e) { 
    log.error('Scripting error', e) 
    session.transfer(flowFile, REL_FAILURE) 
} 
conn?.close() 

答えて

0

あなたがPutHBaseCellPutHBaseJSONは、それぞれのHBaseの先にflowfile内容を置くことを正しいです。おそらく実行したいのは、GetHBaseを使用して初期値を取得し、カウンタ(チュートリアルではhereを参照)を使用して実行カウンタを保持してから、HBaseセルを正しい値で更新します。 DistributedMapCacheシステムを使用して、共有メモリ空間の値をフェッチ/計算/格納することもできます。

+0

同意する。 GetHbase(現在の値を取得)とストリーミング値を追加し、最後にPutHBase(更新する)の問題 - これは2つの操作で、データはクライアント(NiFi)に戻ってきますか?私はパラダイムを増やすのが好きです。なぜなら、それはずっと速く、現在の値を読むのは気にしません(私はそれを増やしたいだけです)。スループットは大きな問題です。私は各NiFiマシン上でphoenixサーバーを実行し、NiFiからのカウンタインクリメントでphoenix-jdbcクライアントを呼び出すアイデアを跳ね返しています。 –

+0

HBaseの経験はあまりありません。関連するモジュールフォルダにHBaseライブラリを含めることで、GroovyでExecuteScriptプロセッサを使用できますか? HBaseの 'Increment'オブジェクトを使用する例(https://github.com/larsgeorge/hbase-book/blob/master/ch04/src/main/java/client/IncrementMultipleExample.java)です[HBaseコントローラサービスをスクリプトから参照できる](https://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html)も可能です。 – Andy

+0

ありがとうAndyこれは、私たちが望んでいたよりも少しコーディングをしていることを除いて、私の要件を満たしているようです。完了後に解決策を投稿します。 –

関連する問題