Apache Spark Quick Start — Как начать работать с PySpark

СТАТЬЯ В ПРОЦЕССЕ НАПИСАНИЯ

СТАТЬЯ В ПРОЦЕССЕ НАПИСАНИЯ

СТАТЬЯ В ПРОЦЕССЕ НАПИСАНИЯ

СТАТЬЯ В ПРОЦЕССЕ НАПИСАНИЯ

СТАТЬЯ В ПРОЦЕССЕ НАПИСАНИЯ

СТАТЬЯ В ПРОЦЕССЕ НАПИСАНИЯ

Table of Contents

Что посмотреть на русском по Spark?

Однозначно плейлист «DataLearn: DE-101 | Модуль 7». Здесь вы получите базовое понимание инструмента.

Краткое описание терминов Spark

  • Application (Приложение) — Пользовательская программа, созданная на Spark. Состоит из программы-драйвера и исполнителей на кластере.
  • Application JAR — JAR-файл, содержащий приложение Spark пользователя. В некоторых случаях пользователи могут захотеть создать «uber JAR», который будет содержать их приложение вместе с его зависимостями. Однако, пользовательский JAR не должен включать библиотеки Hadoop или Spark, так как они будут добавлены во время выполнения.
  • Cluster manager — Внешний сервис для получения ресурсов в кластере.
    Типы Cluster Manager:

    • Standalone (автономный менеджер) – простой менеджер кластеров, входящий в состав Spark, который упрощает настройку кластера.
    • Apache Mesos (Deprecated) — общий менеджер кластера, который также может запускать Hadoop MapReduce и сервисные приложения.
    • Hadoop YARN – менеджер ресурсов в Hadoop 3.
    • Kubernetes – система с открытым исходным кодом для автоматизации развертывания, масштабирования и управления контейнеризированными приложениями.
  • Deploy mode (Режим развертывания) — Определяет, где запускается и работает процесс драйвера:
    • В режиме кластера (cluster mode) фреймворк запускает драйвер внутри кластера.
    • В режиме клиента (client mode) ведомый (подчиненный) запускает драйвер за пределами кластера.
    • Можно определить, в каком режиме вы находитесь, выполнив метод deployMode(). Метод возвращает свойство, защищенное от изменения (только для чтения).
  • Driver program (Программа-драйвер) — Процесс, выполняющий основную функцию main() приложения и создающий контекст SparkContext. Все начинается именно здесь. Это процесс JVM, который управляет приложением Spark, обрабатывая вводимые пользователем данные и распределяя работу между исполнителями.
  • Executor (Исполнитель) — Процесс, запускаемый для приложения на рабочем узле. Исполнитель выполняет задачи и сохраняет промежуточные данные в памяти или на диске. У каждого приложения имеются собственные исполнители.
  • Job (Задание) — Параллельное вычисление (выполнение), состоящее из нескольких задач, которые порождаются в ответ на некоторое действие Spark (например, save() или collect()). В Spark job представляет собой ряд преобразований, применяемых к данным. Он охватывает весь рабочий процесс от начала до конца. Одно приложение Spark может иметь более одного job Spark.
  • Stage (Этап) — Stage — это сегмент задания, выполняемый без data shuffling (перетасовки данных). Spark разбивает job (задание) на разные этапы, когда преобразование требует перетасовки данных между разделами.
  • Task (Задача) — Минимальная единица работы, которая передается одному исполнителю. Каждый этап делится на несколько задач, которые выполняют обработку параллельно в разных data partitions (разделах данных).
  • Worker node (Рабочий узел) — Любой узел, который может выполнять код приложения в кластере.
  • RDD — это первичная абстракция данных. Используются ли DataFrames или Datasets, они компилируются в RDD за кулисами. Он представляет собой неизменяемую, секционированную коллекцию записей, с которой можно работать параллельно. Данные внутри RDD хранятся в памяти как можно дольше и как можно больше.

Схема Spark с компонентами:

Основы архитектуры Apache Spark

Архитектура Apache Spark используется не только для обработки в реальном времени, но и для пакетной обработки, и она может обеспечить производительность в сто раз выше, чем традиционный метод map-reduce, реализованный в Hadoop.

