服饰技术支持 东莞网站建设,下载游戏的软件应用,wordpress如何添加子主题,足球比赛直播在线Apache Zeppelin结合Apache Airflow使用1 文章目录 Apache Zeppelin结合Apache Airflow使用1前言一、安装Airflow二、使用步骤1.目标2.编写DAG2.加载、执行DAG 总结 前言
之前学了Zeppelin的使用#xff0c;今天开始结合Airflow串任务。
Apache Airflow和Apache Zeppelin是两…Apache Zeppelin结合Apache Airflow使用1 文章目录 Apache Zeppelin结合Apache Airflow使用1前言一、安装Airflow二、使用步骤1.目标2.编写DAG2.加载、执行DAG 总结 前言
之前学了Zeppelin的使用今天开始结合Airflow串任务。
Apache Airflow和Apache Zeppelin是两个不同的工具各自用于不同的目的。Airflow用于编排和调度工作流而Zeppelin是一个交互式数据分析和可视化的笔记本工具。虽然它们有不同的主要用途但可以结合使用以满足一些复杂的数据处理和分析需求。
下面是一些结合使用Airflow和Zeppelin的方式 Airflow调度Zeppelin Notebooks: 使用Airflow编写调度任务以便在特定时间或事件触发时运行Zeppelin笔记本。在Airflow中使用Zeppelin的REST API或CLI命令来触发Zeppelin笔记本的执行。 数据流管道: 使用Airflow编排数据处理和转换任务例如从数据源提取数据、清理和转换数据。在Zeppelin中创建笔记本用于进一步的数据分析、可视化和报告生成。Airflow任务完成后触发Zeppelin笔记本执行以基于最新数据执行分析。 参数传递: 通过Airflow参数传递将一些参数值传递给Zeppelin笔记本以便在不同任务之间共享信息。Zeppelin笔记本可以从Airflow任务中获取参数值以适应特定的数据分析需求。 日志和监控: 使用Airflow监控工作流的运行情况查看任务的日志和执行状态。在Zeppelin中记录和可视化Airflow工作流的关键指标以获得更全面的工作流性能洞察。 整合数据存储: Airflow可以用于从不同数据源中提取数据然后将数据传递给Zeppelin进行进一步的分析。Zeppelin可以使用Airflow任务生成的数据进行更深入的数据挖掘和分析。
结合使用Airflow和Zeppelin能够充分发挥它们各自的优势实现更全面、可控和可视化的数据处理和分析工作流。 一、安装Airflow
安装参考 https://airflow.apache.org/docs/apache-airflow/stable/start.html
CentOS 7.9安装后启动会报错还需要配置下sqlite参考https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database
[rootslas bin]# airflow standalone
Traceback (most recent call last):File /root/.pyenv/versions/3.9.10/bin/airflow, line 5, in modulefrom airflow.__main__ import mainFile /root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/__init__.py, line 52, in modulefrom airflow import configuration, settingsFile /root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py, line 2326, in moduleconf.validate()File /root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py, line 718, in validateself._validate_sqlite3_version()File /root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py, line 824, in _validate_sqlite3_versionraise AirflowConfigException(
airflow.exceptions.AirflowConfigException: error: SQLite C library too old ( 3.15.0). See https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database二、使用步骤
1.目标
我想做个简单的demo包括两个节点实现如图所示功能读取csv去重 csv文件输入在airflow上实现去重在zeppelin上实现。
2.编写DAG
先实现extract_data_script.py做个简单的读取csv指定列数据写入新的csv文件。
import argparse
import pandas as pddef extract_and_write_data(date, output_csv, columns_to_extract):# 读取指定列的数据csv_file_path f/home/works/datasets/data_{date}.csvdf pd.read_csv(csv_file_path, usecolscolumns_to_extract)# 将数据写入新的 CSV 文件df.to_csv(output_csv, indexFalse)if __name__ __main__:parser argparse.ArgumentParser()parser.add_argument(--date, typestr, requiredTrue, helpDate parameter passed by Airflow)args parser.parse_args()# 输出 CSV 文件路径替换为实际的路径output_csv_path /home/works/output/extracted_data.csv# 指定要提取的列columns_to_extract [column1, column2, column3]# 调用函数进行数据提取和写入extract_and_write_data(args.date, output_csv_path, columns_to_extract)
然后在 Zeppelin 中创建一个 Python 笔记本Notebook其中包含被 Airflow DAG 调用的代码。加载先前从 output/extracted_data.csv 文件中提取的数据
%python# 导入必要的库
import pandas as pd# 加载先前从 CSV 文件中提取的数据
csv_file_path /home/works/output/extracted_data.csv
# 读取 CSV 文件
df pd.read_csv(csv_file_path)# 过滤掉 column1 为空的行
df df[df[column1].notnull()]# 去重以 column2、column3 字段为联合去重依据
deduplicated_df df.drop_duplicates(subset[column2, column3])# 保存去重后的结果到新的 CSV 文件
deduplicated_df.to_csv(/home/works/output/dd_data.csv, indexFalse)将这个 Zeppelin 笔记本保存并记住笔记本的paragraph ID Airflow DAG 需要使用这个 ID 来调用 Zeppelin 笔记本。
接下来用VSCode编写zeppelin_integration.py代码如下上传到$AIRFLOW_HOME/dags目录下
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedeltadefault_args {owner: airflow,depends_on_past: False,start_date: datetime(2024, 1, 1),email_on_failure: False,email_on_retry: False,retries: 1,retry_delay: timedelta(minutes5),
}dag DAG(zeppelin_integration,default_argsdefault_args,scheduletimedelta(days1),
)extract_data_task BashOperator(task_idextract_data,bash_commandpython /home/works/z/extract_data_script.py --date {{ ds }},dagdag,
)run_zeppelin_notebook_task BashOperator(task_idrun_zeppelin_notebook,bash_commandcurl -X POST -HContent-Type:application/json http://IP:PORT/api/notebook/run/2JND7T68E/paragraph_1705372327640_1111015359,dagdag,
)# Set the task dependencies
extract_data_task run_zeppelin_notebook_task
2.加载、执行DAG
如下命令进行测试先执行下代码看看语法是否都正确然后list出tasks并逐一test
# python zeppelin_integration.py # airflow tasks list zeppelin_integration
extract_data
run_zeppelin_notebook# airflow tasks test zeppelin_integration extract_data 20240122
[2024-01-22T08:57:45.8050800] {dagbag.py:538} INFO - Filling up the DagBag from /root/airflow/dags
[2024-01-22T08:57:47.8530800] {taskinstance.py:1957} INFO - Dependencies all met for dep_contextnon-requeueable deps tiTaskInstance: zeppelin_integration.extract_data __airflow_temporary_run_2024-01-22T00:57:47.74053700:00__ [None]
[2024-01-22T08:57:47.8600800] {taskinstance.py:1957} INFO - Dependencies all met for dep_contextrequeueable deps tiTaskInstance: zeppelin_integration.extract_data __airflow_temporary_run_2024-01-22T00:57:47.74053700:00__ [None]
[2024-01-22T08:57:47.8610800] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-22T08:57:47.8610800] {taskinstance.py:2250} WARNING - cannot record queued_duration for task extract_data because previous state change time has not been saved
[2024-01-22T08:57:47.8620800] {taskinstance.py:2192} INFO - Executing Task(BashOperator): extract_data on 2024-01-20T00:00:0000:00
[2024-01-22T08:57:47.9000800] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNERairflow AIRFLOW_CTX_DAG_IDzeppelin_integration AIRFLOW_CTX_TASK_IDextract_data AIRFLOW_CTX_EXECUTION_DATE2024-01-20T00:00:0000:00 AIRFLOW_CTX_TRY_NUMBER1 AIRFLOW_CTX_DAG_RUN_ID__airflow_temporary_run_2024-01-22T00:57:47.74053700:00__
[2024-01-22T08:57:47.9040800] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-01-22T08:57:47.9050800] {subprocess.py:75} INFO - Running command: [/bin/bash, -c, python /home/works/z/extract_data_script.py --date 2024-01-20]
[2024-01-22T08:57:47.9140800] {subprocess.py:86} INFO - Output:
[2024-01-22T08:57:48.5530800] {subprocess.py:97} INFO - Command exited with return code 0
[2024-01-22T08:57:48.6320800] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_idzeppelin_integration, task_idextract_data, execution_date20240120T000000, start_date, end_date20240122T005748# airflow tasks test zeppelin_integration run_zeppelin_notebook 20240122
[2024-01-22T09:01:43.6650800] {dagbag.py:538} INFO - Filling up the DagBag from /root/airflow/dags
[2024-01-22T09:01:45.8350800] {taskinstance.py:1957} INFO - Dependencies all met for dep_contextnon-requeueable deps tiTaskInstance: zeppelin_integration.run_zeppelin_notebook __airflow_temporary_run_2024-01-22T01:01:45.73334100:00__ [None]
[2024-01-22T09:01:45.8430800] {taskinstance.py:1957} INFO - Dependencies all met for dep_contextrequeueable deps tiTaskInstance: zeppelin_integration.run_zeppelin_notebook __airflow_temporary_run_2024-01-22T01:01:45.73334100:00__ [None]
[2024-01-22T09:01:45.8440800] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-22T09:01:45.8440800] {taskinstance.py:2250} WARNING - cannot record queued_duration for task run_zeppelin_notebook because previous state change time has not been saved
[2024-01-22T09:01:45.8450800] {taskinstance.py:2192} INFO - Executing Task(BashOperator): run_zeppelin_notebook on 2024-01-22T00:00:0000:00
[2024-01-22T09:01:45.9040800] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNERairflow AIRFLOW_CTX_DAG_IDzeppelin_integration AIRFLOW_CTX_TASK_IDrun_zeppelin_notebook AIRFLOW_CTX_EXECUTION_DATE2024-01-22T00:00:0000:00 AIRFLOW_CTX_TRY_NUMBER1 AIRFLOW_CTX_DAG_RUN_ID__airflow_temporary_run_2024-01-22T01:01:45.73334100:00__
[2024-01-22T09:01:45.9090800] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-01-22T09:01:45.9100800] {subprocess.py:75} INFO - Running command: [/bin/bash, -c, curl -X POST -HContent-Type:application/json http://100.100.30.220:8181/api/notebook/run/2JND7T68E/paragraph_1705372327640_1111015359]
[2024-01-22T09:01:45.9210800] {subprocess.py:86} INFO - Output:
[2024-01-22T09:01:45.9310800] {subprocess.py:93} INFO - % Total % Received % Xferd Average Speed Time Time Time Current
[2024-01-22T09:01:45.9310800] {subprocess.py:93} INFO - Dload Upload Total Spent Left Speed
100 50 100 50 0 0 8 0 0:00:06 0:00:06 --:--:-- 12
[2024-01-22T09:01:52.0030800] {subprocess.py:93} INFO - {status:OK,body:{code:SUCCESS,msg:[]}}
[2024-01-22T09:01:52.0030800] {subprocess.py:97} INFO - Command exited with return code 0
[2024-01-22T09:01:52.0980800] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_idzeppelin_integration, task_idrun_zeppelin_notebook, execution_date20240122T000000, start_date, end_date20240122T010152
最后用命令airflow scheduler将它添加到airflow里。
# airflow scheduler____________ _________________ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2024-01-22T09:28:21.8290800] {task_context_logger.py:63} INFO - Task context logging is enabled
[2024-01-22T09:28:21.8310800] {executor_loader.py:115} INFO - Loaded executor: SequentialExecutor
[2024-01-22T09:28:21.8680800] {scheduler_job_runner.py:808} INFO - Starting the scheduler
[2024-01-22T09:28:21.8690800] {scheduler_job_runner.py:815} INFO - Processing each file at most -1 times
。。。页面上会增加一个DAG如图 在Actions里可以点击执行。 总结
以上就是今天要讲的内容总体来说集成两个工具还是很方便的期待后面更多的应用。