== Работа с комплексом Apache Spark для обучения на больших данных ==
=== Об инструментах Примеры реализации алгоритмов с использованием Spark MLlib ===Рассмотрим удобство использования Apache Spark на примере. Задача нашей модели предугадать захочет ли клиент оформить срочный вклад. Для этого воспользуемся [https://www.kaggle.com/rouseguy/bankbalanced| данными из Machine Learning Repository]. Напишем нашу модель на Python. Для начала работы с Apache Spark его необходимо установить, выполнив <code style="display: inline-block"><font color ='green'>pip install</font> pyspark</code>Считаем данные из нашего файла и выведем информацию о датасете на экран <code style="display: inline-block"><font color ='green'>from</font> pyspark.sql <font color ='green'>import</font> SparkSession spark = SparkSession.builder.appName('ml-bank').getOrCreate() df = spark.read.csv('bank.csv', header = True, inferSchema = True) df.printSchema() </code>Результат: root |-- age: integer (nullable = true) |-- job: string (nullable = true) |-- marital: string (nullable = true) |-- education: string (nullable = true) |-- default: string (nullable = true) |-- balance: integer (nullable = true) |-- housing: string (nullable = true) |-- loan: string (nullable = true) |-- contact: string (nullable = true) |-- day: integer (nullable = true) |-- month: string (nullable = true) |-- duration: integer (nullable = true) |-- campaign: integer (nullable = true) |-- pdays: integer (nullable = true) |-- previous: integer (nullable = true) |-- poutcome: string (nullable = true) |-- deposit: string (nullable = true)Как видно наши данные состоят из множества столбцов, содержащих числа и строки Для большей информации выведем наши данные с помощью таблицы pandas. Для примера выведем 7 первых значений import pandas as pd pd.DataFrame(df.take(7), columns=df.columns).transpose()[[Файл:SparkMLFirstTable.png]] Нас будут интересовать только численные данные. Для них построим таблицу с основной информацией (количество/ среднее по всей таблице/ среднеквадратичное отклонение / минимальное значение / максимальное значение) numeric_features =[t[0] for t in df.dtypes if t[1] =='int'] df.select(numeric_features).describe().toPandas().transpose()[[Файл:SparkMLSecondTable.png]]
Многие компании на сегодняшний день уже столкнулись с необходимостью обработки больших массивов данныхОценим корреляцию между оставшимися данными from pandas.plotting import scatter_matrix numeric_data = df.select(numeric_features).toPandas() axs = scatter_matrix(numeric_data, figsize=(8, 8)) n = len(numeric_data. Для этой цели они начали использовать проекты экосистемы columns) for i in range(n): v = axs[https://hadoopi, 0] v.yaxis.label.set_rotation(0) v.yaxis.apachelabel.org/ Apache Hadoopset_ha('right') v.set_yticks(()) h = axs[n-1, i] h. Данная экосистема базируется на xaxis.label.set_rotation(90) h.set_xticks(()) [[httpsФайл://ruSparkMLThirdTable.wikipediapng]] На данных графиках можно увидеть зависимость, к примеру, между возрастом и балансом на карте.org/wiki/MapReduce#:~:textНе будем учитывать эти корреляции при построении наших моделей, однако избавимся от дня и месяца рождения, так как эти параметры не влияют на желание клиента оформить быстрый кредит. df =MapReduce%20%E2%80%94%20%D0%BC%D0%BE%D0%B4%D0%B5%D0%BB%D1%8C%20%D1%80%D0%B0%D1%81%D0%BF%D1%80%D0%B5%D0%B4%D0%B5%D0%BB%D1%91%D0%BD%D0%BD%D1%8B%D1%85%20%D0%B2%D1%8B%D1%87%D0%B8%D1%81%D0%BB%D0%B5%D0%BD%D0%B8%D0%B9%2C%20%D0%BF%D1%80%D0%B5%D0%B4%D1%81%D1%82%D0%B0%D0%B2%D0%BB%D0%B5%D0%BD%D0%BD%D0%B0%D1%8Fdf.select('age', 'job', 'marital', 'education', 'default', 'balance',%D0%BD%D0%B0%D0%B1%D0%BE%D1%80%D0%B0%D0%BC%D0%B8%20%D0%B4%D0%B0%D0%BD%D0%BD%D1%8B%D1%85%20%D0%B2%20%D0%BA%D0%BE%D0%BC%D0%BF%D1%8C%D1%8E%D1%82%D0%B5%D1%80%D0%BD%D1%8B%D1%85%20%D0%BA%D0%BB%D0%B0%D1%81%D1%82%D0%B5%D1%80%D0%B0%D1%85. MapReduce] 'housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', парадигме параллельного программирования 'poutcome', разработанного компанией Google'deposit') cols = df.columnsОсновные достоинства MapReduce:* Масштабируемость;Подготовим оставшиеся данные для построения моделей.* Устойчивость к сбоям;* Простота использования from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderНо при всех достоинствах данного инструмента categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', наблюдалась низкая производительность на итеративных алгоритмах 'poutcome'] stages = [] for categoricalCol in categoricalColumns: stringIndexer = StringIndexer(напримерinputCol = categoricalCol, алгоритмы машинного обученияoutputCol = categoricalCol + 'Index') encoder = OneHotEncoder(inputCols=[stringIndexer. Решение проблемы было найдено в университете Беркли: была разработана модель распределенных вычисленийgetOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder] label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label') stages += [label_stringIdx] numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous'] assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols assembler = VectorAssembler(inputCols=assemblerInputs, которая имеет устойчивость к сбоям при пользовании распределенной коллекцией данных outputCol="features") stages += [assembler] from pyspark.ml import Pipeline pipeline = Pipeline(англstages = stages) pipelineModel = pipeline. resilient distributed datasetfit(df) df = pipelineModel.transform(df) selectedCols = ['label', RDD'features'] + cols df = df.select(selectedCols) df.printSchema()Наконец, поделим нашу выборку на обучающую и тестирующуюНа основе RDD по сей день развивается система train, test = df.randomSplit([https://spark0.apache7, 0.org/ Apache Spark3]<ref name, seed =2018) print("Training Dataset Count: " + str(train.count())) print("sparkTest Dataset Count: ">[https+ str(test.count()))Построим модели и выведем точность(площадь под ROC-кривой) для://sparkLogistic Regression from pyspark.ml.apacheclassification import LogisticRegression lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10) lrModel = lr.org/ Apache Sparkfit(train) trainingSummary = lrModel.summary roc = trainingSummary.roc.toPandas() plt.plot(roc['FPR']</ref>, которая обладает сравнительно высокой эффективностью при работе итеративных алгоритмов за счет кэширования результатов в памятиroc['TPR']) plt.ylabel('False Positive Rate') plt.xlabel('True Positive Rate') plt.title('ROC-кривая') plt. На основе концепции распределенных коллекций разрабатываются распределенные системыshow() print('Точность:' + str(trainingSummary.areaUnderROC))* [https Точность://spark0.apache8865478305561797Binary Classification from pyspark.org/docs/1ml.0evaluation import BinaryClassificationEvaluator evaluator = BinaryClassificationEvaluator() print('Точность: ', evaluator.evaluate(predictions)) Точность: 0/sql-programming-guide.html Shark] – хранилище данных;8837112925002687Decision Tree* [https://spark from pyspark.ml.apacheclassification import DecisionTreeClassifier dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3) dtModel = dt.org/docs/latest/graphx-programming-guidefit(train) predictions = dtModel.html GraphX] – система обработки графовых данных;transform(test) evaluator = BinaryClassificationEvaluator()* [https print("Точность://spark" + str(evaluator.apacheevaluate(predictions, {evaluator.org/docs/latest/streaming-programming-guide.html Spark Streaming] – система обработки потоковых данных;metricName: "areaUnderROC"}))) * [https Точность://spark0.apache7808118726917547 Random Forest from pyspark.org/docs/latest/ml-guide.html Spark MLlib] – библиотека алгоритмов машинного обученияclassification import RandomForestClassifier rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label') rfModel = rf.fit(train)Все из перечисленных систем совместимы со стеком технологий Hadoop predictions = rfModel.transform(test)MLlib – основная библиотека Spark evaluator = BinaryClassificationEvaluator() print("Точность: " + str(evaluator. Она предоставляет множество служебных программevaluate(predictions, полезных для задач машинного обучения{evaluator.metricName:"areaUnderROC"})))* Классификация;* Регрессия; Точность: 0.8777131493473223* Кластеризация;Gradient-Boosted Tree * Моделирование; from pyspark.ml.classification import GBTClassifier gbt = GBTClassifier(maxIter=10) gbtModel = gbt.fit(train) predictions = gbtModel.transform(test) evaluator = BinaryClassificationEvaluator() print("Точность: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))* Сингулярное разложение и анализ по методу главных компонент;* Проверка гипотез и статистической выборки Точность: 0.8935091626908479
=== Примеры реализации алгоритмов с использованием Spark MLlib ===