Архитектура Spark или Apache Spark — это фреймворк параллельной обработки, поддерживающий обработку в памяти для повышения производительности аналитических приложений больших данных и ядро Apache Spark, ресурсы которого управляются YARN. Проще говоря, архитектура Spark известна своей скоростью и эффективностью.

Вот ее основные особенности:

  • Он обрабатывает данные быстрее, что экономит время при операциях чтения и записи.
  • Он обеспечивает потоковую передачу данных в режиме реального времени, что делает его действительно востребованной технологией в современном мире больших данных.

ПРИМЕЧАНИЕ: YARN — это своего рода менеджер ресурсов, который в основном используется для управления ресурсами вашей архитектуры Spark.

Архитектура Spark основана на двух важных абстракциях: устойчивом распределенном наборе данных (RDD) и направленном ациклическом графе (DAG).

Понимание операций DAG в Spark

Общая иерархия приложения Spark представлена на картинке:

В этом разделе будет описано, как код Spark разбивается на Job и Stage Tasks через DAG.


DAG (Directed Acyclic Graph) — это направленный ациклический граф, другими словами это набор вершин и ребер. Вершины представляют RDD, а ребра представляют операцию, которая должна быть применена к RDD. В Spark DAG каждое ребро направляется от более раннего к более позднему в последовательности. При вызове Action созданный DAG отправляется в DAG Scheduler, который далее разбивает граф на этапы задачи.

DAGScheduler — это уровень планирования Apache Spark, который реализует поэтапное планирование с использованием Jobs (заданий) и Stages (стадий).

DAGScheduler преобразует логический план выполнения (RDD lineage of dependencies built using RDD transformations) в физический план выполнения (с использованием Stages).

После вызова действия RDD в SparkContext передает логический план DAGScheduler, который он в свою очередь преобразует в набор Stages (этапов), отправляемых в виде TaskSets для выполнения.

PySpark DAG Visualization:

Как это работает в Spark?

  • Sequence of Computations (Последовательность вычислений): в Spark, когда вы выполняете действия или преобразования с данными, они изначально представлены как ряд операций. Вместо того, чтобы выполнять каждую операцию немедленно, Spark создает DAG этих операций.
  • Lazy Evaluation (Ленивая оценка): этот DAG формирует план выполнения, который оценивается и оптимизируется до фактического вычисления. Ленивая оценка Spark означает, что преобразования не вычисляются сразу, а накапливаются до тех пор, пока не будет вызвано действие. Это позволяет Spark оптимизировать план выполнения, например, путем комбинирования преобразований.
  • Fault Tolerance (Отказоустойчивость): структура DAG способствует отказоустойчивости. Если узел выходит из строя во время вычислений, Spark может пересчитать только раздел потерянных данных, используя отношения, представленные DAG, вместо того, чтобы переделывать всю операцию.
  • Optimized Execution (Оптимизированное выполнение): DAG позволяет Spark эффективно перестраивать вычислительные задачи. Зная зависимости и преобразования, Spark может выполнять этапы параллельно, оптимизировать распределение ресурсов и выполнять конвейеризацию операций.
  • Task Scheduling (Планирование задач): После того, как DAG создан и оптимизирован, Spark делит его на набор этапов с наборами задач, которые запланированы для выполнения. Каждый этап в DAG может выполняться параллельно через доступные исполнители.

По сути, DAG гарантирует, что Выполнение задач Spark выполняется эффективно, с оптимизацией скорости и управлением ресурсами, интегрированными на каждом этапе. Это ключевой механизм, который лежит в основе надежной производительностиApache Spark в задачах обработки данных.

Еще одна иллюстрация как работает DAG (источник):

Понимание компонентов RDD: Resilient, Distributed, Dataset

RDD (Resilient Distributed Dataset) — является фундаментальной концепцией в Apache Spark, которая обозначает три основных атрибута.

