Перейти к основному содержимому
Версия: 2.0.x

Apache Airflow

Обеспечивает оркестрацию и планирование рабочих процессов данных с Selena с использованием DAG (Directed Acyclic Graphs - направленных ациклических графов) и SQL-операторов. Используйте Airflow для загрузки и трансформации данных с помощью SQLExecuteQueryOperator и MySQLHook без какой-либо реализации или сложной конфигурации. Репозиторий Apache Airflow на GitHub.

Поддерживаемые функции

  • Выполнение SQL через протокол MySQL
  • Управление подключениями
  • Поддержка транзакций
  • Параметризованные запросы
  • Зависимости задач
  • Логика повторных попыток

Установка

Предварительные требования

Установка

Пакет провайдера MySQL требуется для использования Selena, так как Selena использует протокол MySQL.

pip install apache-airflow-providers-mysql

Проверьте установку, проверив установленные провайдеры:

airflow providers list

Это должно показать apache-airflow-providers-mysql в выводе.

Конфигурация

Создание подключения Selena

Создайте подключение Selena в UI Airflow или через переменную окружения. Имя подключения будет использоваться DAG позже.

Через UI Airflow

  1. Перейдите в Admin > Connections
  2. Нажмите кнопку + для добавления нового подключения
  3. Настройте подключение:
  • Connection Id: selena_default
  • Connection Type: MySQL
  • Host: your-selena-host.com
  • Schema: your_database
  • Login: your_username
  • Password: your_password
  • Port: 9030

Через CLI Airflow

airflow connections add 'selena_default' \
--conn-type 'mysql' \
--conn-host 'your-selena-host.com' \
--conn-schema 'your_database' \
--conn-login 'your_username' \
--conn-password 'your_password' \
--conn-port 9030

Примеры использования

Эти примеры демонстрируют общие паттерны интеграции Selena с Airflow. Каждый пример основывается на базовых концепциях, демонстрируя различные подходы к загрузке данных, трансформации и оркестрации рабочих процессов.

Что вы узнаете:

  • Загрузка данных: Эффективная загрузка данных из CSV-файлов и облачного хранилища в Selena
  • Трансформация данных: Выполнение SQL-запросов и обработка результатов с помощью Python
  • Продвинутые паттерны: Реализация инкрементальной загрузки, асинхронных операций и оптимизации запросов
  • Лучшие практики для продакшена: Корректная обработка ошибок и построение устойчивых конвейеров

Все примеры используют таблицы данных об авариях из Быстрого старта.

Загрузка данных

Потоковая загрузка данных

Эффективная загрузка больших CSV-файлов с использованием API Stream Load Selena. Stream Load - рекомендуемый подход для:

  • Высокопроизводительной загрузки данных (поддерживает параллельные загрузки)
  • Загрузки данных с трансформациями столбцов и фильтрацией

Stream Load обеспечивает лучшую производительность, чем операторы INSERT INTO VALUES для больших наборов данных, и включает встроенные функции, такие как допуск ошибок. Обратите внимание, что это требует, чтобы CSV-файл был доступен в файловой системе воркера Airflow.

from airflow.sdk import dag, task
from airflow.hooks.base import BaseHook
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
from urllib.parse import urlparse


class PreserveAuthSession(requests.Session):
"""
Пользовательская сессия, сохраняющая заголовок Authorization при редиректах.
Selena FE может перенаправлять запросы Stream Load на узлы BE.
"""
def rebuild_auth(self, prepared_request, response):
old = urlparse(response.request.url)
new = urlparse(prepared_request.url)

# Сохранять авторизацию только при редиректе на тот же хост
if old.hostname == new.hostname:
prepared_request.headers["Authorization"] = response.request.headers.get("Authorization")
@dag(
dag_id="selena_stream_load_example",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["selena", "stream_load", "example"],
)
def selena_stream_load_example():
@task
def load_csv_to_selena():
# Конфигурация
DATABASE = "quickstart"
TABLE = "crashdata"
CSV_PATH = "/path/to/crashdata.csv"

conn = BaseHook.get_connection("selena_default")
url = f"http://{conn.host}:{conn.port}/api/{DATABASE}/{TABLE}/_stream_load"

