"Управляем ошибками. Часть 1 Unprocessable Records К нам пришли строки, которые падают при обработке. Есть 2 пути: 1. Сразу падать. Но если в стриминге много плохих событий, то мы замучаемся постоянно переподнимать 2. Не падать, а игнорить их - но особым образом 📒 Pattern: Dead-Letter Что делаем: 1. Определяем места в коде, где что-то может упасть 2. Оборачиваем в try/catch, if/else 3. Добавляем мету для анализа ошибки 4. Пишем зафейленные строки/файлы в другую папку 5. Добавляем алерты 6. Пишем пайплайн для перезапуска из зафейленной папки (опционально) У нас есть подобная штука - если поля критичные и точно не должны быть пустыми, то такие данные сразу складываются отдельно. Но разбираются ли причины - это загадка)🤷‍♀️ Опасности и решения: 1. Если просто отфильтровать некорректные записи, то другие пайплайны будут использовать неполные данные. Но если мы поправим ошибки и перепроцессим, то пользователям тоже придется все перезапускать. А там может быть 20 пайплайнов, которые ссылаются друг на друга. И вообще у них может быть не реализован пересчет за прошлое 🙂 2. Чтобы отличать скорректированные записи, может понадобиться какой-нибудь флаг 3. Можно заполнять NULL при ошибке, но тогда придется сравнивать: это действительно NULL-значение в источнике, или что-то пошло не так? (мне не нравится) 4. Обязательно алертить, если количество проигноренных данных очень большое, и даже останавливать джобу. Прям с остановкой дальнейших тасок я не сталкивалась, интересный подход на полном доверии к dq) 🌿🌿 Duplicated Records Pattern: Windowed Deduplicator Для батча все просто: distinct/dropDuplicates или окно с row_number = 1 Для стриминга нужно выделить временное окно и сохранять уже обработанные уникальные ключи: ``` .withWatermark(""visit_time"", ""10 minutes"") .dropDuplicates([""visit_id"", ""visit_time""]) ``` В этом примере ключи будут храниться в течение 10 минут. Если для новой записи ключ уже существует, он скипнется. Если тот же самый ключ придет через 11 минут, то будут дубликаты #depatterns"