Давайте разберемся, что означает каждый термин:

  • Resilient (Устойчивый)
    • Отказоустойчивость: RDD разработаны для автоматического восстановления после ошибок или сбоев. Если узел в кластере выходит из строя, потерянные данные могут быть пересчитаны из его родословной, что гарантирует целостность данных.
    • Эффективное восстановление: с помощью процесса, известного как родословная, RDD могут повторно вычислять только утраченную часть данных, что делает восстановление эффективным без существенного нарушения процессов.
  • Distributed (Распределенный)
    • Масштабируемость по узлам: данные распределяются по различным узлам в кластере, что позволяет выполнять параллельную обработку. Такое распределение повышает как скорость, так и эффективность, поскольку задачи могут использовать несколько машин.
    • Балансировка нагрузки: рабочие нагрузки равномерно распределяются по узлам, что позволяет избежать возникновения узких мест, обеспечивая оптимальное использование ресурсов и повышение производительности.
  • Dataset (Набор данных)
    • Логическая группировка данных: RDD представляют собой набор данных в логической структуре, позволяя пользователям манипулировать большими объемами данных с помощью единого унифицированного интерфейса.
    • Преобразования и действия: с помощью RDD можно выполнять различные преобразования (например, сопоставление и фильтрация) и действия (например, подсчет и сбор) для беспрепятственного манипулирования и анализа данных.

Внутри каждый RDD в Spark имеет пять ключевых свойств:

  • List of Partitions (Список разделов): RDD разделен на разделы, которые являются единицами параллелизма в Spark.
  • Computation Function (Функция вычисления): Функция определяет, как вычислять данные для каждого раздела.
  • Dependencies (Зависимости): RDD отслеживает свои зависимости от других RDD, которые описывают, как он был создан.
  • Partitioner — Разделитель (необязательно): для RDD «ключ-значение» разделитель определяет, как разделяются данные, например, с помощью хэш-разделителя.
  • Preferred Locations — Предпочтительные расположения (необязательно): в этом свойстве перечислены предпочтительные расположения для вычисления каждого раздела, например расположения блоков данных в HDFS.

Lazy Evaluation (Ленивая оценка)

Когда вы определяете RDD, его внутренние данные не доступны или не преобразуются немедленно, пока действие не запустит выполнение. Такой подход позволяет Spark определить наиболее эффективный способ выполнения преобразований.

RDD в Apache Spark поддерживает два типа операций:

  • Трансформации — это функции, которые принимают существующие RDD в качестве входных данных и выводят один или несколько RDD. Однако данные в существующем RDD в Spark не изменяются, поскольку они неизменяемы.
  • Действия в Spark — это функции, которые возвращают конечный результат вычислений RDD. Он использует lineage graph для загрузки данных в RDD в определенном порядке. После того, как все преобразования выполнены, действия возвращают конечный результат в драйвер Spark.

Partitions (Разделы)

При создании RDD Spark делит данные на несколько фрагментов, называемых разделами. Каждый раздел представляет собой логическое подмножество данных и может обрабатываться независимо с помощью разных исполнителей. Это позволяет Spark выполнять операции с большими наборами данных параллельно.

RDD операции:

Существуют narrow (узкие) и wide (широкие) преобразования.

  • В узком преобразовании все элементы данных, необходимые для вычисления записей в single partition (одном разделе), находятся в одном разделе родительского RDD.
  • В широком преобразовании все элементы данных, необходимые для вычисления записей в одном разделе, могут находиться во многих разделах родительского RDD, поэтому для завершения преобразования разделы необходимо перетасовать (shuffled).

Narrow vs Wide Transformations:

Two-stage Spark job:

DAGScheduler проходит по родословной RDD от конечного RDD (с действием) обратно к исходному RDD, создавая DAG этапов на основе границ перемешивания. Этапы формируются там, где существуют широкие зависимости (границы перемешивания). Каждый этап состоит из параллельных задач, которые могут выполняться на разных разделах. Этапы создаются как ResultStages (конечный этап) или ShuffleMapStages (промежуточные этапы, которые выполняют перемешивание).

Shuffling происходит, когда данные перераспределяются между разделами. Это необходимо, когда преобразование требует информации из других разделов, например, суммирования всех значений в столбце. Spark соберет требуемые данные из каждого раздела и объединит их в новый раздел, вероятно, на другом исполнителе. Shuffling — это механизм, который Spark использует для перераспределения данных между разными исполнителями и даже между машинами. Перемешивание Spark срабатывает, когда мы выполняем определенные операции преобразования, такие как gropByKey(), reducebyKey(), join() на RDD и DataFrame.

Еще одна иллюстрация работы приложения Spark

Apache Spark Internals: Understanding Physical Planning (Stages, Tasks & Pipelining)

Apache Spark Internals: Task Scheduling — Execution of a Physical Plan

Что такое кластер Apache Spark?

