2016-03-21 11 views
0

Hbaseデータを取得していて、その上でスパークジョブを実行しようとしています。私のテーブルは約70,000行あり、各行には列「タイプ」があり、値はです。投稿、コメント、返信です。そのタイプに基づいて、私は以下に示すような異なるペアRDDを(ポストのために)取り出したいと思う。JavaPairRDDからnullデータを削除する方法

JavaPairRDD<ImmutableBytesWritable, FlumePost> postPairRDD = hBaseRDD.mapToPair(
      new PairFunction<Tuple2<ImmutableBytesWritable, Result>, ImmutableBytesWritable, FlumePost>() { 
       private static final long serialVersionUID = 1L; 

       public Tuple2<ImmutableBytesWritable, FlumePost> call(Tuple2<ImmutableBytesWritable, Result> arg0) 
         throws Exception { 
        FlumePost flumePost = new FlumePost(); 
        ImmutableBytesWritable key = arg0._1; 
        Result result = arg0._2; 
        String type = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("t"))); 
        if (type.equals("post")) { 
         return new Tuple2<ImmutableBytesWritable, FlumePost>(key, flumePost); 
        } else { 
         return null; 
        } 

       } 

      }).distinct(); 

ここでの問題は、私は望ましくないnull値を送信する必要がポスト以外のタイプを持つすべての行について、です。そして、繰り返しは3つのタイプすべてで70k回実行され、サイクルが無駄になります。だから私の最初の質問は:

1)これを行うにはどのような効果的な方法がありますか?

これで、70KBの結果が得られたら、distinct()メソッドを使用してヌル値の重複を削除しました。だから私は1つのnull値のオブジェクトを持つことになります。私は20327の結果を期待していますが、私は20328を取得します。

2)このヌルエントリをペアRDDから削除する方法はありますか?

答えて

2

RDDでfilter操作を使用できます。

は単純に呼び出す:

.filter(new Function<Tuple2<ImmutableBytesWritable, FlumePost>, Boolean>() { 
    @Override 
    public Boolean call(Tuple2<ImmutableBytesWritable, FlumePost> v1) throws Exception { 
     return v1 != null; 
    } 
}) 

null Sをフィルタリングするdistinct()を呼び出す前に。

+0

ありがとうTzach :)私の最初の質問にもお答えください。別の型を取得するためにSQLContextを使うべきですか? –

+0

あなたの実装に重大なパフォーマンス上の問題はないと思います。はい、このコードをDataFramesに変換し、SQLとUDFを使用してこれを実行できますが、パフォーマンスにどのくらいの効果が得られるかはわかりません。 –

+0

あなたの助けTzachにもう一度感謝します。 :) –

関連する問題