2016-12-29 8 views
2

私はApache Flinkには比較的新しいので、AWS S3バケットにファイルを生成する簡単なプロジェクトを作成しようとしています。ドキュメントに基づいて、これを行うためにHadoopをインストールする必要があるようです。Apache Flink AWS S3 Sinkでは、ローカルテストにHadoopが必要ですか?

この機能をテストできるようにローカル環境を設定するにはどうすればよいですか?私はApache FlinkとHadoopをローカルにインストールしました。私はHadoopのcore-site.xml設定に必要な変更を追加し、HADOOP_CONFパスをflink.yaml設定に追加しました。私がしようとするとFLINK UIを通じてローカルに自分の仕事を提出するとき、私は常にエラー

2016-12-29 16:03:49,861 INFO org.apache.flink.util.NetUtils        - Unable to allocate on port 6123, due to error: Address already in use 
 
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager    - Failed to run JobManager. 
 
java.lang.RuntimeException: Unable to do further retries starting the actor system 
 
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203) 
 
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143) 
 
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040) 
 
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

を取得し、私は私が私の環境が設定されている方法で何かをしないのですと仮定しています。これをローカルで行うことは可能ですか?どんな助けもありがとう。

+0

ポート6123が使用中であることを確認します。そうでなければあなたのファイアウォール/ iptablesを無効にします。応答のために –

答えて

3

Hadoopライブラリが必要な場合は、ローカルで実行してS3に書き込むためにHadoopをインストールする必要はありません。私は、Avroスキーマに基づくParquet出力を作成し、SpecificRecordをS3に生成してこれを試してみました。私はSBTとIntellij Ideaを通じて、次のコードをローカルで実行しています。必要な部分:

1)必要なHadoopプロパティを指定して次のファイルを作成してください(注:AWSアクセスキー/秘密鍵の定義は推奨されません)。適切なIAMロールを持つEC2インスタンスでの読み取り/書き込みあなたのS3バケットしかし、テストのために地元のために必要)

<configuration> 
    <property> 
     <name>fs.s3.impl</name> 
     <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
    </property> 

    <!-- Comma separated list of local directories used to buffer 
     large results prior to transmitting them to S3. --> 
    <property> 
     <name>fs.s3a.buffer.dir</name> 
     <value>/tmp</value> 
    </property> 

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> 
    <property> 
     <name>fs.s3a.access.key</name> 
     <value>YOUR_ACCESS_KEY</value> 
    </property> 

    <!-- set your AWS access key --> 
    <property> 
     <name>fs.s3a.secret.key</name> 
     <value>YOUR_SECRET_KEY</value> 
    </property> 
</configuration> 

2)輸入:。 インポートは

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat 
import org.apache.flink.api.scala.{ExecutionEnvironment, _} 

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat 
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} 
import org.apache.hadoop.fs.Path 
import org.apache.hadoop.mapreduce.Job 

import org.apache.parquet.avro.AvroParquetOutputFormat 

3をcom.uebercomputing.eventrecord.EventOnlyRecord)FLINKコードは、上記の構成でHadoopOutputFormatを使用しています:

val events: DataSet[(Void, EventOnlyRecord)] = ... 

    val hadoopConfig = getHadoopConfiguration(hadoopConfigFile) 

    val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord] 
    val outputJob = Job.getInstance 

    //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T] 
    //so key is Void, value of type T - EventOnlyRecord in this case 
    val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
     outputFormat, 
     outputJob 
    ) 

    val outputConfig = outputJob.getConfiguration 
    outputConfig.addResource(hadoopConfig) 
    val outputPath = new Path("s3://<bucket>/<dir-prefix>") 
    FileOutputFormat.setOutputPath(outputJob, outputPath) 
    AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema) 

    events.output(hadoopOutputFormat) 

    env.execute 

    ... 

    def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = { 
     val hadoopConfig = new HadoopConfiguration() 
     hadoopConfig.addResource(new Path(hadoodConfigPath)) 
     hadoopConfig 
    } 

