TiCDC can't read the data content. It only reads basic metadata

Note that this question was originally posted on the Chinese TiDB Forum. We copied it here to help new users solve their problems quickly.

Application environment

Production environment

TiDB version

TiDB v6.0.0 + kafka v3.2.0

Reproduction method

I added a config file with only the sync-ddl=true attribute

The command to create changefeed:

tiup cdc cli changefeed create --pd=http://192.168.60.4:2379 --sink-uri=“kafka://192.168.60.207:9092/tidb-to-kafka-test5?protocol=canal-json&kafka-version=3.2.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&enable-tidb-extension=true” --changefeed-id=“tidb-to-kafka-test1” --sort-engine=“unified”

Problem

TiCDC replicates the changes into the downstream Kafka. But the change data includes no data content, only metadata, e.g. {“timestamp”:1656502486040,“name”:“records_consumed”,“count”:2,“partitions”:[{“topic”:“tidb-to-kafka-test”,“partition”:0,“count”:2,“minOffset”:10,“maxOffset”:11}]}

TiCDC has no error logs:

[2022/06/30 11:11:21.046 +08:00] [INFO] [ddl_puller.go:148] [“receive new ddl job”] [changefeed=tidb-to-kafka-test11_ddl_puller] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”]
[2022/06/30 11:11:21.046 +08:00] [INFO] [schema_storage.go:847] [“handle DDL”] [DDL=“CREATE TABLE testcdc.test333 (\r\ id int NOT NULL,\r\ name varchar(255) NULL,\r\ PRIMARY KEY (id)\r\ )”] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”] [changefeed=tidb-to-kafka-test11] [finishTs=434256918394961922]
[2022/06/30 11:11:21.046 +08:00] [WARN] [ddl_puller.go:144] [“ignore duplicated DDL job”] [changefeed=tidb-to-kafka-test11_ddl_puller] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”]
[2022/06/30 11:11:21.046 +08:00] [INFO] [ddl_puller.go:135] [“ddl job is nil after unmarshal”] [changefeed=tidb-to-kafka-test11_ddl_puller]
[2022/06/30 11:11:21.046 +08:00] [INFO] [ddl_puller.go:135] [“ddl job is nil after unmarshal”] [changefeed=tidb-to-kafka-test11_ddl_puller]
[2022/06/30 11:11:21.046 +08:00] [INFO] [schema_storage.go:832] [“ignore foregone DDL”] [jobID=97] [DDL=“CREATE TABLE testcdc.test333 (\r\ id int NOT NULL,\r\ name varchar(255) NULL,\r\ PRIMARY KEY (id)\r\ )”] [changefeed=tidb-to-kafka-test11] [finishTs=434256918394961922]
[2022/06/30 11:11:22.901 +08:00] [INFO] [schema.go:124] [“handle DDL”] [changefeed=tidb-to-kafka-test11] [DDL=“CREATE TABLE testcdc.test333 (\r\ id int NOT NULL,\r\ name varchar(255) NULL,\r\ PRIMARY KEY (id)\r\ )”] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”] [role=owner]
[2022/06/30 11:11:22.902 +08:00] [INFO] [ddl_sink.go:235] [“ddl is sent”] [changefeed=tidb-to-kafka-test11] [ddlSentTs=434256918394961922]
[2022/06/30 11:11:22.902 +08:00] [INFO] [scheduler.go:144] [“schedulerV2: DispatchTable”] [message="{“owner-rev”:811337,“epoch”:“031790eb-949a-4502-a602-18f64801c4b8”,“id”:96,“is-delete”:false}"] [successful=true] [changefeedID=tidb-to-kafka-test11] [captureID=b290c64b-489c-4bfa-8a4f-677fb879e33d]
[2022/06/30 11:11:22.902 +08:00] [INFO] [ddl_sink.go:179] [“begin emit ddl event”] [changefeed=tidb-to-kafka-test11] [DDL="{“StartTs”:434256918382116873,“CommitTs”:434256918394961922,“TableInfo”:{“Schema”:“testcdc”,“Table”:“test333”,“TableID”:96,“ColumnInfo”:[{“Name”:“id”,“Type”:3},{“Name”:“name”,“Type”:15}]},“PreTableInfo”:null,“Query”:“CREATE TABLE testcdc.test333 (id INT NOT NULL,name VARCHAR(255) NULL,PRIMARY KEY(id))”,“Type”:3}"]
[2022/06/30 11:11:22.902 +08:00] [INFO] [agent.go:389] [OnOwnerDispatchedTask] [changefeed=tidb-to-kafka-test11] [ownerCaptureID=b290c64b-489c-4bfa-8a4f-677fb879e33d] [ownerRev=811337] [op="{“TableID”:96,“IsDelete”:false,“Epoch”:“031790eb-949a-4502-a602-18f64801c4b8”,“FromOwnerID”:“b290c64b-489c-4bfa-8a4f-677fb879e33d”}"]
[2022/06/30 11:11:22.907 +08:00] [INFO] [ddl_sink.go:187] [“Execute DDL succeeded”] [changefeed=tidb-to-kafka-test11] [ignored=false] [ddl="{“StartTs”:434256918382116873,“CommitTs”:434256918394961922,“TableInfo”:{“Schema”:“testcdc”,“Table”:“test333”,“TableID”:96,“ColumnInfo”:[{“Name”:“id”,“Type”:3},{“Name”:“name”,“Type”:15}]},“PreTableInfo”:null,“Query”:“CREATE TABLE testcdc.test333 (id INT NOT NULL,name VARCHAR(255) NULL,PRIMARY KEY(id))”,“Type”:3}"]
[2022/06/30 11:11:22.997 +08:00] [INFO] [agent.go:295] [“Agent start processing operation”] [changefeed=tidb-to-kafka-test11] [op="{“TableID”:96,“IsDelete”:false,“Epoch”:“031790eb-949a-4502-a602-18f64801c4b8”,“FromOwnerID”:“b290c64b-489c-4bfa-8a4f-677fb879e33d”}"]
[2022/06/30 11:11:22.997 +08:00] [INFO] [processor.go:109] [“adding table”] [tableID=96] [changefeed=tidb-to-kafka-test11]
[2022/06/30 11:11:22.998 +08:00] [INFO] [processor.go:1015] [“Add table pipeline”] [tableID=96] [changefeed=tidb-to-kafka-test11] [name=testcdc.test333] [replicaInfo="{“start-ts”:434256918394961922,“mark-table-id”:0}"] [globalResolvedTs=434256918394961922]
[2022/06/30 11:11:22.998 +08:00] [INFO] [client.go:512] [“event feed started”] [span="[7480000000000000ff605f720000000000fa, 7480000000000000ff605f730000000000fa)"] [startTs=434256918394961922] [changefeed=tidb-to-kafka-test11]
[2022/06/30 11:11:22.999 +08:00] [INFO] [region_range_lock.go:222] [“range locked”] [changefeed=tidb-to-kafka-test11] [lockID=105] [regionID=11005] [version=82] [startKey=7480000000000000ff605f720000000000fa] [endKey=7480000000000000ff605f730000000000fa] [checkpointTs=434256918394961922]
[2022/06/30 11:11:22.999 +08:00] [INFO] [client.go:755] [“creating new stream to store to send request”] [changefeed=tidb-to-kafka-test11] [regionID=11005] [requestID=230] [storeID=5] [addr=192.168.60.4:20161]
[2022/06/30 11:11:22.999 +08:00] [INFO] [puller.go:217] [“puller is initialized”] [changefeed=tidb-to-kafka-test11] [duration=1.31316ms] [tableID=96] [spans="["[7480000000000000ff605f720000000000fa, 7480000000000000ff605f730000000000fa)"]"] [resolvedTs=434256918394961922]
[2022/06/30 11:11:23.000 +08:00] [INFO] [client.go:801] [“start new request”] [changefeed=tidb-to-kafka-test11] [request="{“header”:{“cluster_id”:7103783547263542403,“ticdc_version”:“6.0.0”},“region_id”:11005,“region_epoch”:{“conf_ver”:5,“version”:82},“checkpoint_ts”:434256918394961922,“start_key”:“dIAAAAAAAAD/YF9yAAAAAAD6”,“end_key”:“dIAAAAAAAAD/YF9zAAAAAAD6”,“request_id”:230,“extra_op”:1,“Request”:null}"] [addr=192.168.60.4:20161]
[2022/06/30 11:11:23.196 +08:00] [INFO] [processor.go:175] [“Add Table finished”] [changefeed=tidb-to-kafka-test11] [tableID=96]
[2022/06/30 11:11:23.197 +08:00] [INFO] [agent.go:330] [“Agent finish processing operation”] [changefeed=tidb-to-kafka-test11] [op="{“TableID”:96,“IsDelete”:false,“Epoch”:“031790eb-949a-4502-a602-18f64801c4b8”,“FromOwnerID”:“b290c64b-489c-4bfa-8a4f-677fb879e33d”}"]
[2022/06/30 11:11:23.197 +08:00] [INFO] [agent.go:212] [“SchedulerAgent: FinishTableOperation”] [message="{“id”:96,“epoch”:“031790eb-949a-4502-a602-18f64801c4b8”}"] [successful=true] [changefeedID=tidb-to-kafka-test11] [ownerID=b290c64b-489c-4bfa-8a4f-677fb879e33d]
[2022/06/30 11:11:23.197 +08:00] [INFO] [schedule_dispatcher.go:567] [“owner received dispatch finished”] [changefeed=tidb-to-kafka-test11] [captureID=b290c64b-489c-4bfa-8a4f-677fb879e33d] [tableID=96] [epoch=031790eb-949a-4502-a602-18f64801c4b8]
[2022/06/30 11:11:23.299 +08:00] [INFO] [ddl_sink.go:218] [“ddl already executed”] [changefeed=tidb-to-kafka-test11] [ddlFinishedTs=434256918394961922] [DDL="{“StartTs”:434256918382116873,“CommitTs”:434256918394961922,“TableInfo”:{“Schema”:“testcdc”,“Table”:“test333”,“TableID”:96,“ColumnInfo”:[{“Name”:“id”,“Type”:3},{“Name”:“name”,“Type”:15}]},“PreTableInfo”:null,“Query”:“CREATE TABLE testcdc.test333 (id INT NOT NULL,name VARCHAR(255) NULL,PRIMARY KEY(id))”,“Type”:3}"]

