2017-07-27 1 views
1

スパーク2.1、ETLプロセスはソースシステムからファイルをパーケットに変換し、小さなパーツをfolder1に配置します。 folder1のスパークストリーミングは正常に機能していますが、folder1のパーケットファイルはHDFSにとって小さすぎます。小さい方の寄木細工ファイルを大きなものにマージする必要がありますが、folder1からファイルを削除しようとすると、スパークストリーミングプロセスの例外例外が発生します:Spark Streamingフォルダからファイルを削除することはできますか?

17/07/26 17:16:23 ERROR StreamExecution:Query [id = f29783ea- BDFB-4b59-a6f6-b77f79509a5a、RUNID = cbcce2b2-7d7b-4e31-a15aは-7efed420f974]エラー java.io.FileNotFoundExceptionで終了します。ファイルが存在しない

それはスパークストリーミングフォルダ内の寄木細工のファイルをマージすることは可能ですか?

Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /___/ .__/\_,_/_/ /_/\_\ version 2.1.0.2.6.0.3-8 
     /_/ 

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131) 
Type in expressions to have them evaluated. 
Type :help for more information. 

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

import org.apache.spark.sql.types._ 

val userSchema = new StructType() 
    .add("itemId", "string") 
    .add("tstamp", "integer") 
    .add("rowtype", "string") 
    .add("rowordernumber", "integer") 
    .add("parentrowordernumber", "integer") 
    .add("fieldname", "string") 
    .add("valuestr", "string") 

val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2") 

csvDF.createOrReplaceTempView("tab1") 
val aggDF = spark.sql("select distinct count(itemId) as cases_count from tab1") 
aggDF 
    .writeStream 
    .outputMode("complete") 
    .format("console") 
    .start() 

aggDF 
.writeStream 
.queryName("aggregates") // this query name will be the table name 
.outputMode("complete") 
    .format("memory") 
    .start() 
spark.sql("select * from aggregates").show() 

// Exiting paste mode, now interpreting. 

+-----------+ 
|cases_count| 
+-----------+ 
+-----------+ 

import org.apache.spark.sql.types._ 
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(itemId,StringType,true), StructField(tstamp,IntegerType,true), StructField(rowtype,StringType,true), StructField(rowordernumber,IntegerType,true), StructField(parentrowordernumber,IntegerType,true), StructField(fieldname,StringType,true), StructField(valuestr,StringType,true)) 
csvDF: org.apache.spark.sql.DataFrame = [itemId: string, tstamp: int ... 5 more fields] 
aggDF: org.apache.spark.sql.DataFrame = [cases_count: bigint] 

scala> ------------------------------------------- 
Batch: 0 
------------------------------------------- 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". 
SLF4J: Defaulting to no-operation (NOP) logger implementation 
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 
+-----------+ 
|cases_count| 
+-----------+ 
| 292086106| 
+-----------+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+-----------+ 
|cases_count| 
+-----------+ 
| 292086106| 
+-----------+ 

------------------------------------------- 
Batch: 2 
------------------------------------------- 
+-----------+ 
|cases_count| 
+-----------+ 
| 292086106| 
+-----------+ 

------------------------------------------- 
Batch: 3 
------------------------------------------- 
+-----------+ 
|cases_count| 
+-----------+ 
| 292086106| 
+-----------+ 

------------------------------------------- 
Batch: 4 
------------------------------------------- 
+-----------+ 
|cases_count| 
+-----------+ 
| 324016758| 
| 292086106| 
+-----------+ 

------------------------------------------- 
Batch: 5 
------------------------------------------- 
+-----------+ 
|cases_count| 
+-----------+ 
| 355839229| 
| 324016758| 
| 292086106| 
+-----------+ 

17/07/26 17:16:23 ERROR StreamExecution: Query [id = f29783ea-bdfb-4b59-a6f6-b77f79509a5a, runId = cbcce2b2-7d7b-4e31-a15a-7efed420f974] terminated with error 
java.io.FileNotFoundException: File does not exist: /folder1/folder2/P-FMVDBAF-4021-20161107152556-1_006.gz.parquet 
+0

このSpark StreamingまたはStructured Streamingはありますか?いくつかのコードを共有するための注意?構造化ストリーミングのように見えます。スタックトレース全体を含めることはできますか? –

+0

メイン・ポストをサンプル・コードで更新しました。はい、ストラクチャード・ストリーミングです。コードを実行するためにspark-shellを使用します。 – Triffids

答えて

1

グロビングを使用すると、必要なファイルだけを処理できます。このように:

val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2/bigger_file*.parquet") 
関連する問題