Apache Doris Python UDF:让 SQL 直接调用 Python 生态,支撑 Agent 时代复杂业务逻辑

发布时间:2026/7/2 3:58:08
Apache Doris Python UDF:让 SQL 直接调用 Python 生态,支撑 Agent 时代复杂业务逻辑 随着 AI 应用和实时分析场景深入进入数据平台的不再只是结构化业务表。日志、JSON、文本内容、行为事件、模型推理结果等半结构化和非结构化数据正在成为实时分析的常见对象。与此同时分析链路中要完成的工作也在变化它不再只是COUNT、SUM、GROUP BY还包括规则判断、字段解析、特征加工、标签抽取、模型打分等更复杂的业务逻辑。这些逻辑往往更适合用 Python 实现。但如果把数据导出到外部脚本或服务中处理就会带来链路拉长、时效下降、排查困难和治理复杂等问题。Doris Python UDF 要解决的正是这个割裂让开发者可以在 SQL 中创建并调用 Python 函数将 Pandas、PyArrow 等 Python 生态能力引入 Doris 查询链路在数据不离开分析链路的前提下完成更复杂的数据处理和业务计算。SelectDB 也已将这一能力纳入商业化产品体系面向企业生产环境提供更完整的运维、稳定性、安全合规和技术支持能力。像调用 SQL 函数一样调用 Python UDF使用 Doris Python UDF 的心智模型非常简单准备 Python 环境声明函数然后在 SQL 中直接调用。前置条件在创建 Python UDF/UDAF/UDTF 前需要在所有 BE 节点开启 Python UDF 相关配置并在目标 Python 环境中安装pandas与pyarrow。Python UDF Server 日志可在output/be/log/python_udf_output.log中查看。下面以支付金额风险等级评估为例创建一个 Python UDFDROP FUNCTION IF EXISTS py_risk_level(DOUBLE); CREATE FUNCTION py_risk_level(DOUBLE) RETURNS STRING PROPERTIES ( type PYTHON_UDF, symbol evaluate, runtime_version 3.12.11, always_nullable true, volatility immutable ) AS $$ def evaluate(amount): if amount is None: return None if amount 10000: return high if amount 1000: return medium return low $$;创建完成后该函数即可像 Doris 内置函数一样使用SELECT user_id, amount, py_risk_level(amount) AS risk_level FROM payment_events WHERE dt 2026-06-17 ORDER BY user_id;开发者可以把已有 Python 逻辑直接嵌入 SQL 查询链路在 Doris 内完成数据处理、规则判断和特征加工。Python UDF 核心特点基于 Arrow RecordBatch 的批量执行Python UDF 既要具备 Python 的灵活性也需要尽可能降低跨语言、跨进程调用带来的额外开销。Doris Python UDF 采用列式批量处理机制。执行过程中Doris BE 会将输入数据组织为 Arrow RecordBatch并通过 Arrow Flight 高效传输至独立的 Python Server。Python 函数完成批量计算后结果再以列式数据形式返回 Doris 查询链路。这种方式避免了传统逐行调用造成的频繁进程切换和序列化开销使 Python 扩展能力能够与 Doris 的列式执行框架保持一致在支持复杂业务逻辑的同时尽可能保持查询执行效率。支持 Pandas Series 向量化计算对于字符串处理、特征计算、字段转换、分桶映射等列式处理场景Python UDF 支持基于 Pandas Series 的向量化实现。例如可以使用 Pandas 对金额进行分桶CREATE FUNCTION py_amount_bucket(DOUBLE) RETURNS INT PROPERTIES ( type PYTHON_UDF, symbol evaluate, runtime_version 3.10.12, always_nullable true, volatility immutable ) AS $$ import pandas as pd # 显式声明pd.Series类型使用向量化实现 def evaluate(amount: pd.Series) - pd.Series: return pd.cut( amount, bins[-float(inf), 100, 1000, 10000, float(inf)], labels[0, 1, 2, 3] ).astype(Int64) $$;相比在 Python 中逐行循环处理向量化计算可以更好利用 Pandas 底层能力减少解释器循环开销适合大批量数据转换和特征加工场景。完整支持 UDF、UDAF、UDTFDoris Python 扩展能力覆盖三类函数形态同一套 Python 扩展框架能够覆盖标量计算、聚合计算和展开型处理降低不同业务逻辑接入 Doris 的复杂度。支持内联与模块化加载Doris Python UDF 支持灵活的代码组织方式。方式一内联方式。对于简单函数可以直接将 Python 代码写在CREATE FUNCTION语句中适合快速验证和小规模试验CREATE FUNCTION py_add_one(INT) RETURNS INT PROPERTIES ( type PYTHON_UDF, symbol evaluate, runtime_version 3.10.12, volatility immutable ) AS $$ def evaluate(x): return None if x is None else x 1 $$;方式二模块方式。对于复杂函数可以将 Python 代码打成 ZIP 包并通过file与symbol指定模块入口CREATE FUNCTION py_add_one(INT) RETURNS INT PROPERTIES ( type PYTHON_UDF, file file:///opt/doris/udf/math_ops.zip, symbol math_ops.add_one, runtime_version 3.10.12, volatility immutable );这种方式更适合团队协作、代码评审、依赖管理和版本发布也便于将生产级 Python 逻辑稳定接入 Doris。面向生产环境的隔离、复用与自愈Python UDF 的重点不只是把 Python 跑起来而是让它以可隔离、可复用、可恢复的方式进入生产查询链路。关键机制设计及收益介绍用户感知对业务开发者来说Python UDF 仍然只是一次普通 SQL 调用进程管理、资源复用和故障恢复由 Doris 在底层自动完成。适用场景从复杂数据处理到 AI 分析Doris Python UDF 可以应用于多类业务场景。具体如下。这使得 Doris 不仅能够完成传统实时分析中的统计计算也能够更自然地承接 AI 应用和复杂业务逻辑。结束语Doris Python UDF 提供的不只是一个函数扩展机制而是一条连接 Doris 高性能分析能力与 Python 生态的通道。当规则判断、文本处理、特征加工和模型打分可以直接嵌入 SQL 查询链路开发者就不必为了复用 Python 生态而拆分实时分析流程。数据留在 Doris 内复杂逻辑也能更自然地进入分析链路。对于希望在生产环境中稳定使用这一能力的企业用户SelectDB也已经提供对 Python、 UDF/UDAF/UDTF 的支持并结合企业级运维、稳定版本、安全合规和技术支持能力帮助用户更高效地将复杂 Python 逻辑接入实时分析与 AI 分析场景。未来随着更多 AI 能力进入数据分析链路Python UDF 的意义在于在保留 Doris 高性能 SQL 分析能力的同时把 Python 生态的灵活性引入到生产查询中让新的业务逻辑更快被验证、复用和上线。特别鸣谢来自字节跳动的WencongLiu和来自腾讯的sjyango两位同学感谢他们在社区 Python UDF 功能开发中的合作与贡献。也欢迎更多对 Doris 感兴趣的朋友积极参与社区建设共同推动 Doris 生态持续发展。