The TiCDC log shows “Execute DDL succeeded.” It means that the DDL operations have been written to the downstream Kafka properly.

I’m using Flink to monitor the current Kafka topic. It was swiping logs. I can see that there was new data.

{“id”:0,“database”:“testcdc”,“table”:“test333”,“pkNames”:[“id”],“isDdl”:false,“type”:“INSERT”,“es”:1656559518981,“ts”:1656559520925,“sql”:“”,“sqlType”:{“id”:4,“name”:12},“mysqlType”:{“id”:“int”,“name”:“varchar”},“data”:[{“id”:“22”,“name”:“bb”}],“old”:null,“_tidb”:{“commitTs”:434257138543755266}}
{“id”:0,“database”:“”,“table”:“”,“pkNames”:null,“isDdl”:false,“type”:“TIDB_WATERMARK”,“es”:1656559520880,“ts”:1656559523120,“sql”:“”,“sqlType”:null,“mysqlType”:null,“data”:null,“old”:null,“_tidb”:{“watermarkTs”:434257139041566722}}
{“id”:0,“database”:“”,“table”:“”,“pkNames”:null,“isDdl”:false,“type”:“TIDB_WATERMARK”,“es”:1656559521880,“ts”:1656559524120,“sql”:“”,“sqlType”:null,“mysqlType”:null,“data”:null,“old”:null,“_tidb”:{“watermarkTs”:434257139303710722}}

It’s the normal data output in canal-json format. For example, this data represents a watermark event.

{“id”:0,“database”:"",“table”:"",“pkNames”:null,“isDdl”:false,“type”:“TIDB_WATERMARK”,“es”:1656559521880,“ts”:1656559524120,“sql”:"",“sqlType”:null,“mysqlType”:null,“data”:null,“old”:null,"_tidb":{“watermarkTs”:434257139303710722}}

Can I output only the data? I don’t care about the metadata and empty data, and I need to filter them.

You can specify enable-tidb-extension=false when creating changefeed, or leave it out. For a detailed description of the canal-json watermark event, please refer to this doc WATERMARK Event.