2016-11-21 8 views
10

マシン学習モデルをPythonでデプロイすることに興味があります。そのため、サーバーへのリクエストを通じて予測を行うことができます。非スパーク環境でのpyspark MLモデルのロード

私はClouderaクラスタを作成し、ライブラリpysparkを使用して、モデルを開発するためにSparkを利用します。モデルをサーバー上で使用するためにモデルを保存する方法を知りたい。

さまざまなアルゴリズムには(この投稿How to save and load MLLib model in Apache Sparkで回答されているような).save関数があることがわかりましたが、サーバーはSparkなしでClouderaクラスタではない別のマシンにあるため、 .loadと.predict関数を使用できるかどうかを知る。

sparkの下にない予測のライブラリ関数をpysparkで作成できますか?または、モデルを保存して他の場所で使用するために、変換を行う必要がありますか?

+1

私はあなたが火花を持つ必要があると信じています。あなたがPythonで単純な残りのAPIを作成し、モデルファイルを読み込んで応答を送信することができます。 – Backtrack

+0

私は動作する答えを追加しました – Backtrack

答えて

1

これは完全な解決策ではありません

Model.py

from sklearn.externals import joblib 
from sklearn.pipeline import make_pipeline 
from sklearn.feature_extraction.text import HashingVectorizer 
from sklearn.svm import LinearSVC 

# code to load training data into X_train, y_train, split train/test set 

vec = HashingVectorizer() 
svc = LinearSVC() 
clf = make_pipeline(vec, svc) 
svc.fit(X_train, y_train) 

joblib.dump({'class1': clf}, 'models', compress=9) 

myRest.py

from flask import jsonify, request, Flask 
from sklearn.externals import joblib 

models = joblib.load('models') 
app = Flask(__name__) 

@app.route('/', methods=['POST']) 
def predict(): 
    text = request.form.get('text') 
    results = {} 
    for name, clf in models.iteritems(): 
     results[name] = clf.predict([text])[0] 
    return jsonify(results) 

if __name__ == '__main__': 
    app.run() 

あなたが行うことができます。このような何か。 REF:スパークためhttps://loads.pickle.me.uk/2016/04/04/deploying-a-scikit-learn-classifier-to-production/

:私はこの作業コードを持って時間を過ごした後http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html

+0

私はそれがまったく解決策ではないことを恐れています。 PySpark 'ml'は' scikit-learn'ではありません。 –

+1

@LostInOverflow、私もscikit-learnのサンプルを追加したことを知っています。確かに私はあなたのコメントを受け入れる。しかし、このようにspark mlモデルをロードすることさえできます。 sameModel = MatrixFactorizationModel.load(sc、 "target/tmp/myCollaborativeFilter")。このリンクを確認してください:http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html – Backtrack

+0

少なくともローカルモードの "cluster"が必要です。したがって、Spark以外の環境ではありません。 –

2

、これが最適化されないことがあり、

Mymodel.py:

import os 
import sys 

# Path for spark source folder 
os.environ['SPARK_HOME']="E:\\Work\\spark\\installtion\\spark" 

# Append pyspark to Python Path 
sys.path.append("E:\\Work\\spark\\installtion\\spark\\python") 

try: 
    from pyspark.ml.feature import StringIndexer 
    # $example on$ 
    from numpy import array 
    from math import sqrt 
    from pyspark import SparkConf 
    # $example off$ 

    from pyspark import SparkContext 
    # $example on$ 
    from pyspark.mllib.clustering import KMeans, KMeansModel 

    print ("Successfully imported Spark Modules") 

except ImportError as e: 
    sys.exit(1) 


if __name__ == "__main__": 
    sconf = SparkConf().setAppName("KMeansExample").set('spark.sql.warehouse.dir', 'file:///E:/Work/spark/installtion/spark/spark-warehouse/') 
    sc = SparkContext(conf=sconf) # SparkContext 
    parsedData = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) 
    clusters = KMeans.train(sc.parallelize(parsedData), 2, maxIterations=10, 
          runs=10, initializationMode="random") 
    clusters.save(sc, "mymodel") // this will save model to file system 
    sc.stop() 

このコードは作成されます。 kmeanクラスタモデルを作成し、ファイルシステムに保存します。

API.py

from flask import jsonify, request, Flask 
from sklearn.externals import joblib 
import os 
import sys 

# Path for spark source folder 
os.environ['SPARK_HOME']="E:\\Work\\spark\\installtion\\spark" 

# Append pyspark to Python Path 
sys.path.append("E:\\Work\\spark\\installtion\\spark\\python") 

try: 
    from pyspark.ml.feature import StringIndexer 
    # $example on$ 
    from numpy import array 
    from math import sqrt 
    from pyspark import SparkConf 
    # $example off$ 

    from pyspark import SparkContext 
    # $example on$ 
    from pyspark.mllib.clustering import KMeans, KMeansModel 

    print ("Successfully imported Spark Modules") 

except ImportError as e: 
    sys.exit(1) 


app = Flask(__name__) 

@app.route('/', methods=['GET']) 
def predict(): 

    sconf = SparkConf().setAppName("KMeansExample").set('spark.sql.warehouse.dir', 'file:///E:/Work/spark/installtion/spark/spark-warehouse/') 
    sc = SparkContext(conf=sconf) # SparkContext 
    sameModel = KMeansModel.load(sc, "clus") // load from file system 

    response = sameModel.predict(array([0.0, 0.0])) // pass your data 

    return jsonify(response) 

if __name__ == '__main__': 
    app.run() 

は、上記フラスコ内に書かれた私のREST APIです。

http://127.0.0.1:5000/に電話をかけてください。ブラウザでその応答を見ることができます。

+0

clusters.saveが呼び出されたときに保存されるモデルがどの形式であるかを知りたいと思います。前もって感謝します。 – daloman

+1

こんにちは、あなたの答えに感謝します。しかし、私にはわからないことが1つあります。私はそれがPythonだけがインストールされているマシンでAPI.pyスクリプトを実行できますか?またはSparkをインストールする必要がありますか?その場合、スタンドアロンバージョンをインストールするだけで十分ですか? –

+1

@MarcialGonzalezはい、私たちはサーバーにSparkをインストールするか、別のことをする必要があります。あなたの残りの部分とspark mlサーバーとの間のポートベースの通信を行います。 – Backtrack

2

MLeap(私が貢献するプロジェクト)を見てください - スパークコンテキストに依存しないMLパイプライン(推定器だけでなく)と実行エンジンのシリアル化/非シリアル化、分散データフレームと実行計画。

現在、モデルを実行するためのMLeapのランタイムには、Pythonバインディングはありませんが、scala/javaのみがありますが、複雑にすることはありません。 Sparkで訓練されたパイプラインとモデルからスコアリングエンジンを作成する際には、自分自身や他のMLeap開発者にgithubを公開してください。

+0

https://stackoverflow.com/questions/tagged/mleapタグを作成しました。そのタグに従ってください。主なSparkプロジェクト/ブランチにmleapを統合する計画はありますか? Java 8のサポートはどうですか? – Gevorg

関連する問題