Кластеры Apache Spark — это группа компьютеров, которые работают как один компьютер и обрабатывают выполнение различных команд, выдаваемых из блокнотов. Apache Spark использует архитектуру типа «master-worker», которая по сути может иметь master process (главный процесс) и worker processes (рабочие процессы), которые могут выполняться на нескольких машинах.

В Master node (главном узле) у вас есть программа драйвера (driver), которая управляет приложением, поэтому код, который вы пишете, ведет себя как программа драйвера. Внутри программы драйвера первое, что мы делаем, это создаем Spark context. Предполагая, что к Spark context может быть шлюзом ко всем или любым функциям Spark при идентичном подключении к базе данных. Worker nodes (рабочие узлы) — это подчиненные узлы, чья работа заключается в основном в выполнении задач.

Главный узел содержит программу драйвера, которая управляет приложением, создавая объект контекста Spark. Объект контекста Spark работает с менеджером кластера для управления различными заданиями. Задача рабочих узлов — выполнять задания и возвращать результаты главному узлу.

Основные компоненты архитектуры Apache Spark

  • Программа драйвера: запускает основную функцию и отвечает за координацию задач в кластере. Она определяет один или несколько объектов SparkContext, которые являются основными точками входа для взаимодействия с механизмом выполнения Spark.
  • Spark Context: центральный координатор для приложений Spark. Он взаимодействует с менеджером кластера для распределения ресурсов и планирования задач. Spark Context также управляет кэшированием и распределенным хранением RDD.
  • Менеджер кластера: отвечает за управление ресурсами и планирование задач в кластере. Spark может работать с различными менеджерами кластера, такими как автономные, Apache Mesos, Hadoop YARN и Kubernetes.
  • Executor: это процессы JVM, работающие на рабочих узлах, которые выполняют задачи, запланированные программой драйвера. Каждый executor запускает несколько задач одновременно в отдельных потоках и напрямую взаимодействует с программой драйвера, чтобы сообщать о состоянии задач. Executors также отвечают за кэширование разделов RDD в памяти или на диске.
  • Задача: это наименьшая единица работы в Spark, представляющая собой одну операцию над разделом данных. Задачи объединяются в этапы, которые представляют собой группы задач с одинаковой операцией и зависимостями.
  • RDD (Resilient Distributed Dataset): это неизменяемая, распределенная коллекция объектов, которые могут обрабатываться параллельно. RDD распределяются по узлам кластера, и Spark автоматически управляет их распределением, отказоустойчивостью и восстановлением.
  • Планировщик DAG: делит приложение Spark на этапы на основе зависимостей между RDD. Он планирует задачи на узлах-исполнителях, принимая во внимание локальность данных и ресурсы кластера.
  • Выполнение запроса: оптимизатор запросов Spark Catalyst и механизм выполнения Tungsten оптимизируют и выполняют запросы SQL и операции DataFrame/Dataset. Catalyst оптимизирует планы запросов, используя такие методы, как predicate pushdown и project pruning, в то время как Tungsten оптимизирует физическое выполнение, используя такие методы, как генерация кода и управление памятью.

Как Executor обрабатывает данные в приложении Spark?

В приложении Spark Executor играет решающую роль в управлении обработкой и хранением данных.

Вот как это работает:

  • Инициирование на Worker Nodes (рабочих узлах): каждый Executor инициируется как процесс на рабочем узле, гарантируя, что приложение эффективно использует распределенные вычислительные ресурсы.
  • Выполнение задач и управление данными: Executors отвечают за выполнение назначенных им задач. Они предназначены для хранения данных либо в памяти для быстрого доступа, либо на диске для более постоянного хранения. Эта двойная возможность позволяет исполнителям сбалансировать скорость и доступность ресурсов, оптимизируя производительность.
  • Взаимодействие с данными: Executors активно взаимодействуют с внешними источниками данных. Они считывают входные данные, необходимые для обработки, и записывают выходные данные обратно в эти источники, поддерживая бесперебойный поток информации в среду Spark и из нее.
  • Application-Specific Allocation: каждое приложение Spark инициализирует свой собственный набор Executors. Это гарантирует, что задачи будут выполнены в изоляции от других запущенных приложений, предотвращая конфликты ресурсов и оптимизируя производительность приложения.

