XInputTest实用指南:深入解析Xbox控制器轮询率检测与性能优化
2026/4/21 23:27:26
JsonDeserializationSchema实现了 Flink 的DeserializationSchema,因此只要某个 connector 支持DeserializationSchema,你就能直接使用它。
典型用法:KafkaSource 只消费 value,反序列化成 POJO:
JsonDeserializationSchema<SomePojo>jsonFormat=newJsonDeserializationSchema<>(SomePojo.class);KafkaSource<SomePojo>source=KafkaSource.<SomePojo>builder().setValueOnlyDeserializer(jsonFormat)// ....build();适用场景:
SomePojo工程建议:
Integer/Long)应对字段缺失或 null写回 Kafka 时,JsonSerializationSchema实现了SerializationSchema,可用于任何支持SerializationSchema的 connector。
典型用法:KafkaSink 写 value,序列化 POJO 为 JSON:
JsonSerializationSchema<SomePojo>jsonFormat=newJsonSerializationSchema<>();KafkaSink<SomePojo>sink=KafkaSink.<SomePojo>builder().setRecordSerializer(newKafkaRecordSerializationSchemaBuilder<SomePojo>().setValueSerializationSchema(jsonFormat)// ....build()).build();适用场景:
Flink 允许你通过构造函数传入SerializableSupplier<ObjectMapper>来定制 mapper,相当于提供一个“ObjectMapper 工厂”。
你可以用它做很多工程级增强,比如:
示例:自定义序列化 mapper,让 map key 有序,并注册模块:
JsonSerializationSchema<SomeClass>jsonFormat=newJsonSerializationSchema<>(()->newObjectMapper().enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS).registerModule(newParameterNamesModule()));你也可以把“兼容字段变更”的设置加进去(强烈建议生产开启类似配置):
FAIL_ON_UNKNOWN_PROPERTIES关闭(这里不展开写完整 mapper 配置,你只要知道:用 supplier 你就能完全掌控 Jackson。)
在 PyFlink 中,Flink 内置了 Row 的 JSON Schema:
JsonRowDeserializationSchemaJsonRowSerializationSchema这对 Python 流处理特别友好,因为 Python 侧更常操作 Row 而不是 POJO 类。
row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowDeserializationSchema.builder()\.type_info(row_type_info)\.build()source=KafkaSource.builder()\.set_value_only_deserializer(json_format)\.build()row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowSerializationSchema.builder()\.with_type_info(row_type_info)\.build()sink=KafkaSink.builder()\.set_record_serializer(KafkaRecordSerializationSchema.builder().set_topic('test').set_value_serialization_schema(json_format).build())\.build()适用场景: