2017-01-10 9 views
0

件名にちょっと触れる唯一の投稿はhereですが、私の問題は解決しません。ここでHDFSのgetmergeリカバリを使った寄木細工の構築

は、我々がローカルバックアップに寄木細工を収集する問題がある:

$ hadoop fs -getmerge /dir/on/hdfs /local/dir 

作られたエラーは、我々は寄木細工複数のファイルの組織はHDFSの書き込みによるものであると思ったが、我々はそれが本当にだった理解していなかったということです寄せ木張りのファイル "通常の"組織。だから、私たちはHDFSのgetmergeを使ってバックアップを行いました。問題は、私たちのデータが消去されており、現在回復していないということです。

寄木細工を解析して(そしてを読むと)、すべてのファイルは元々マジックナンバー 'PAR1'の間にデータ+メタデータを含むチャンクで構成され、2 - _metadataと_common_metadata - filesメタデータの

getmergeのプロセスファイルがhdfs上の元の寄せ木細工ディレクトリの順番になっていることを観察すると、私は2つの 'PAR1'の間のデータをとり、それをチャンクファイルにするスクリプトを考え出しました。 作成される最初の2つのファイルは(_common_metadata、_metadata)です。それが作成されたすべての部分は、元のファイル(後にされた後、名前のディレクトリに移動され

  • パラメータに「getmerge」寄木細工のファイルを取る

    1. filePrefix='part-' 
      finalFilePrefix='part-r-' 
      
      awk 'NR%2==0{ print $0 > "part-"i++ }' RS='PAR1' $1 
      
      nbFiles=$(ls -lah | grep 'part-' | wc -l) 
      
      for num in $(seq 0 $nbFiles) 
           do 
           fileName="$filePrefix$num" 
           lastName="" 
           if [ "$num" -eq "0" ]; then 
             lastName="_common_metadata" 
             awk '{print "PAR1" $0 "PAR1"}' $fileName > $lastName 
           else  
      
             if [ "$num" -eq "1" ]; then 
               lastName="_metadata" 
               awk '{print "PAR1" $0 "PAR1"}' $fileName > $lastName 
             else  
               if [ -e $fileName ]; then 
                 count=$(printf "%05d" $(($num-2))) 
                 lastName="$finalFilePrefix$count.gz.parquet" 
                 awk '{print "PAR1" $0 "PAR1"}' $fileName > $lastName 
               fi  
             fi    
           fi 
           echo $lastName 
           truncate --size=-1 $lastName 
           rm -f "$fileName" 
      done 
      
      mv $1 $1.backup 
      mkdir $1 
      mv _* $1 
      mv part* $1 
      

      スクリプトに関するいくつかの観察

    2. 各ファイルの末尾にバイトを記入する必要があります - 切り捨て - sparkとして経験的に行われましたsc.load.parquet()はメタデータファイルを読み取れませんでした
    3. 最終的には、hadoop fs -putを使用してhdfsにアップロードします。
    4. 私は(明らかと_common_metadateファイル)_metadataを言ったようにDATAFRAME としてそれをロードしようとOK読まなく、チャンクを読み込むときに、まだ我々は持っているとエラーされています

    コードを:

    val newDataDF = sqlContext.read.parquet("/tmp/userActionLog2-leclerc-culturel-2016.09.04.parquet") 
    newDataDF.take(1) 
    

    エラー:誰かが助けることができる任意のアイデアを持っていたならば我々のデータはここに危機に瀕していることを

    newDataDF: org.apache.spark.sql.DataFrame = [bson: binary] 
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 5, hdp-node4.affinytix.com): java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 13 
    at org.apache.parquet.format.Util.read(Util.java:216) 
    at org.apache.parquet.format.Util.readPageHeader(Util.java:65) 
    at org.apache.parquet.hadoop.ParquetFileReader$WorkaroundChunk.readPageHeader(ParquetFileReader.java:668) 
    at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:546) 
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:496) 
    at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.checkEndOfRowGroup(UnsafeRowParquetRecordReader.java:604) 
    at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.loadBatch(UnsafeRowParquetRecordReader.java:218) 
    at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.nextKeyValue(UnsafeRowParquetRecordReader.java:196) 
    at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
    Caused by: parquet.org.apache.thrift.protocol.TProtocolException: don't know what type: 13 
    at parquet.org.apache.thrift.protocol.TCompactProtocol.getTType(TCompactProtocol.java:806) 
    at parquet.org.apache.thrift.protocol.TCompactProtocol.readFieldBegin(TCompactProtocol.java:500) 
    at org.apache.parquet.format.InterningProtocol.readFieldBegin(InterningProtocol.java:158) 
    at org.apache.parquet.format.PageHeader.read(PageHeader.java:828) 
    at org.apache.parquet.format.Util.read(Util.java:213) 
    ... 32 more 
    

    を考えると、私は暖かく(彼に感謝しますエル)。

    bye

  • 答えて

    0

    私は質問に答えました。

    私が最初に持っていた基本的な考え方は大丈夫です。問題はちょうどawk(ソリューションスクリプト内)が多数の文字を追加していることです。 それでは、寄木細工の塊が読めなくなりました。

    解決策は、プログラミング(python、perl ...)によってマージされたファイルを操作することです。 ここで私が思いつくpythonの解決策です。それは、無用の文字を追加しないことを除いて、前のものと同等です。

    コード:

    print "create parquet script." 
    import sys 
    filename = sys.argv[1] 
    import locale 
    currencode=locale.getpreferredencoding() 
    
    import io 
    print "=====================================================================" 
    print "Create parquet from: ", filename 
    print "defautl buffer size: ", io.DEFAULT_BUFFER_SIZE 
    print "default encoding of the system: ", currencode 
    print "=====================================================================" 
    
    import re 
    magicnum = "PAR1" 
    with io.open(filename, mode='rb') as f: 
         content = f.read() 
    res = [ magicnum + chunk + magicnum for chunk in filter(lambda s: s!="", re.split(magicnum, content)) ] 
    
    szcontent = len(res[2:]) 
    for i in range(0,szcontent) : 
         si = str(i) 
         write_to_binfile("part-r-{}.gz.parquet".format(si.zfill(5)), res[i+2]) 
    
    write_to_binfile("_common_metadata", res[0]) 
    write_to_binfile("_metadata", res[1]) 
    
    import os 
    os.system("mv {} {}.backup".format(filename, filename)) 
    os.system("mkdir {}".format(filename)) 
    os.system("mv _* {}".format(filename)) 
    os.system("mv part* {}".format(filename)) 
    

    観察:Pythonの関数は、文字列(数十メガバイトはOKです)などのメモリに全部を読み込むよう 寄木細工のファイルは非常に大きいのであってはなりません! 最後にシステムコールがUNIXベースであるため、linux/unixで実行する必要があります。

    bye

    関連する問題