アップデート:SparkやHiveのインストール方法が原因でエラーが発生したようです。ウィンドウ関数の操作は、Databricks(ホステッド)ノートブックでは非常に簡単です。私はこれをローカルに設定する方法を理解する必要があります。Spark 1.5.2でHiveContextを使用して作成したPySpark DataFrameを入手するにはどうすればよいですか?
私は、Window関数を使用する必要があるSpark DataFrameを持っています。* here以上の手順に従ってみましたが、いくつか問題が発生しました。
私の環境設定:データフレームと撮影を取得
def ts_to_df(ts):
data = []
for line in ts['data']:
data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value']))
return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')])
:DATAFRAMEにそのJSONをオンにする
test_ts = {'adminDistrict': None,
'city': None,
'country': {'code': 'NA', 'name': 'UNKNOWN'},
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89},
{'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44},
{'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3},
{'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6},
{'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84},
{'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74},
{'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5},
{'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79},
{'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3},
{'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0},
{'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35},
{'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82},
{'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24},
{'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61},
{'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14},
{'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0},
{'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82},
{'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11},
{'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46},
{'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8},
{'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74},
{'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63},
{'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64},
{'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}],
'maxDate': '2015-12-28T00:00:00Z',
'minDate': '2005-08-25T00:00:00Z',
'name': 'S&P GSCI Crude Oil Spot',
'offset': 0,
'resolution': 'DAY',
'sources': ['trf'],
'subtype': 'Index',
'type': 'Commodities',
'uid': 'TRF_INDEX_Z39824_PI'}
機能:私のデータを設定する
import os
import sys
import datetime as dt
os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2'
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip'
sys.path.append('/usr/bin/spark-1.5.2/python')
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip')
import pyspark
sc = pyspark.SparkContext()
hiveContext = pyspark.sql.HiveContext(sc)
sqlContext = pyspark.sql.SQLContext(sc)
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict
を内部のものを見る:
私は私がやっている見当がつかないし、すべてが間違って行くことに始まる+----------+----------------------+
| Date|SP_GSCI_Crude_Oil_Spot|
+----------+----------------------+
|2005-08-25| 369.89|
|2005-08-26| 362.44|
|2005-08-29| 368.3|
|2005-08-30| 382.6|
|2005-08-31| 377.84|
|2005-09-01| 380.74|
|2005-09-02| 370.33|
|2005-09-05| 370.33|
|2005-09-06| 361.5|
|2005-09-07| 352.79|
|2005-09-08| 354.3|
|2005-09-09| 353.0|
|2005-09-12| 349.35|
|2005-09-13| 348.82|
|2005-09-14| 360.24|
|2005-09-15| 357.61|
|2005-09-16| 347.14|
|2005-09-19| 370.0|
|2005-09-20| 362.82|
|2005-09-21| 366.11|
+----------+----------------------+
そして、ここでは、次のとおりです:
from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()
私は、このエラーを与えること:私はこのことを示し
test_df = ts_to_df(test_ts)
test_df.show()
Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;
私はHiveContextが必要なようですね。 HiveContextを使用してDataFrameを作成する必要がありますか?
def ts_to_hive_df(ts):
data = []
for line in ts['data']:
data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
ts['name'].replace('&', '').replace(' ', '_'):line['value']})
temp_rdd = sc.parallelize(data).map(lambda x: Row(**x))
return hiveContext.createDataFrame(temp_rdd)
test_df = ts_to_hive_df(test_ts)
test_df.show()
しかし、それは私にこのエラーを与える:そして、私は明示的にHiveContextを使用してデータフレームを作成してみましょう
TypeError: 'JavaPackage' object is not callable
は、どのように私は、Window関数を使用していますか? HiveContextを使用してDataFramesを作成する必要がありますか?もしそうなら、それをどうすればいいのですか?誰かが私が間違っていることを教えてもらえますか?
*データに隙間があるかどうかを知る必要があります。 「Date」という列があり、Dateごとに並べられた各行について、次の行に何があるかを知りたいのですが、欠落している日やデータがない場合は、その行の最後の日のデータを使用します。あなたがそれをするより良い方法を知っているなら、私に知らせてください。しかし、私はまだこれらのウィンドウ関数を動作させる方法を知りたいです。
申し訳ありませんを取得します。特定のコードを追加しました。それが私たちをどこかに導くことを願っています見ていただきありがとうございます。 – Nathaniel
さて、Spark(またはHive?)がローカルにインストールされていると、DataBricksノートブックでこれを動作させることができるため、何かがうんざりしているように見えます。 DataBricksは独自のHiveContextまたはSQLContextを作成することを望んでいません。そこで動作させるために、自分のコンテキストの作成を省き、上記のts_to_hive_df関数を使用して、hiveContextをsqlContextに置き換えました。私は最終的に私自身のインストールでこれを動作させる必要があります。私はそれを理解するときに戻って解決策を書くでしょう。 – Nathaniel
SparkバイナリがHiveサポートなしでビルドされているようです。 – zero323