DataFrameの飛行遅延情報を取り込んでランダムフォレストを実行するパイプラインを作成しようとしています。私はMLLibの新機能で、下のコードでどこが間違っているのか理解できません。PySparkトレーニングランダムフォレストパイプライン
マイDATAFRAMEがこの形式で、寄木細工のファイルから読み込まれます。(私は、カテゴリの列をOneHotEncodeに進み、そしてFeatures
列にすべての機能を組み合わせ
Table before Encoding
+-----+-----+---+---+----+--------+-------+------+----+-----+-------+
|Delay|Month|Day|Dow|Hour|Distance|Carrier|Origin|Dest|HDays|Delayed|
+-----+-----+---+---+----+--------+-------+------+----+-----+-------+
| -8| 8| 4| 2| 11| 224| OO| GEG| SEA| 31| 0|
| -12| 8| 5| 3| 11| 224| OO| GEG| SEA| 32| 0|
| -9| 8| 6| 4| 11| 224| OO| GEG| SEA| 32| 0|
+-----+-----+---+---+----+--------+-------+------+----+-----+-------+
only showing top 3 rows
Delayed
は私がしようとしているものです予測する)。ここではそのためのコードは次のとおりです。
import os
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
spark = SparkSession.builder \
.master('local[3]') \
.appName('Flight Delay') \
.getOrCreate()
# read in the pre-processed DataFrame from the parquet file
base_dir = '/home/william/Projects/flight-delay/data/parquet'
flights_df = spark.read.parquet(os.path.join(base_dir, 'flights.parquet'))
print('Table before Encoding')
flights_df.show(3)
# categorical columns that will be OneHotEncoded
cat_cols = ['Month', 'Day', 'Dow', 'Hour', 'Carrier', 'Dest']
# numeric columns that will be a part of features used for prediction
non_cat_cols = ['Delay', 'Distance', 'HDays']
# NOTE: StringIndexer does not have multiple col support yet (PR #9183)
# Create StringIndexer for each categorical feature
cat_indexers = [ StringIndexer(inputCol=col, outputCol=col+'_Index')
for col in cat_cols ]
# OneHotEncode each categorical feature after being StringIndexed
encoders = [ OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(),
outputCol=indexer.getOutputCol()+'_Encoded')
for indexer in cat_indexers ]
# Assemble all feature columns (numeric + categorical) into `features` col
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
for encoder in encoders] + non_cat_cols,
outputCol='Features')
# Train a random forest model
rf = RandomForestClassifier(labelCol='Delayed',featuresCol='Features', numTrees=10)
# Chain indexers, encoders, and forest into one pipeline
pipeline = Pipeline(stages=[ *cat_indexers, *encoders, assembler, rf ])
# split the data into training and testing splits (70/30 rn)
(trainingData, testData) = flights_df.randomSplit([0.7, 0.3])
# Train the model -- which also runs indexers and coders
model = pipeline.fit(trainingData)
# use model to make predictions
precitions = model.trainsform(testData)
predictions.show(10)
私はこれを実行すると、私は非常に任意の助けに感謝 Py4JJavaError: An error occurred while calling o46.fit. : java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double
を入手します!
(あなたが参照しているコメントを見つけることができませんが)。 Spark ML/MLlibの他の同様の厄介な機能もあります。 – desertnaut