TiCDC Unable to Read Data Content, Only Basic Metadata is Retrieved

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: ticdc读取不到数据内容,只读到了基本元数据

| username: juecong

[TiDB Usage Environment]
6.0.0 kafka: 3.2.0

[Overview] Scenario + Problem Overview
ticdc fetches change content to downstream kafka, but there is no data content, only metadata is obtained, for example:
{“timestamp”:1656502486040,“name”:“records_consumed”,“count”:2,“partitions”:[{“topic”:“tidb-to-kafka-test”,“partition”:0,“count”:2,“minOffset”:10,“maxOffset”:11}]}

[Background] Operations performed
Added a config configuration file, which only contains the sync-ddl=true property, but it still doesn’t work.

[Phenomenon] Business and database phenomenon
The table has a primary key and data.

[TiDB Version]
6.0.0

| username: xinyuzhao | Original post link

Could you please provide the command to create a changefeed?

| username: juecong | Original post link

tiup cdc cli changefeed create --pd=http://192.168.60.4:2379 --sink-uri=“kafka://192.168.60.207:9092/tidb-to-kafka-test5?protocol=canal-json&kafka-version=3.2.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&enable-tidb-extension=true” --changefeed-id=“tidb-to-kafka-test1” --sort-engine=“unified”

| username: xinyuzhao | Original post link

Can data be produced and consumed normally using Kafka’s built-in ./kafka-console-producer.sh and ./kafka-console-consumer.sh?

| username: xinyuzhao | Original post link

You can simply create a table in the upstream TiDB, then insert a few rows of data into this table, and post the TiCDC logs.

| username: juecong | Original post link

cdc log, no error log

