2016-08-03 11 views
0

SPARK sqlを使用してsparkアプリケーションを開発しています。私の仕事の1つは2つのテーブルから値を選択し、それを空のテーブルに挿入することです。しかし、このために私は空のテーブルを作成するための火花を試しています、私は、空のデータフレームを作成しているが、ここでそれを表として登録カント今私の状況でコードsparkの空のデータフレームに値を挿入する方法

from pyspark import SQLContext 
from pyspark.sql.types import StructType,StructField,StringType,DateType,IntegerType 
sqlc=SQLContext(sc) 

schema= StructType([StructField("Name",StringType(),False),StructField("AGE",IntegerType(),False),StructField("DATE",DateType(),False)]) 

dataframe=sqlc.createDataFrame([],schema) 
dataframe.show() 

それがデータフレームを示しているが、私はないですそれは私がこの問題のために、私はテストのためのいくつかの値を挿入したいが、私はそれを行うことができませんので、私は、ここで間違いを犯しています考えて何の価値

dataframe.registerTempTable("Exp") 
experiment=sqlc.sql("SELECT * FROM Exp") 
experiment.show() 

が存在しない原因の登録であるかどうかを確認してください。私はスパークの新しいので、どのように私はこの状況を解決することができますか分からない。私はいくつかのフォーラムを通過しました私はいくつかのフォーラムユーザーがテーブルを作成するためのparallelize()メソッドについて述べたが、私はそれがworks.Isを正しく理解していない操作目的のハイブコンテキストを使用する必要がありますか?私の要求は、ハイブの文脈なしにこの仕事をしています。私を案内してください

+0

タグにHadoopが記述されているのはなぜですか?私が間違っていれば私を訂正しますが、質問にはHadoopはまったく言及されていません。 –

+0

申し訳ありませんが、私は誤ってそれを削除していませんでした – Kalyan

答えて

2

短い答え:あなたはしていません。

スパークデータフレームは、RDDの上に構築され、は変更不可能です。これは最初は慣れにくいですが、あなたはそれに取り組むことができます。

新しいデータフレームを作成するには、既存のデータ変換(たとえば、spark-sqlのSQL文を使用)、入力データの読み込み、または手動でsqlContext.createDataFrame(.. 。)

結果で新しいDataFrameを作成し、必要に応じてテーブルに登録するだけの理由がありますか?

編集:あなたの質問をよく理解していれば分かりません...私が提案していることは、まさにあなたがしたいことです。その場合:テーブルを作成するための

df = sqlContext.createDataFrame([("val1","val2","val1")], ["colName1", "colName2", "colName3"]) 
+0

返信ありがとうございました。私のシナリオは、リモートデータベースからデータを取得していて、スパークにロードしてregisterTempTableを作成しています。私は2つのテンポラリテーブルを持っています。私はそれを結合して新しい空の空のテーブルに結果を挿入します。私は空のデータフレームとテーブルを作成しましたが、挿入できませんでした。 – Kalyan

+0

grepe私はあなたの上記のコードを試してみました。これにより、私はこのような空の値を持つデータフレームを作成しました( ""、 ""、 "")、また一時テーブルを作成しました。ここではsqlcontext.sql(INSERT INTO TABLE(列名)VALUES( "values"))のようなSQLクエリを使用していくつかの値を挿入しようとしましたが、失敗しました。再びあなたに気を遣うためにこのような問い合わせをすることは可能でしょうか?:) – Kalyan

+0

@Kalyanは私が以前に書いたように、一度DFが作成されると、石に設定されます。それを変更することはできないので、SQLの更新や挿入などの操作はできません。変更するたびに* new *データフレームを作成する必要があります。古いDFのデータと新しいエントリが必要な場合は、新しいレコードでDFを作成し、(union)[http://spark.apache.org/docs/latest/api/python/pyspark.sql.html #pyspark.sql.DataFrame.union]を使用して、両方のデータを含むDFを作成します。あなたはSQLのようなステートメントを実行すると、結果はすでに新しいDFです。おそらく、あなたは挿入が必要な場合は火花の代わりにハイブのようなものを使用したいですか? – grepe

0

並列化()メソッドが、私はきちんとそれがどのように動作するか理解していませんでした。

簡単に言えば - parallizeメソッドは一連の入力を受け取り、そこから入力分割を作成します。私はあなたのいずれかがデータフレームの行オブジェクトのリストをparallizeまたはファイルを作成し、その

上でSQLスキーマをマップする必要が

をテストするためのいくつかの値を挿入するこの問題の

私の仕事は2つのテーブルから値を選択しています

そこから始めてください。 2つのテーブルに対してSELECTステートメントを作成すると、新しいテーブルが返されます。登録してデータベース/ディスクに書き出すことができます

+0

あなたの返信をありがとう、私はdatabricksのparallelize()メソッドについては見たことがあるが、私は、registerTempTable()の代わりにsavedasTable()を使用しなければならないと言いましたが、私はHivecontextまたはハイブメタテーブル。そこにはこの言葉が私を混乱させています。どのように感謝私はあなたの考えを試します:) – Kalyan

+0

私はあなたが何を求めているのか分かりません。 HiveContextなしでDataframe APIを使用できます。しかし、データがHiveにロードされている場合、HiveContextを使用する必要があります。 –

+0

私はちょうどスパークスタンドアローンを使用しています。ハイブにデータをロードしていないので、ハイブにデータがロードされている場合は、ハイブコンテキストを使用する必要があります。しかしどこかで私はテーブルを永久に保存したいのであれば、混乱したhivecontextで作業する必要があることを読んだことがあります。私のコンセプトは不明ですが、私はいくつか考えました – Kalyan

関連する問題