7

私はカスタムステージでPipelineを作成して保存しようとしていました。 UDFを使用してDataFramecolumnを追加する必要があります。したがって、UDFまたは類似のアクションをTransformerに変換することが可能かどうかは疑問でしたか?UDFからカスタムトランスフォーマを作成する方法は?

私のカスタムUDFはこのように見え、UDFをカスタムとして使用する方法を学習したいと思いますTransformer

def getFeatures(n: String) = { 
    val NUMBER_FEATURES = 4 
    val name = n.split(" +")(0).toLowerCase 
    ((1 to NUMBER_FEATURES) 
     .filter(size => size <= name.length) 
     .map(size => name.substring(name.length - size))) 
} 

val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name)) 

答えて

12

それはフル機能のソリューションではありませんが、あなたはこのような何かを始めることができます:

import org.apache.spark.ml.{UnaryTransformer} 
import org.apache.spark.ml.util.Identifiable 
import org.apache.spark.sql.types.{ArrayType, DataType, StringType} 

class NGramTokenizer(override val uid: String) 
    extends UnaryTransformer[String, Seq[String], NGramTokenizer] { 

    def this() = this(Identifiable.randomUID("ngramtokenizer")) 

    override protected def createTransformFunc: String => Seq[String] = { 
    getFeatures _ 
    } 

    override protected def validateInputType(inputType: DataType): Unit = { 
    require(inputType == StringType) 
    } 

    override protected def outputDataType: DataType = { 
    new ArrayType(StringType, true) 
    } 
} 

クイックチェック:あなたも何かにそれを一般化しようとすることができます

val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v") 
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs") 

transformer.transform(df).show 
// +---+------+------------------+ 
// | k|  v|    vs| 
// +---+------+------------------+ 
// | 1|abcdef|[f, ef, def, cdef]| 
// | 2|foobar|[r, ar, bar, obar]| 
// +---+------+------------------+ 

これは:

import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor 
import scala.reflect.runtime.universe._ 

class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
    override val uid: String, 
    f: T => U 
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]] { 

    override protected def createTransformFunc: T => U = f 

    override protected def validateInputType(inputType: DataType): Unit = 
    require(inputType == schemaFor[T].dataType) 

    override protected def outputDataType: DataType = schemaFor[U].dataType 
} 

val transformer = new UnaryUDFTransformer("featurize", getFeatures) 
    .setInputCol("v") 
    .setOutputCol("vs") 

ラップされた関数ではなくUDFを使用する場合は、Transformerを直接拡張し、transformメソッドをオーバーライドする必要があります。残念ながら、有用なクラスの大部分はプライベートですので、ややこしいことがあります。

また、あなたがUDFを登録することができます。

spark.udf.register("getFeatures", getFeatures _) 

SQLTransformer

import org.apache.spark.ml.feature.SQLTransformer 

val transformer = new SQLTransformer() 
    .setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__") 

transformer.transform(df).show 
// +---+------+------------------+ 
// | k|  v|    vs| 
// +---+------+------------------+ 
// | 1|abcdef|[f, ef, def, cdef]| 
// | 2|foobar|[r, ar, bar, obar]| 
// +---+------+------------------+ 
+0

私はモデルを保存しようとしましたが、 'Message:Pipeline書き込みは、書き込み可能を実装していないステージが含まれているため、このパイプラインでは失敗します。書き込み不可能なステージ:ngramtokenizer_f784079e2124タイプのクラス '私はWritableインターフェイスを実装する必要がありますか? –

+1

これは私が以前に言及した悪い部分です。私が知る限り、最良のアプローチは 'DefaultParamsWritable'と' DefaultParamsReadable'を実装することですが、あなたのコードの少なくとも一部をMLパッケージに入れないと実行できません。あなたは 'MLWritable' /' MLReadable'でも試してみることができます。 – zero323

1

を使用し、私が最初にTransformerUnaryTransformer抄録を拡張しようとしましたが、私のアプリケーションはDefaultParamsWriteableの.asに達することができないとのトラブルに遭遇しましたあなたの問題に関連するかもしれない例、私はthis exampleに沿って続くUDFとして簡単な用語正規化を作成しました。私の目標は、用語をパターンとセットに照合して、それらを一般用語に置き換えることです。例:あなたがいるよう

val testMap: Map[Any, String] = Map("hadoop".r -> "elephant", 
    "spark".r -> "sparky", "cool".r -> "neat", 
    Set("123", "456") -> "set1", 
    Set("789", "10") -> "set2") 

val testTermNormalizer = new TermNormalizer(testMap) 
val termNormalizerUdf = udf(testTermNormalizer.normalizeTerms(_: Seq[String])) 

val trainingTest = sqlContext.createDataFrame(Seq(
    (0L, "spark is cool 123", 1.0), 
    (1L, "adsjkfadfk akjdsfhad 456", 0.0), 
    (2L, "spark rocks my socks 789 10", 1.0), 
    (3L, "hadoop is cool 10", 0.0) 
)).toDF("id", "text", "label") 

val testTokenizer = new Tokenizer() 
    .setInputCol("text") 
    .setOutputCol("words") 

val tokenizedTrainingTest = testTokenizer.transform(trainingTest) 
println(tokenizedTrainingTest 
    .select($"id", $"text", $"words", termNormalizerUdf($"words"), $"label").show(false)) 

今、私はもう少し近くに質問を読んでいること、それが聞こえる:

"\b[A-Z0-9._%+-][email protected][A-Z0-9.-]+\.[A-Z]{2,}\b".r -> "emailaddr" 

これは、私がこのようにそれを使用するクラス

import scala.util.matching.Regex 

class TermNormalizer(normMap: Map[Any, String]) { 
    val normalizationMap = normMap 

    def normalizeTerms(terms: Seq[String]): Seq[String] = { 
    var termsUpdated = terms 
    for ((term, idx) <- termsUpdated.view.zipWithIndex) { 
     for (normalizer <- normalizationMap.keys: Iterable[Any]) { 
     normalizer match { 
      case (regex: Regex) => 
      if (!regex.findFirstIn(term).isEmpty) termsUpdated = 
       termsUpdated.updated(idx, normalizationMap(regex)) 
      case (set: Set[String]) => 
      if (set.contains(term)) termsUpdated = 
       termsUpdated.updated(idx, normalizationMap(set)) 
     } 
     } 
    } 
    termsUpdated 
    } 
} 

がありますこのようにするのを避ける方法を尋ねる。とにかく、将来の誰かが、トランスのような機能を簡単に適用する方法を探している場合には、私はまだ投稿します。

0

トランスを書き込み可能にしたい場合は、選択したパブリックパッケージ内のsharedParamsライブラリのHasInputColなどの特性を使用して、それらをDefaultParamsWritable特性とともに使用して、トランスを永続化できるようにします。

このようにして、コードの一部をspark core mlパッケージ内に置かなくても、自分のパッケージにパラレルセットを保持することができます。これは実際にはほとんど変わらないという点では問題ではありません。

しかし、JIRAのボードhereのバグを追跡して、共通のsharedParamsの一部をmlにプライベートではなく公開するようにして、人々が外部のクラスのものを直接使用できるようにします。

関連する問題