22
правки
Изменения
→Примеры реализации алгоритмов с использованием Spark MLlib
=== Примеры реализации алгоритмов с использованием Spark MLlib ===
Рассмотрим удобство использования Apache Spark на примере. Решим задачу “Hello world” Задача нашей модели предугадать захочет ли клиент оформить срочный вклад. Для этого воспользуемся [https://www.kaggle.com/rouseguy/bankbalanced| данными из мира Big Data {{---}} подсчет слов в файлеMachine Learning Repository]. Напишем нашу модель на Python. Для начала работы с Apache Spark его необходимо установить, выполнив pip install pysparkСчитаем данные из нашего файла и выведем информацию о датасете на экран from pyspark. sql import SparkSession sparkContextspark = SparkSession.textFilebuilder.appName('ml-bank').getOrCreate("hdfs://) df = spark.read.csv('bank."csv', header = True, inferSchema = True) df.flatMapprintSchema()Результат: root |-- age: integer (nullable = true) |-- job: string (nullable = true) |-- marital: string (line nullable => line.splittrue) |-- education: string (nullable = true) |-- default: string (nullable = true) |-- balance: integer (nullable = true) |-- housing: string (nullable = true) |-- loan: string (nullable = true) |-- contact: string (" "nullable = true) |-- day: integer (nullable = true) .map |-- month: string (word nullable => true) |-- duration: integer (word, 1nullable = true) |-- campaign: integer (nullable = true).reduceByKey |-- pdays: integer (nullable = true) |-- previous: integer (_ + _nullable = true) .saveAsTextFile |-- poutcome: string ("hdfsnullable = true) |-- deposit://..."string (nullable = true)Рассмотрим еще один пример анализа данных Как видно наши данные состоят из множества столбцов, содержащих числа и строки Для большей информации выведем наши данные с помощью Spark MLlib. Поставим задачу проанализировать наличие землетрясений на основе твитов. Отберем необходимые твиты с информацией о “тряске” или “землетрясении”таблицы pandas. Для примера выведем 7 первых значений TwitterUtils.createStream(...)import pandas as pd pd.filterDataFrame(_.getTextdf.containstake("earthquake"7) || _, columns=df.getTextcolumns).containstranspose("shaking"))Далее подготовим эти Нас будут интересовать только численные данные к построению модели. Разобьем выборку на обучающую и тестирующуюДля них построим таблицу с основной информацией (количество/ среднее по всей таблице/ среднеквадратичное отклонение / минимальное значение / максимальное значение) <font colornumeric_features = [t[0] for t in df.dtypes if t[1] ="blue">val</font> data = MLUtils'int'] df.loadLibSVMFileselect(sc, "sample_earthquate_tweetsnumeric_features).describe().toPandas().txt"transpose()
Оценим корреляцию между оставшимися данными from pandas.plotting import scatter_matrix numeric_data = df.select(numeric_features).toPandas() axs = scatter_matrix(numeric_data, figsize=(8, 8)) n = len(numeric_data.columns) for i in range(n): v = axs[i, 0] v.yaxis.label.set_rotation(0) v.yaxis.label.set_ha('right') v.set_yticks(()) h = axs[n-1, i] h.xaxis.label.set_rotation(90) h.set_xticks(()) На данных графиках можно увидеть зависимость, к примеру, между возрастом и балансом на карте. Не будем учитывать эти корреляции при построении наших моделей, однако избавимся от дня и месяца рождения, так как эти параметры не влияют на желание клиента оформить быстрый кредит. df = df.select('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit') cols = df.columns Подготовим оставшиеся данные для построения моделей. from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome'] stages = [] <font colorfor categoricalCol in categoricalColumns: stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index') encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "greenclassVec">// Очищаем пороговое значение]) stages += [stringIndexer, заданное по умолчанию</font>encoder] model.clearThresholdlabel_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')Выведем интересующий нас результат stages += [label_stringIdx] <font colornumericCols =['age', 'balance', 'duration', 'campaign', 'pdays', 'previous'] assemblerInputs = [c + "blueclassVec">val</font> metrics for c in categoricalColumns] + numericCols assembler = new BinaryClassificationMetricsVectorAssembler(scoreAndLabelsinputCols=assemblerInputs, outputCol="features") <font colorstages +=[assembler] from pyspark.ml import Pipeline pipeline = Pipeline(stages = stages) pipelineModel = pipeline.fit(df) df = pipelineModel.transform(df) selectedCols = ['label', 'features'] + cols df = df.select(selectedCols) df.printSchema()Наконец, поделим нашу выборку на обучающую и тестирующую train, test = df.randomSplit([0.7, 0.3], seed = 2018) print("blueTraining Dataset Count: ">val</font> auROC = metrics+ str(train.count())) print("Test Dataset Count: " + str(test.areaUnderROCcount()))Построим модели и выведем площадь под ROC-кривой для:Logistic Regression
Random Forest <font color="orange">import</font> orgfrom pyspark.apacheml.sparkclassification import RandomForestClassifier rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label') rfModel = rf.mllibfit(train) predictions = rfModel.optimization.L1Updatertransform(test) <font colorevaluator =BinaryClassificationEvaluator() print("blueПлощадь под ROC-кривой: ">val</font> svmAlg = new SVMWithSGD+ str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"}))) Площадь под ROC-кривой: 0.8777131493473223Gradient-Boosted Tree svmAlgfrom pyspark.optimizerml.classification import GBTClassifier setNumIterations gbt = GBTClassifier(200maxIter=10) gbtModel = gbt.fit(train) setRegParam predictions = gbtModel.transform(0.1test). setUpdater evaluator = BinaryClassificationEvaluator(new L1Updater) <font color=print("blueПлощадь под ROC-кривой: ">val</font> modelL1 = svmAlg+ str(evaluator.runevaluate(trainingpredictions, {evaluator.metricName: "areaUnderROC"}))) Площадь под ROC-кривой: 0.8935091626908479
== Практическое применение Big Data ==