1. 从原始数据到目标表:转换器的完整工作流
想象一下你刚拿到一份用户数据表,里面混杂着各种格式混乱的信息:日期可能是"2023-05-12"也可能是"20230512",电话号码有的带区号有的不带,姓名字段里还夹杂着特殊字符。这时候就需要一套完整的数据清洗流水线,而Seatunnel的转换器就是这条流水线上的智能机器人。
我最近处理过一个电商用户数据集,原始数据包含23个字段,但数据仓库只需要15个标准化字段。通过组合使用Copy、Replace和Split转换器,三天的工作量压缩到15分钟就完成了。比如地址字段"上海市-浦东新区-张江镇",用Split转换器按"-"分隔后,直接生成省市区三个标准字段,后续分析效率提升了70%。
2. 基础转换器:数据清洗的瑞士军刀
2.1 字段复制与重命名实战
Copy转换器看似简单,但在实际项目中能解决大问题。上周我遇到个典型场景:原始表的region_name字段要被两个下游系统使用,但一个系统要求字段名保持原样,另一个系统要求改名为area_name。用以下配置就搞定了:
transform { Copy { source_table_name = "user_source" result_table_name = "user_temp" fields { id = id region_name = region_name # 保持原名 area_name = region_name # 复制为新字段 } } }特别注意:字段复制会占用额外内存,当处理千万级数据时,建议先用FieldSelector过滤掉不需要的字段再复制。有次我直接复制包含50个字段的百万行数据,内存直接飙到90%,后来优化成先筛选后复制,内存消耗降了60%。
2.2 智能过滤的两种姿势
Filter和DataFilter这对组合特别实用。Filter像精确的剪刀,只保留指定字段:
transform { Filter { fields = [id, name, phone] # 只留这三个字段 } }而DataFilter则是智能筛子,按条件过滤行数据。最近帮客户处理用户数据时,就用它过滤了无效记录:
transform { DataFilter { condition = "age >= 18 AND phone IS NOT NULL" # 只要成年且有手机号的用户 } }踩坑提醒:条件表达式中的字段名要完全匹配源字段名。有次我把"user_age"写成"age",过滤直接失效,排查了半小时才发现这个大小写问题。
3. 进阶转换器:处理复杂数据变形
3.1 字符串手术刀:Replace转换器
Replace转换器是处理脏数据的利器。上周清洗一批商品数据时,遇到价格字段里混着"¥"符号和空格:
transform { Replace { replace_field = "price" pattern = "[¥ ]" # 匹配¥符号和空格 replacement = "" # 替换为空 } }更强大的地方在于支持正则表达式。比如统一格式化手机号:
pattern = "(\\d{3})(\\d{4})(\\d{4})" replacement = "$1-$2-$3"3.2 字段拆分与类型转换组合拳
Split转换器配合TypeConverter能解决很多格式问题。处理用户注册日志时,我用这套组合把"2023-05-12 14:30:00"拆解并转换:
transform { Split { split_field = "register_time" separator = " " output_fields = [reg_date, reg_time] } TypeConverter { field_conversion { reg_date { from = "string" to = "date" } reg_time { from = "string" to = "time" } } } }性能提示:类型转换比较耗资源,建议先过滤再转换。实测显示,对10万行数据先Filter再TypeConverter,比直接转换快2倍。
4. SQL转换器:终极武器
4.1 用SQL函数处理复杂逻辑
当内置转换器不够用时,SQL函数就是救星。比如清洗用户地址时:
transform { SqlFunction { function = "REGEXP_REPLACE" field = "address" pattern = "([0-9]+)号" replacement = "No.$1" } }还支持函数嵌套,像这个把手机号中间四位打码的例子:
function = "CONCAT(SUBSTRING(phone,1,3),'****',SUBSTRING(phone,8))"4.2 自定义SQL的威力
对于需要多步计算的场景,直接写SQL更高效:
transform { Sql { query = """ SELECT id, CASE WHEN score>90 THEN 'A' WHEN score>60 THEN 'B' ELSE 'C' END AS level, DATEDIFF(expire_date,CURRENT_DATE) AS remain_days FROM source_table """ } }实战经验:复杂SQL建议先在数据库客户端测试通过再放入配置。有次直接在配置里写5层嵌套SQL,出错后调试特别困难,后来改成分步SQL就清晰多了。