[2022/06/30 11:11:21.046 +08:00] [INFO] [ddl_puller.go:148] [“receive new ddl job”] [changefeed=tidb-to-kafka-test11_ddl_puller] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”]
[2022/06/30 11:11:21.046 +08:00] [INFO] [schema_storage.go:847] [“handle DDL”] [DDL=“CREATE TABLE testcdc.test333 (\r
id int NOT NULL,\r
name varchar(255) NULL,\r
PRIMARY KEY (id)\r
)”] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”] [changefeed=tidb-to-kafka-test11] [finishTs=434256918394961922]
[2022/06/30 11:11:21.046 +08:00] [WARN] [ddl_puller.go:144] [“ignore duplicated DDL job”] [changefeed=tidb-to-kafka-test11_ddl_puller] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”]
[2022/06/30 11:11:21.046 +08:00] [INFO] [ddl_puller.go:135] [“ddl job is nil after unmarshal”] [changefeed=tidb-to-kafka-test11_ddl_puller]
[2022/06/30 11:11:21.046 +08:00] [INFO] [ddl_puller.go:135] [“ddl job is nil after unmarshal”] [changefeed=tidb-to-kafka-test11_ddl_puller]
[2022/06/30 11:11:21.046 +08:00] [INFO] [schema_storage.go:832] [“ignore foregone DDL”] [jobID=97] [DDL=“CREATE TABLE testcdc.test333 (\r
id int NOT NULL,\r
name varchar(255) NULL,\r
PRIMARY KEY (id)\r
)”] [changefeed=tidb-to-kafka-test11] [finishTs=434256918394961922]
[2022/06/30 11:11:22.901 +08:00] [INFO] [schema.go:124] [“handle DDL”] [changefeed=tidb-to-kafka-test11] [DDL=“CREATE TABLE testcdc.test333 (\r
id int NOT NULL,\r
name varchar(255) NULL,\r
PRIMARY KEY (id)\r
)”] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”] [role=owner]
[2022/06/30 11:11:22.902 +08:00] [INFO] [ddl_sink.go:235] [“ddl is sent”] [changefeed=tidb-to-kafka-test11] [ddlSentTs=434256918394961922]
[2022/06/30 11:11:22.902 +08:00] [INFO] [scheduler.go:144] [“schedulerV2: DispatchTable”] [message=“{"owner-rev":811337,"epoch":"031790eb-949a-4502-a602-18f64801c4b8","id":96,"is-delete":false}”] [successful=true] [changefeedID=tidb-to-kafka-test11] [captureID=b290c64b-489c-4bfa-8a4f-677fb879e33d]
[2022/06/30 11:11:22.902 +08:00] [INFO] [ddl_sink.go:179] [“begin emit ddl event”] [changefeed=tidb-to-kafka-test11] [DDL=“{"StartTs":434256918382116873,"CommitTs":434256918394961922,"TableInfo":{"Schema":"testcdc","Table":"test333","TableID":96,"ColumnInfo":[{"Name":"id","Type":3},{"Name":"name","Type":15}]},"PreTableInfo":null,"Query":"CREATE TABLE testcdc.test333 (id INT NOT NULL,name VARCHAR(255) NULL,PRIMARY KEY(id))","Type":3}”]
[2022/06/30 11:11:22.902 +08:00] [INFO] [agent.go:389] [OnOwnerDispatchedTask] [changefeed=tidb-to-kafka-test11] [ownerCaptureID=b290c64b-489c-4bfa-8a4f-677fb879e33d] [ownerRev=811337] [op=“{"TableID":96,"IsDelete":false,"Epoch":"031790eb-949a-4502-a602-18f64801c4b8","FromOwnerID":"b290c64b-489c-4bfa-8a4f-677fb879e33d"}”]
[2022/06/30 11:11:22.907 +08:00] [INFO] [ddl_sink.go:187] [“Execute DDL succeeded”] [changefeed=tidb-to-kafka-test11] [ignored=false] [ddl=“{"StartTs":434256918382116873,"CommitTs":434256918394961922,"TableInfo":{"Schema":"testcdc","Table":"test333","TableID":96,"ColumnInfo":[{"Name":"id","Type":3},{"Name":"name","Type":15}]},"PreTableInfo":null,"Query":"CREATE TABLE testcdc.test333 (id INT NOT NULL,name VARCHAR(255) NULL,PRIMARY KEY(id))","Type":3}”]
[2022/06/30 11:11:22.997 +08:00] [INFO] [agent.go:295] [“Agent start processing operation”] [changefeed=tidb-to-kafka-test11] [op=“{"TableID":96,"IsDelete":false,"Epoch":"031790eb-949a-4502-a602-18f64801c4b8","FromOwnerID":"b290c64b-489c-4bfa-8a4f-677fb879e33d"}”]
[2022/06/30 11:11:22.997 +08:00] [INFO] [processor.go:109] [“adding table”] [tableID=96] [changefeed=tidb-to-kafka-test11]
[2022/06/30 11:11:22.998 +08:00] [INFO] [processor.go:1015] [“Add table pipeline”] [tableID=96] [changefeed=tidb-to-kafka-test11] [name=testcdc.test333] [replicaInfo=“{"start-ts":434256918394961922,"mark-table-id":0}”] [globalResolvedTs=434256918394961922]
[2022/06/30 11:11:22.998 +08:00] [INFO] [client.go:512] [“event feed started”] [span=“[7480000000000000ff605f720000000000fa, 7480000000000000ff605f730000000000fa)”] [startTs=434256918394961922] [changefeed=tidb-to-kafka-test11]
[2022/06/30 11:11:22.999 +08:00] [INFO] [region_range_lock.go:222] [“range locked”] [changefeed=tidb-to-kafka-test11] [lockID=105] [regionID=11005] [version=82] [startKey=7480000000000000ff605f720000000000fa] [endKey=7480000000000000ff605f730000000000fa] [checkpointTs=434256918394961922]
[2022/06/30 11:11:22.999 +08:00] [INFO] [client.go:755] [“creating new stream to store to send request”] [changefeed=tidb-to-kafka-test11] [regionID=11005] [requestID=230] [storeID=5] [addr=192.168.60.4:20161]
[2022/06/30 11:11:22.999 +08:00] [INFO] [puller.go:217] [“puller is initialized”] [changefeed=tidb-to-kafka-test11] [duration=1.31316ms] [tableID=96] [spans=“["[7480000000000000ff605f720000000000fa, 7480000000000000ff605f730000000000fa)"]”] [resolvedTs=434256918394961922]
[2022/06/30 11:11:23.000 +08:00] [INFO] [client.go:801] [“start new request”] [changefeed=tidb-to-kafka-test11] [request=“{"header":{"cluster_id":7103783547263542403,"ticdc_version":"6.0.0"},"region_id":11005,"region_epoch":{"conf_ver":5,"version":82},"checkpoint_ts":434256918394961922,"start_key":"dIAAAAAAAAD/YF9yAAAAAAD6","end_key":"dIAAAAAAAAD/YF9zAAAAAAD6","request_id":230,"extra_op":1,"Request":null}”] [addr=192.168.60.4:20161]
[2022/06/30 11:11:23.196 +08:00] [INFO] [processor.go:175] [“Add Table finished”] [changefeed=tidb-to-kafka-test11] [tableID=96]
[2022/06/30 11:11:23.197 +08:00] [INFO] [agent.go:330] [“Agent finish processing operation”] [changefeed=tidb-to-kafka-test11] [op=“{"TableID":96,"IsDelete":false,"Epoch":"031790eb-949a-4502-a602-18f64801c4b8","FromOwnerID":"b290c64b-489c-4bfa-8a4f-677fb879e33d"}”]
[2022/06/30 11:11:23.197 +08:00] [INFO] [agent.go:212] [“SchedulerAgent: FinishTableOperation”] [message=“{"id":96,"epoch":"031790eb-949a-4502-a602-18f64801c4b8"}”] [successful=true] [changefeedID=tidb-to-kafka-test11] [ownerID=b290c64b-489c-4bfa-8a4f-677fb879e33d]
[2022/06/30 11:11:23.197 +08:00] [INFO] [schedule_dispatcher.go:567] [“owner received dispatch finished”] [changefeed=tidb-to-kafka-test11] [captureID=b290c64b-489c-4bfa-8a4f-677fb879e33d] [tableID=96] [epoch=031790eb-949a-4502-a602-18f64801c4b8]
[2022/06/30 11:11:23.299 +08:00] [INFO] [ddl_sink.go:218] [“ddl already executed”] [changefeed=tidb-to-kafka-test11] [ddlFinishedTs=434256918394961922] [DDL=“{"StartTs":434256918382116873,"CommitTs":434256918394961922,"TableInfo":{"Schema":"testcdc","Table":"test333","TableID":96,"ColumnInfo":[{"Name":"id","Type":3},{"Name":"name","Type":15}]},"PreTableInfo":null,"Query":"CREATE TABLE testcdc.test333 (id INT NOT NULL,name VARCHAR(255) NULL,PRIMARY KEY(id))","Type":3}”]

