Изменения

Перейти к: навигация, поиск

Обучение на больших данных

4241 байт добавлено, 18:49, 19 января 2021
Примеры реализации алгоритмов с использованием Spark MLlib
=== Примеры реализации алгоритмов с использованием Spark MLlib ===
Рассмотрим удобство использования Apache Spark на примере. Решим задачу “Hello world” Задача нашей модели предугадать захочет ли клиент оформить срочный вклад. Для этого воспользуемся [https://www.kaggle.com/rouseguy/bankbalanced| данными из мира Big Data {{---}} подсчет слов в файлеMachine Learning Repository]. Напишем нашу модель на Python. Для начала работы с Apache Spark его необходимо установить, выполнив pip install pysparkСчитаем данные из нашего файла и выведем информацию о датасете на экран from pyspark. sql import SparkSession sparkContextspark = SparkSession.textFilebuilder.appName('ml-bank').getOrCreate("hdfs://) df = spark.read.csv('bank."csv', header = True, inferSchema = True) df.flatMapprintSchema()Результат: root |-- age: integer (nullable = true) |-- job: string (nullable = true) |-- marital: string (line nullable => line.splittrue) |-- education: string (nullable = true) |-- default: string (nullable = true) |-- balance: integer (nullable = true) |-- housing: string (nullable = true) |-- loan: string (nullable = true) |-- contact: string (" "nullable = true) |-- day: integer (nullable = true) .map |-- month: string (word nullable => true) |-- duration: integer (word, 1nullable = true) |-- campaign: integer (nullable = true).reduceByKey |-- pdays: integer (nullable = true) |-- previous: integer (_ + _nullable = true) .saveAsTextFile |-- poutcome: string ("hdfsnullable = true) |-- deposit://..."string (nullable = true)Рассмотрим еще один пример анализа данных Как видно наши данные состоят из множества столбцов, содержащих числа и строки Для большей информации выведем наши данные с помощью Spark MLlib. Поставим задачу проанализировать наличие землетрясений на основе твитов. Отберем необходимые твиты с информацией о “тряске” или “землетрясении”таблицы pandas. Для примера выведем 7 первых значений TwitterUtils.createStream(...)import pandas as pd pd.filterDataFrame(_.getTextdf.containstake("earthquake"7) || _, columns=df.getTextcolumns).containstranspose("shaking"))Далее подготовим эти Нас будут интересовать только численные данные к построению модели. Разобьем выборку на обучающую и тестирующуюДля них построим таблицу с основной информацией (количество/ среднее по всей таблице/ среднеквадратичное отклонение / минимальное значение / максимальное значение) <font colornumeric_features = [t[0] for t in df.dtypes if t[1] ="blue">val</font> data = MLUtils'int'] df.loadLibSVMFileselect(sc, "sample_earthquate_tweetsnumeric_features).describe().toPandas().txt"transpose()
<font color="blue">val</font> splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
<font color="blue">val</font> training = splits(0).cache()
<font color="blue">val</font> test = splits(1)
Проводим обучение нашей модели. В качестве алгоритма выберем метод опорных векторов.
<font color="blue">val</font> numIterations = 100
<font color="blue">val</font> model = SVMWithSGD.train(training, numIterations)
Оценим корреляцию между оставшимися данными from pandas.plotting import scatter_matrix numeric_data = df.select(numeric_features).toPandas() axs = scatter_matrix(numeric_data, figsize=(8, 8)) n = len(numeric_data.columns) for i in range(n): v = axs[i, 0] v.yaxis.label.set_rotation(0) v.yaxis.label.set_ha('right') v.set_yticks(()) h = axs[n-1, i] h.xaxis.label.set_rotation(90) h.set_xticks(()) На данных графиках можно увидеть зависимость, к примеру, между возрастом и балансом на карте. Не будем учитывать эти корреляции при построении наших моделей, однако избавимся от дня и месяца рождения, так как эти параметры не влияют на желание клиента оформить быстрый кредит. df = df.select('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit') cols = df.columns Подготовим оставшиеся данные для построения моделей. from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome'] stages = [] <font colorfor categoricalCol in categoricalColumns: stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index') encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "greenclassVec">// Очищаем пороговое значение]) stages += [stringIndexer, заданное по умолчанию</font>encoder] model.clearThresholdlabel_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')Выведем интересующий нас результат stages += [label_stringIdx] <font colornumericCols =['age', 'balance', 'duration', 'campaign', 'pdays', 'previous'] assemblerInputs = [c + "blueclassVec">val</font> metrics for c in categoricalColumns] + numericCols assembler = new BinaryClassificationMetricsVectorAssembler(scoreAndLabelsinputCols=assemblerInputs, outputCol="features") <font colorstages +=[assembler] from pyspark.ml import Pipeline pipeline = Pipeline(stages = stages) pipelineModel = pipeline.fit(df) df = pipelineModel.transform(df) selectedCols = ['label', 'features'] + cols df = df.select(selectedCols) df.printSchema()Наконец, поделим нашу выборку на обучающую и тестирующую train, test = df.randomSplit([0.7, 0.3], seed = 2018) print("blueTraining Dataset Count: ">val</font> auROC = metrics+ str(train.count())) print("Test Dataset Count: " + str(test.areaUnderROCcount()))Построим модели и выведем площадь под ROC-кривой для:Logistic Regression
printlnfrom pyspark.ml.classification import LogisticRegression lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10) lrModel = lr.fit(train) trainingSummary = lrModel.summary roc = trainingSummary.roc.toPandas() plt.plot("Area under roc['FPR'],roc['TPR']) plt.ylabel('False Positive Rate') plt.xlabel('True Positive Rate') plt.title('ROC = " -кривая') plt.show() print('Площадь под ROC-кривой: ' + auROCstr(trainingSummary.areaUnderROC))
Если результат нас устраивает можем дополнить наш код отправкой сообщений о землетрясении пользователям с помощью Spark SQL и пользовательской функции sendEMail Площадь под ROC-кривой: 0.8865478305561797Binary Classification from pyspark.ml.evaluation import BinaryClassificationEvaluator evaluator = BinaryClassificationEvaluator() <font color="blue">val</font> sqlContext = new orgprint('Площадь под ROC-кривой: ', evaluator.apache.spark.sql.hive.HiveContextevaluate(scpredictions))
sqlContextПлощадь под ROC-кривой: 0.sql8837112925002687Decision Tree from pyspark.ml.classification import DecisionTreeClassifier dt = DecisionTreeClassifier("FROM earthquake_warning_users SELECT firstName, lastNamefeaturesCol = 'features', citylabelCol = 'label', email" maxDepth = 3) dtModel = dt.collectfit(train) predictions = dtModel.foreachtransform(test) evaluator = BinaryClassificationEvaluator() print("Площадь под ROC-кривой: " + str(sendEmailevaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
Еще одной интересной особенностью Spark MLlib является возможность кастомизировать выбранные алгоритмы Площадь под ROC-кривой: 0. К примеру, можно увеличить число итераций или изменить параметр регуляризации 7808118726917547
Random Forest  <font color="orange">import</font> orgfrom pyspark.apacheml.sparkclassification import RandomForestClassifier rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label') rfModel = rf.mllibfit(train) predictions = rfModel.optimization.L1Updatertransform(test) <font colorevaluator =BinaryClassificationEvaluator() print("blueПлощадь под ROC-кривой: ">val</font> svmAlg = new SVMWithSGD+ str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})) Площадь под ROC-кривой: 0.8777131493473223Gradient-Boosted Tree svmAlgfrom pyspark.optimizerml.classification import GBTClassifier setNumIterations gbt = GBTClassifier(200maxIter=10) gbtModel = gbt.fit(train) setRegParam predictions = gbtModel.transform(0.1test). setUpdater evaluator = BinaryClassificationEvaluator(new L1Updater) <font color=print("blueПлощадь под ROC-кривой: ">val</font> modelL1 = svmAlg+ str(evaluator.runevaluate(trainingpredictions, {evaluator.metricName: "areaUnderROC"})))  Площадь под ROC-кривой: 0.8935091626908479
== Практическое применение Big Data ==
22
правки

Навигация