2016-11-09 8 views
-1

私はScalaとSparkを初めて使用し、発見したサンプルをベースにしています。基本的に私は、Google APIを使用して状態を郵便番号から取得するために、データフレーム内から関数を呼び出そうとしています。 コードは別々に動作していますが、一緒ではありません; ここには動作しないコードがあります...Spark DF:ユニットタイプのスキーマはサポートされていません。

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Unit is not supported 
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:716) 
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:654) 
    at org.apache.spark.sql.functions$.udf(functions.scala:2837) 
    at MovieRatings$.getstate(MovieRatings.scala:51) 
    at MovieRatings$$anonfun$4.apply(MovieRatings.scala:48) 
    at MovieRatings$$anonfun$4.apply(MovieRatings.scala:47)... 
Line 51 starts with def getstate = udf {(zipcode:String)... 
... 

コード:

userDF.createOrReplaceTempView("Users") 
    // SQL statements can be run by using the sql methods provided by Spark 
    val zipcodesDF = spark.sql("SELECT distinct zipcode, zipcode as state FROM Users") 
// zipcodesDF.map(zipcodes => "zipcode: " + zipcodes.getAs[String]("zipcode") + getstate(zipcodes.getAs[String]("zipcode"))).show() 
    val colNames = zipcodesDF.columns 
val cols = colNames.map(cName => zipcodesDF.col(cName)) 
val theColumn = zipcodesDF("state") 
val mappedCols = cols.map(c => 
    if (c.toString() == theColumn.toString()) getstate(c).as("transformed") else c) 
    val newDF = zipcodesDF.select(mappedCols:_*).show() 
    } 
def getstate = udf {(zipcode:String) => { 
val url = "http://maps.googleapis.com/maps/api/geocode/json?address="+zipcode 
val result = scala.io.Source.fromURL(url).mkString 
val address = parse(result) 
val shortnames = for { 
     JObject(address_components) <- address 
     JField("short_name", short_name) <- address_components 
      } yield short_name 
val state = shortnames(3) 
//return state.toString() 
val stater = state.toString() 

} 
    } 
+0

'return state.toString()'部分をコメントアウトしたため、あなたの 'UDF'は何も返しません。 – cheseaux

答えて

0

応答のおかげで..私は、私はそれを考え出したと思うここで動作するコードがあるもう一つ注意すべきは、GoogleのAPIである制限をので、いくつかの有効な郵便番号を持っています。しかし、私のための問題ではありません。

  private def loaduserdata(spark: SparkSession): Unit = { 
      import spark.implicits._ 
      // Create an RDD of User objects from a text file, convert it to a Dataframe 
      val userDF = spark.sparkContext 
      .textFile("examples/src/main/resources/users.csv") 
      .map(_.split("::")) 
      .map(attributes => users(attributes(0).trim.toInt, attributes(1), attributes(2).trim.toInt, attributes(3), attributes(4))) 
      .toDF() 
      // Register the DataFrame as a temporary view 
      userDF.createOrReplaceTempView("Users") 
      // SQL statements can be run by using the sql methods provided by Spark 
     val zipcodesDF = spark.sql("SELECT distinct zipcode, substr(zipcode,1,5) as state FROM Users ORDER BY zipcode desc") // zipcodesDF.map(zipcodes => "zipcode: " + zipcodes.getAs[String]("zipcode") + getstate(zipcodes.getAs[String]("zipcode"))).show() 
     val colNames = zipcodesDF.columns 
     val cols = colNames.map(cName => zipcodesDF.col(cName)) 
     val theColumn = zipcodesDF("state") 
     val mappedCols = cols.map(c => 
     if (c.toString() == theColumn.toString()) getstate(c).as("state") else c) 
     val geoDF = zipcodesDF.select(mappedCols:_*)//.show() 
     geoDF.createOrReplaceTempView("Geo") 
     } 
     val getstate = udf {(zipcode: String) => 
      val url = "http://maps.googleapis.com/maps/api/geocode/json?address="+zipcode 
      val result = scala.io.Source.fromURL(url).mkString 
      val address = parse(result) 
      val statenm = for { 
         JObject(statename) <- address 
         JField("types", JArray(types)) <- statename 
         JField("short_name", JString(short_name)) <- statename 
        if types.toString().equals("List(JString(administrative_area_level_1), JString(political))") 
        // if types.head.equals("JString(administrative_area_level_1)") 
        } yield short_name 
      val str = if (statenm.isEmpty.toString().equals("true")) "N/A" else statenm.head   
      } 
+0

興味のある方は、Tableau Public上のこのデータをご覧ください:https://public.tableau.com/views/MovieRatings_7/Geo?:embed=y&:display_count=yes –

関連する問題