网站搭建 成都,wordpress 列表页面,wordpress主题盗取,池州做网站培训系列文章#xff1a;
Flyte工作流平台调研#xff08;一#xff09;——整体架构
Flyte工作流平台调研#xff08;二#xff09;——核心概念说明
Flyte工作流平台调研#xff08;三#xff09;——核心组件原理
Flyte工作流平台调研#xff08;四#xff09;——…系列文章
Flyte工作流平台调研一——整体架构
Flyte工作流平台调研二——核心概念说明
Flyte工作流平台调研三——核心组件原理
Flyte工作流平台调研四——服务部署
Flyte工作流平台调研五——扩展集成
Flyte工作流平台调研六——跟Ray框架对比
Flyte工作流平台调研七——核心源码走读
正文
Flyte的设计具有高度的可扩展性可以通过多种方式进行定制。这是官方提供的贡献开发一个集成的样例文档。
以下是Flyte主要的几种集成方式
Flytekit plugins
简介
Flytekit是Flyte提供的Python SDK帮助用户使用Python编程设计新的Workflow。它可以解析Python代码将其编译为有效的工作流DAG并将其提交给Flyte执行。
Flytekit plugins就是指用户通过 Python 编写自定义代码来增强 FlytekitFlyte 的 Python SDK的功能。通过这种方式能在 Flyte 工作流Workflow中添加自定义任务Task、数据处理步骤或其他功能。
插件运行时基本上直接依赖Python代码或者基础镜像而不依赖其他服务。 1. Flytekit 插件的作用
Flytekit 插件允许用户在 Flyte 中集成外部工具、服务或库或在 Flyte 的任务和工作流中引入自定义的执行逻辑。这些插件可以帮助 Flyte 执行各种类型的作业例如运行特定的机器学习平台、数据库操作、数据验证等。
例如
通过 Comet 插件Flyte 工作流可以与 Comet.ml 进行集成自动追踪机器学习实验。通过 MLFlow 插件Flyte 可以追踪和记录机器学习模型的训练过程。
2. Flytekit 插件的特点
纯 Python 实现Flytekit 插件完全使用 Python 编写这使得用户可以轻松地开发、测试和调试插件功能。单元测试用户可以在本地进行单元测试确保插件功能的正确性而无需依赖 Flyte 的集群或其他外部服务。扩展 Flytekit插件主要用于扩展 Flytekit 的功能。比如用户可以编写插件来处理特定的任务或将 Flyte与特定的服务如 Comet、MLFlow集成。
3. 插件的类型
Flytekit 插件可以分为几种类型根据它们的功能和目标服务进行分类
机器学习平台插件如 Comet、MLFlow这些插件允许在 Flyte 中直接集成和使用机器学习平台用于模型追踪、实验管理等。数据处理插件如 Great Expectations、Pandera用于验证数据质量、进行数据验证等操作。数据库和查询插件如 Dolt、DuckDB、SQL 插件提供数据存储、数据库查询等功能。服务集成插件如 AWS SageMaker、Airflow这些插件允许将 Flyte 与 AWS 服务或其他外部系统如 Airflow、BigQuery集成进行模型推理、任务执行等。
实现方式
本文档以Flyte的SQLAlchemy这个插件为例进行说明。具体实现一个自定义插件的方式是
1. 下载Flytekit的代码
Flytekit的代码仓库地址GitHub - flyteorg/flytekit: Extensible Python SDK for developing Flyte tasks and workflows. Simple to get started and learn and highly extensible.
代码主要路径 说明flytekit是核心源码的实现plugins是插件的实现。
2. 在代码中实现插件具体的代码
1创建在plugins路径中创建对应的插件包比如 2在代码中实现自定义的task或者workflow这个插件只有task 3定义自定义task的自定义配置 4定义运行当前Task的工作流Pod也就是一个Node默认依赖的镜像 3. 构建插件依赖的镜像
如果第1步中默认的镜像不是镜像仓库中已经存在的镜像还需要自己构建当前自定义task的镜像并上传到镜像仓库。
4. 构建并发布插件
使用项目根目录下创建 setup.py用于打包插件
from setuptools import setup, find_packagessetup(namemy_flytekit_plugin,version0.1.0,descriptionA custom Flytekit plugin,authorYour Name,packagesfind_packages(),install_requires[flytekit, requests],
)构建并安装插件
pip install .
将插件发布到 PyPI便于其他 Flyte 用户使用
python setup.py sdist bdist_wheel
twine upload dist/*
使用方式
还是以Flyte的SQLAlchemy这个插件为例进行说明。
1. 安装 Flyte 的 SQLAlchemy 插件
pip install flytekitplugins-sqlalchemy
导入所需的库:
from flytekit import kwtypes, task, workflow
from flytekit.types.schema import FlyteSchema
from flytekitplugins.sqlalchemy import SQLAlchemyConfig, SQLAlchemyTask
2. 定义 SQLAlchemy Task
首先定义一个 SQLAlchemyTask它将从 RNA 中央数据库的 rna 表返回前 n 条记录。由于该数据库是公共的可以将数据库 URI 硬编码在字符串中包括用户名和密码。
注意 SQLAlchemyTask 的输出默认是 FlyteSchema。 警告 切勿存储用于私有或敏感数据库的密码 DATABASE_URI postgresql://reader:NWDMCE5xdipIjRrphh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs# 定义查询的输出 schema我们稍后在 get_mean_length 任务中复用
DataSchema FlyteSchema[kwtypes(sequence_lengthint)]sql_task SQLAlchemyTask(rna,query_template select len as sequence_length from rna where len {{ .inputs.min_length }} and len {{ .inputs.max_length }} limit {{ .inputs.limit }},inputskwtypes(min_lengthint, max_lengthint, limitint),output_schema_typeDataSchema,task_configSQLAlchemyConfig(uriDATABASE_URI),
)
3. 定义计算平均长度的任务
接下来定义一个任务用于计算我们查询返回的 RNA 序列子集的平均长度。注意如果你在 Flyte 后端中运行此任务通过 pyflyte run如果没有指定镜像pyflyte run 会使用默认的 flytekit 镜像。默认的 flytekit 镜像没有安装 sqlalchemy 插件。为了正确启动此任务的执行需要使用以下命令。
bash
复制代码
pyflyte --config ~/.flyte/your-config.yaml run --destination-dir /app --remote --image ghcr.io/flyteorg/flytekit:py3.8-sqlalchemy-latest integrations/flytekit_plugins/sql/sql_alchemy.py my_wf --min_length 3 --max_length 100 --limit 50
另外添加了 destination-dir 参数因为 pyflyte run 默认会将代码复制到 /root 目录而该镜像的工作目录被设置为 /app。 task
def get_mean_length(data: DataSchema) - float:dataframe data.open().all()return dataframe[sequence_length].mean().item()4. 将任务组合成一个工作流
最后将所有任务组合成一个工作流
workflow
def my_wf(min_length: int, max_length: int, limit: int) - float:return get_mean_length(datasql_task(min_lengthmin_length, max_lengthmax_length, limitlimit))运行代码
if __name__ __main__:print(fRunning {__file__} main...)print(my_wf(min_length50, max_length200, limit5))这样就完成了一个基于 SQLAlchemy 的 Flyte 工作流能够从公共 PostgreSQL 数据库中查询 RNA 序列的长度并计算这些序列的平均长度。
Native backend plugins
简介
原生后端插件是指直接依赖 Flyte 的原生计算和编排能力通常是 Kubernetes 集群中的工作负载无需额外的外部服务支持。这种插件由 Flyte 的内部组件如 FlytePropeller 和 Kubernetes协同管理用于运行特定类型的工作负载或任务。
比如Native backend的Spark 插件支持执行Spark Task使用这种插件时执行Spark Task时会先直接在Flyte的k8s集群中启动一个Spark集群然后再启动Task Pod去访问这个Spark集群执行 Task的具体内容。Spark Task Pods和Spark Cluster Pods都是FlytePropeller启动的而FlytePropeller具有启动Spark Cluster Pods的能力就是这个插件扩展的这个插件集成在FlytePropeller中调用Spark的k8s operator启动一个Spark集群。 主要插件类型
分布式训练 Kubeflow PyTorch运行 PyTorch 分布式训练任务。Kubeflow TensorFlow运行 TensorFlow 分布式训练任务。MPI Operator运行基于 Horovod 的深度学习训练任务。大数据处理 Kubernetes Cluster Spark运行 Spark 作业。Kubernetes Cluster Dask运行 Dask 作业。灵活任务 Kubernetes Pods运行任意 Kubernetes Pod 工作负载。Ray运行 Ray 分布式作业。
主要作用
Native Backend Plugins 的主要作用是提供一种无缝的方式使用 Kubernetes 原生能力执行特定类型的工作负载同时保持对 Flyte 工作流的高度集成。以下是其具体作用
高效资源管理 Flyte 利用 Kubernetes 的资源调度能力根据任务的资源需求动态分配和管理资源如 CPU、内存、GPU。分布式任务支持 原生插件支持常见的分布式计算框架如 PyTorch、TensorFlow、Spark、Ray简化了大规模分布式任务的开发和运行。统一工作流编排 无需外部服务支持用户可以将各种任务类型统一集成到 Flyte 的工作流中实现端到端的数据处理和模型训练。简化操作和维护 通过 Kubernetes 的原生能力Native Backend Plugins 避免了对外部服务的依赖简化了部署和运维。
使用方式
这里以一个Ray插件为例通过Flyte的Ray插件提交一个Ray任务。
1. flytekitplugins-ray基本说明
KubeRay 是一个开源工具包旨在简化在 Kubernetes 上运行 Ray 应用程序的过程。它提供了一系列工具增强了在 Kubernetes 上运行和管理 Ray 的操作能力。
插件的关键组件
Ray Operator用于集群资源创建和删除的后台服务管理 CRDCustom Resource Definition对象的 kubectl 插件/命令行工具与集群功能无缝集成的作业Jobs和服务Serving功能
2. 安装插件
pip install flytekitplugins-ray
3. 如果直接给已存在的Ray集群提交任务
ray.remote
def f(x):return x * xtask(task_configRayJobConfig(addressRAY_CLUSTER_ADDRESSruntime_env{pip: [numpy, pandas]})
)
def ray_task() - typing.List[int]:futures [f.remote(i) for i in range(5)]return ray.get(futures)
4. 使用插件通过Flyte提交一个Ray任务并且Ray集群也有Flyte管理
task(task_configRayJobConfig(worker_node_config[WorkerNodeConfig(group_nametest-group, replicas10)]))
def ray_task() - typing.List[int]:futures [f.remote(i) for i in range(5)]return ray.get(futures)
5. 在 Flyte 集群上运行示例
要在 Flyte 集群上运行上述示例使用以下命令
pyflyte run --remote ray_example.py \ray_workflow --n 10
此命令将远程运行 ray_example.py 脚本中的 ray_workflow并传递参数 --n 10。Flyte会先通过KubeRay在自己的K8S集群中创建一个Ray Cluster然后再运行一个Pod把Ray任务提交到这个Ray Cluster中。
Flyte Agent
简介
Flyte Agent 是一种 长时间运行的无状态服务通过 gRPC 接收执行请求并与适当的外部或内部服务交互以启动任务。它的功能定位在支持特定类型的任务并通过插件化扩展为不同的服务场景。
简单来说就是在Flyte的K8S集群中启动一个无状态的Agent Pod执行这种类型的任务时这个Agent Pod给真正的服务转发请求。
1. 基本的工作流程
任务触发当用户触发特定类型的任务时FlytePropeller 将通过 gRPC 向相应的 Agent 发送请求。任务处理Agent 接收到请求后会与指定的服务交互例如 BigQuery、AWS SageMaker 等并启动对应的任务。运行环境每个 Agent 是一个 Kubernetes 部署运行于 Flyte 的集群中。
2. 核心特点
gRPC 通信 Flyte Agent 是通过 gRPC 接收来自 FlytePropeller 的任务请求的。这些任务通常是特定类型如 BigQuery、Databricks的工作负载。Kubernetes 部署 每个 Agent 是一个 Kubernetes 部署运行在集群中专注于某类任务的管理。Agent 本身无状态任务的状态由后端服务如 BigQuery 或 SageMaker提供。任务初始化 Agent 服务在接收任务后会与适配的服务进行交互启动作业或任务。插件化扩展 Flyte 提供了许多预构建的 Agent如 Airflow Agent、Snowflake Agent。用户可以根据需求开发自定义 Agent用以支持其他任务类型。
3. 常见的 Flyte Agents
以下是一些内置的 Flyte Agents 及其用途
AWS SageMaker Inference Agent部署模型、创建推理端点并触发推理任务。Airflow Agent在 Flyte 的工作流中运行 Airflow 作业。BigQuery Agent在 Flyte 中运行 BigQuery 查询任务。ChatGPT Agent在工作流中运行 ChatGPT 相关任务。Databricks Agent在 Flyte 中运行 Databricks 作业。Memory Machine Cloud Agent使用 MemVerge Memory Machine Cloud 执行任务。OpenAI Batch Agent提交 OpenAI 异步批处理请求。PERIAN Job Platform Agent在 PERIAN 平台上执行任务。Sensor Agent用于工作流中的传感器任务。Snowflake Agent在 Snowflake 中运行作业。
使用方式
以一个ChatGPT agent为例ChatGPT 可用于多种场景例如情感分析、语言翻译、SQL 查询生成以及文本摘要。以下示例展示了如何在 Flyte 中运行 ChatGPT 任务
1. 安装
pip install flytekitplugins-openai
2. 样例代码
from typing import List
import flytekit
from flytekit import ImageSpec, Secret, dynamic, task, workflow
from flytekitplugins.openai import ChatGPTTask# 需要指定 name、openai_organization 和 chatgpt_config
# name用于 Flyte必须唯一。
# openai_organizationOpenAI API 的组织 ID可以在 [此处](https://platform.openai.com/account/org-settings) 找到。
# chatgpt_config用于 OpenAI Chat Completion 的配置可参考 [API 文档](https://platform.openai.com/docs/api-reference/completions)。chatgpt_small_job ChatGPTTask(name3.5-turbo,openai_organizationorg-NayNG68kGnVXMJ8Ak4PMgQv7,chatgpt_config{model: gpt-3.5-turbo,temperature: 0.7,},
)chatgpt_big_job ChatGPTTask(namegpt-4,openai_organizationorg-NayNG68kGnVXMJ8Ak4PMgQv7,chatgpt_config{model: gpt-4,temperature: 0.7,},
)workflow
def my_chatgpt_job(message: str) - str:message chatgpt_small_job(messagemessage)message chatgpt_big_job(messagemessage)return message# 本地运行工作流
if __name__ __main__:print(fRunning {__file__} main...)print(fRunning my_chatgpt_job(messagehi) {my_chatgpt_job(messagehi)})Agent的部署配置
如果使用的是 Flyte 的托管部署需要联系部署管理员以在部署中配置代理。比如要在 Flyte 部署中启用 ChatGPT 代理需要在代理服务器中设置 OpenAI API 密钥才能运行 ChatGPT 任务。详细的方式如下
1. 指定代理配置
1) 通过Flyte binary
编辑相关的 YAML 文件以指定代理配置
kubectl edit configmap flyte-sandbox-config -n flyte在 YAML 文件中添加以下配置
tasks:task-plugins:enabled-plugins:- container- sidecar- k8s-array- agent-servicedefault-for-task-types:- container: container- container_array: k8s-array- chatgpt: agent-serviceplugins:agent-service:# 配置超时是可选的。# 像使用大模型的 ChatGPT 任务可能需要更长的时间# 因此可以在这里调整超时设置。defaultAgent:timeouts:ExecuteTaskSync: 10s
2) 通过Flyte core
创建一个名为values-override.yaml的文件并向其中添加以下配置
configmap:enabled_plugins:# -- Tasks specific configuration [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig)tasks:# -- Plugins configuration, [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig)task-plugins:# -- [Enabled Plugins](https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config). Enable sagemaker*, athena if you install the backendenabled-plugins:- container- sidecar- k8s-array- agent-servicedefault-for-task-types:container: containersidecar: sidecarcontainer_array: k8s-arraychatgpt: agent-serviceplugins:agent-service:# Configuring the timeout is optional.# Tasks like using ChatGPT with a large model might require a longer time,# so we have the option to adjust the timeout setting here.defaultAgent:timeouts:ExecuteTaskSync: 10s
2. 添加 OpenAI API 密钥
1) 使用 Helm 安装 flyteagent pod
helm repo add flyteorg https://flyteorg.github.io/flyte
helm install flyteagent flyteorg/flyteagent --namespace flyte2将 OpenAI API 密钥设置为秘密Base64 编码
SECRET_VALUE$(echo -n OPENAI_API_TOKEN | base64) \
kubectl patch secret flyteagent -n flyte --patch {\data\:{\flyte_openai_api_key\:\$SECRET_VALUE\}}
3重启开发环境
kubectl rollout restart deployment flyteagent -n flyte
3. 升级 Flyte 的 Helm 发行版
1使用 Flyte binary
helm upgrade RELEASE_NAME flyteorg/flyte-binary -n YOUR_NAMESPACE --values YOUR_YAML_FILE将 RELEASE_NAME 替换为您的发行版名称例如 flyte-backendYOUR_NAMESPACE 替换为您的命名空间名称例如 flyteYOUR_YAML_FILE 替换为您的 YAML 文件名称。
2使用Flyte core
helm upgrade RELEASE_NAME flyte/flyte-core -n YOUR_NAMESPACE --values values-override.yaml
将 RELEASE_NAME 替换为您的发行版名称例如 flyte 将 YOUR_NAMESPACE 替换为您的命名空间名称例如 flyte。
其他集成方式
除了前面三种主要的集成方式Flyte还有几种集成方式分布如下
External service backend plugins
外部服务后端插件正如其名称所示这些插件依赖于外部服务来处理使用这些插件定义的 Flyte 任务中的工作负载。
特点:
依赖外部服务这些插件不是通过 Flyte 自身直接处理任务而是将任务交由外部服务完成例如 AWS Athena、AWS Batch、Hive 等。支持大规模计算任务借助这些插件Flyte 可以处理需要大规模资源或专用计算平台的工作负载例如批量作业、SQL 查询和分布式计算。灵活性和扩展性Flyte 的任务定义通过插件扩展了支持范围使其能够与多种外部服务无缝集成满足不同工作流的需求。异步执行能力外部服务插件通常支持异步工作流Flyte 会等待外部服务完成任务并返回结果。
原理:
任务提交Flyte 的任务在定义时通过插件标记为特定类型例如Athena 查询、Batch 任务。任务调度Flyte 将任务的执行请求发送到相应的外部服务如 AWS Batch 或 Hive。任务处理外部服务根据 Flyte 提供的配置处理任务例如执行 SQL 查询或运行容器化的批处理任务。结果返回一旦外部服务完成任务Flyte 接收返回结果并继续后续的工作流步骤。
这种模式允许 Flyte 保持轻量化同时利用强大的外部计算服务来满足复杂工作流的需求。
主要类型
AWS Athena使用 AWS Athena 执行查询。AWS Batch在 AWS Batch 服务上运行任务和工作流。Flyte Interactive使用 Flyte Interactive 执行任务以进行调试。Hive在工作流中运行 Hive 作业。
SDKs for writing tasks and workflows
用于编写任务和工作流的 SDK社区非常乐意帮助您构建新的 SDK。目前可用的 SDK 包括
flytekitFlyte 的 Python SDK。flytekit-javaFlyte 的 Java/Scala SDK。
通过这些 SDK开发者可以更好地利用 Flyte 的功能来构建分布式任务和复杂工作流同时保持开发过程的高效性和灵活性。
Flyte operators
Flyte 提供Operator可以与其他编排工具集成帮助用户在这些工具中原生利用 Flyte 的构建功能。
目前集成了Airflow从 Airflow 中触发 Flyte 的执行。