【玩转daft】daft on ray执行数据杂谈

发布时间:2026/7/6 2:52:04
【玩转daft】daft on ray执行数据杂谈 1、daft on ray执行的时候 daft的所有列数据都会读到节点上吗Daft 是惰性执行的先构建逻辑计划优化后才执行。比如通过列裁剪会在计划优化阶段自动下推。几种下推Projection Pushdown只读取用到的列Filter Pushdown过滤条件下推到读取层Predicate Pushdown谓词下推到存储层Parquet/Iceberg/Lance等dfdaft.read_parquet(s3://data)# 假设 100 列dfdf.select(a,b).with_column(c,df[a]df[b])df.show()即使源文件 100 列Ray 节点间只传输 a, b 两列。Parquet 读取阶段就已经只读这两列了。2、daft on ray 数据模型场景 1: Native Runner (单机)数据完全在当前 Python 进程的内存里走 Arrow bufferdaft.context.set_runner_native()dfdaft.read_parquet(data.parquet).collect()# 数据在本进程堆内存 (Arrow buffer)零拷贝共享场景 2: Ray Runner (分布式)Ray Runner 下Daft 的基本传输单位是 MicroPartition一批 RecordBatchDaft DataFrame 底层是分区Partition集合每个分区是一个 Arrow RecordBatch被序列化为 Ray Object 存到 Object Store。DataFrame├── Partition 0 → ray.ObjectRef (在Node A的Object Store)├── Partition 1 → ray.ObjectRef (在Node B的Object Store)├── Partition 2 → ray.ObjectRef (在Node A的Object Store)└── …DataFrame 状态数据位置未 collect (lazy)无数据只有 plan执行中 (task running)各 worker 的本地进程内存 (Arrow)task 完成结果传出Ray Object Store (plasma共享内存)collect() 到 driverDriver 进程内存跨节点传输时Object Store → 网络 → 目标节点 Object Store3、什么情况下会触发跨节点传输所有分区都跨节点传吗算子是否触发跨节点传输传什么select / with_column❌ 本地执行无filter❌ 本地执行无map_partitions / UDF❌ 本地执行无 优先本地执行Ray locality-aware schedulingrepartition✅ 全 shuffle显式重分区剪枝后的所有列groupby().agg()✅ hash shuffleShuffle按key重分区数据跨节点 。剪枝后的所有列join✅ hash shuffleShuffle或Broadcast大量跨节点传输。剪枝后的所有列sort✅ range shuffle全局排序需要shuffle 剪枝后的所有列否本地优先仅shuffle和调度不均时跨节点4、怎么跨节点传输跨节点传输时Daft 通过 Ray Object Store (plasma) 传数据数据以 Arrow IPC 格式序列化零拷贝共享同节点多个 worker 读同一个 object 不复制。默认占 30% 节点内存–object-store-memory 可调 。溢出到磁盘Object Store 满了会 spill to disk/tmp/ray/。引用计数管理DataFrame 变量释放后Object 会被 GC。跨节点走 gRPC 传输会有网络开销5、怎么观察传输行为查看执行计划dfdf.select(...).groupby(...).agg(...)df.explain()# 打印物理计划可看到shuffle和列裁剪