"Управляем ошибками. Часть 1 Unprocessable Records К нам пришли строки, которые падают при обработке. Есть 2 пути: 1. Сразу падать. Но если в стриминге много плохих событий, то мы замучаемся постоянно переподнимать 2. Не падать, а игнорить их - но особым образом 📒 Pattern: Dead-Letter Что делаем: 1. Определяем места в коде, где что-то может упасть 2. Оборачиваем в try/catch, if/else 3. Добавляем мету для анализа ошибки 4. Пишем зафейленные строки/файлы в другую папку 5. Добавляем алерты 6. Пишем пайплайн для перезапуска из зафейленной папки (опционально) У нас есть подобная штука - если поля критичные и точно не должны быть пустыми, то такие данные сразу складываются отдельно. Но разбираются ли причины - это загадка)🤷♀️ Опасности и решения: 1. Если просто отфильтровать некорректные записи, то другие пайплайны будут использовать неполные данные. Но если мы поправим ошибки и перепроцессим, то пользователям тоже придется все перезапускать. А там может быть 20 пайплайнов, которые ссылаются друг на друга. И вообще у них может быть не реализован пересчет за прошлое 🙂 2. Чтобы отличать скорректированные записи, может понадобиться какой-нибудь флаг 3. Можно заполнять NULL при ошибке, но тогда придется сравнивать: это действительно NULL-значение в источнике, или что-то пошло не так? (мне не нравится) 4. Обязательно алертить, если количество проигноренных данных очень большое, и даже останавливать джобу. Прям с остановкой дальнейших тасок я не сталкивалась, интересный подход на полном доверии к dq) 🌿🌿 Duplicated Records Pattern: Windowed Deduplicator Для батча все просто: distinct/dropDuplicates или окно с row_number = 1 Для стриминга нужно выделить временное окно и сохранять уже обработанные уникальные ключи: ``` .withWatermark(""visit_time"", ""10 minutes"") .dropDuplicates([""visit_id"", ""visit_time""]) ``` В этом примере ключи будут храниться в течение 10 минут. Если для новой записи ключ уже существует, он скипнется. Если тот же самый ключ придет через 11 минут, то будут дубликаты #depatterns"
"Управляем ошибками. Часть 1 Unprocessable Records К нам пришли строки,…
Из этого канала
- #494"Управляем ошибками. Часть 2 📼 Late Data Посмотрим на 3 паттерна для работы с…
"Управляем ошибками. Часть 2 📼 Late Data Посмотрим на 3 паттерна для работы с данными, которые пришли позже, чем ожидалось 1️⃣Pattern: Late Data Detector Пока…
- #495Мои заметки У меня скопилось много заметок с митапов, из книг и разговоров. Я…
Мои заметки У меня скопилось много заметок с митапов, из книг и разговоров. Я решила все собрать в один пост, а у себя почистить) Это будут просто рандомные…
- #497"Возможен ли Sort-Merge Join без шафла? Ага, в спарке есть такая оптимизация -…
"Возможен ли Sort-Merge Join без шафла? Ага, в спарке есть такая оптимизация - Storage Partition Join (SPJ).
- #492Козырной вопрос А у вас есть прикольный вопрос, который вы спрашиваете у…
Козырной вопрос А у вас есть прикольный вопрос, который вы спрашиваете у руководителя на собесе? Команда, стек, задачи, скрам/не скрам - это все понятно.
- #490Сопутки загрузок 🍔 Data Compaction Pattern: Compactor Стриминг пишет очень…
Сопутки загрузок 🍔 Data Compaction Pattern: Compactor Стриминг пишет очень много мелких файлов, и эта проблема актуальна не только для hdfs, но и для s3.