2016-11-06 8 views
0

マップ内でrichfatMapFunctionを使用してhbaseから読み込むと、シリアル化エラーが発生します。私がしようとしているのは、datastreamがhbase else ignoreから読み取られた特定の文字列に等しい場合です。以下は、私が得ているサンプルプログラムとエラーです。hbaseからの読み取り時にflink thowingシリアル化エラーが発生する

package com.abb.Flinktest 
import java.text.SimpleDateFormat 
import java.util.Properties 

import scala.collection.concurrent.TrieMap 
import org.apache.flink.addons.hbase.TableInputFormat 
import org.apache.flink.api.common.functions.RichFlatMapFunction 
import org.apache.flink.api.common.io.OutputFormat 
import org.apache.flink.api.java.tuple.Tuple2 
import org.apache.flink.streaming.api.scala.DataStream 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
import org.apache.flink.streaming.api.scala.createTypeInformation 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 
import org.apache.flink.util.Collector 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.hbase.TableName 
import org.apache.hadoop.hbase.client.ConnectionFactory 
import org.apache.hadoop.hbase.client.HTable 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.client.Result 
import org.apache.hadoop.hbase.client.Scan 
import org.apache.hadoop.hbase.filter.BinaryComparator 
import org.apache.hadoop.hbase.filter.CompareFilter 
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.log4j.Level 
import org.apache.flink.api.common.functions.RichMapFunction 

object Flinktesthbaseread { 

    def main(args:Array[String]) 
    { 
    val env = StreamExecutionEnvironment.createLocalEnvironment() 
    val kafkaStream = env.fromElements("hello") 
    val c=kafkaStream.map(x => if(x.equals("hello"))kafkaStream.flatMap(new ReadHbase()))  
    env.execute() 
    } 
     class ReadHbase extends RichFlatMapFunction[String,Tuple11[String,String,String,String,String,String,String,String,String,String,String]] with Serializable 
    { 
     var conf: org.apache.hadoop.conf.Configuration = null; 
    var table: org.apache.hadoop.hbase.client.HTable = null; 
    var hbaseconnection:org.apache.hadoop.hbase.client.Connection =null 
    var taskNumber: String = null; 
    var rowNumber = 0; 
    val serialVersionUID = 1L; 

    override def open(parameters: org.apache.flink.configuration.Configuration) { 
     println("getting table") 
     conf = HBaseConfiguration.create() 
     val in = getClass().getResourceAsStream("/hbase-site.xml") 

     conf.addResource(in) 
     hbaseconnection = ConnectionFactory.createConnection(conf) 
     table = new HTable(conf, "testtable"); 
    // this.taskNumber = String.valueOf(taskNumber); 
    } 

    override def flatMap(msg:String,out:Collector[Tuple11[String,String,String,String,String,String,String,String,String,String,String]]) 
     { 
       //flatmap operation here 
     } 

     override def close() { 

     table.flushCommits(); 
     table.close(); 
    } 

    } 
} 

エラー:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$). 
log4j:WARN Please initialize the log4j system properly. 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable 
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172) 
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164) 
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:617) 
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959) 
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:484) 
    at com.abb.Flinktest.Flinktesthbaseread$.main(Flinktesthbaseread.scala:45) 
    at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala) 
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream 
    - field (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", name: "kafkaStream$1", type: "class org.apache.flink.streaming.api.scala.DataStream") 
    - root object (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", <function1>) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) 
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170) 
    ... 6 more 

私はwelのが、運としてクラスはシリアライズすることによって、メソッドとクラス内のフィールドをラップしようとしました。誰かがこれにいくつかのライトを投げたり、これに対するいくつかの回避策を提案することができます。

答えて

3

問題は、単純にシリアライズできないマップ関数のkafkaストリーム変数にアクセスしようとしていることです。これは単にデータの抽象的な表現です。最初にあなたの機能を無効にするものは何も含まれていません。

代わりに、このような何かを:

kafkaStream.filter(x => x.equals("hello")).flatMap(new ReadHBase()) 

フィルタ目的球は唯一の条件が真である要素を保持し、かつそれらはあなたのflatMap関数に渡されます。

変換を指定したときに実際に何が起こったのか誤解されているように、基本API概念のドキュメントを読むことを強くお勧めします。

+0

ありがとうございました。さらに進める前に、APIドキュメントをお読みください。 –

関連する問題