2016-09-19 1 views
1

私の目標は、大きなR data.frameをSparkに読み込むことです。 data.frameのサイズは5 milです。行と7種類のさまざまな種類の列があります。 Rにロードされると、このdata.frameは約1秒かかる。 200MBのメモリしかし、as.DataFrame()機能を使用してSparkにロードしようとすると、Rセッションは永遠に占有され、1時間実行されていたため、操作をキャンセルする必要がありました。SparkRのas.DataFrame()を使用して大きなR data.framesをSparkにロードするにはどうすればよいですか?

私は、この例で使用して、次のデータセットを作成しています:上記のサンプルdata.frameに

サイズ、約200メガバイトを作成

n=5e6 # set sample size 

d <- data.frame(
    v1=base::sample(1:9,n,replace=TRUE), 
    v2=base::sample(1000:9000,n,replace=TRUE), 
    v3=seq(as.Date("2016-01-01"), as.Date("2016-12-31"), by = "day")[base::sample(1:365,n,replace=TRUE)], 
    v4=LETTERS[base::sample(1:length(LETTERS),n,replace=TRUE)], 
    v5=base::sample(1000:9000,n,replace=TRUE)/1000, 
    v6=seq(ISOdate(2016,1,1), ISOdate(2018,1,1), "sec")[base::sample(1:63158401,n,replace=TRUE)], 
    v7=c(TRUE,FALSE)[base::sample(1:2,n,replace=TRUE)] 
) 

:ここ

は詳細です

paste0("size: ", round(as.numeric(object.size(d))/1000000,1)," mb") 

次に、私はSparkセッションを作成します:

Sys.setenv(SPARK_HOME='C:\\soft\\spark-2.0.0-bin-hadoop2.7',HADOOP_HOME='C:\\soft\\hadoop') 
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'),.libPaths())) 
Sys.setenv('SPARKR_SUBMIT_ARGS'='"sparkr-shell"') 

library(SparkR) 
library(rJava) 
sparkR.session(enableHiveSupport = FALSE,master = "local[*]", sparkConfig = list(spark.driver.memory = "1g",spark.sql.warehouse.dir="C:\\soft\\hadoop\\bin")) 

今、私はスパークに上記で作成したdata.frameをロードしようとしています:

d_sd <- as.DataFrame(d) 

上記のコマンドを実行するように永遠にかかります。

私は何か間違っていますか? 元のR data.frameの列のclass()に関連付けることはできますか? RからSparkに大規模なデータセットをロードする別のアプローチをとるべきでしょうか?はいの場合は、何かを提案してください。

ありがとうございます。

PS:

私はすぐにこのメソッドを使用してスパークの小さなデータセットを変換して操作することができています。

Rバージョン3.2.5(2016年4月14日) プラットフォーム:x86_64版-W64-MINGW32/x64(64ビット)をここで

は私のRセッションやOS上でいくつかの背景情報は、私が実行しているあります 実行中:Windows 7 x64(ビルド7601)サービスパック1

私はWindows 7 Professional(64ビット)、8 GBのRAMの下でMicrosoftのバージョンのR(Revolution)を実行しています。プロセッサ:i5-2520M @ 2.50GHz


EDIT 2016年9月19日:

、ZeydyオルティスとMohit Bansal氏、ありがとうございました。あなたの回答に基づいて、私は次のことを試してみましたが、私はまだ同じ問題に直面しています:

Sys.setenv(SPARK_HOME='C:\\soft\\spark-2.0.0-bin-hadoop2.7',HADOOP_HOME='C:\\soft\\hadoop') 
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'),.libPaths())) 
Sys.setenv('SPARKR_SUBMIT_ARGS'='"sparkr-shell"') 

library(SparkR) 
library(rJava) 
sparkR.session(enableHiveSupport = FALSE,master = "local[*]", sparkConfig = list(spark.driver.memory = "1g",spark.sql.warehouse.dir="C:\\soft\\hadoop\\bin")) 


n=5e6 # set sample size 

d_sd <- createDataFrame(sqlContext,data=data.frame(
     v1=base::sample(1:9,n,replace=TRUE), 
     v2=base::sample(1000:9000,n,replace=TRUE), 
     v3=seq(as.Date("2016-01-01"), as.Date("2016-12-31"), by = "day")[base::sample(1:365,n,replace=TRUE)], 
     v4=LETTERS[base::sample(1:length(LETTERS),n,replace=TRUE)], 
     v5=base::sample(1000:9000,n,replace=TRUE)/1000, 
     v6=seq(ISOdate(2016,1,1), ISOdate(2018,1,1), "sec")[base::sample(1:63158401,n,replace=TRUE)], 
     v7=c(TRUE,FALSE)[base::sample(1:2,n,replace=TRUE)] 
    )) 

DFが数時間走っていたスパークするためにRのDFを変換コマンド。キャンセルする必要がありました。手伝ってください。


EDIT 2016年12月14日:

上記スパーク1.6.1及びR 3.2.0を用いて試みました。私は最近、Spark 2.0.2(最新)とR 3.2.5を使用してこれを試しましたが、同じ問題が発生しました。

ご協力いただければ幸いです。

答えて

2

これはメモリの制約と関係があります。なぜ、まずベースデータフレームを作成し、それをSpark DataFrameに変換する必要がありますか?あなたが一つに手順の両方を組み合わせて、結果を得ることができます

Sys.setenv(SPARK_HOME='C:\\soft\\spark-2.0.0-bin-hadoop2.7',HADOOP_HOME='C:\\soft\\hadoop') 
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'),.libPaths())) 
Sys.setenv('SPARKR_SUBMIT_ARGS'='"sparkr-shell"') 

library(SparkR) 
library(rJava) 
sparkR.session(enableHiveSupport = FALSE,master = "local[*]", sparkConfig = list(spark.driver.memory = "1g",spark.sql.warehouse.dir="C:\\soft\\hadoop\\bin")) 

次に、あなたがあなたのSDFをロードすることができます。

n=5e6 # set sample size 

d_sd <- as.DataFrame(data.frame(
    v1=base::sample(1:9,n,replace=TRUE), 
    v2=base::sample(1000:9000,n,replace=TRUE), 
    v3=seq(as.Date("2016-01-01"), as.Date("2016-12-31"), by = "day")[base::sample(1:365,n,replace=TRUE)], 
    v4=LETTERS[base::sample(1:length(LETTERS),n,replace=TRUE)], 
    v5=base::sample(1000:9000,n,replace=TRUE)/1000, 
    v6=seq(ISOdate(2016,1,1), ISOdate(2018,1,1), "sec")[base::sample(1:63158401,n,replace=TRUE)], 
    v7=c(TRUE,FALSE)[base::sample(1:2,n,replace=TRUE)] 
)) 

また、同様の質問を参照することができます。How best to handle converting a large local data frame to a SparkR data frame?

+0

ありがとう、Mohit Bansal、私は新しいRセッションを開始しました。あなたのアプローチを試みたが、それは役に立たなかった。 Rは数分間実行されています。 –

+0

SparkRで大きなデータセットを操作するにはどうすればよいですか? –

+0

@KirillSavineは次の質問に従います - http://stackoverflow.com/questions/39392327/how-best-to-handle-converting-a-large-local-data-frame-to-a-sparkr-data-frame –

0

Spark 2.0.0では、createDataFrame(d)

+0

ありがとう、ゼディオルティス、私はあなたのアプローチを試みた。私の質問のデータを使って、createDataFrame()関数も非常に遅いです。それはしばらくの間実行されています。決して終わらない。 –

関連する問題