2016-11-14 5 views
0

jsonファイルがあり、SHA 256で1フィールドをハッシュしようとしています。これらのファイルはAWS S3にあります。私は現在、Apache Zeppelin上でPythonでsparkを使用しています。jsonスキーマを使用したsparkデータフレームのカラムの更新

ここは私のjsonスキーマです。私は 'mac'フィールドをハッシュしようとしています。

|-- Document: struct (nullable = true) 
| |-- data: array (nullable = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- mac: string (nullable = true) 

私はいくつかのことを試しました。

from pyspark.sql.functions import UserDefinedFunction 
from pyspark.sql.types import StringType 
import hashlib 

hcData = sqlc.read.option("inferSchema","true").json(inputPath) 
hcData.registerTempTable("hcData") 


name = 'Document' 
udf = UserDefinedFunction(lambda x: hashlib.sha256(str(x).encode('utf-8')).hexdigest(), StringType()) 
new_df = hcData.select(*[udf(column).alias(name) if column == name else column for column in hcData.columns]) 

このコードは正常に動作します。しかし、私はMacフィールドをハッシュしようとすると何も起こりません名前の変数を変更しようとすると、

特定の名前の列が見つかりませんでした。

私は少しコードを変更しようとしました。

def valueToCategory(value): 
    return hashlib.sha256(str(value).encode('utf-8')).hexdigest() 


udfValueToCategory = udf(valueToCategory, StringType()) 
df = hcData.withColumn("Document.data[0].mac",udfValueToCategory("Document.data.mac")) 

このコードは、 "Document.data.mac" をハッシュし、は、ハッシュされたMACアドレスを持つ新しい列を作成します。既存の列を更新したい入れ子になっていない変数については、更新することはできますが、問題はありませんが、入れ子にされた変数に対しては、更新する方法を見つけることができませんでした。

基本的に、spark pythonを使ってネストされたjsonファイルのフィールドをハッシュしたいと思います。誰もがスキーマとスパークのデータフレームを更新する方法を知っている?

答えて

0

は、以下の私の質問にはのpythonソリューションです。

from pyspark.sql.functions import UserDefinedFunction 
from pyspark.sql.types import StringType 
import hashlib 
import re 


def find(s, r): 
    l = re.findall(r, s) 
    if(len(l)!=0): 
     return l 
    else: 
     lis = ["null"] 
     return lis 



def hash(s): 
    return hashlib.sha256(str(s).encode('utf-8')).hexdigest() 



def hashAll(s, r): 
    st = s 
    macs = re.findall(r, s) 
    for mac in macs: 
     st = st.replace(mac, hash(mac)) 
    return st 


rdd = sc.textFile(inputPath) 

regex = "([0-9A-Z]{1,2}[:-]){5}([0-9A-Z]{1,2})" 
hashed_rdd = rdd.map(lambda line: hashAll(line, regex)) 

hashed_rdd.saveAsTextFile(outputPath) 
0

私はscalaと私の質問の解決策を見つけました。とにかく冗長なコードがあるかもしれませんが、とにかくそれは働いたここで

import scala.util.matching.Regex 
import java.security.MessageDigest 

val inputPath = "" 
val outputPath = "" 

//finds mac addresses with given regex 
def find(s: String, r: Regex): List[String] = { 
    val l = r.findAllIn(s).toList 
    if(!l.isEmpty){ 
     return l 
    } else { 
     val lis: List[String] = List("null") 
     return lis 
    } 
} 

//hashes given string with sha256 
def hash(s: String): String = { 
    return MessageDigest.getInstance("SHA-256").digest(s.getBytes).map(0xFF & _).map { "%02x".format(_) }.foldLeft(""){_ + _} 
} 

//hashes given line 
def hashAll(s: String, r:Regex): String = { 
    var st = s 
    val macs = find(s, r) 
    for (mac <- macs){ 
     st = st.replaceAll(mac, hash(mac)) 
    } 
    return st 
} 

//read data 
val rdd = sc.textFile(inputPath) 

//mac address regular expression 
val regex = "(([0-9A-Z]{1,2}[:-]){5}([0-9A-Z]{1,2}))".r 

//hash data 
val hashed_rdd = rdd.map(line => hashAll(line, regex)) 

//write hashed data 
hashed_rdd.saveAsTextFile(outputPath) 
関連する問題