# Генерация уникальной метки
from airflow.sdk import get_current_context
context = get_current_context()
execution_date = context['logical_date'].strftime('%Y%m%d_%H%M%S')
label = f"{TABLE}_load_{execution_date}"

headers = {
"label": label,
"column_separator": ",",
"skip_header": "1",
"max_filter_ratio": "0.1", # Допускать до 10% ошибок
"Expect": "100-continue",
"columns": """
tmp_CRASH_DATE, tmp_CRASH_TIME,
CRASH_DATE=str_to_date(concat_ws(' ', tmp_CRASH_DATE, tmp_CRASH_TIME), '%m/%d/%Y %H:%i'),
BOROUGH, ZIP_CODE, LATITUDE, LONGITUDE, LOCATION,
ON_STREET_NAME, CROSS_STREET_NAME, OFF_STREET_NAME,
NUMBER_OF_PERSONS_INJURED, NUMBER_OF_PERSONS_KILLED,
NUMBER_OF_PEDESTRIANS_INJURED, NUMBER_OF_PEDESTRIANS_KILLED,
NUMBER_OF_CYCLIST_INJURED, NUMBER_OF_CYCLIST_KILLED,
NUMBER_OF_MOTORIST_INJURED, NUMBER_OF_MOTORIST_KILLED,
CONTRIBUTING_FACTOR_VEHICLE_1, CONTRIBUTING_FACTOR_VEHICLE_2,
CONTRIBUTING_FACTOR_VEHICLE_3, CONTRIBUTING_FACTOR_VEHICLE_4,
CONTRIBUTING_FACTOR_VEHICLE_5, COLLISION_ID,
VEHICLE_TYPE_CODE_1, VEHICLE_TYPE_CODE_2,
VEHICLE_TYPE_CODE_3, VEHICLE_TYPE_CODE_4, VEHICLE_TYPE_CODE_5
""".replace("\n", "").replace(" ", ""),
}
session = PreserveAuthSession()
with open(CSV_PATH, "rb") as f:
response = session.put(
url,
headers=headers,
data=f,
auth=HTTPBasicAuth(conn.login, conn.password or ""),
timeout=3600,
)

result = response.json()
print(f"\nОтвет Stream Load:")
print(f" Статус: {result.get('Status')}")
print(f" Загружено строк: {result.get('NumberLoadedRows', 0):,}")

if result.get("Status") == "Success":
return result
else:
error_msg = result.get("Message", "Unknown error")
raise Exception(f"Stream Load завершился с ошибкой: {error_msg}")
load_csv_to_selena()

selena_stream_load_example()

Вставка из файлов

Используйте табличную функцию FILES() Selena для загрузки данных напрямую из файлов. Этот подход идеален для:

  • Загрузки данных из S3, HDFS, Google Cloud Storage
  • Одноэтапного поглощения данных с применением трансформаций во время загрузки
  • Разовых загрузок данных из различных файловых источников

FILES() поддерживает несколько форматов файлов и систем хранения, что делает его гибкой альтернативой Stream Load для определённых случаев использования. Данные читаются и вставляются в одном SQL-операторе.

from airflow.sdk import dag, task
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime

FILE_PATH = "path_to_file_here"

@dag(
dag_id='crashdata_dynamic_files_load',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['selena', 'files', 'dynamic'],
)
def crashdata_files():
@task
def load_file(file_path: str):
hook = MySqlHook(mysql_conn_id='selena_default')

