Hadoopライブラリが必要な場合は、ローカルで実行してS3に書き込むためにHadoopをインストールする必要はありません。私は、Avroスキーマに基づくParquet出力を作成し、SpecificRecordをS3に生成してこれを試してみました。私はSBTとIntellij Ideaを通じて、次のコードをローカルで実行しています。必要な部分:
1)必要なHadoopプロパティを指定して次のファイルを作成してください(注:AWSアクセスキー/秘密鍵の定義は推奨されません)。適切なIAMロールを持つEC2インスタンスでの読み取り/書き込みあなたのS3バケットしかし、テストのために地元のために必要)
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.access.key</name>
<value>YOUR_ACCESS_KEY</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.secret.key</name>
<value>YOUR_SECRET_KEY</value>
</property>
</configuration>
2)輸入:。 インポートは
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.avro.AvroParquetOutputFormat
3をcom.uebercomputing.eventrecord.EventOnlyRecord)FLINKコードは、上記の構成でHadoopOutputFormatを使用しています:
val events: DataSet[(Void, EventOnlyRecord)] = ...
val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)
val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
val outputJob = Job.getInstance
//Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
//so key is Void, value of type T - EventOnlyRecord in this case
val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
outputFormat,
outputJob
)
val outputConfig = outputJob.getConfiguration
outputConfig.addResource(hadoopConfig)
val outputPath = new Path("s3://<bucket>/<dir-prefix>")
FileOutputFormat.setOutputPath(outputJob, outputPath)
AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)
events.output(hadoopOutputFormat)
env.execute
...
def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
val hadoopConfig = new HadoopConfiguration()
hadoopConfig.addResource(new Path(hadoodConfigPath))
hadoopConfig
}
4)ビルドの依存関係とバージョンが使用される:
val awsSdkVersion = "1.7.4"
val hadoopVersion = "2.7.3"
val flinkVersion = "1.1.4"
val flinkDependencies = Seq(
("org.apache.flink" %% "flink-scala" % flinkVersion),
("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
)
val providedFlinkDependencies = flinkDependencies.map(_ % "provided")
val serializationDependencies = Seq(
("org.apache.avro" % "avro" % "1.7.7"),
("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
("org.apache.parquet" % "parquet-avro" % "1.8.1")
)
val s3Dependencies = Seq(
("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)
編集S3にwriteAsTextを使用する:
1)Hadoopの設定ディレクトリを作成し(Hadoopの-CONF-DIRとしてこれを参照します)にcore-site.xmlというファイルがあります。例えば
:
mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml
#content of core-site.xml
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.access.key</name>
<value>YOUR_ACCESS_KEY</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.secret.key</name>
<value>YOUR_SECRET_KEY</value>
</property>
</configuration>
2)にファイルFLINK-conf.yamlと(FLINK-CONF-DIRとしてこれを参照する)ディレクトリを作成します。 - 実行 - 編集の設定を -
mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml
//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config
3)あなたのS3 FLINKジョブを実行するために使用されるあなたのIntelliJの実行構成を編集し、次の環境変数を追加します:たとえば
FLINK_CONF_DIR and set it to your flink-conf-dir
Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config
4)その環境変数を設定してコードを実行します。
events.writeAsText("s3://<bucket>/<prefix-dir>")
env.execute
ポート6123が使用中であることを確認します。そうでなければあなたのファイアウォール/ iptablesを無効にします。応答のために –