Job — это параллельное вычисление, состоящее из нескольких задач, и каждое распределенное действие — это job. Группа задач, которые могут быть выполнены для выполнения одной и той же задачи на нескольких исполнителях, представляет Stage. Каждое job разбивается на наборы задач, которые зависят друг от друга.

Ресурсы, предоставляемые всем рабочим узлам в соответствии с их потребностями, и управление всеми узлами осуществляется с помощью Cluster Manager, т.е. Cluster Manager — это режим, в котором мы можем запустить Spark.

Что такое Spark Session | Точка входа в Spark

Spark Session — это унифицированная точка входа для приложений Spark; она была представлена в Spark 2.0. Она действует как соединитель для всех базовых функций Spark, включая RDD, DataFrames и Datasets, предоставляя унифицированный интерфейс для работы со структурированной обработкой данных. Это один из самых первых объектов, которые вы создаете при разработке приложения Spark PySpark или Spark SQL. Как разработчик Spark, вы создаете SparkSession с помощью метода SparkSession.builder().

Spark Session объединяет несколько ранее отдельных контекстов, таких как SQLContext, HiveContext и StreamingContext, в одну точку входа, упрощая взаимодействие со Spark и его различными API. Он позволяет пользователям выполнять различные операции, такие как чтение данных из различных источников, выполнение SQL-запросов, создание DataFrames и Datasets, а также эффективное выполнение действий с распределенными наборами данных.

  • SparkSession.builder()– Возвращаемый SparkSession.Builder класс. Это конструктор для SparkSession.master(), appName() и getOrCreate() являются методами SparkSession.Builder.
  • master() – Это позволяет приложениям Spark подключаться и работать в разных режимах (локальный, автономный кластер, Mesos, YARN) в зависимости от конфигурации.
  • Используйте local[x] при запуске на локальном ноутбуке. x должно быть целым числом и должно быть больше 0; это показывает, сколько разделов он должен создать при использовании RDD, DataFrame и Dataset. В идеале xзначение должно быть числом ядер ЦП, которые у вас есть.
  • Для standalone использования spark://master:7077
  • appName() – Задает имя для приложения Spark, которое отображается в веб-интерфейсе Spark. Если имя приложения не задано, задается случайное имя.
  • getOrCreate() – Возвращает объект SparkSession, если он уже существует. Создает новый, если он не существует.

Создать еще один Spark Session

Иногда вам может потребоваться создать несколько сеансов, что можно легко сделать с помощью newSession() метода. Он использует то же имя приложения и мастера, что и существующий сеанс. Базовый SparkContext будет одинаковым для обоих сеансов, так как у вас может быть только один контекст на приложение Spark.

Spark Session vs Spark Context

Сессия Spark — это основной объект в Spark — точка входа каждого приложения Spark. Сессия представляет собой обертку вокруг sparkContext и содержит все метаданные, необходимые для начала работы с распределенными данными.

SparkContext — это переменная объекта Spark Session, которая используется для работы с RDD.

Контекст SQL — это то же самое, что и контекст Spark, переменная объекта сеанса Spark, которая используется для выполнения операций с DataFrames и DataSets.

Когда мы закончим анализ, мы можем очистить кэш Spark и завершить сессию с помощью SparkSession.stop().

Сериализация данных в PySpark

Сериализация данных в Spark — это процесс преобразования объектов в байтовый поток, который может быть передан по сети, сохранён в файл или использован в распределённой среде для хранения и обработки данных. В контексте Spark, сериализация имеет особое значение, потому что данные и объекты должны быть переданы между различными узлами или процессами, а также сохранены в формате, который позволяет их быстро считывать и восстанавливать.

Когда мы говорим о физической сериализации данных в Spark, мы имеем в виду преобразование объектов Python (или других языков, которые поддерживает Spark) в бинарный формат, который может быть эффективно передан и обработан на разных узлах кластера или в локальном режиме.

Что происходит при сериализации

Сериализация — это процесс, при котором объект (например, список, словарь, или даже пользовательский класс) преобразуется в последовательность байтов. Эти байты могут быть сохранены в файл, переданы по сети или использованы для передачи данных между разными процессами (например, между рабочими узлами кластера).

Типы сериализаторов в PySpark

В Spark доступны два основных сериализатора для общего использования:

JavaSerializer (по умолчанию):

Класс: org.apache.spark.serializer.JavaSerializer