sql = f"""
INSERT INTO crashdata (
CRASH_DATE, BOROUGH, ZIP_CODE, LATITUDE, LONGITUDE,
LOCATION, ON_STREET_NAME, CROSS_STREET_NAME, OFF_STREET_NAME,
CONTRIBUTING_FACTOR_VEHICLE_1, CONTRIBUTING_FACTOR_VEHICLE_2,
COLLISION_ID, VEHICLE_TYPE_CODE_1, VEHICLE_TYPE_CODE_2
)
SELECT
STR_TO_DATE(CONCAT_WS(' ', `CRASH DATE`, `CRASH TIME`), '%m/%d/%Y %H:%i'),
BOROUGH,
`ZIP CODE`,
CAST(LATITUDE as INT),
CAST(LONGITUDE as INT),
LOCATION,
`ON STREET NAME`,
`CROSS STREET NAME`,
`OFF STREET NAME`,
`CONTRIBUTING FACTOR VEHICLE 1`,
`CONTRIBUTING FACTOR VEHICLE 2`,
CAST(`COLLISION_ID` as INT),
`VEHICLE TYPE CODE 1`,
`VEHICLE TYPE CODE 2`
FROM FILES(
"path" = "s3://{file_path}",
"format" = "parquet",
"aws.s3.access_key" = "XXXXXXXXXX",
"aws.s3.secret_key" = "YYYYYYYYYY",
"aws.s3.region" = "us-west-2"
)
"""
result = hook.run(sql)

return file_path

load_file(FILE_PATH)

crashdata_files()

Трансформация данных

Выполняйте SQL-запросы к Selena для создания таблиц и вставки данных. Это полезно для:

  • Настройки схемы базы данных
  • Загрузки небольших наборов данных
  • Выполнения разовых запросов
from airflow.sdk import dag, chain
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime

@dag(
dag_id='crashdata_basic_setup',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['selena', 'crashdata'],
)
def crashdata_basic_pipeline():
"""Создание таблицы crashdata и вставка примерных данных об авариях в NYC."""

create_table = SQLExecuteQueryOperator(
task_id='create_crashdata_table',
conn_id='selena_default',
sql="""
CREATE TABLE IF NOT EXISTS crashdata (
CRASH_DATE DATETIME,
BOROUGH STRING,
ZIP_CODE STRING,
LATITUDE INT,
LONGITUDE INT,
LOCATION STRING,
ON_STREET_NAME STRING,
CROSS_STREET_NAME STRING,
OFF_STREET_NAME STRING,
CONTRIBUTING_FACTOR_VEHICLE_1 STRING,
CONTRIBUTING_FACTOR_VEHICLE_2 STRING,
COLLISION_ID INT,
VEHICLE_TYPE_CODE_1 STRING,
VEHICLE_TYPE_CODE_2 STRING
)
DUPLICATE KEY(CRASH_DATE)
DISTRIBUTED BY HASH(COLLISION_ID) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
)
""",
)

insert_data = SQLExecuteQueryOperator(
task_id='insert_sample_data',
conn_id='selena_default',
sql="""
INSERT INTO crashdata VALUES
('2024-01-15 08:30:00', 'MANHATTAN', '10001', 40748817, -73985428,
'(40.748817, -73.985428)', '5 AVENUE', 'WEST 34 STREET', NULL,
'Driver Inattention/Distraction', 'Unspecified', 4567890, 'Sedan', 'Taxi'),
('2024-01-15 14:20:00', 'BROOKLYN', '11201', 40693139, -73987664,
'(40.693139, -73.987664)', 'FLATBUSH AVENUE', 'ATLANTIC AVENUE', NULL,
'Failure to Yield Right-of-Way', 'Unspecified', 4567891, 'SUV', 'Sedan'),
('2024-01-15 18:45:00', 'QUEENS', '11354', 40767689, -73827426,
'(40.767689, -73.827426)', 'NORTHERN BOULEVARD', 'MAIN STREET', NULL,
'Following Too Closely', 'Driver Inattention/Distraction', 4567892, 'Sedan', 'Sedan'),
('2024-01-16 09:15:00', 'BRONX', '10451', 40820679, -73925300,
'(40.820679, -73.925300)', 'GRAND CONCOURSE', 'EAST 161 STREET', NULL,
'Unsafe Speed', 'Unspecified', 4567893, 'Truck', 'Sedan')
""",
)

create_table >> insert_data

crashdata_basic_pipeline()

Более сложные операции с MySqlHook

Используйте MySqlHook для продвинутого анализа данных и обработки в задачах Python. Этот подход полезен для:

  • Выполнения аналитических запросов и обработки результатов в Python
  • Комбинирования запросов Selena с библиотеками Python (pandas, numpy и т.д.)
  • Реализации сложной бизнес-логики, требующей как SQL, так и Python
  • Создания проверок качества данных и рабочих процессов валидации

