"Управляем ошибками. Часть 2 📼 Late Data Посмотрим на 3 паттерна для работы с данными, которые пришли позже, чем ожидалось 1️⃣Pattern: Late Data Detector Пока что для меня сложная и непонятная история про стриминг. В целом, книга легко читается, но требуется время, чтобы переварить. Все концепты сжаты, но очень насыщенны. Перечитаю, когда нужно будет с этим работать Интересная мысль - ""shifting the late data problem"" Пример: мы пишем партиции по времени обработки. В партицию 21:00 к нам залетел кусок данных за 20:00 и 19:00. А наши пользователи используют партиции по времени события. Тогда мы перекладываем ответственность ковыряться в этих партициях на них 😁 2️⃣Pattern: Static Late Data Integrator Как вообще можно перегрузить данные за прошлое? 1. Создать кучу дагранов, где каждый перегружает 1 день. Если упало - перезапускаем конкретный день 2. Создать один дагран, где в коде генерируется список нужных дат. И по каждой дате запускается загрузка. Если упало - просто перезапускаем, пойдет считаться с упавшего дня. Это и есть Static Late Data Integrator. А статическое - потому что мы сами задаем 14 дней или сколько угодно И тут я поняла, что неосознанно это и делала. У нас часто была проблема, что данные в источники просто не приходили 😁 Потом мы шли разбираться с владельцами, и данные заливались, но позднее. Чтобы это учитывать, в моем подходе был такой алгоритм: 1. Задаем стартовую и конечную даты расчета 2. Создаем диапазон значений ``` full_range = pd.date_range(start=str(start_dt), end=str(end_dt)).strftime(""%Y-%m-%d"") ``` 3. Из меты достаем существующие партиции ``` def get_existing_partitions(table_name): partitions = ( spark.sql(f""show partitions {table_name}"") .select(F.split(F.col(""partition""), ""="")[1].alias(""dt"")) .collect() ) return [p[0] for p in partitions] ``` 4. Находим разницу ``` lost_range = full_range.difference(existing_partitions_pdf) ``` 5. Итерируемся по потеряшкам ``` for dt in lost_range: calc_mart(dt) ``` Если в будущем снова будет пустая дата, нам не придется перезапускать определенный день - он пойдет считаться сам 3️⃣Pattern: Dynamic Late Data Integrator Предлагается завести табличку с 4 полями: 🤩партиция 🤩время обработки 🤩время добавления новых записей 🤩флаг обработано или нет Так мы запросом можем найти партиции, которые уже обрабатывались, но в которые попали новые данные. А в iceberg есть удобное свойство last_updated_at на уровне таблицы 🤩 Filtering Pattern: Filter Interceptor Как будто это антипаттерн. Предлагается создать доп колонки с фильтрами id_is_not_null, status_is_not_failed и выводить количество отфильтрованных записей, чтобы понимать, на каком этапе ошибка в коде или в данных. Но прям пробегаться по каждой записи в датафрейме… Как будто это все-таки dq 🌳 Fault Tolerance Pattern: Checkpointer Просто нужно создавать чекпоинты и хранить последний оффсет обработанной записи и состояние, если оно есть Еще раз напомнили про семантики доставки: 🤩exactly once - нужны другие паттерны, расскажу, когда дойду 🤩at least once - чекпоинт после обработки, могут быть дубликаты при перезапуске после падения 🤩at most once - чекпоинт до обработки, данные потеряются при падении #depatterns"
"Управляем ошибками. Часть 2 📼 Late Data Посмотрим на 3 паттерна для работы с…
Из этого канала
- #495Мои заметки У меня скопилось много заметок с митапов, из книг и разговоров. Я…
Мои заметки У меня скопилось много заметок с митапов, из книг и разговоров. Я решила все собрать в один пост, а у себя почистить) Это будут просто рандомные…
- #497"Возможен ли Sort-Merge Join без шафла? Ага, в спарке есть такая оптимизация -…
"Возможен ли Sort-Merge Join без шафла? Ага, в спарке есть такая оптимизация - Storage Partition Join (SPJ).
- #499"Минутка рефлексии Не люблю рефлексировать, потому что это всегда вгоняет меня…
"Минутка рефлексии Не люблю рефлексировать, потому что это всегда вгоняет меня в тоску😔 Но мем прям метчится со мной.
- #493"Управляем ошибками. Часть 1 Unprocessable Records К нам пришли строки,…
"Управляем ошибками. Часть 1 Unprocessable Records К нам пришли строки, которые падают при обработке. Есть 2 пути: 1. Сразу падать.
- #492Козырной вопрос А у вас есть прикольный вопрос, который вы спрашиваете у…
Козырной вопрос А у вас есть прикольный вопрос, который вы спрашиваете у руководителя на собесе? Команда, стек, задачи, скрам/не скрам - это все понятно.