Этот сериализатор используется по умолчанию в Spark, в основном из-за его совместимости с объектами Java. Он использует встроенный механизм сериализации Java, что делает его высокосовместимым со всеми объектами Java.

  • Плюсы: Простота, не требует дополнительных настроек, работает с широким спектром типов объектов.
  • Минусы: медленнее и производит больше сериализованных данных по сравнению с Kryo. Не идеально подходит для высокопроизводительных приложений с интенсивным использованием памяти.

KryoSerializer

Класс: org.apache.spark.serializer.KryoSerializer

Kryo — это более оптимизированный сериализатор, разработанный для производительности и эффективности использования пространства. Он быстрее и генерирует меньшие сериализованные данные, что делает его предпочтительным выбором при работе со сложными структурами данных.

  • Плюсы: более высокая производительность для больших или сложных объектов, минимизирует размер данных.
  • Минусы: требуется ручная регистрация пользовательских классов (т.е. нестандартных типов Java), что добавляет дополнительные шаги и накладные расходы на настройку.

Как работает сериализация в PySpark

В Spark данные сериализуются, когда они перемещаются между узлами или сохраняются на диске.

Вот упрощенный вид процесса:

  1. Данные подготавливаются на узле драйвера и сериализуются в двоичный формат с помощью настроенного сериализатора.
  2. Сериализованные данные передаются на рабочие узлы кластера.
  3. Рабочие узлы десериализуют данные для обработки.
  4. Результаты с рабочих узлов снова сериализуются перед отправкой обратно на узел драйвера.

Сериализатор влияет как на задержку, так и на объем памяти этих шагов. Сериализация Kryo быстрее и генерирует меньше данных, что может снизить использование памяти и ускорить передачу по сети, что особенно полезно для итеративных задач или больших наборов данных.

Spark Partitioning и Bucketing

Spark Partitioning и Bucketing — это техники организации данных в Apache Spark, которые помогают улучшить производительность при обработке больших объемов данных.

Partitioning в Spark — это процесс распределения данных по разным частям (партициям) для параллельной обработки. Каждая партиция — это подмножество данных, которое может обрабатываться отдельным потоком или задачей.

Партиционирование данных позволяет Spark эффективно распределять работу между кластерами.
При правильном partitioning можно уменьшить количество операций shuffle (перераспределение данных между узлами) в кластере, что значительно ускоряет обработку данных.

Bucketing — это метод организации данных в дисковых файлах с использованием более контролируемого подхода. В отличие от partitioning, который делит данные на основе значений столбцов, bucketing делит данные на несколько фиксированных «ведер» (buckets) по хэш-значению.

Каждый «bucket» представляет собой раздел данных, который соответствует определенному диапазону или значению.

Partitioning — это метод, который делит данные на части на уровне кластеров.
Bucketing — это метод на уровне файловой системы, где данные делятся на несколько частей в рамках одного файла.

Партиционирование: Используйте партиционирование, когда вам нужно распределить данные для параллельной обработки и оптимизировать локальность данных. Это полезно для фильтрации и объединений (joins), где условие фильтрации соответствует ключу партиции.

Бакетирование: Используйте бакетирование, когда у вас есть большие наборы данных, и вы хотите оптимизировать производительность запросов, особенно для фильтров на основе равенства. Это полезно для сценариев, когда данные часто запрашиваются по конкретным критериям.

Комбинирование обоих методов: В некоторых случаях сочетание партиционирования и бакетирования может дать наилучшие результаты. Вы можете разделить данные на партиции по высокоуровневой категории, а затем применить бакетирование внутри каждой партиции.

На следующей диаграмме показан поток распределенной обработки:

  • Драйвер Spark создает план выполнения для распределенной обработки между множеством исполнителей Spark.
  • Драйвер Spark назначает задачи каждому исполнителю на основе плана выполнения. По умолчанию драйвер Spark создает разделы RDD (каждый соответствует задаче Spark) для каждого объекта S3 (Part1 … N). Затем драйвер Spark назначает задачи каждому исполнителю.
  • Каждая задача Spark загружает свой назначенный объект S3 и сохраняет его в памяти в разделе RDD. Таким образом, несколько исполнителей Spark загружают и обрабатывают свою назначенную задачу параллельно.

Оптимизатор Catalyst

