"Возможен ли Sort-Merge Join без шафла? Ага, в спарке есть такая оптимизация - Storage Partition Join (SPJ). Если таблицы одинаково партиционированы и джойнятся по этому ключу, то не приходится перемещать кучу данных. Правда, это работает только в форматах, поддерживающих DataSource API V2. Например, в Iceberg, Hudi, Delta Lake Что нужно: 🌸 Spark 3.3+ 🌸 Условие равенства в джойне 🌸 У таблиц должно быть общее поле-партиция, которое входит в ключ джойна 🎈 Пример Я взяла 2 таблицы, партицировала, заполнила данными и положила. Потом их поджойнила, и вот какой план запроса у меня сначала получился (картинка 1): ``` == Physical Plan == CollectLimit +- * Project +- * SortMergeJoin Inner :- * Sort : +- Exchange : +- * Project : +- BatchScan spark_catalog.test.table1 +- * Sort +- Exchange +- * Project +- BatchScan spark_catalog.test.table2 ``` У нас есть Exchange - происходит шафл даных. Потом я накинула несколько конфигов: ``` # включает Storage Partition Join spark.conf.set(""spark.sql.sources.v2.bucketing.enabled"", ""true"") # сохраняет текущее партиционирование при планировании запроса spark.conf.set(""spark.sql.iceberg.planning.preserve-data-grouping"", ""true"") # убирает шафл, когда партиции между таблицами не совпадают по количеству spark.conf.set(""spark.sql.sources.v2.bucketing.pushPartValues.enabled"", ""true"") # иначе ключи джойна должны быть такие же, как партиции, в том же порядке spark.conf.set(""spark.sql.requireAllClusterKeysForCoPartition"", ""false"") # борется с перекосом данных spark.conf.set(""spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled"", ""true"") # разрешает включать части партиций в ключ джойна spark.conf.set(""spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled"", ""true"") # на всякий можно отключить broadcast, адаптивку и sortmerge (если это точно нужно) spark.conf.set(""spark.sql.autoBroadcastJoinThreshold"", -1) spark.conf.set(""spark.sql.adaptive.enabled"", ""false"") spark.conf.set(""spark.sql.join.preferSortMergeJoin"", ""false"") ``` План запроса чуть поменялся (картинка 2) - исчез шафл: ``` == Physical Plan == CollectLimit +- * Project +- * SortMergeJoin Inner :- * Sort : +- * Project : +- BatchScan spark_catalog.test.table1 +- * Sort +- * Project +- BatchScan spark_catalog.test.table2``` В одной из статей говорится, что перформанс увеличился на 45-70% на джойнах. А кто-то уже использовал на практике? @data_engineerette"
"Возможен ли Sort-Merge Join без шафла? Ага, в спарке есть такая оптимизация -…
Из этого канала
- #499"Минутка рефлексии Не люблю рефлексировать, потому что это всегда вгоняет меня…
"Минутка рефлексии Не люблю рефлексировать, потому что это всегда вгоняет меня в тоску😔 Но мем прям метчится со мной.
- #500"Как ускорить ClickHouse? Обсуждали с одним де перформанс систем и связку Trino…
"Как ускорить ClickHouse? Обсуждали с одним де перформанс систем и связку Trino + ClickHouse.
- #501Понедельничная мантра Всем прекрасной рабочей недели🐾
Понедельничная мантра Всем прекрасной рабочей недели🐾
- #495Мои заметки У меня скопилось много заметок с митапов, из книг и разговоров. Я…
Мои заметки У меня скопилось много заметок с митапов, из книг и разговоров. Я решила все собрать в один пост, а у себя почистить) Это будут просто рандомные…
- #494"Управляем ошибками. Часть 2 📼 Late Data Посмотрим на 3 паттерна для работы с…
"Управляем ошибками. Часть 2 📼 Late Data Посмотрим на 3 паттерна для работы с данными, которые пришли позже, чем ожидалось 1️⃣Pattern: Late Data Detector Пока…