MySqlHook предоставляет полный программный доступ к результатам запросов, позволяя выполнять сложные трансформации и анализ данных в вашем DAG.

from airflow.sdk import dag, task
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime

@dag(
dag_id='crashdata_python_analysis',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['selena', 'python', 'analytics'],
)
def crashdata_python_pipeline():
@task
def analyze_crash_hotspots():
"""Определение горячих точек аварий по районам и улицам."""
hook = MySqlHook(mysql_conn_id='selena_default')

# Запрос для поиска локаций с высокой частотой аварий
sql = """
SELECT
BOROUGH,
ON_STREET_NAME,
COUNT(*) as crash_count,
COUNT(DISTINCT DATE(CRASH_DATE)) as days_with_crashes
FROM crashdata
WHERE ON_STREET_NAME IS NOT NULL
GROUP BY BOROUGH, ON_STREET_NAME
HAVING crash_count >= 3
ORDER BY crash_count DESC
LIMIT 10
"""

results = hook.get_records(sql)

print("Топ-10 горячих точек аварий:")
for row in results:
borough, street, count, days = row
print(f"{borough:15} | {street:40} | {count:3} аварий за {days} дней")

return len(results)

@task
def calculate_contributing_factors():
"""Расчёт процентного распределения факторов, способствующих авариям."""
hook = MySqlHook(mysql_conn_id='selena_default')

sql = """
SELECT
CONTRIBUTING_FACTOR_VEHICLE_1 as factor,
COUNT(*) as count,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentage
FROM crashdata
WHERE CONTRIBUTING_FACTOR_VEHICLE_1 != 'Unspecified'
GROUP BY CONTRIBUTING_FACTOR_VEHICLE_1
ORDER BY count DESC
"""

results = hook.get_records(sql)

print("\nАнализ факторов, способствующих авариям:")
for factor, count, percentage in results:
print(f"{factor:50} | {count:4} ({percentage}%)")

return results

# Определение порядка выполнения задач
hotspots = analyze_crash_hotspots()
factors = calculate_contributing_factors()

hotspots >> factors

crashdata_python_pipeline()

Продвинутые паттерны

Инкрементальная загрузка данных

Загружайте данные инкрементально, чтобы избежать повторной обработки существующих записей. Инкрементальная загрузка необходима для:

  • Эффективного обновления таблиц только новыми данными
  • Сокращения времени обработки и использования ресурсов
  • Управления большими наборами данных, растущими со временем
  • Поддержания актуальности данных без полных перезагрузок

Этот паттерн использует промежуточные таблицы и фильтрацию на основе временных меток, чтобы гарантировать загрузку только новых записей, что делает его идеальным для запланированных пакетных обновлений.

from airflow.sdk import dag, chain
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime

@dag(
dag_id='crashdata_incremental_load',
schedule='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['selena', 'incremental'],
)
def crashdata_incremental_pipeline():
"""Инкрементальная загрузка новых отчётов об авариях из промежуточной таблицы."""

create_staging = SQLExecuteQueryOperator(
task_id='create_staging_table',
conn_id='selena_default',
sql="""
CREATE TABLE IF NOT EXISTS crashdata_staging (
CRASH_DATE DATETIME,
BOROUGH STRING,
ZIP_CODE STRING,
LATITUDE INT,
LONGITUDE INT,
LOCATION STRING,
ON_STREET_NAME STRING,
CROSS_STREET_NAME STRING,
OFF_STREET_NAME STRING,
CONTRIBUTING_FACTOR_VEHICLE_1 STRING,
CONTRIBUTING_FACTOR_VEHICLE_2 STRING,
COLLISION_ID INT,
VEHICLE_TYPE_CODE_1 STRING,
VEHICLE_TYPE_CODE_2 STRING,
loaded_at DATETIME
)
DUPLICATE KEY(CRASH_DATE)
DISTRIBUTED BY HASH(COLLISION_ID) BUCKETS 10
PROPERTIES ("replication_num" = "1")
""",
)

