Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.Original topic: flink cdc 读取TIDB数据库数据时,结果乱码问题
To improve efficiency, please provide the following information. Clear problem descriptions can be resolved faster:
[TiDB Usage Environment] Testing
[TiDB Version]
- TIDB v4.0.8
- flink-sql-connector-tidb-cdc 2.2.1
- flink-clients_2.12 1.13.5
[Encountered Problem]
Using the Flink CDC component to read data from TiKV, the read result is in another encoding. I want to know the specific encoding format and how to convert the read data into JSON format to facilitate data parsing. Using record.getKey().toStringUtf8()
to convert to UTF-8 only works for Chinese characters without garbling.
[Reproduction Path]
The example provided on the Flink official website, running it directly results in this outcome;
Link: https://ververica.github.io/flink-cdc-connectors/master/content/connectors/tidb-cdc.html
[Problem Phenomenon and Impact]
Code as follows:
Map<String, String> map = new HashMap<>();
map.put("tikv.grpc.timeout_in_ms", "20000");
SourceFunction<String> tidbSource =
TiDBSource.<String>builder()
.database("db_bill_shop") // set captured database
.tableName("test") // set captured table
.tiConf(
TDBSourceOptions.getTiConfiguration(pdAddrs, map)
)
.snapshotEventDeserializer(
new TiKVSnapshotEventDeserializationSchema<String>() {
@Override
public void deserialize(Kvrpcpb.KvPair record, Collector<String> out) throws Exception {
// System.out.println("-------key---------"+record.getKey().toStringUtf8());
// System.out.println("--------value------"+record.getValue().toStringUtf8());
out.collect(record.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.changeEventDeserializer(
new TiKVChangeEventDeserializationSchema<String>() {
@Override
public void deserialize(Cdcpb.Event.Row record, Collector<String> out) throws Exception {
out.collect(record.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.startupOptions(StartupOptions.initial())
.build();
env.enableCheckpointing(3000);
env.addSource(tiDBSource).print().setParallelism(1);
try {
env.execute("Tidb-CDC-JOB");
} catch (Exception e) {
log.error("Tidb-CDC-JOB execution failed!");
e.printStackTrace();
}
Print result:
Has anyone encountered the same problem?
[Attachments]
Please provide the version information of each component, such as cdc/tikv, which can be obtained by executing cdc version
/tikv-server --version
.
Related Logs and Monitoring
- TiUP Cluster Display information
- TiUP Cluster Edit Config information
- TiDB-Overview monitoring
- Corresponding module logs (including logs one hour before and after the problem)