4)ビルドの依存関係とバージョンが使用される:

val awsSdkVersion = "1.7.4" 
    val hadoopVersion = "2.7.3" 
    val flinkVersion = "1.1.4" 

    val flinkDependencies = Seq(
     ("org.apache.flink" %% "flink-scala" % flinkVersion), 
     ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion) 
    ) 

    val providedFlinkDependencies = flinkDependencies.map(_ % "provided") 

    val serializationDependencies = Seq(
     ("org.apache.avro" % "avro" % "1.7.7"), 
     ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"), 
     ("org.apache.parquet" % "parquet-avro" % "1.8.1") 
    ) 

    val s3Dependencies = Seq(
     ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion), 
     ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion) 
    ) 

編集S3にwriteAsTextを使用する:

1)Hadoopの設定ディレクトリを作成し(Hadoopの-CONF-DIRとしてこれを参照します)にcore-site.xmlというファイルがあります。例えば

mkdir /home/<user>/hadoop-config 
cd /home/<user>/hadoop-config 
vi core-site.xml 

#content of core-site.xml 
<configuration> 
    <property> 
     <name>fs.s3.impl</name> 
     <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
    </property> 

    <!-- Comma separated list of local directories used to buffer 
     large results prior to transmitting them to S3. --> 
    <property> 
     <name>fs.s3a.buffer.dir</name> 
     <value>/tmp</value> 
    </property> 

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> 
    <property> 
     <name>fs.s3a.access.key</name> 
     <value>YOUR_ACCESS_KEY</value> 
    </property> 

    <!-- set your AWS access key --> 
    <property> 
     <name>fs.s3a.secret.key</name> 
     <value>YOUR_SECRET_KEY</value> 
    </property> 
</configuration> 

2)にファイルFLINK-conf.yamlと(FLINK-CONF-DIRとしてこれを参照する)ディレクトリを作成します。 - 実行 - 編集の設定を -

mkdir /home/<user>/flink-config 
cd /home/<user>/flink-config 
vi flink-conf.yaml 

//content of flink-conf.yaml - continuing earlier example 
fs.hdfs.hadoopconf: /home/<user>/hadoop-config 

3)あなたのS3 FLINKジョブを実行するために使用されるあなたのIntelliJの実行構成を編集し、次の環境変数を追加します:たとえば

FLINK_CONF_DIR and set it to your flink-conf-dir 

Continuing the example above: 
FLINK_CONF_DIR=/home/<user>/flink-config 

4)その環境変数を設定してコードを実行します。

events.writeAsText("s3://<bucket>/<prefix-dir>") 

env.execute 
+0

ありがとう。出力パスを定義せずに、自分のローカルJava実行をhadoop設定ファイルにポイントする方法がありますか?ドキュメントに基づいて、私はちょうどのような何かをすることができるはずです:messageStream.writeAsText( "s3:// ..."); IntelliJを使ってローカル実行を実行すると、そのファイルがどこにあるのかわかりません。私はまた、私が実行時にそれを設定することを許可するフリンク操作を見つけることができないようです。 – medium

+0

問題は、writeAsTextを呼び出すときに使用されるデフォルトのHadoopFileSystemがs3ファイルシステムについて「認識」していないことです。上記の私の元の答えへの編集を参照してください。 – medale

+0

だから私はすべてが機能していると思うが、S3バケツにアクセスする際に問題がある。このエラー: com.amazonaws.services.s3.model.AmazonS3Exception:ステータスコード:403、AWSサービス:Amazon S3、AWSリクエストID:**********、AWSエラーコード:null、AWSエラーメッセージ:禁止S3拡張要求ID: 私のアプリで使用されるキーがS3バケットを作成したのと同じアカウントなので、なぜアクセスエラーが発生するのかわかりません。それは今、すべてがフリンク側で働いているようだ。なぜ私がこのエラーが出てきたのかについてのヒントがあれば教えてください。再度、感謝します! – medium

関連する問題