Внутри Spark используется движок, называемый оптимизатором Catalyst.для оптимизации планов выполнения. Catalyst имеет оптимизатор запросов, который можно использовать при запуске высокоуровневых API Spark, таких как Spark SQL, DataFrame и наборы данных, как показано на следующей диаграмме.

Поскольку оптимизатор Catalyst не работает напрямую с RDD API, высокоуровневые API обычно быстрее низкоуровневого RDD API. Для сложных соединений оптимизатор Catalyst может значительно повысить производительность, оптимизируя план выполнения задания. Вы можете увидеть оптимизированный план вашего задания Spark на вкладке SQL пользовательского интерфейса Spark.

Адаптивное выполнение запросов

Оптимизатор Catalyst выполняет оптимизацию времени выполнения с помощью процесса, называемого Adaptive Query Execution.

Adaptive Query Execution использует статистику времени выполнения для повторной оптимизации плана выполнения запросов во время выполнения вашего задания. Adaptive Query Execution предлагает несколько решений проблем производительности, включая объединение разделов после перемешивания, преобразование объединения сортировки-слияния в широковещательное объединение и оптимизацию перекошенного объединения.

Объединение разделов после перемешивания

Эта функция уменьшает разделы RDD (объединяет) после каждого перемешивания на основе map выходной статистики. Она упрощает настройку номера раздела перемешивания при выполнении запросов. Вам не нужно устанавливать номер раздела перемешивания, чтобы он соответствовал вашему набору данных. Spark может выбрать правильный номер раздела перемешивания во время выполнения после того, как у вас будет достаточно большое начальное количество разделов перемешивания.

Преобразование сортировочно-слиятельного соединения в широковещательное соединение

Эта функция распознает, когда вы объединяете два набора данных существенно разного размера, и использует более эффективный алгоритм объединения на основе этой информации. Для получения более подробной информации см. документацию Apache Spark. Стратегии присоединения обсуждаются в разделе «Оптимизация перемешиваний» .

Оптимизация косого соединения

Перекос данных — одно из наиболее распространенных узких мест для заданий Spark. Он описывает ситуацию, в которой данные перекошены в определенные разделы RDD (и, следовательно, определенные задачи), что задерживает общее время обработки приложения. Это часто может снизить производительность операций соединения. Функция оптимизации перекоса соединения динамически обрабатывает перекос в соединениях сортировки-слияния, разделяя (и реплицируя при необходимости) перекошенные задачи на задачи примерно одинакового размера.

Spark RDD vs. DataFrame vs. Dataset

Понимание разницы между Spark RDD, DataFrames и наборами данных имеет важное значение в мире распределенной обработки данных. Эти различия, начиная от контроля и удобства и заканчивая сложностью и ясностью, позволяют специалистам по данным эффективно использовать возможности Apache Spark, независимо от характера их данных или сложности их задач.

Вот некоторая информация о Spark RDD, DataFrames и Datasets:

  • Spark RDD: самая базовая абстракция данных в Spark. Это неизменяемые и распределенные коллекции данных. RDD предлагают детальный контроль над обработкой данных, но они могут быть сложны в использовании.
  • Spark DataFrames: абстракция более высокого уровня, чем RDD. Это структурированные данные, организованные в именованные столбцы, похожие на таблицу в базе данных. DataFrames проще в использовании, чем RDD, но они предлагают меньший контроль над обработкой данных.
  • Spark Datasets: это новая абстракция данных в Spark, которая объединяет преимущества RDD и DataFrames. Datasets — это распределенные коллекции данных с дополнительной схемой. Они предлагают баланс контроля и удобства, что делает их хорошим выбором для различных задач обработки данных.

Таблица сравнения:

YouTube Videos Apache Spark

Прежде чем мы перейдем к настройке среды, я публикую подборку видео по Apache Spark

PySpark Tutorial for Beginners

PlayList PySpark Tutorial for Beginners

Apache Spark Architecture — EXPLAINED!

Java и Apache Spark

Установка Java необходима для работы с PySpark, потому что Apache Spark написан на языке программирования Scala, который работает на платформе Java (JVM — Java Virtual Machine). Spark использует компоненты и библиотеки, которые требуют наличия Java.

Scala компилируется под Java VM.


Когда вы запускаете PySpark, Python-код взаимодействует с кластером Spark через Py4J — библиотеку, которая позволяет Python-коду общаться с Java-объектами.

