Basic PySpark Interview Questions
Каковы основные преимущества использования PySpark по сравнению с традиционным Python для обработки больших данных?
PySpark, Python API для Apache Spark, предлагает несколько преимуществ по сравнению с традиционным Python для обработки больших данных. К ним относятся масштабируемость для работы с массивными наборами данных, высокая производительность за счёт параллельной обработки, отказоустойчивость для обеспечения надёжности данных, а также интеграция с другими инструментами для работы с большими данными внутри экосистемы Apache.
Как создать SparkSession в PySpark? Каковы его основные назначения?
В PySpark SparkSession является точкой входа для использования функциональности Spark и создаётся с помощью API SparkSession.builder. Его основные назначения включают взаимодействие с Spark SQL для обработки структурированных данных, создание DataFrame, конфигурирование свойств Spark, а также управление жизненным циклом SparkContext и SparkSession. Ниже приведён пример того, как может быть создан SparkSession:
|
1 2 3 4 5 6 |
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("MySparkApp") \ .master("local[*]") \ .getOrCreate() |
Опиши различные способы чтения данных в PySpark.
PySpark поддерживает чтение данных из различных источников, таких как CSV, Parquet и JSON, среди прочих. Для этой цели он предоставляет разные методы, включая spark.read.csv(), spark.read.parquet(), spark.read.json(), spark.read.format() и spark.read.load(). Ниже приведён пример того, как данные могут быть прочитаны в PySpark:
|
1 2 3 |
df_from_csv = spark.read.csv("my_file.csv", header=True) df_from_parquet = spark.read.parquet("my_file.parquet") df_from_json = spark.read.json("my_file.json") |
Как обрабатывать пропущенные данные в PySpark?
В PySpark пропущенные данные можно обрабатывать с использованием нескольких методов. Можно удалять строки или столбцы, содержащие пропущенные значения, с помощью метода .dropna(). Также можно заполнять пропущенные данные конкретным значением или использовать методы интерполяции с помощью .fillna(). Кроме того, можно выполнять импутацию пропущенных значений с использованием статистических методов, таких как среднее значение или медиана, применяя Imputer. Ниже приведён пример обработки пропущенных данных в PySpark:
|
1 2 3 4 5 6 7 8 9 10 11 |
# Как удалить строки df_from_csv.dropna(how="any") # Как заполнить пропущенные значения константой df_from_parquet.fillna(value=2) # Как выполнить импутацию значений медианой from pyspark.ml.feature import Imputer imputer = Imputer(strategy="median", inputCols=["price","rooms"], outputCols=["price_imputed","rooms_imputed"]) model = imputer.fit(df_from_json) df_imputed = model.transform(df_from_json) |
Как можно кэшировать данные в PySpark для повышения производительности?
Одним из преимуществ PySpark является возможность использовать методы .cache() или .persist() для хранения данных в памяти или на заданном уровне хранения. Это улучшает производительность за счёт предотвращения повторных вычислений и снижения необходимости сериализации и десериализации данных. Ниже приведён пример того, как кэшировать данные в PySpark:
|
1 2 3 4 5 |
# Как кэшировать данные в памяти df_from_csv.cache() # Как сохранить данные на локальном диске df_from_csv.persist(storageLevel=StorageLevel.DISK_ONLY) |
При использовании cache() применяется только уровень хранения по умолчанию:
MEMORY_ONLYдля RDDMEMORY_AND_DISKдля Dataset
При использовании persist() вы можете указать нужный уровень хранения как для RDD, так и для Dataset.
Из официальной документации:
- Вы можете пометить RDD для сохранения с помощью методов
persist()илиcache(). - Каждый сохранённый RDD может храниться с использованием разного уровня хранения.
- Метод
cache()— это сокращённая форма использования уровня хранения по умолчанию, а именноStorageLevel.MEMORY_ONLY(хранение десериализованных объектов в памяти).
Используйте persist(), если вы хотите назначить уровень хранения, отличный от:
MEMORY_ONLYдля RDD- или
MEMORY_AND_DISKдля Dataset
Подробнее почитать:
Опиши выполнение соединений в PySpark
PySpark позволяет выполнять несколько типов соединений, таких как inner, outer, left и right. Используя метод .join(), можно задать условие соединения через параметр on и тип соединения через параметр how, как показано в примере:
|
1 2 3 4 5 |
# Как выполнить inner join двух наборов данных df_from_csv.join(df_from_json, on="id", how="inner") # Как выполнить outer join наборов данных df_from_json.join(df_from_parquet, on="product_id", how="outer") |
В чём заключаются ключевые различия между RDD, DataFrame и Dataset в PySpark?
Spark Resilient Distributed Dataset (RDD), DataFrame и Dataset являются ключевыми абстракциями в Spark, которые позволяют работать со структурированными данными в распределённой вычислительной среде. Несмотря на то что все они представляют данные, между ними существуют важные различия.
RDD являются низкоуровневыми API, не имеющими схемы и предоставляющими полный контроль над данными; они представляют собой неизменяемые коллекции объектов.
DataFrame являются высокоуровневыми API, построенными поверх RDD и оптимизированными для производительности, но не обладающими типобезопасностью; они организуют структурированные и полуструктурированные данные в именованные столбцы.
Dataset объединяют преимущества RDD и DataFrame, являясь высокоуровневыми API, которые предоставляют типобезопасную абстракцию; они поддерживают Python и Scala, обеспечивают проверку типов во время компиляции и при этом работают быстрее, чем DataFrame.
Объясни концепцию ленивых вычислений в PySpark. Как она влияет на производительность?
PySpark реализует стратегию, называемую ленивыми вычислениями, при которой преобразования, применяемые к распределённым наборам данных, таким как RDD, DataFrame или Dataset, не выполняются немедленно. Вместо этого Spark строит последовательность операций или преобразований, которые должны быть выполнены над данными, называемую ориентированным ациклическим графом, или DAG. Такой подход улучшает производительность и оптимизирует выполнение, поскольку вычисления откладываются до момента, когда вызывается действие и их выполнение становится действительно необходимым.
Какова роль партиционирования в PySpark и каким образом оно может улучшить производительность?
В PySpark партиционирование данных является ключевой возможностью, которая помогает равномерно распределять нагрузку между узлами кластера. Партиционирование означает разделение данных на более мелкие части, называемые партициями, которые обрабатываются независимо и параллельно на разных узлах кластера.
Это повышает производительность за счёт параллельной обработки, уменьшения перемещения данных и более эффективного использования ресурсов. Управлять партиционированием можно с помощью таких методов, как .repartition() и .coalesce().
Объясни концепцию широковещательных переменных в PySpark и приведи пример использования
Широковещательные переменные являются важной возможностью распределённых вычислительных фреймворков Spark.
В PySpark это разделяемые переменные только для чтения, которые кэшируются и распространяются по узлам кластера для того, чтобы избежать операций shuffle. Они могут быть особенно полезны в распределённых приложениях машинного обучения, которым необходимо использовать и загружать предварительно обученную модель. В этом случае модель передаётся как широковещательная переменная, что помогает сократить накладные расходы на передачу данных и повысить производительность.
В чём различия между PySpark и pandas?
PySpark и pandas оба широко используются для обработки данных, однако между ними существуют ключевые различия. PySpark ориентирован на масштабируемость и предназначен для работы с большими данными и распределённой обработки, тогда как pandas подходит для относительно небольших наборов данных, которые помещаются в память.
С точки зрения производительности PySpark выполняет параллельные вычисления на уровне кластера, что делает его значительно быстрее при работе с большими объёмами данных по сравнению с pandas, который работает на одной машине. С точки зрения удобства использования pandas проще для разведочного анализа данных, тогда как PySpark более сложен, но при этом сильно оптимизирован для распределённых вычислений.
Как можно преобразовать DataFrame из pandas в PySpark DataFrame и обратно?
DataFrame pandas можно преобразовать в PySpark DataFrame с помощью метода spark.createDataFrame(), а PySpark DataFrame обратно в pandas DataFrame с помощью метода .toPandas().
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import pandas as pd from pyspark.sql import SparkSession # Инициализация SparkSession spark = SparkSession.builder.appName("Example").getOrCreate() # Создание Pandas DataFrame pdf = pd.DataFrame({'id': [1, 2, 3], 'value': [10, 20, 30]}) # Преобразование в PySpark DataFrame df_spark = spark.createDataFrame(pdf) # Обратное преобразование в Pandas DataFrame pdf_new = df_spark.toPandas() |
Intermediate PySpark Interview Questions
Рассмотрев основы, перейдём к вопросам для собеседования по PySpark среднего уровня, которые глубже затрагивают архитектуру и модель выполнения приложений Spark.
Что такое Spark Driver и каковы его обязанности?
Spark Driver — это основной процесс, который координирует выполнение приложений Spark, распределяя задачи по кластеру. Он взаимодействует с менеджером кластера для выделения ресурсов, планирования задач и мониторинга выполнения Spark-задач (Tasks).
Что такое Spark DAG?
Ориентированный ациклический граф (DAG) в Spark является ключевым понятием, поскольку он представляет логическую модель выполнения Spark. Он называется ориентированным, потому что каждая вершина представляет преобразование, выполняемое в определённом порядке, заданном рёбрами. Он является ациклическим, так как в плане выполнения отсутствуют циклы или петли. Этот план оптимизируется с использованием конвейерных преобразований, объединения задач и проталкивания предикатов.
Какие типы менеджеров кластеров доступны в Spark?
В настоящее время Spark поддерживает несколько менеджеров кластеров для управления ресурсами и планирования заданий.
- К ним относится Standalone — простой менеджер кластера, встроенный в Spark.
- Hadoop YARN — универсальный менеджер в экосистеме Hadoop, используемый для планирования заданий и управления ресурсами.
- Kubernetes применяется для автоматизации, развёртывания, масштабирования и управления контейнеризованными приложениями.
- Apache Mesos — распределённая система, используемая для управления ресурсами на уровне приложений.
Опиши, как реализовать пользовательское преобразование в PySpark
Для реализации пользовательского преобразования в PySpark можно определить Python-функцию, которая работает с PySpark DataFrame, а затем использовать метод .transform() для вызова этого преобразования. Ниже приведён пример реализации пользовательского преобразования в PySpark:
|
1 2 3 4 5 6 7 |
# Определение Python-функции, работающей с PySpark DataFrame def get_discounted_price(df): return df.withColumn("discounted_price", \ df.price - (df.price * df.discount) / 100) # Вызов преобразования df_discounted = df_from_csv.transfrom(get_discounted_price) |
Объясни концепцию оконных функций в PySpark и приведи пример
Оконные функции в PySpark позволяют применять операции к окну строк, возвращая одно значение для каждой входной строки. С их помощью можно выполнять ранжирование, аналитические операции и агрегатные функции. Ниже приведён пример применения оконной функции в PySpark:
|
1 2 3 4 5 6 7 8 |
from pyspark.sql.window import Window from pyspark.sql.functions import row_number # Определение оконной функции window = Window.orderBy("discounted_price") # Применение оконной функции df = df_from_csv.withColumn("row_number", row_number().over(window)) |
Как обрабатывать ошибки и исключения в PySpark?
Одним из наиболее полезных способов обработки ошибок и исключений в преобразованиях и действиях PySpark является оборачивание кода в блоки try-except для их перехвата. В RDD можно использовать операцию foreach для итерации по элементам и обработки исключений.
Каково назначение чекпойнтов в PySpark?
В PySpark чекпойнтинг означает сохранение RDD на диск, чтобы к этой промежуточной точке можно было обратиться в будущем вместо повторного вычисления RDD из исходного источника. Чекпойнты обеспечивают возможность восстановления после сбоев, поскольку драйвер может быть перезапущен с использованием ранее вычисленного состояния.
Как PySpark выполняет вывод схемы и как можно задать схему явно?
PySpark автоматически выводит схему при загрузке структурированных данных, однако для лучшего контроля и повышения эффективности можно явно определить схему с помощью StructType и StructField.
|
1 2 3 4 5 6 7 8 |
from pyspark.sql.types import StructType, StructField, IntegerType, StringType schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True) ]) df = spark.read.csv("data.csv", schema=schema, header=True) |
Advanced PySpark Interview Questions
Для тех, кто претендует на более старшие позиции или стремится продемонстрировать более глубокое понимание PySpark, далее рассматриваются продвинутые вопросы для собеседования, которые углубляются в тонкости преобразований и оптимизаций внутри экосистемы PySpark.
Объясни различия между узкими и широкими преобразованиями в PySpark
В PySpark узкие преобразования выполняются тогда, когда каждый входной партиции соответствует не более одной выходной партиции и при этом не требуется выполнение shuffle. К таким преобразованиям относятся, например, map(), filter() и union(). В противоположность этому, широкие преобразования необходимы для операций, при которых каждая входная партиция может вносить вклад в несколько выходных партиций, и при этом требуется перераспределение данных, выполнение соединений или агрегаций. Примерами таких преобразований являются groupBy(), join() и sortBy().
Что такое оптимизатор Catalyst в Spark и как он работает?
В Spark оптимизатор Catalyst является компонентом Spark SQL, основанным на правилах и предназначенным для оптимизации производительности запросов. Его основная задача заключается в преобразовании и улучшении SQL-запросов или операций DataFrame, заданных пользователем, с целью генерации эффективного физического плана выполнения, адаптированного под конкретный запрос и характеристики набора данных.
Опиши, как реализовать пользовательские агрегации в PySpark
Для реализации пользовательских агрегаций в PySpark можно совместно использовать методы groupBy() и agg(). Внутри вызова agg() можно передавать различные функции из модуля pyspark.sql.functions. Кроме того, можно применять пользовательские агрегации Pandas к группам внутри PySpark DataFrame с помощью метода .applyInPandas(). Ниже приведён пример реализации пользовательских агрегаций в PySpark:
|
1 2 3 4 5 6 7 8 9 10 |
# Использование groupBy и agg с функциями from pyspark.sql import functions as F df_from_csv.groupBy("house_id").agg(F.mean("price_discounted")) # Использование applyInPandas def normalize_price(df): disc_price = df["discounted_price"] df["normalized_price"] = disc_price.mean() / disc_price.std() df_from_csv.groupBy("house_id").applyInPandas(normalize_price) |
С какими трудностями ты сталкивался при работе с большими наборами данных в PySpark и как ты их преодолевал?
С помощью этого вопроса можно обратиться к собственному опыту и рассказать о конкретном случае, в котором возникали сложности при работе с PySpark и большими наборами данных, которые могут включать следующее:
- Управление памятью и использование ресурсов.
- Перекос данных и неравномерное распределение нагрузки.
- Оптимизация производительности, особенно для широких преобразований и операций shuffle.
- Отладка и устранение неисправностей сложных сбоев заданий.
- Эффективное партиционирование и хранение данных.
Для преодоления этих проблем PySpark предоставляет возможности партиционирования наборов данных, кэширования промежуточных результатов, использования встроенных техник оптимизации, надёжного управления кластером и применения механизмов отказоустойчивости.
Как интегрировать PySpark с другими инструментами и технологиями в экосистеме больших данных?
PySpark обладает тесной интеграцией с различными инструментами для работы с большими данными, включая Hadoop, Hive, Kafka и HBase, а также с облачными хранилищами, такими как AWS S3 и Google Cloud Storage.
Такая интеграция осуществляется с использованием встроенных коннекторов, библиотек и API, предоставляемых PySpark.
Какие лучшие практики существуют для тестирования и отладки приложений PySpark?
К числу рекомендуемых лучших практик для тестирования и отладки приложений PySpark относятся написание модульных тестов с использованием pyspark.sql.test.SQLTestUtils совместно с Python-библиотеками, такими как pytest, отладка приложений и логирование сообщений с помощью библиотеки logging, а также Spark UI, и оптимизация производительности с использованием API Spark org.apache.spark.metrics и инструментов мониторинга производительности.
Как бы ты решал вопросы безопасности и конфиденциальности данных в среде PySpark?
В настоящее время обмен данными стал значительно проще, поэтому защита чувствительной и конфиденциальной информации является важным способом предотвращения утечек данных. Одной из лучших практик является применение шифрования данных во время обработки и хранения. В PySpark этого можно добиться, используя функции aes_encrypt() и aes_decrypt() для столбцов DataFrame. Также для достижения этой цели можно использовать сторонние библиотеки, такие как библиотека cryptography.
Опиши, как использовать PySpark для построения и развёртывания модели машинного обучения
PySpark предоставляет библиотеку MLlib — масштабируемую библиотеку машинного обучения для построения и развёртывания моделей машинного обучения на больших наборах данных. API этой библиотеки может использоваться на различных этапах ML-процесса, таких как предварительная обработка данных, инженерия признаков, обучение модели, оценка качества и развёртывание. Используя кластеры Spark, можно развёртывать модели машинного обучения на базе PySpark в промышленной среде, применяя пакетный или потоковый инференс.
Как можно оптимизировать операции shuffle в PySpark?
Операции shuffle возникают, когда данные перераспределяются между партициями, и они могут быть затратными с точки зрения производительности. Для оптимизации shuffle можно применять следующие подходы:
- Стратегически использовать
repartition()для балансировки партиций перед затратными операциями, такими какjoin. - Отдавать предпочтение
coalesce()вместоrepartition()при уменьшении количества партиций, поскольку это минимизирует перемещение данных. - Выполнять широковещательные соединения небольших таблиц с помощью
broadcast()перед соединением с большими таблицами, чтобы избежать операций, интенсивно использующихshuffle. - Настраивать конфигурации Spark, такие как
spark.sql.shuffle.partitions, для оптимизации количества партиций при shuffle-операциях.
Вопросы для собеседования по PySpark для Data Engineer
Если вы проходите собеседование на позицию инженера данных, ожидайте вопросы, которые оценивают вашу способность проектировать, оптимизировать и устранять проблемы в приложениях PySpark в промышленной среде.
Ниже приведены типичные вопросы, с которыми можно столкнуться.
Опиши, как бы ты оптимизировал задание PySpark, которое работает медленно. На какие ключевые факторы ты бы обратил внимание?
Если задание PySpark работает медленно, существует несколько аспектов, которые можно улучшить для оптимизации его производительности:
- Обеспечение корректного размера и количества партиций данных для минимизации перераспределения данных во время преобразований.
- Использование DataFrame вместо RDD, поскольку они уже используют несколько модулей оптимизации для повышения производительности рабочих нагрузок Spark.
- Использование широковещательных соединений и широковещательных переменных при соединении небольшого набора данных с большим набором данных.
- Кэширование и сохранение промежуточных DataFrame, которые используются повторно.
- Настройка количества партиций, ядер исполнителей и числа экземпляров для эффективного использования ресурсов кластера.
Выбор подходящих форматов файлов для уменьшения объёма данных.
Как обеспечить отказоустойчивость в приложениях PySpark?
Для обеспечения отказоустойчивости в приложениях PySpark можно использовать несколько стратегий:
- Использование чекпойнтинга для сохранения данных в определённых точках.
- Репликация данных путём их сохранения на разных машинах.
- Ведение журнала изменений, выполняемых над данными до их применения.
- Выполнение проверок валидации данных для выявления ошибок.
- Выбор корректного уровня сохранения данных.
Использование встроенных механизмов отказоустойчивости Spark для автоматического повторного выполнения задач, которые завершились с ошибкой.
Какие существуют способы развертывания и управления приложениями PySpark?
Мы можем развертывать и управлять приложениями PySpark с помощью следующих инструментов:
- YARN: менеджер ресурсов, который помогает развертывать и управлять приложениями в Hadoop-кластерах.
- Kubernetes: Spark предоставляет поддержку для развертывания приложений в кластерах Kubernetes.
- Databricks: предоставляет полностью управляемую платформу для приложений PySpark, абстрагируя сложность управления кластерами.
Как вы бы мониторили и устраняли проблемы в заданиях PySpark, работающих в production-среде?
PySpark предоставляет следующие инструменты для мониторинга и устранения проблем заданий, работающих в production-среде:
- Spark UI: веб-интерфейс, который помогает отслеживать прогресс выполнения заданий, использование ресурсов и выполнение задач.
- Логирование: мы можем настроить логирование для сбора детальной информации об ошибках и предупреждениях.
- Метрики: мы можем использовать системы мониторинга для сбора и анализа метрик, связанных с состоянием кластера и производительностью заданий.
Объясните разницу между динамическим и статическим распределением ресурсов в Spark и в каких случаях вы бы выбрали каждый из них
В Spark статическое распределение ресурсов означает предварительное и постоянное выделение фиксированных ресурсов, таких как память и количество ядер executors, на всё время выполнения приложения. В отличие от этого, динамическое распределение ресурсов позволяет Spark динамически изменять количество executors в зависимости от нагрузки. Ресурсы могут добавляться или удаляться по мере необходимости, что улучшает использование ресурсов и снижает затраты.
Как вы принимаете решение между использованием DataFrames и RDD в PySpark?
Выбор между DataFrames и RDD зависит от структуры данных и типа операций, которые необходимо выполнять.
Используйте DataFrames, когда:
- требуется обработка структурированных данных со схемой;
- нужна оптимизированная обработка с использованием Catalyst и Tungsten;
- используются SQL-запросы и встроенные трансформации.
Используйте RDD, когда:
- нужны низкоуровневые трансформации и более детальный контроль над вычислениями;
- вы работаете с неструктурированными или полуструктурированными данными;
- требуется большая гибкость в определении трансформаций.
Как бы вы реализовали инкрементальную обработку данных в PySpark?
Инкрементальная обработка необходима для эффективной работы с постоянно растущими наборами данных. Она может быть реализована с помощью:
- Использования Delta Lake: хранение обновлений в формате Delta позволяет эффективно обрабатывать инкрементальные изменения.
- Использования watermarking в structured streaming: помогает отбрасывать устаревшие данные, сохраняя при этом агрегаты с состоянием.
- Партиционирования и фильтрации: загрузка только новых или изменённых данных вместо переработки всего объёма.
- Использования checkpointing: сохранение промежуточных результатов для предотвращения переработки с нуля в случае сбоя.






Leave a Reply