2016-10-14 11 views
0

私は、状態、年齢、性別、給与などのCSVファイルを独立変数として持っています。spark CSVをlibsvm形式に変換する

従属変数はチャーンです。

sparkでは、データフレームをlibsvm形式に変換する必要があります。あなたは私にそれをする方法を教えてもらえますか?

LIBSVMフォーマットがある:0〜128:51

HERE特徴値が値51は、カラム128

+0

問題を詳しく説明してください。私はあなたの考えに従わない。 – Jerry

答えて

0

INがあることを意味するように私は同じのためのHadoopを用いたが、ロジックが同じでなければなりません。あなたのユースケースのサンプルサンプルを作成しました。ここではまず、データフレームを作成していて、nullまたは空白の値を持つすべての行を削除します。その後、RDDを作成し、Rowをlibsvm形式に変換します。 "repartition(1)"とは、すべてが1つのファイルのみに入ることを意味します.1つの結果カラムがあります。 CTR予測の場合、1または0のみになります。

サンプルファイル入力:

"zip","city","state","latitude","longitude","timezone","dst" 
"00210","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00211","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00212","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00213","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00214","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00215","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00501","Holtsville","NY","40.922326","-72.637078","-5","1" 
"00544","Holtsville","NY","40.922326","-72.637078","-5","1" 

public class LibSvmConvertJob { 

    private static final String SPACE = " "; 
    private static final String COLON = ":"; 

    public static void main(String[] args) { 

     SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("Libsvm Convertor"); 

     JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); 

     SQLContext sqlContext = new SQLContext(javaSparkContext); 

     DataFrame inputDF = sqlContext.read().format("com.databricks.spark.csv").option("header", "true") 
       .load("/home/raghunandangupta/inputfiles/zipcode.csv"); 

     inputDF.printSchema(); 

     sqlContext.udf().register("convertToNull", (String v1) -> (v1.trim().length() > 0 ? v1.trim() : null), DataTypes.StringType); 

     inputDF = inputDF.selectExpr("convertToNull(zip)","convertToNull(city)","convertToNull(state)","convertToNull(latitude)","convertToNull(longitude)","convertToNull(timezone)","convertToNull(dst)").na().drop(); 

     inputDF.javaRDD().map(new Function<Row, String>() { 
      private static final long serialVersionUID = 1L; 
      @Override 
      public String call(Row v1) throws Exception { 
       StringBuilder sb = new StringBuilder(); 
       sb.append(hashCode(v1.getString(0))).append("\t") //Resultant column 
       .append("1"+COLON+hashCode(v1.getString(1))).append(SPACE) 
       .append("2"+COLON+hashCode(v1.getString(2))).append(SPACE) 
       .append("3"+COLON+hashCode(v1.getString(3))).append(SPACE) 
       .append("4"+COLON+hashCode(v1.getString(4))).append(SPACE) 
       .append("5"+COLON+hashCode(v1.getString(5))).append(SPACE) 
       .append("6"+COLON+hashCode(v1.getString(6))); 
       return sb.toString(); 
      } 
      private String hashCode(String value) { 
       return Math.abs(Hashing.murmur3_32().hashString(value, StandardCharsets.UTF_8).hashCode()) + ""; 
      } 
     }).repartition(1).saveAsTextFile("/home/raghunandangupta/inputfiles/zipcode"); 

    } 
} 
+0

ありがとうたくさん:) – Nirmal

+0

正しいとupvoteをマークしてください。 TY – cody123

0
/* 
/Users/mac/matrix.txt 
1 0.5 2.4 3.0 
1 99 34 6454 
2 0.8 3.0 4.5 
*/ 
def concat(a:Array[String]):String ={ 
    var result=a(0)+" " 
    for(i<-1 to a.size.toInt-1) 
    result=result+i+":"+a(i)(0)+" " 
    return result 
} 
val rfile=sc.textFile("file:///Users/mac/matrix.txt") 
val f=rfile.map(line => line.split(' ')).map(i=>concat(i)) 

私ははるかに簡単な解決策を持っていると信じています。

関連する問題