incremental_load = SQLExecuteQueryOperator(
task_id='load_new_crashes',
conn_id='selena_default',
sql="""
INSERT INTO crashdata
SELECT
CRASH_DATE,
BOROUGH,
ZIP_CODE,
LATITUDE,
LONGITUDE,
LOCATION,
ON_STREET_NAME,
CROSS_STREET_NAME,
OFF_STREET_NAME,
CONTRIBUTING_FACTOR_VEHICLE_1,
CONTRIBUTING_FACTOR_VEHICLE_2,
COLLISION_ID,
VEHICLE_TYPE_CODE_1,
VEHICLE_TYPE_CODE_2
FROM crashdata_staging
WHERE loaded_at >= '{{ data_interval_start }}'
AND loaded_at < '{{ data_interval_end }}'
AND COLLISION_ID NOT IN (SELECT COLLISION_ID FROM crashdata)
""",
)

create_staging >> incremental_load

crashdata_incremental_pipeline()

Асинхронные крупномасштабные задания с SUBMIT TASK

Используйте SUBMIT TASK для долго выполняющихся запросов, которые не должны блокировать задачу Airflow. Этот паттерн полезен для:

  • Сложных аналитических запросов, выполняющихся минуты или часы
  • Крупномасштабных трансформаций данных (копирование таблиц, агрегации)
  • Ресурсоёмких операций, которые могут завершиться по таймауту в синхронном режиме
  • Параллельного выполнения нескольких тяжёлых запросов
  • Разделения отправки задания и мониторинга завершения

SUBMIT TASK позволяет Airflow отслеживать долго выполняющиеся операции Selena без удержания открытых подключений к базе данных, улучшая эффективность использования ресурсов и надёжность.

from airflow.sdk import dag, chain, task
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime
import time

@dag(
dag_id='crashdata_submit_task',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['selena', 'submit-task'],
)
def crashdata_submit_task_pipeline():
"""
Пример использования SUBMIT TASK для долго выполняющихся запросов.
Требуется Selena 3.4+ для поддержки SUBMIT TASK.
"""

@task
def submit_long_running_query():
"""Отправка долго выполняющегося запроса как асинхронной задачи."""
hook = MySqlHook(mysql_conn_id='selena_default')

submit_sql = """
SUBMIT TASK backup_crashdata AS
CREATE TABLE crash_data_backup AS
SELECT * FROM crashdata
"""
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute(submit_sql)

# Получение имени задачи из результата
result = cursor.fetchone()
task_name = result[0] if result else None

cursor.close()
conn.close()

if task_name:
print(f"Задача успешно отправлена: {task_name}")
return task_name
else:
raise Exception("Не удалось отправить задачу")

@task
def monitor_task_completion(task_name: str):
"""Мониторинг отправленной задачи до завершения."""
hook = MySqlHook(mysql_conn_id='selena_default')

max_wait_time = 600 # 10 минут
poll_interval = 10 # Проверка каждые 10 секунд
elapsed_time = 0

while elapsed_time < max_wait_time:
conn = hook.get_conn()
cursor = conn.cursor()

# Проверка статуса задачи в information_schema
check_sql = f"""
SELECT STATE, ERROR_MESSAGE
FROM information_schema.task_runs
WHERE TASK_NAME = '{task_name}'
"""
cursor.execute(check_sql)
result = cursor.fetchone()

if result:
state, error_msg = result
print(f"[{elapsed_time}с] Статус задачи: {state}")

if state == 'SUCCESS':
cursor.close()
conn.close()
return {'status': 'SUCCESS', 'task_name': task_name}
elif state == 'FAILED':
cursor.close()
conn.close()
raise Exception(f"Задача завершилась с ошибкой: {error_msg}")

cursor.close()
conn.close()

time.sleep(poll_interval)
elapsed_time += poll_interval

raise Exception(f"Задача не завершилась за {max_wait_time} секунд")

@task
def process_results():
"""Обработка или проверка результатов завершённой задачи."""
print("Задача успешно завершена - результаты теперь доступны")
return "Обработка завершена"

# Определение потока задач
task_name = submit_long_running_query()
monitor_result = monitor_task_completion(task_name)
result = process_results()

chain(task_name, monitor_result, result)

crashdata_submit_task_pipeline()

Обратите внимание, что имя задачи уникально в Selena, поэтому будущим запускам может потребоваться квалификатор (например, uuid).

