Flink SQL 从入门到实战:4 种窗口 + TopN + 4 种 JOIN 全解析

发布时间:2026/7/3 1:06:46
Flink SQL 从入门到实战:4 种窗口 + TopN + 4 种 JOIN 全解析 本文基于 Flink 1.17 Kafka 3.2 Debezium 1.9 MySQL 5.7 环境记录从零搭建实时数仓的完整过程涵盖 4 种窗口、TopN、4 种 JOIN 的实战案例和踩坑记录。一、环境准备1.1 启动 Flink SQL Clientbashcd /opt/module/flink-1.17.0 ./bin/sql-client.sh1.2 设置展示模式sql-- Tableau 风格推荐更清晰 SET sql-client.execution.result-mode tableau; -- 或者 changelog 模式显示 I / -U / U SET sql-client.execution.result-mode changelog; -- 设置时区 SET table.local-time-zone Asia/Shanghai; -- 设置并行度 SET parallelism.default 2; -- 设置状态 TTL防止状态无限膨胀 SET table.exec.state.ttl 1h;1.3 三种展示模式对比模式命令特点tableSET sql-client.execution.result-mode table;默认表格形式展示tableauSET sql-client.execution.result-mode tableau;更紧凑适合快速浏览changelogSET sql-client.execution.result-mode changelog;显示I、-U、U观察数据更新过程二、建表模板2.1 Kafka 源表流表sqlCREATE TABLE orders_source ( order_id INT, product_id INT, quantity INT, order_time STRING, status STRING, op STRING, -- UTC → 北京时间8 小时 order_time_ts AS TO_TIMESTAMP_LTZ( (UNIX_TIMESTAMP(order_time, yyyy-MM-ddTHH:mm:ssZ) 28800) * 1000, 3 ), WATERMARK FOR order_time_ts AS order_time_ts - INTERVAL 1 SECOND ) WITH ( connector kafka, topic retail_db.retail_db.orders, properties.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092, properties.group.id flink-orders-group, scan.startup.mode earliest-offset, format debezium-json );关键参数说明参数说明scan.startup.modeearliest-offset从头消费latest-offset从最新开始formatdebezium-json解析 Debezium CDC 格式WATERMARK定义事件时间和乱序容忍度2.2 MySQL 维表Lookup TablesqlCREATE TABLE products_dim ( product_id INT, product_name STRING, category STRING, price DECIMAL(10, 2), stock INT, PRIMARY KEY (product_id) NOT ENFORCED ) WITH ( connector jdbc, url jdbc:mysql://hadoop102:3306/retail_db, table-name products, username root, password 123456, lookup.cache.max-rows 10000, lookup.cache.ttl 10min );2.3 JDBC Sink 表结果回写sqlCREATE TABLE category_sales_sink ( window_start TIMESTAMP(3), category STRING, total_sales DECIMAL(10, 2), PRIMARY KEY (window_start, category) NOT ENFORCED ) WITH ( connector jdbc, url jdbc:mysql://hadoop102:3306/retail_db, table-name minute_category_sales, username root, password 123456, sink.buffer-flush.max-rows 1, sink.buffer-flush.interval 1s );2.4 MySQL 目标表sqlCREATE TABLE IF NOT EXISTS minute_category_sales ( window_start DATETIME(3) NOT NULL, category VARCHAR(50) NOT NULL, total_sales DECIMAL(10,2), PRIMARY KEY (window_start, category) );注意MySQL 中TIMESTAMP有时区问题建议用DATETIME(3)。三、四种窗口实战3.1 滚动窗口TUMBLE—— 每分钟独立统计特点每个窗口独立计算数据不重叠。触发条件窗口结束时间到达且有数据。sqlINSERT INTO category_sales_sink SELECT TUMBLE_START(order_time_ts, INTERVAL 1 MINUTE) AS window_start, p.category, SUM(o.quantity * p.price) AS total_sales FROM orders_source o JOIN products_dim p ON o.product_id p.product_id GROUP BY TUMBLE(order_time_ts, INTERVAL 1 MINUTE), p.category;3.2 滑动窗口HOP—— 近 5 分钟滚动统计特点窗口重叠一条数据可能属于多个窗口。触发条件每个窗口结束时间到达且有数据。sqlINSERT INTO category_sales_sink SELECT HOP_START(order_time_ts, INTERVAL 1 MINUTE, INTERVAL 5 MINUTE) AS window_start, p.category, SUM(o.quantity * p.price) AS total_sales FROM orders_source o JOIN products_dim p ON o.product_id p.product_id GROUP BY HOP(order_time_ts, INTERVAL 1 MINUTE, INTERVAL 5 MINUTE), p.category;3.3 累积窗口CUMULATE—— 今日累计特点从当天 0 点开始持续累加直到当天结束。第一条数据激活后每分钟固定输出。触发条件有数据激活后每分钟定时触发类似定时器。sqlINSERT INTO category_sales_sink SELECT CUMULATE_START(order_time_ts, INTERVAL 1 MINUTE, INTERVAL 1 DAY) AS window_start, p.category, SUM(o.quantity * p.price) AS total_sales FROM orders_source o JOIN products_dim p ON o.product_id p.product_id WHERE order_time_ts TIMESTAMP 2026-07-02 00:00:00 GROUP BY CUMULATE(order_time_ts, INTERVAL 1 MINUTE, INTERVAL 1 DAY), p.category;3.4 会话窗口SESSION—— 按活跃间隙分组特点超过 5 分钟没有新数据才触发计算持续有数据则不断延长窗口。触发条件空闲时间达到阈值才触发事件驱动持续有数据就不触发。sqlSELECT SESSION_START(order_time_ts, INTERVAL 5 MINUTE) AS session_start, SESSION_END(order_time_ts, INTERVAL 5 MINUTE) AS session_end, p.category, COUNT(*) AS order_count, SUM(o.quantity * p.price) AS total_sales FROM orders_source o JOIN products_dim p ON o.product_id p.product_id GROUP BY SESSION(order_time_ts, INTERVAL 5 MINUTE), p.category;四、分组 TopN 实时排行榜场景每个品类下过去 5 分钟销售额最高的 Top 2 商品。sqlINSERT INTO top_category_products_sink SELECT window_start, window_end, category, product_name, total_sales, rank_num FROM ( SELECT window_start, window_end, category, product_name, total_sales, ROW_NUMBER() OVER ( PARTITION BY window_start, window_end, category ORDER BY total_sales DESC ) AS rank_num FROM ( SELECT HOP_START(o.order_time_ts, INTERVAL 1 MINUTE, INTERVAL 5 MINUTE) AS window_start, HOP_END(o.order_time_ts, INTERVAL 1 MINUTE, INTERVAL 5 MINUTE) AS window_end, p.category, p.product_name, SUM(o.quantity * p.price) AS total_sales FROM orders_source o JOIN products_dim p ON o.product_id p.product_id GROUP BY HOP(o.order_time_ts, INTERVAL 1 MINUTE, INTERVAL 5 MINUTE), p.category, p.product_name ) t1 WHERE total_sales 0 ) t2 WHERE rank_num 2;⚠️QUALIFY语法需要 Flink 1.181.17 用子查询方式。五、四种 JOIN 实战5.1 Regular Join双流 Join场景两条流实时任意条件关联如订单流 JOIN 支付流。特点两条流都必须有 Watermark状态会保留所有历史数据需要设置 TTL支持 CDC 流的更新/删除-U/U/-D测试数据示例sqlWITH orders AS ( SELECT 1 AS order_id, 1 AS product_id, 2 AS quantity, TIMESTAMP 2026-07-02 22:30:00 AS order_time UNION ALL SELECT 2, 1, 1, TIMESTAMP 2026-07-02 22:32:00 ), payments AS ( SELECT 1 AS payment_id, 1 AS order_id, 6999.00 AS amount, TIMESTAMP 2026-07-02 22:31:00 AS pay_time UNION ALL SELECT 2, 2, 12999.00, TIMESTAMP 2026-07-02 22:33:00 ) SELECT o.order_id, o.quantity, p.payment_id, p.amount AS pay_amount FROM orders o JOIN payments p ON o.order_id p.order_id;生产环境注意事项sql-- 必须设置状态 TTL防止状态无限膨胀 SET table.exec.state.ttl 1h; -- 两条流都必须有 Watermark -- 适用于任意条件的实时关联 SELECT ... FROM orders_source o JOIN payments_source p ON o.order_id p.order_id;5.2 Lookup Join维表 Join场景流表关联外部静态维表如 MySQL 商品表。特点维表必须有主键不阻塞 Watermark适合静态/缓慢变化的维度数据sqlSELECT o.order_id, p.product_name, p.category, o.quantity * p.price AS total_amount FROM orders_source o JOIN products_dim p ON o.product_id p.product_id;5.3 Interval Join时间区间 Join场景两条流在指定时间窗口内关联如订单和支付在 5 分钟内匹配。特点要求输入流是append-onlyCDC 流不支持状态可控只保留窗口内数据适合两条日志流的关联sqlWITH orders_test AS ( SELECT 1 AS order_id, TIMESTAMP 2026-07-02 22:30:00 AS order_time UNION ALL SELECT 2, TIMESTAMP 2026-07-02 22:32:00 UNION ALL SELECT 3, TIMESTAMP 2026-07-02 22:33:00 UNION ALL SELECT 4, TIMESTAMP 2026-07-02 22:38:00 ) SELECT o1.order_id AS left_order_id, o2.order_id AS right_order_id, o1.order_time AS left_time, o2.order_time AS right_time FROM orders_test o1 JOIN orders_test o2 ON o1.order_id o2.order_id AND o1.order_time BETWEEN o2.order_time - INTERVAL 5 MINUTE AND o2.order_time INTERVAL 5 MINUTE;5.4 Temporal Join版本表 Join场景查询订单下单时的商品价格历史版本。特点版本表必须有主键 时间属性状态只保留每个主键的最新版本适合持续更新的 CDC 流sqlWITH product_price_history AS ( SELECT 1 AS product_id, 6999.00 AS price, TIMESTAMP 2026-07-01 00:00:00 AS valid_from UNION ALL SELECT 1, 7999.00, TIMESTAMP 2026-07-02 00:00:00 UNION ALL SELECT 2, 12999.00, TIMESTAMP 2026-07-01 00:00:00 UNION ALL SELECT 2, 13999.00, TIMESTAMP 2026-07-02 00:00:00 ), orders AS ( SELECT 1 AS order_id, 1 AS product_id, TIMESTAMP 2026-07-01 12:00:00 AS order_time UNION ALL SELECT 2, 1, TIMESTAMP 2026-07-02 14:00:00 ) SELECT o.order_id, o.product_id, o.order_time, p.price AS price_at_order_time FROM orders o JOIN product_price_history p ON o.product_id p.product_id AND p.valid_from o.order_time WHERE p.valid_from ( SELECT MAX(valid_from) FROM product_price_history p2 WHERE p2.product_id p.product_id AND p2.valid_from o.order_time );六、四种 JOIN 对比总结JOIN 类型适用场景状态大小Watermark 要求CDC 流支持触发机制Regular Join双流任意条件关联大全量两条流都必须有✅ 支持数据驱动Lookup Join流表 外部维表可控缓存只要求主表有✅ 支持查询驱动Interval Join时间窗口内双流关联可控窗口内两条流都必须有❌ 不支持时间驱动Temporal Join历史版本维表关联可控快照主表必须有✅ 支持数据驱动七、踩坑记录坑 1双流 JOIN 阻塞 Watermark现象窗口一直不触发。原因两张流表 JOIN 时必须两张表都有 Watermark否则全局 Watermark 取最小值Long.MIN_VALUE。解决将维表改为 Lookup Join或给维表也加上时间属性和 Watermark。坑 2时区问题现象Flink 时间和 MySQL 时间相差 8 小时。解决sqlorder_time_ts AS TO_TIMESTAMP_LTZ( (UNIX_TIMESTAMP(order_time, yyyy-MM-ddTHH:mm:ssZ) 28800) * 1000, 3 )坑 3QUALIFY语法不支持Flink 1.18解决用子查询 WHERE rn N替代。坑 4Interval Join 不支持 CDC 流解决用 Temporal Join 或 Lookup Join 替代或用纯 JSON 数据模拟 append-only 流。坑 5资源不足NoResourceAvailableException解决bash./bin/flink list -r ./bin/flink cancel JobID SET parallelism.default 1;