。 Dataprocはこのルックアップの時間を改善するために、デフォルトで "mapreduce.input.fileinputformat.list-status.num-threads"プロパティを20に設定しますが、RPCはGCSのファイルごとに実行されます。
スレッドを追加してもあまり役に立たず、ドライバをOOMにすばやく導いているような場合があります。
読んで並列化する方法を拡張すると、2つのアイデアがあります。
しかし、最初に、警告のビットがあります。これらの解決策はいずれも、グロブに含まれるディレクトリに対して非常に堅牢です。おそらく、読み込むファイルのリストに表示されているディレクトリを守る必要があります。
最初はpythonとhadoopコマンドラインツールで行います(これはgsutilでも行うことができます)。それがどのように見えるかの例を以下で、労働者への上場のファイルを実行し、ペアにファイルの内容を読み取り、最後に(ファイル名、ファイル長)のペアを計算します。
from __future__ import print_function
from pyspark.rdd import RDD
from pyspark import SparkContext
import sys
import subprocess
def hadoop_ls(file_glob):
lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("\n")
files = [line.split()[7] for line in lines if len(line) > 0]
return files
def hadoop_cat(file):
return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Provide a list of path globs to read.")
exit(-1)
sc = SparkContext()
# This is just for testing. You'll want to generate a list
# of prefix globs instead of having a list passed in from the
# command line.
globs = sys.argv[1:]
# Desired listing partition count
lpc = 100
# Desired 'cat' partition count, should be less than total number of files
cpc = 1000
files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls)
files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)])
files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])])
local = files_and_char_count.collect()
for pair in local:
print("File {} had {} chars".format(pair[0], pair[1]))
私が最初にこのサブプロセスを開始しますhadoop_lsとhadoop_catの呼び出しを分割して遊んで、受け入れられるものが得られるかどうか確認してください。
2番目の解決策は複雑ですが、多くの多くのexec呼び出しを避けることでよりパフォーマンスの高いパイプラインが得られるでしょう。
この2番目の解決策では、初期化アクションを使用して、そのjarファイルをすべてのワーカーにコピーし、最後にドライバからヘルパーを使用して、専用ヘルパーjarファイルをコンパイルします。私たちは純粋なPythonのソリューションは、上記の場合と同様に多くを機能小さなScalaのクラスを持つことになり、当社のPysparkHelper.scalaファイルで
helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala
helper/build.sbt
:
当社のScalaのjarプロジェクトの最終的なディレクトリ構造は次のようになります。最初に、ファイルグロブのRDD、ファイル名のRDD、最後にファイル名とファイルの内容のペアのRDDを作成します。
package com.google.cloud.dataproc.support
import collection.JavaConversions._
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext}
import java.util.ArrayList
import java.nio.charset.StandardCharsets
class PysparkHelper extends Serializable {
def wholeTextFiles(
context: JavaSparkContext,
paths: ArrayList[String],
partitions: Int): JavaPairRDD[String, String] = {
val globRDD = context.sc.parallelize(paths).repartition(partitions)
// map globs to file names:
val filenameRDD = globRDD.flatMap(glob => {
val path = new Path(glob)
val fs: FileSystem = path.getFileSystem(new Configuration)
val statuses = fs.globStatus(path)
statuses.map(s => s.getPath.toString)
})
// Map file name to (name, content) pairs:
// TODO: Consider adding a second parititon count parameter to repartition before
// the below map.
val fileNameContentRDD = filenameRDD.map(f => {
Pair(f, readPath(f, new Configuration))
})
new JavaPairRDD(fileNameContentRDD)
}
def readPath(file: String, conf: Configuration) = {
val path = new Path(file)
val fs: FileSystem = path.getFileSystem(conf)
val stream = fs.open(path)
try {
IOUtils.toString(stream, StandardCharsets.UTF_8)
} finally {
stream.close()
}
}
}
ヘルパー/ビルド。私たちは、その後、SBTでヘルパーを構築することができ
organization := "com.google.cloud.dataproc.support"
name := "pyspark_support"
version := "0.1"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided"
exportJars := true
:
$ cd helper && sbt package
出力ヘルパージャーは、ターゲット/ Scalaの-2.10/pyspark_support_2.10-0.1.jarあるべきSBTファイルには、次のようになります
このクラスタをクラスタに追加する必要があります.1つはGCSにjarをアップロードし、2はGCSでJarをクラスタノードにコピーするための初期化アクションを作成します。
説明のために、あなたのバケツの名前がMY_BUCKETであると仮定しましょう(ここに適切な海洋関連のメモを挿入してください)。
$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar
(のは、必要に応じてMY_BUCKETを交換し、pyspark_init_action.shそれを呼びましょう)初期化アクションを作成します。
#!/bin/bash
gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/
、最終的にGCSに初期化アクションをアップロード:
$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh
クラスタ次のフラグをgcloudに渡すことで開始できます。
--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh
ついに pysparkからそれを利用する我々はできる、アップロードを構築し、私たちの新しいライブラリをインストールした後:
from __future__ import print_function
from pyspark.rdd import RDD
from pyspark import SparkContext
from pyspark.serializers import PairDeserializer, UTF8Deserializer
import sys
class DataprocUtils(object):
@staticmethod
def wholeTextFiles(sc, glob_list, partitions):
"""
Read whole text file content from GCS.
:param sc: Spark context
:param glob_list: List of globs, each glob should be a prefix for part of the dataset.
:param partitions: number of partitions to use when creating the RDD
:return: RDD of filename, filecontent pairs.
"""
helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper()
return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Provide a list of path globs to read.")
exit(-1)
sc = SparkContext()
globs = sys.argv[1:]
partitions = 10
files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions)
files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1])))
local = files_and_char_count.collect()
for pair in local:
print("File {} had {} chars".format(pair[0], pair[1]))
エラーメッセージやスタックトレースなどは参考になります。マスターは含まれているすべてのデータを読み取るわけではありませんが、作業を開始する前にすべての入力ファイルのステータスを取得します。 Dataprocはこのルックアップの時間を改善するために、デフォルトで "mapreduce.input.fileinputformat.list-status.num-threads"プロパティを20に設定しますが、RPCはGCSのファイルごとに実行されます。ルックアップをさらに改善する方法の1つは、ファイル接頭辞を含むRDDを作成し、flatMapを使用してこれらの接頭辞をファイル名に変換し、次にファイル名をファイルコンテンツにマッピングすることによって、このルックアップロジックの一部を実行することです。 –
あなたが提案したようにファイル名のRDDを作成するとします。このファイル名をファイルコンテンツにどのようにマッピングする必要がありますか?私はexecutor内でsc.wholeTextFileを呼び出すことはできません。私はexecutor内でboto APIを使ってファイルをダウンロードすることができました。私はこれを試みたが、それはさらに遅い。私の疑問は、boto APIにはすべての要求に対して多くの認証オーバーヘッドがあることです。 –