2016-04-06 6 views
0

私はfilesystem状態のバックエンドとzookeeperリカバリモードに設定します:HDFSかのように(私がcheckpointdirstorageDirパラメータを指定する必要があります見ることができますが、私は、Apache FLINKでサポートされている任意のファイルシステムを持っていないのでApache FlinkでRiak CSを使用することはできますか?

state.backend: filesystem 
state.backend.fs.checkpointdir: ??? 

recovery.mode: zookeeper 
recovery.zookeeper.storageDir: ??? 

をAmazon S3)。しかし、私はRiak CSクラスタをインストールしました(それはcompatible with S3のようです)。

Apache Flinkと一緒にRiak CSを使用できますか?可能であれば:Apache FlinkをRiak CSと連携するように設定する方法は?

答えて

3

回答:Apache FlinkとRiak CSに参加するにはどうすればいいですか?

Riak CSには、S3(バージョン2)互換インタフェースがあります。したがって、HadoopのS3ファイルシステムアダプタを使用してRiak CSを使用することは可能です。

私はなぜ知られていないが、ApacheのFLINKは脂肪ジャー内部のHadoopのファイルシステムアダプタの一部のみ(lib/flink-dist_2.11-1.0.1.jar)すなわち、それはFTPファイルシステム(org.apache.hadoop.fs.ftp.FTPFileSystem)を持っていますが、S3ファイルシステム(すなわちorg.apache.hadoop.fs.s3a.S3AFileSystem)を持っていませんがあります。したがって、この問題を解決する2つの方法があります。

  • これらのアダプターをHadoopインストールから使用してください。私はこれを試していないが、HADOOP_CLASSPATHまたはHADOOP_HOME evn変数を設定する必要があるようだ。
  • モンキーパッチのApache FLINKので<flink home>/libディレクトリ

に必要なJARファイルをダウンロードし、私の環境で規定のHadoopにしたくないので、私は2番目の道を選択してください。あなたはのHadoop distのか、インターネットからJARをコピーすることができます。

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar 
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar 
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar 
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar 

あなたは、このようなバージョンは、Hadoopの2.7.2で使用して、私はHadoopののこのバージョンと互換性FLINKを使用するので、私は古いバージョンを使用しています見ることができるように。

FYI:このようなハックは、これらのJARの最新バージョンを独自のフローで使用している場合に問題を引き起こす可能性があります。

// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS) 
shadowJar { 
    dependencies { 
     include(dependency('.*:.*:.*')) 
    } 

    relocate 'org.apache.http', 'relocated.org.apache.http' 
    relocate 'org.apache.commons', 'relocated.org.apache.commons' 
} 

その後、あなたはHadoopの互換性のあるファイルシステムのでflink-conf.yamlcore-site.xmlへのパスを指定する必要があります別のバージョンに関連する問題を回避するために、あなたは、フローを使用して脂肪を瓶にのようなものを(私はGradleのを使用しています)を構築する際に、パッケージを再配置することができます負荷設定のため、この設定を使用して:

... 
fs.hdfs.hadoopconf: /flink/conf 
... 

あなたは私がちょうど<fink home>/confディレクトリにそれを置く見ることができるように。それは、次の設定があります。

<?xml version="1.0" encoding="UTF-8" ?> 
<configuration> 
    <property> 
     <name>fs.s3a.impl</name> 
     <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3 
    </property> 
    <property> 
     <name>fs.s3a.endpoint</name> 
     <value>my-riak-cs.stage.local</value> // this is my Riak CS host 
    </property> 
    <property> 
     <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL 
     <value>false</value> 
    </property> 
    <property> 
     <name>fs.s3a.access.key</name> 
     <value>????</value> // this is my access key for Riak CS 
    </property> 
    <property> 
     <name>fs.s3a.secret.key</name> 
     <value>????</value> // this is my secret key for Riak CS 
    </property> 
</configuration> 

その後、あなたは、推薦hereとしてflink-conf.yamlにRiakにCSバケットを構成する必要があります。

... 
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints 
... 
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery 
... 

とRiakにCSでバケットを作成します。私はs3cmdを使用しています(私のOS X用のdevのENVにbrewの上に設置さ):FYI

s3cmd mb s3://example-staging-flink 

は:

signature_v2 = True // because Riak CS using S3 V2 interface 
use_https = False // if your don't use SSL 
access_key = ??? 
secret_key = ??? 
host_base = my-riak-cs.stage.local // your Riak CS host 
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS 

s3cmdを使用する前に、あなたはそれが~/.s3cmdファイルにいくつかの設定をs3cmd --configureを使用して、修正する設定する必要がありますつまり、Riak CSのスタンドアロンHA Apache Flinkクラスタの保存/復元状態を設定する必要があります。

関連する問題