2017-02-18 5 views
0

2つの.txtデータファイルがあります。次の例に示すように、最初の列には2つの列(ムービー、シネマ)が含まれ、2つ目の列には2つの列(ムービー、ビューア)が含まれます。私がしたいことは、に表示された映画をcinema_1に最大視聴人数で見つけることです。上記の例では2つのデータフレームを結合し、値を合計して最大値を取得します。

+----------+---------+ 
| movie | cinema | 
+----------+---------+ 
| movie_1 | cinema_2 | 
| movie_2 | cinema_3 | 
| movie_4 | cinema_1 | 
| movie_3 | cinema_1 | 
+------+-------------+ 

+----------+---------+ 
| movie | viewers | 
+----------+---------+ 
| movie_1 | 10 | 
| movie_2 | 98 | 
| movie_4 | 100 | 
| movie_3 | 19 | 
| movie_1 | 340 | 
| movie_3 | 31 | 
+------+-------------+ 

すなわち2つの候補は、movie_3movie_4(cinema_1に示されている)であり、(movie_3は50(19 + 31)のビューを有している)正しい答えが100件の景色をmovie_4です。

ステップ1:取得データは

val moviesCinemas = sparkSession.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .option("mode", "DROPMALFORMED") 
     .load("moviesCinemas.txt"); 

    val moviesViewers = sparkSession.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .option("mode", "DROPMALFORMED") 
     .load("moviesViewers.txt"); 

ステップ2:映画はcinema_1

大手
val cinema1Movies = moviesCinemas.filter(col("cinema").like("cinema_1")) 

に示す取得し、私はこれまで何をやったか

〜へ:

+----------+---------+ 
| movie | cinema | 
+----------+---------+ 
| movie_4 | cinema_1 | 
| movie_3 | cinema_1 | 
+------+-------------+ 

ステップ3:これらの2つの映画について、視聴者(データフレームmoviesViewersから)を合計して、最大数のものを報告する必要があります。これは私が実際につかまれている場所です。

+----------+---------+ 
| movie | viewers | 
+----------+---------+ 
| movie_4 | 100 | 
| movie_3 | 19 | 
| movie_3 | 31 | 
+------+-------------+ 

は、今私は、各movieためviewersを合計するかどうかはかなりわからない:次のような結果になります、私はcinema1MoviesmoviesViewersデータフレーム

val joinMoviesViewers = moviesViewers.join(cinema1Movies, Seq("movie")) 

に参加しようとした

このようなものを得るために(そして最後に最大の視聴者で映画を入手する):

+----------+---------+ 
| movie | viewers | 
+----------+---------+ 
| movie_4 | 100 | 
| movie_3 | 50 | 
+------+-------------+ 

答えて

1

スタートが参加したデータフレームから:

val aggJoin = joinMoviesViewers.groupBy("movie").agg(sum($"viewers").as("viewers")) 
// aggJoin: org.apache.spark.sql.DataFrame = [movie: string, viewers: bigint] 

val maxViewers = aggJoin.agg(max($"viewers")).first().getLong(0) 
// maxViewers: Long = 100 

// depending on what data type you have for viewers, you might use getDouble here 
// val maxViewers = aggJoin.agg(max($"viewers")).first().getDouble(0) 

aggJoin.filter($"viewers" === maxViewers).show 
+-------+-------+ 
| movie|viewers| 
+-------+-------+ 
|movie_4| 100| 
+-------+-------+ 
+0

これは動作しますが、 '.getLong(0)'の代わりに '.getDouble(0)'を追加してください。それ以外の場合は、例外が発生します。それを受け入れるためにあなたの答えを編集してください。どうもありがとう。 –

1

以下は、結果を導き出すためのAPIアプローチです。

import org.apache.spark.sql.functions._ 

val result = moviesCinemas 
    .filter($"cinema" === "cinema_1") 
    .join(moviesViewers, "movie") 
    .select(moviesCinemas("movie"),moviesViewers("viewers")) 
    .groupBy($"movie") 
    .agg(sum($"viewers").as("sum_cnt")) 
    .orderBy($"sum_cnt".desc) 

    result.first 
    res34: org.apache.spark.sql.Row = [movie_4,100] 

以下では、spark sqlを使用して同じ結果を得ています。

moviesCinemas.registerTempTable("movies_cinemas") 
moviesViewers.registerTempTable("movies_viewers") 

val spark = SparkSession.builder. 
    master("local") // set your master here 
    .appName("spark session example") 
    .getOrCreate() 

val result = spark.sql( 
""" 
SELECT 
t0.movie, 
sum(viewers) as total_viewers 
FROM 
movies_cinemas t0 JOIN movies_viewers t1 
on t0.movie = t1.movie 
WHERE t0.cinema = "cinema_1" 
GROUP BY t0.movie 
ORDER BY total_viewers desc 
""" 
) 

result.first 

res6: org.apache.spark.sql.Row = [movie_4,100] 
+0

また、これは動作します!どうもありがとう。 –

関連する問題