3

私はPySpark(Google Dataproc)を使用して約100万のHTMLファイルを解析し、関連するフィールドを凝縮ファイルに書き出しようとしています。各HTMLファイルは約200KBです。したがって、すべてのデータは約200GBです。PySpark + Google Cloud Storage(wholeTextFiles)

データのサブセットを使用すると、以下のコードは正常に動作しますが、数時間実行されてからデータセット全体を実行するとクラッシュします。さらに、ワーカーノードは利用されていないため(< 5%CPU)、問題があることがわかります。

私は、システムがGCSからのデータを摂取して窒息していると思います。これを行うより良い方法はありますか?また、この方法でwholeTextFilesを使用すると、マスターはすべてのファイルをダウンロードしてエグゼキュータに送信しようとしますか、またはエグゼキュータがそれらをダウンロードできるようにしますか?マスターが含まれているデータのすべてを読んでいないだろうが、それは作業を開始する前に、すべての入力ファイルのステータスを取得します、あなたの質問に答えるために

def my_func(keyval): 
    keyval = (file_name, file_str) 
    return parser(file_str).__dict__ 

data = sc.wholeTextFiles("gs://data/*") 
output = data.map(my_func) 
output.saveAsTextFile("gs://results/a") 
+0

エラーメッセージやスタックトレースなどは参考になります。マスターは含まれているすべてのデータを読み取るわけではありませんが、作業を開始する前にすべての入力ファイルのステータスを取得します。 Dataprocはこのルックアップの時間を改善するために、デフォルトで "mapreduce.input.fileinputformat.list-status.num-threads"プロパティを20に設定しますが、RPCはGCSのファイルごとに実行されます。ルックアップをさらに改善する方法の1つは、ファイル接頭辞を含むRDDを作成し、flatMapを使用してこれらの接頭辞をファイル名に変換し、次にファイル名をファイルコンテンツにマッピングすることによって、このルックアップロジックの一部を実行することです。 –

+0

あなたが提案したようにファイル名のRDDを作成するとします。このファイル名をファイルコンテンツにどのようにマッピングする必要がありますか?私はexecutor内でsc.wholeTextFileを呼び出すことはできません。私はexecutor内でboto APIを使ってファイルをダウンロードすることができました。私はこれを試みたが、それはさらに遅い。私の疑問は、boto APIにはすべての要求に対して多くの認証オーバーヘッドがあることです。 –

答えて

3

。 Dataprocはこのルックアップの時間を改善するために、デフォルトで "mapreduce.input.fileinputformat.list-status.num-threads"プロパティを20に設定しますが、RPCはGCSのファイルごとに実行されます。

スレッドを追加してもあまり役に立たず、ドライバをOOMにすばやく導いているような場合があります。

読んで並列化する方法を拡張すると、2つのアイデアがあります。

しかし、最初に、警告のビットがあります。これらの解決策はいずれも、グロブに含まれるディレクトリに対して非常に堅牢です。おそらく、読み込むファイルのリストに表示されているディレクトリを守る必要があります。

最初はpythonとhadoopコマンドラインツールで行います(これはgsutilでも行うことができます)。それがどのように見えるかの例を以下で、労働者への上場のファイルを実行し、ペアにファイルの内容を読み取り、最後に(ファイル名、ファイル長)のペアを計算します。

from __future__ import print_function 

from pyspark.rdd import RDD 
from pyspark import SparkContext 

import sys 
import subprocess 


def hadoop_ls(file_glob): 
    lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("\n") 
    files = [line.split()[7] for line in lines if len(line) > 0] 
    return files 

def hadoop_cat(file): 
    return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8") 

if __name__ == "__main__": 
    if len(sys.argv) < 2: 
    print("Provide a list of path globs to read.") 
    exit(-1) 

    sc = SparkContext() 
    # This is just for testing. You'll want to generate a list 
    # of prefix globs instead of having a list passed in from the 
    # command line. 
    globs = sys.argv[1:] 
    # Desired listing partition count 
    lpc = 100 
    # Desired 'cat' partition count, should be less than total number of files 
    cpc = 1000 
    files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls) 
    files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)]) 
    files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])]) 
    local = files_and_char_count.collect() 
    for pair in local: 
    print("File {} had {} chars".format(pair[0], pair[1])) 

私が最初にこのサブプロセスを開始しますhadoop_lsとhadoop_catの呼び出しを分割して遊んで、受け入れられるものが得られるかどうか確認してください。

2番目の解決策は複雑ですが、多くの多くのexec呼び出しを避けることでよりパフォーマンスの高いパイプラインが得られるでしょう。

この2番目の解決策では、初期化アクションを使用して、そのjarファイルをすべてのワーカーにコピーし、最後にドライバからヘルパーを使用して、専用ヘルパーjarファイルをコンパイルします。私たちは純粋なPythonのソリューションは、上記の場合と同様に多くを機能小さなScalaのクラスを持つことになり、当社のPysparkHelper.scalaファイルで

helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala 
helper/build.sbt 

当社のScalaのjarプロジェクトの最終的なディレクトリ構造は次のようになります。最初に、ファイルグロブのRDD、ファイル名のRDD、最後にファイル名とファイルの内容のペアのRDDを作成します。

package com.google.cloud.dataproc.support 

import collection.JavaConversions._ 

import org.apache.commons.io.IOUtils 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.fs.{FileSystem, Path} 
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext} 

import java.util.ArrayList 
import java.nio.charset.StandardCharsets 

class PysparkHelper extends Serializable { 
    def wholeTextFiles(
    context: JavaSparkContext, 
    paths: ArrayList[String], 
    partitions: Int): JavaPairRDD[String, String] = { 

    val globRDD = context.sc.parallelize(paths).repartition(partitions) 
    // map globs to file names: 
    val filenameRDD = globRDD.flatMap(glob => { 
     val path = new Path(glob) 
     val fs: FileSystem = path.getFileSystem(new Configuration) 
     val statuses = fs.globStatus(path) 
     statuses.map(s => s.getPath.toString) 
    }) 
    // Map file name to (name, content) pairs: 
    // TODO: Consider adding a second parititon count parameter to repartition before 
    // the below map. 
    val fileNameContentRDD = filenameRDD.map(f => { 
     Pair(f, readPath(f, new Configuration)) 
    }) 

    new JavaPairRDD(fileNameContentRDD) 
    } 

    def readPath(file: String, conf: Configuration) = { 
    val path = new Path(file) 
    val fs: FileSystem = path.getFileSystem(conf) 
    val stream = fs.open(path) 
    try { 
     IOUtils.toString(stream, StandardCharsets.UTF_8) 
    } finally { 
     stream.close() 
    } 
    } 
} 

ヘルパー/ビルド。私たちは、その後、SBTでヘルパーを構築することができ

organization := "com.google.cloud.dataproc.support" 
name := "pyspark_support" 
version := "0.1" 
scalaVersion := "2.10.5" 
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided" 
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided" 
exportJars := true 

$ cd helper && sbt package 

出力ヘルパージャーは、ターゲット/ Scalaの-2.10/pyspark_support_2.10-0.1.jarあるべきSBTファイルには、次のようになります

このクラスタをクラスタに追加する必要があります.1つはGCSにjarをアップロードし、2はGCSでJarをクラスタノードにコピーするための初期化アクションを作成します。

説明のために、あなたのバケツの名前がMY_BUCKETであると仮定しましょう(ここに適切な海洋関連のメモを挿入してください)。

$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar 

(のは、必要に応じてMY_BUCKETを交換し、pyspark_init_action.shそれを呼びましょう)初期化アクションを作成します。

#!/bin/bash 

gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/ 

、最終的にGCSに初期化アクションをアップロード:

$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh 

クラスタ次のフラグをgcloudに渡すことで開始できます。

--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh 

ついに pysparkからそれを利用する我々はできる、アップロードを構築し、私たちの新しいライブラリをインストールした後:

from __future__ import print_function 

from pyspark.rdd import RDD 
from pyspark import SparkContext 
from pyspark.serializers import PairDeserializer, UTF8Deserializer 

import sys 

class DataprocUtils(object): 

    @staticmethod 
    def wholeTextFiles(sc, glob_list, partitions): 
    """ 
    Read whole text file content from GCS. 
    :param sc: Spark context 
    :param glob_list: List of globs, each glob should be a prefix for part of the dataset. 
    :param partitions: number of partitions to use when creating the RDD 
    :return: RDD of filename, filecontent pairs. 
    """ 
    helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper() 
    return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc, 
       PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) 

if __name__ == "__main__": 
    if len(sys.argv) < 2: 
    print("Provide a list of path globs to read.") 
    exit(-1) 

    sc = SparkContext() 
    globs = sys.argv[1:] 
    partitions = 10 
    files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions) 
    files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1]))) 
    local = files_and_char_count.collect() 
    for pair in local: 
    print("File {} had {} chars".format(pair[0], pair[1])) 
0

ありがとう!私は最初の方法を試しました。動作しますが、exec呼び出しとRPC/authオーバーヘッドのためにパフォーマンスがあまり良くありません。 32ノードクラスタで実行するのに約10時間かかります。私は、Amazon S3コネクターでawsのdatabricksを使用して、4ノードのクラスターで30分で実行することができました。そこにはオーバーヘッドがほとんどないようです。私はGoogleがGCSからSparkにデータを取り込むためのよりよい方法を提供したいと考えています。

+0

これをもう少し見てみたいと思います。 10時間は非常に高いようです - ファイル/ディレクトリのレイアウトを共有できますか(たとえば、1つのディレクトリに100万のオブジェクト、1,000のオブジェクトがある1,000のディレクトリなど)。また、一般的な形式のglobを共有できますか(たとえば、/ bucket/*/dir/*など)? –

関連する問題