2015-12-29 2 views
6

アップデート:SparkやHiveのインストール方法が原因でエラーが発生したようです。ウィンドウ関数の操作は、Databricks(ホステッド)ノートブックでは非常に簡単です。私はこれをローカルに設定する方法を理解する必要があります。Spark 1.5.2でHiveContextを使用して作成したPySpark DataFrameを入手するにはどうすればよいですか?

私は、Window関数を使用する必要があるSpark DataFrameを持っています。* here以上の手順に従ってみましたが、いくつか問題が発生しました。

私の環境設定:データフレームと撮影を取得

def ts_to_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value'])) 
    return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')]) 

:DATAFRAMEにそのJSONをオンにする

test_ts = {'adminDistrict': None, 
'city': None, 
'country': {'code': 'NA', 'name': 'UNKNOWN'}, 
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89}, 
    {'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44}, 
    {'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3}, 
    {'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6}, 
    {'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84}, 
    {'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74}, 
    {'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33}, 
    {'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33}, 
    {'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5}, 
    {'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79}, 
    {'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3}, 
    {'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0}, 
    {'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35}, 
    {'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82}, 
    {'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24}, 
    {'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61}, 
    {'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14}, 
    {'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0}, 
    {'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82}, 
    {'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11}, 
    {'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46}, 
    {'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8}, 
    {'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74}, 
    {'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63}, 
    {'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64}, 
    {'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}], 
'maxDate': '2015-12-28T00:00:00Z', 
'minDate': '2005-08-25T00:00:00Z', 
'name': 'S&P GSCI Crude Oil Spot', 
'offset': 0, 
'resolution': 'DAY', 
'sources': ['trf'], 
'subtype': 'Index', 
'type': 'Commodities', 
'uid': 'TRF_INDEX_Z39824_PI'} 

機能:私のデータを設定する

import os 
import sys 
import datetime as dt 

os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2' 
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip' 
sys.path.append('/usr/bin/spark-1.5.2/python') 
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip') 

import pyspark 
sc = pyspark.SparkContext() 
hiveContext = pyspark.sql.HiveContext(sc) 
sqlContext = pyspark.sql.SQLContext(sc) 
from pyspark.sql import Row 
from pyspark.sql.functions import struct 
from pyspark.sql import DataFrame 
from collections import OrderedDict 

を内部のものを見る:

私は私がやっている見当がつかないし、すべてが間違って行くことに始まる

+----------+----------------------+ 
|  Date|SP_GSCI_Crude_Oil_Spot| 
+----------+----------------------+ 
|2005-08-25|    369.89| 
|2005-08-26|    362.44| 
|2005-08-29|     368.3| 
|2005-08-30|     382.6| 
|2005-08-31|    377.84| 
|2005-09-01|    380.74| 
|2005-09-02|    370.33| 
|2005-09-05|    370.33| 
|2005-09-06|     361.5| 
|2005-09-07|    352.79| 
|2005-09-08|     354.3| 
|2005-09-09|     353.0| 
|2005-09-12|    349.35| 
|2005-09-13|    348.82| 
|2005-09-14|    360.24| 
|2005-09-15|    357.61| 
|2005-09-16|    347.14| 
|2005-09-19|     370.0| 
|2005-09-20|    362.82| 
|2005-09-21|    366.11| 
+----------+----------------------+ 

そして、ここでは、次のとおりです:

from pyspark.sql.functions import lag, col, lead 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col('Date')) 
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show() 

私は、このエラーを与えること:私はこのことを示し

test_df = ts_to_df(test_ts) 
test_df.show() 

Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;

私はHiveContextが必要なようですね。 HiveContextを使用してDataFrameを作成する必要がありますか?

def ts_to_hive_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), 
       ts['name'].replace('&', '').replace(' ', '_'):line['value']}) 
    temp_rdd = sc.parallelize(data).map(lambda x: Row(**x)) 
    return hiveContext.createDataFrame(temp_rdd) 

test_df = ts_to_hive_df(test_ts) 
test_df.show() 

しかし、それは私にこのエラーを与える:そして、私は明示的にHiveContextを使用してデータフレームを作成してみましょう

TypeError: 'JavaPackage' object is not callable

は、どのように私は、Window関数を使用していますか? HiveContextを使用してDataFramesを作成する必要がありますか?もしそうなら、それをどうすればいいのですか?誰かが私が間違っていることを教えてもらえますか?

*データに隙間があるかどうかを知る必要があります。 「Date」という列があり、Dateごとに並べられた各行について、次の行に何があるかを知りたいのですが、欠落している日やデータがない場合は、その行の最後の日のデータを使用します。あなたがそれをするより良い方法を知っているなら、私に知らせてください。しかし、私はまだこれらのウィンドウ関数を動作させる方法を知りたいです。

+0

申し訳ありませんを取得します。特定のコードを追加しました。それが私たちをどこかに導くことを願っています見ていただきありがとうございます。 – Nathaniel

+1

さて、Spark(またはHive?)がローカルにインストールされていると、DataBricksノートブックでこれを動作させることができるため、何かがうんざりしているように見えます。 DataBricksは独自のHiveContextまたはSQLContextを作成することを望んでいません。そこで動作させるために、自分のコンテキストの作成を省き、上記のts_to_hive_df関数を使用して、hiveContextをsqlContextに置き換えました。私は最終的に私自身のインストールでこれを動作させる必要があります。私はそれを理解するときに戻って解決策を書くでしょう。 – Nathaniel

+1

SparkバイナリがHiveサポートなしでビルドされているようです。 – zero323

答えて

0

Sparkの新しいバージョンに移行した可能性があるので、これは古い質問です。私は自分自身でスパーク2.0を実行しているので、これは不正かもしれません。

しかし、2つの問題が考えられます。最初の例では、両方とも呼び出されたので、.toDF()がSQLContextにデフォルト設定されている可能性があります。第二に、あなたがリファクタリングしたとき、関数内のハイブコンテキストを呼び出すことができますか?

関数の外側で2番目のts_to_df関数がhivecontextを呼び出すようにリファクタリングすると、すべて問題ありません。

def ts_to_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), 
       ts['name'].replace('&', '').replace(' ', '_'):line['value']}) 
    return data 

data = ts_to_df(test_ts) 
test_rdd = sc.parallelize(data).map(lambda x: Row(**x)) 
test_df = hiveContext.createDataFrame(test_rdd) 

from pyspark.sql.functions import lag, col, lead 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col('Date')) 
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show() 

私は出力

+----------+ 
| Next_Date| 
+----------+ 
|2005-08-26| 
|2005-08-29| 
|2005-08-30| 
|2005-08-31| 
|2005-09-01| 
|2005-09-02| 
..... 
関連する問題