2016-08-29 5 views
-1

私はHDFSからデータを読んでいます。私は各ユーザーの複数の行を持って、私はすべてのユーザーの最新の行を選択する必要があります。Apache Spark RDD:ペアドRDDキーと値に基づいて最新のデータを取得する方法

行の例(RDD [Id: Int, DateTime: String, Name: STRING]

1,2016-05-01 01:01:01,testa 
2,2016-05-02 01:01:01,testb 
1,2016-05-05 01:01:01,testa 

上記の例であり、ID = 1 2つの行があり、私はすべてのIDが一度だけ必要(だけ最新のものと、それは、対応するデータ'S)iは以下のように出力RDDを望ん。

2,2016-05-02 01:01:01,testb 
1,2016-05-05 01:01:01,testa 

私の考え

私はすべてのユーザーに対して最新のデータを維持することによって、配列にこのデータを収集し、望ましい結果を得るために、forループを実行することができます。

私は収集データをマスターノードに与えます。私のデータは30 GBで、MasterのRAMは25 GBです。だから私はこれを試してみません。

あなたはこのタスクを達成するためのアイデアとコードを共有できますか?

+0

アレイを使用する場合は、なぜSparkで気になるのですか? –

+0

okこれはアプローチになるはずです。私はapacheのスパークに新しいです –

+0

そして、 '2016-05-01 01:01:01'は'ロング 'のように見えますか? –

答えて

0

これは、必要としている誰かを助けるかもしれません。

val yourRdd = sc.parallelize(List(
(30, ("1122112211111".toLong, "testa", "testa", "testa")), 
(1, ("1122112211111".toLong, "testa", "testa", "testa")), 
(1, ("1122112211119".toLong, "testa", "testa", "testa")), 
(1, ("1122112211112".toLong, "testa", "testa", "testa")), 
(2, ("1122112211111".toLong, "testa", "testa", "testa")), 
(2, ("1122112211110".toLong, "testa", "testa", "testa")) 
)) 

val addToSet1 = (
    s: (Int, (Long, String, String, String)), 
    v: ((Long, String, String, String)) 
) => if (s._2._1 > v._1) s else (s._1,v) 

val mergePartitionSets1 = (
    s: (Int, (Long, String, String, String)), 
    v: (Int, (Long, String, String, String)) 
) => if (s._2._1 > v._2._1) s else v 

val ab1 = yourRdd 
    .aggregateByKey(initialSet)(addToSet1, mergePartitionSets1) 

ab1.take(10).foreach(println) 
+0

あなたのコードを適切な書式で読めるようにすることが、主な貢献点です。すべてを1行に入れることは必ずしも良いことではありません。 –

1

日付ストリングをタイムスタンプに変換し、最新のタイムスタンプを持つタプルを選択してIDを集計します。

import java.time.format.DateTimeFormatter 
import java.time.LocalDateTime 

val yourRdd: RDD[Int, String, String] = sc.parallelize(List(
    1, "2016-05-01 01:01:01", "testa" 
    2, "2016-05-02 01:01:01", "testb" 
    1, "2016-05-05 01:01:01", "testa" 
)) 

val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH-mm-ss"); 

val zeroVal = ("", Long.MinValue, "", "") 

val rddWithTimestamp = yourRdd 
    .map({ 
    case (id, datetimeStr, name) => { 
     val timestamp: Long = LocalDateTime.parse(datetimeStr, dateFormetter) 
     .toInstant().toEpochMilli() 

     (id, (id, timestamp, datetimeStr, name)) 
    } 
    }) 

val yourRequiredRdd = rddWithTimestamp 
    .aggregateByKey(zeroValue)(
    (t1, t2) => if (t1._2 > t2._2) t1 else t2 
    (t1, t2) => if (t1._2 > t2._2) t1 else t2 
) 
1

あなたはDataFrame API使用することができます

import org.apache.spark.sql.functions._ 

val df = sc.parallelize(Seq(
    (1, "2016-05-01 01:01:01", "testA"), 
    (2, "2016-05-02 01:01:01", "testB"), 
    (1, "2016-05-05 01:01:01", "testA"))) 
    .toDF("id", "dateTime", "name") 

df.withColumn("dateTime", unix_timestamp($"dateTime")) 
    .groupBy("id", "name") 
    .max("dateTime") 
    .withColumnRenamed("max(dateTime)", "dateTime") 
    .withColumn("dateTime", from_unixtime($"dateTime")) 
    .show() 

をこれはあなたのSQLContextとしてHiveContextが必要です。

import org.apache.spark.sql.hive.HiveContext 

val sqlContext = new HiveContext(sc) 
import sqlContext.implicits._ 
関連する問題