Материализованные представления

Создавайте и управляйте материализованными представлениями для ускорения производительности запросов. Материализованные представления идеальны для:

  • Предварительного вычисления сложных агрегаций для дашбордов
  • Ускорения часто выполняемых аналитических запросов
  • Поддержания сводных таблиц, которые обновляются автоматически
  • Сокращения вычислительных затрат путём избежания повторных вычислений
  • Обслуживания аналитики реального времени из предагрегированных данных

Материализованные представления в Selena обновляются автоматически или по запросу, поддерживая актуальность агрегированных данных при значительном улучшении производительности запросов.

from airflow.sdk import dag, task
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime, timedelta


@dag(
dag_id="selena_materialized_view_example",
schedule="0 2 * * *", # Запуск ежедневно в 2 часа ночи
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["selena", "materialized_view", "example"],
doc_md=__doc__,
)
def selena_materialized_view_example():
@task
def create_materialized_view():
hook = MySqlHook(mysql_conn_id="selena_conn")

drop_sql = """
DROP MATERIALIZED VIEW IF EXISTS quickstart.mv_daily_crash_stats
"""

create_sql = """
CREATE MATERIALIZED VIEW IF NOT EXISTS quickstart.mv_daily_crash_stats
DISTRIBUTED BY HASH(`crash_date`)
REFRESH ASYNC
AS
SELECT
DATE(CRASH_DATE) as crash_date,
BOROUGH,
COUNT(*) as total_crashes,
COUNT(DISTINCT COLLISION_ID) as unique_collisions
FROM quickstart.crashdata
WHERE CRASH_DATE IS NOT NULL
GROUP BY DATE(CRASH_DATE), BOROUGH
"""

hook.run(drop_sql)
hook.run(create_sql)

return "mv_daily_crash_stats"

@task
def refresh_materialized_view(mv_name: str):
hook = MySqlHook(mysql_conn_id="selena_conn")
refresh_sql = f"REFRESH MATERIALIZED VIEW quickstart.{mv_name}"
hook.run(refresh_sql)
return mv_name

@task
def check_materialized_view_status(mv_name: str):
hook = MySqlHook(mysql_conn_id="selena_conn")

# Получение имени задачи для MV
task_query = f"""
SELECT TASK_NAME
FROM information_schema.tasks
WHERE `DATABASE` = 'quickstart'
AND DEFINITION LIKE '%{mv_name}%'
ORDER BY CREATE_TIME DESC
LIMIT 1
"""

task_name = hook.get_first(task_query)[0]

# Получение последнего состояния запуска задачи
state_query = f"""
SELECT STATE
FROM information_schema.task_runs
WHERE TASK_NAME = '{task_name}'
ORDER BY CREATE_TIME DESC
LIMIT 1
"""

state = hook.get_first(state_query)[0]

print(f"MV: {mv_name} | Task: {task_name} | State: {state}")

if state not in ('SUCCESS', 'RUNNING'):
raise Exception(f"Обновление материализованного представления в неожиданном состоянии: {state}")

return {'task_name': task_name, 'state': state}


create = create_materialized_view()
refresh = refresh_materialized_view(create)
status = check_materialized_view_status(refresh)

status

selena_materialized_view_example()

Обработка ошибок

Реализуйте надёжную обработку ошибок для производственной надёжности. Правильная обработка ошибок критична для:

  • Автоматического восстановления после временных сбоев (сетевые проблемы, таймауты)
  • Предотвращения нарушений конвейера данных из-за временных проблем
  • Обеспечения видимости паттернов сбоев

Встроенные механизмы повторных попыток Airflow обрабатывают большинство временных ошибок.

from airflow.sdk import dag
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime, timedelta

@dag(
dag_id='selena_with_retries',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
},
)
def selena_resilient_pipeline():
critical_query = SQLExecuteQueryOperator(
task_id='critical_query',
conn_id='selena_default',
sql='SELECT * FROM important_table',
)
selena_resilient_pipeline()

Устранение неполадок

  • Убедитесь, что порт 9030 доступен из экземпляра Airflow
  • Протестируйте подключение (если включено) из UI Airflow
  • При использовании localhost используйте 127.0.0.1 вместо него