2016-11-13 3 views
0

おはようございますstackoverflow-Community
私は次の問題があります。
sparkのclouderaでクラスタを設定しました。
Clustermanagerと3つの作業ノードがあり、ローカルではありません。
私はいくつかのデータでPythonのプログラムを実行するクラスタをします。私は私のpython-プログラムの開発にデータを作成し、ローカル上で、これは正常に動作した瞬間のためにSparkclusterにPythonでデータを配布する方法

from pyspark import SparkConf, SparkContext 
import matplotlib.pyplot as plt 
import numpy as np 
from time import time as t 

def mapper(point, data): 
    counter = 0 
    for elem in data: 
     dominate = False 
     for i in range(len(elem)): 
      if point[i] < elem[i]: 
       dominate = True 
     if dominate: 
      counter += 1 
    return (point,counter) 


if __name__=="__main__": 

    xx = np.array([-0.51, 51.2]) 
    yy = np.array([0.33, 51.6]) 
    means = [xx.mean(), yy.mean()] 
    stds = [xx.std()/3, yy.std()/3] 
    corr = 0.8 # correlation 
    covs = [[stds[0]**2   , stds[0]*stds[1]*corr], 
      [stds[0]*stds[1]*corr,   stds[1]**2]] 

    m = np.random.multivariate_normal(means, covs, 1000).T 
    data = list(zip(m[0],m[1])) 

    conf = SparkConf().setAppName("Naive_Spark") 
    sc = SparkContext(conf=conf) 
    data_rdd = sc.parallelize(data).partitionBy(3).persist() 

    start = t() 

    mapped = data_rdd.map(lambda x: mapper(x, data)).filter(lambda x: x[1] == len(data)-1).collect()  
    print(mapped) 
    time = str(t()-start) 
    print(mapped) 
    with open('/home/.../Schreibtisch/Naive.txt','a') as f: 
     f.write('Spark: ' + str(mapped) + ' in ' + time + ' ms\n\n') 
    sc.stop() 
    plt.scatter(*zip(*data)) 
    plt.show() 



通常、テキストファイルのコードとデータをクラスタに送信し、spark-submitで実行します。
私の質問は、どのようにして3つのノードがそれぞれのノードを使ってデータを扱うようにデータを分割するのかです。ただ、

挨拶
サム

答えて

0

:例えば使用して、すべてのノードで使用可能

  • メイクファイルHDFS
  • (SparkContex.texFileのように)適用可能な方法を使用してLETスパークハンドルをそれを読みます残り。
+0

しかし、ノードを制御する方法が必要なので、それらのうちの2つだけを使用したい場合は、ノードの1つをシャットダウンする必要があります。 – Sam

関連する問題