私はユーザ名のデータフレームをどのスレッドに投稿したのか、それらの投稿のタイムスタンプを持っています。スレッドの最初のユーザーは誰だったのか、何時だったのかを把握するために何をしようとしているのですか?最初の投稿がスレッドでグループを行い、次にタイムスタンプで分を行うことであることがわかります。しかしそれはユーザー名を削除します。どのようにグループを使用してユーザー名を保持するのですか?グループを使用しているときにSparkで未使用の列を保持していますか?
3
A
答えて
1
これは、HiveContextとHive named_struct関数を使用して1つのgroupByで実行できます。トリックはminです。左から順に列を評価し、現在の列が等しい場合は次の行に移動するだけで、構造体でminが機能します。したがって、この場合、タイムスタンプの列を比較するだけですが、min関数が結果を吐き出した後にアクセスする名前を含む構造体を作成します。
data = [
('user', 'thread', 'ts'),
('ryan', 1, 1234),
('bob', 1, 2345),
('bob', 2, 1234),
('john', 2, 2223)
]
header = data[0]
rdd = sc.parallelize(data[1:])
df = sqlContext.createDataFrame(rdd, header)
df.registerTempTable('table')
sql = """
SELECT thread, min(named_struct('ts', ts, 'user', user)) as earliest
FROM table
GROUP BY thread
"""
grouped = sqlContext.sql(sql)
final = grouped.selectExpr('thread', 'earliest.user as user', 'earliest.ts as timestamp')
1
これは、row_number()ウィンドウ関数を使用して行うことができます。これは、他のすべての列をそのまま維持します。 withColumnを使用して、 "thread_user_order"のような新しい列を作成し、その値はrow_number()PARTITION BYスレッドORDER BY tsである必要があります。 次に、"thread_user_order" == 1をフィルタリングします。あなたが順序でフィールドをソートして、一度に2つの列を維持する構造体のソート順を利用することができます
df.withColumn("thread_user_order", row_number().over(Window.partitionBy(col("thread")).orderBy(col("ts")))).where(col("thread_user_order").equalTo(1))
1
:ここ
は、いくつかの擬似コードです。その後、min
と呼ぶと、最初にタイムスタンプでソートされ、次に2回ネストされたときにユーザー名がソートされます。
user_time = functions.struct(df.timestamp, df.username).alias('user_time')
min_thread_users_df = df.select(df.thread, user_time).groupby('thread').agg(
functions.min('user_time').alias('user_time')).select(
'thread', 'user_time.username', 'user_time.timestamp')
関連する問題
- 1. 名前を保持している列名の配列を使用してSparkデータフレームを集約します。
- 2. ナビゲーションコントローラーを使用しているときにヘッダービューを保持
- 3. shlex.splitを使用しているときに引用符を保持する
- 4. ファイヤーバード1でグループを使用しているときのエラー
- 5. グループ化でカスタムマップサプライヤを使用しているときのClassCastException
- 6. IFRAMEを使用しているときに$ _SESSIONを保持する
- 7. PHPのGDlib imagecopyresampledを使用しているときに、PNGイメージの透明度を保持できますか?
- 8. Djangoグループをフレンドリストとして使用できますか?
- 9. 複数のレイヤーを使用しているときに未使用のCALayerメモリを解放できません
- 10. 共用体を使用しているときにグループ化する
- 11. spark-redshift - Spark 2.1.0を使用してエラーを保存しました。
- 12. IndexedDBを使用していくつのオブジェクト(列)を使用できますか?
- 13. clj-kafkaを使用して保持時間を把握していますか?
- 14. サービスとしてsparkを使用することはできますか?
- 15. .LINQを使用してLINQを使用した保持の保持
- 16. eachとimage.onloadを使用して配列の順序を保持します
- 17. コアデータエンティティを列挙型として使用していますか?
- 18. __blockを使用してもデータを保持できません
- 19. グーグルビジュアライゼーションAPIでグループ化を使用して書式を保持する
- 20. routeconfigを使用しているときに、 'annotations'のプロパティが未定義です。
- 21. spark-cassandra-connectorを使用してsparkとcassandraを接続するにはどうすればいいですか?
- 22. mysqli_real_escape_stringを使用してクエリ文字列を保護していますか?
- 23. Spark、GraphxプログラムはCPUとメモリを使用していません
- 24. アプリケーションサーバーとしてsparkを使用しますか?
- 25. AzureのsqlデータベースとMSMQを使用してHangfireを有効にしているメッセージを保持します
- 26. AndroidでJCIFSライブラリを使用しているときにURLでSMBプロトコルを使用しています
- 27. ダイヤモンドのアスタリスクを使用しているときにPythonを使用してループ
- 28. 'utf-8'エンコーディングを使用してto_csvを使用して文字列を保存できるので、pandas to_exelを使用して 'utf-8'形式の文字列を保存する方法(to_csvは.csvとして保存できます)
- 29. NSbundleをforループで使用しているときにInstrumentがNSbundleを使用しているとき
- 30. 行列でcbindを使用しているときのエラー
ユーザー名別 – eliasah