| username: juecong | Original post link

At present, another strange phenomenon has occurred. Kafka is continuously logging, and the offset is constantly changing! image|690x267

| username: xinyuzhao | Original post link

What is the full name of Kafka used in the screenshot above?

| username: juecong | Original post link

It can produce and consume normally, but there may be an issue with Kafka consumption, as there is no message body.

| username: xinyuzhao | Original post link

I checked the log and saw “Execute DDL succeeded,” which indicates that the DDL has been successfully written to the downstream Kafka.

| username: juecong | Original post link

tiup cdc cli changefeed create --pd=http://192.168.60.4:2379 --sink-uri=“kafka://192.168.60.207:9092/tidb-to-kafka-test11?protocol=canal-json&kafka-version=3.2.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&enable-tidb-extension=true” --changefeed-id=“tidb-to-kafka-test11” --sort-engine=“unified”

| username: juecong | Original post link

I use Flink to monitor the current Kafka topic, and it keeps logging. However, Flink can monitor for new data additions.

| username: xinyuzhao | Original post link

This Flink screenshot looks normal; it is the normal output of canal-json.

| username: xinyuzhao | Original post link

The screenshot above that only shows metadata, I suspect that the Kafka consumption command might be incorrect.

| username: juecong | Original post link

My consumption command:
bin/kafka-verifiable-consumer.sh --broker-list 192.168.60.207:9092 --topic tidb-to-kafka-test11 --group-id test11

| username: juecong | Original post link

Flink’s monitored Kafka is also constantly refreshing metadata, and the offset is continuously changing as well.

| username: xinyuzhao | Original post link

Try using kafka-console-consumer.sh to see if it still only outputs metadata.

| username: juecong | Original post link

Yes, it is still refreshing.

| username: xinyuzhao | Original post link

Do you mean that Flink sees the metadata and data mixed together in the output?

| username: juecong | Original post link

Yes,


{“id”:0,“database”:“testcdc”,“table”:“test333”,“pkNames”:[“id”],“isDdl”:false,“type”:“INSERT”,“es”:1656559518981,“ts”:1656559520925,“sql”:“”,“sqlType”:{“id”:4,“name”:12},“mysqlType”:{“id”:“int”,“name”:“varchar”},“data”:[{“id”:“22”,“name”:“bb”}],“old”:null,“_tidb”:{“commitTs”:434257138543755266}}
{“id”:0,“database”:“”,“table”:“”,“pkNames”:null,“isDdl”:false,“type”:“TIDB_WATERMARK”,“es”:1656559520880,“ts”:1656559523120,“sql”:“”,“sqlType”:null,“mysqlType”:null,“data”:null,“old”:null,“_tidb”:{“watermarkTs”:434257139041566722}}
{“id”:0,“database”:“”,“table”:“”,“pkNames”:null,“isDdl”:false,“type”:“TIDB_WATERMARK”,“es”:1656559521880,“ts”:1656559524120,“sql”:“”,“sqlType”:null,“mysqlType”:null,“data”:null,“old”:null,“_tidb”:{“watermarkTs”:434257139303710722}}