Issue of Garbled Results When Reading TiDB Database Data with Flink CDC

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: flink cdc 读取TIDB数据库数据时,结果乱码问题

| username: 消息终结者

To improve efficiency, please provide the following information. Clear problem descriptions can be resolved faster:

[TiDB Usage Environment] Testing
[TiDB Version]

  • TIDB v4.0.8
  • flink-sql-connector-tidb-cdc 2.2.1
  • flink-clients_2.12 1.13.5

[Encountered Problem]
Using the Flink CDC component to read data from TiKV, the read result is in another encoding. I want to know the specific encoding format and how to convert the read data into JSON format to facilitate data parsing. Using record.getKey().toStringUtf8() to convert to UTF-8 only works for Chinese characters without garbling.

[Reproduction Path]
The example provided on the Flink official website, running it directly results in this outcome;
Link: https://ververica.github.io/flink-cdc-connectors/master/content/connectors/tidb-cdc.html

[Problem Phenomenon and Impact]
Code as follows:

Map<String, String> map = new HashMap<>();
map.put("tikv.grpc.timeout_in_ms", "20000");
SourceFunction<String> tidbSource =
        TiDBSource.<String>builder()
                .database("db_bill_shop") // set captured database
                .tableName("test") // set captured table
                .tiConf(
                        TDBSourceOptions.getTiConfiguration(pdAddrs, map)
                )
                .snapshotEventDeserializer(
                        new TiKVSnapshotEventDeserializationSchema<String>() {
                            @Override
                            public void deserialize(Kvrpcpb.KvPair record, Collector<String> out) throws Exception {
                                // System.out.println("-------key---------"+record.getKey().toStringUtf8());
                                // System.out.println("--------value------"+record.getValue().toStringUtf8());
                                out.collect(record.toString());
                            }

                            @Override
                            public TypeInformation<String> getProducedType() {
                                return BasicTypeInfo.STRING_TYPE_INFO;
                            }
                        })
                .changeEventDeserializer(
                        new TiKVChangeEventDeserializationSchema<String>() {
                            @Override
                            public void deserialize(Cdcpb.Event.Row record, Collector<String> out) throws Exception {
                                out.collect(record.toString());
                            }

                            @Override
                            public TypeInformation<String> getProducedType() {
                                return BasicTypeInfo.STRING_TYPE_INFO;
                            }
                        })
                .startupOptions(StartupOptions.initial())
                .build();

env.enableCheckpointing(3000);
env.addSource(tiDBSource).print().setParallelism(1);
try {
    env.execute("Tidb-CDC-JOB");
} catch (Exception e) {
    log.error("Tidb-CDC-JOB execution failed!");
    e.printStackTrace();
}

Print result:

Has anyone encountered the same problem?

[Attachments]
Please provide the version information of each component, such as cdc/tikv, which can be obtained by executing cdc version/tikv-server --version.

Related Logs and Monitoring

  1. TiUP Cluster Display information
  2. TiUP Cluster Edit Config information
  3. TiDB-Overview monitoring
  4. Corresponding module logs (including logs one hour before and after the problem)
| username: Billmay表妹 | Original post link

What is the version of TiDB?

| username: 消息终结者 | Original post link

v4.0.8

| username: zzzzzz | Original post link

It is recommended to check the encoding settings of the development framework and the display window.

| username: system | Original post link

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.