暗无天日

=============>DarkSun的个人博客

读:数据管道中Schema变更的四种形状

数据管道跑了几个月没问题,某天突然挂了。翻日志一看,报了 SchemaError ,原因是上游数据加了一列属性。

原文 来自 Polars 官方博客,讨论的就是这个问题:schema 变更不可避免,但不同存储格式应对变更的能力差异很大。原文以 Polars 为工具讲解具体参数,本文提取其中的通用原则,重点看不同格式在 schema 演化上的设计取舍。

四种形状

Schema 变更看似五花八门,归纳起来只有四种:

形状 例子 能否自动处理
新增列(Additive) 上游加了一个 category 字段 可以,旧数据填 null
缺失列(Subtractive) 期望的列不在新数据里了 可以,新数据填 null
类型漂移(Type drift) Int32 变成 Int64 ,因为数据量大了整数范围不够用 部分可以,只支持无损拓宽
破坏性变更(Breaking) 列被重命名、语义变了、类型不兼容 不行,必须手动处理

前三种有自动化的空间,第四种没有。遇到报错时,先判断是哪种形状,再找对应的处理方式。

顺便说一下"类型漂移只支持无损拓宽"这件事。 Int64 缩到 Int32 ,超过 2^31 的值就溢出了;浮点数缩窄精度也会丢失。所以自动处理只能往宽了走( Int32Int64 ),不能往窄了走。

四种格式的设计哲学

CSV:没有元数据,一切靠猜

CSV 文件不携带类型信息,读取时工具只能采样前若干行来推断类型。如果后面的行突然出现小数,而采样推断出来的是整数,就报错。

生产环境应该提前声明已知列的类型(告诉工具"这一列就是浮点数")。否则一旦推断错了就会酿成故障。

Parquet:文件级元数据,多文件时易出错

Parquet 是列式存储格式,同一列的数据连续存放。分析查询(对某列求和、求平均)只需读相关列,不用扫描整行。每个文件自带 schema 元数据(列名和类型),还附带统计信息(每列的最大值、最小值、记录数),查询引擎可以利用这些统计跳过不相关的文件。

单文件没有推断问题。问题出在"多个文件"上:多数工具(如 events_*.parquet )在扫描多文件时,以第一个文件的 schema 为基准,后续不一致就报错。

新增列时,可以声明一个超集 schema,让旧文件里没有的列填 null。类型漂移时,可以让整数自动拓宽。如果两种问题同时出现(列不同 + 类型不同),可以独立读取各文件再按列名对齐,类型取公共超类型。

Delta Lake:事务日志,显式声明演化

Delta Lake 在 Parquet 数据旁边维护一个 _delta_log/ 目录,里面是按序号命名的 JSON 文件( 00001.json00002.json 、...),每次写入操作生成一个新的 JSON,记录这次加了哪些文件、删了哪些文件、schema 有没有变。定期还会合并为 Parquet 格式的 checkpoint 文件防止日志过长。这种设计让 Parquet 有了事务能力:多个写入者通过乐观并发控制协调(先提交者赢,其他人重试),读操作通过日志就知道该读哪些文件,不用扫描全部。

写入时默认严格匹配 schema,不一致就拒绝。要让 schema 演化,需要显式声明允许合并:新旧列的增减自动处理,旧数据自动填 null。重命名和类型不兼容仍然需要手动处理。和 Parquet 相比,Delta Lake 能通过事务日志回溯 schema 变更的历史,知道哪天加了列、哪天改了类型。

Iceberg:field ID 追踪

Iceberg 是四种格式中对 schema 演化支持最完整的。它的元数据是分层的:最上层是 metadata 文件,指向各个 snapshot(快照);每个 snapshot 指向一个 manifest list;manifest list 指向 manifest 文件;manifest 文件记录一组数据文件及其统计信息(记录数、文件大小、列的上下界)。这种分层结构让查询引擎在规划阶段就能跳过大量不相关的文件,不用真正读数据。底层数据文件可以是 Parquet、ORC 或 Avro 格式。

Schema 演化做得好,核心在于一个设计选择:用稳定的 field ID 而不是列名来追踪列。

其他格式都是按列名或位置匹配。列名改了,旧数据文件找不到对应的列;位置变了,列会对不上号。Iceberg 的做法是:每列有一个数字 ID(如 1、2、3),存储在 catalog(元数据服务,通常用 Hive Metastore、AWS Glue 或 REST catalog)里。列名改了,ID 不变。读的时候 catalog 知道 "ID=2 现在叫 category " ,自动映射。加列、删列、重命名,都不需要重写数据文件。

from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, LongType

schema = Schema(
    NestedField(field_id=1, name="id", field_type=LongType()),
    NestedField(field_id=2, name="value", field_type=LongType()),
)

注意 field_id 参数:这是 Iceberg 追踪列的标识符,和列名无关。之后通过 catalog 的 update_schema() 方法加列、删列、重命名,都是操作这个 ID 映射,不动数据文件。读取时,扫描器始终读 catalog 中的最新 snapshot(快照,某个时间点的完整表状态),自动处理列的增删和重命名,不需要任何额外参数。

对比与选择

CSV 四项全部手动,不单独列出。

需求 Parquet Delta Lake Iceberg
新增列 声明超集 schema 自动(merge 模式) 自动(field ID)
缺失列 指定参数填充 null 自动(merge 模式) 自动(field ID)
类型漂移 自动拓宽 手动 自动(无损拓宽)
列重命名 手动 手动 自动(field ID 不变)
额外设施 需要 catalog 服务

Iceberg 支持最完整,但需要额外搭建 catalog 服务。Delta Lake 不需要额外服务,文件系统就够了。Parquet 也没有额外依赖,但多个文件 schema 不一致时只能自己写代码处理。CSV 连列类型都没有,全靠工具猜。

你的 schema 变更频繁吗?如果频繁且需要自动化,Iceberg 的 catalog 开销就值。schema 稳定的话,Parquet 或 Delta Lake 就足够。如果连 schema 都还没有,那么先搞定 CSV 的类型推断再说吧。

data-engineering : schema-evolution : parquet : delta-lake : iceberg