Таким образом, Java необходима для:

  • Запуска Spark: Для инициализации ядра Spark и работы всех его компонентов требуется Java.
  • Совместимости с компонентами Spark: Spark и его различные компоненты (например, Spark SQL, MLlib и т.д.) используют Java и требуют Java Runtime для своей работы.
  • Использования Py4J: Эта библиотека позволяет взаимодействовать Python-коду с JVM-кодом Spark, обеспечивая выполнение задач и обработки данных в распределенной среде.

Таким образом, для корректной работы PySpark обязательно наличие Java на машине.

Установка и настройка Apache Spark на отдельной машине

В рамках этой статьи основной целью является написание Guide, который позволит начать работу в рамках одной машины (я все инсталляции выполняю на Ubuntu). На других ОС действия должны быть похожими, но скорей всего будут чем-то отличаться.

Я начну с описания установки Java, затем перейду к установки Apache Spark и PySpark. После этого будет приведены примеры скриптов для работы с PySpark.

Установка JAVA

Если у вас на машине отсутствует Java, то начинаем с инсталляции Java (будем использовать Java 11 версии).

Проверяем, что Java установлена:

Далее нам необходимо установить переменную окружения JAVA_HOME.

Запускаем команду:

Копируем

Добавляем в среду переменные в /etc/environment

/etc/environment — это системный файл, который используется для настройки глобальных переменных окружения. Переменные в этом файле доступны для всех пользователей системы.

Команда редактирования переменных:

Добавляем 2 строки:

Перезагружаем с помощью команды:

Проверяем $JAVA_HOME

Результат:

Установка Apache Spark и PySpark — Вариант 1 «Light»

Установка PySpark в виртуальном окружении

Устанавливаем виртуальную среду

Активируем среду

Устанавливаем PySpark:

Запускаем PySpark:

Результат выполнения команды:

Использование Spark UI

При локальном запуске PySpark предоставляет веб-интерфейс Spark UI для мониторинга процессов.

После запуска PySpark откройте http://localhost:4040.

В Spark UI отражается следующая информация:

  • Активные задачи — список всех текущих заданий, которые выполняются на Spark.
  • DAG выполнения — Directed Acyclic Graph (DAG) отображает последовательность операций, которые Spark будет выполнять на данных. Это помогает понять, как данные передаются через различные этапы выполнения.
  • Использование памяти и shuffle — показывает, сколько памяти используется на каждом этапе обработки и как Spark проводит операции shuffle (перемещение данных между узлами или процессами). В локальном режиме это будет важно для отслеживания, не переполняется ли память вашего устройства.

Если PySpark установлен локально через pip, то Spark запускается в локальном режиме (Local mode). В локальном режиме Spark по-прежнему использует свой распределённый движок, разделяя задачи на подзадачи и управляя их параллельным выполнением. Это позволяет работать с большими объемами данных, но ограничивает производительность количеством доступных ядер и памяти вашего устройства.

.master("local[*]") означает запуск Spark с использованием всех доступных ядер процессора. Вы можете также указать конкретное количество потоков, например, .master("local[4]"), чтобы использовать только 4 ядра.

Процесс выполнения PySpark кода локально:

  1. Вы пишете код на Python с использованием PySpark.
  2. Запускаете SparkSession, который инициализирует Spark.
  3. Spark начинает разбиение данных и распределение их по доступным ядрам процессора.
  4. Данные обрабатываются, и результаты возвращаются обратно в ваш код.

Важно! Если вы работаете с большим объёмом данных, которые не помещаются в память вашего устройства, вы можете столкнуться с проблемами производительности или даже с нехваткой памяти. Но для понимания PySpark — локальной машины будет достаточно для экспериментов с инструментом.

Установка Jupyter Notebook и запуск примера PySpark

В той же виртуальной среде запускаем команды установки Jupyter Notebook:

Стартуем Jupyter Notebook:

Откроется http://localhost:8888/tree

Запуск примера приложения PySpark в Jupyter Notebook

todo

Установка Apache Spark и PySpark — Вариант 2 «Docker Compose»

todo

Как работать с Apache Spark в Google Colab?

Пример:

todo

Подборка примеров по Apache Spark

Использованные материалы для подготовки статьи

0
Оставьте комментарий! Напишите, что думаете по поводу статьи.x