TiDB CDC Data Synchronization Exception

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: tidb cdc 同步数据异常

| username: 表渣渣渣

[TiDB Usage Environment] Test
[TiDB Version] v7.5.1
[Reproduction Path] Merging multiple data entries together
[Encountered Issue: Problem Phenomenon and Impact]

Table creation statement:

CREATE TABLE ods.`ods_test` (
  `id` int(11) NOT NULL,
  `user_id` int(11) DEFAULT NULL COMMENT 'User ID',
  `user_name` varchar(48) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '',
  `mysql_delete_type` int(11) NOT NULL DEFAULT '0' COMMENT 'MySQL data type',
  PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='';

Creating CDC:

case-sensitive = true

[filter]
rules = ['!*.*', 'ods.*']

[mounter]
worker-num = 16
[sink]
dispatchers = [
     {matcher = ['*.*'], dispatcher = "table"}
         ]
protocol = "maxwell"

Executing SQL:

INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(1, 10749, '111', 0);

Kafka displays data:

{
  "database": "ods",
  "table": "ods_test",
  "type": "insert",
  "ts": 1719833732,
  "data": {
    "id": 1,
    "mysql_delete_type": 0,
    "user_id": 10749,
    "user_name": "111"
  }
}

Executing SQL again:

INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(4, 10596, '555', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(5, 10749, '666', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(6, 10596, '777', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(7, 10749, '888', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(8, 10596, '999', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(9, 10749, '000', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(10, 10596, '111', 0);

Kafka displays data:

First entry:

{
  "database": "ods",
  "table": "ods_test",
  "type": "insert",
  "ts": 1719833912,
  "data": {
    "id": 4,
    "mysql_delete_type": 0,
    "user_id": 10596,
    "user_name": "555"
  }
}

Second entry:

{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":5,"mysql_delete_type":0,"user_id":10749,"user_name":"666"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":6,"mysql_delete_type":0,"user_id":10596,"user_name":"777"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":7,"mysql_delete_type":0,"user_id":10749,"user_name":"888"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":8,"mysql_delete_type":0,"user_id":10596,"user_name":"999"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":9,"mysql_delete_type":0,"user_id":10749,"user_name":"000"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":10,"mysql_delete_type":0,"user_id":10596,"user_name":"111"}}

Change: dispatch = “ts”, clear table data,
Executing SQL:

INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(2, 10596, '222', 0),
(3, 10749, '333', 0),
(4, 10596, '555', 0);

Kafka data (still merged together):

{"database":"ods","table":"ods_test","type":"insert","ts":1719884437,"data":{"id":2,"mysql_delete_type":0,"user_id":10596,"user_name":"222"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719884437,"data":{"id":3,"mysql_delete_type":0,"user_id":10749,"user_name":"333"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719884437,"data":{"id":4,"mysql_delete_type":0,"user_id":10596,"user_name":"555"}}
| username: 表渣渣渣 | Original post link

Changing the configuration file: max-batch-size=1 is ineffective.

| username: 表渣渣渣 | Original post link

Currently, the data requirements are that the downstream parses JSON, so do not merge multiple entries.

| username: 表渣渣渣 | Original post link

Currently, after testing dispatcher = “table”, multiple rows are merged.
dispatch = “rowid” reports an error: Error: [CDC:ErrDispatcherFailed] index not found when verifying the table, table: ods.ods_consult_qy_wx_user, index:

dispatch = “ts” eliminates the multiple row merge situation.

| username: 表渣渣渣 | Original post link

Executing multiple inserts:
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(2, 10596, ‘222’, 0),
(3, 10749, ‘333’, 0),
(4, 10596, ‘555’, 0);

The data is still merged together.

| username: xfworld | Original post link

Isn’t this three pieces of data? Do you mean that these three pieces of data are in the same message?

| username: 表渣渣渣 | Original post link

Yes, a piece of data in Kafka.

| username: rebelsre | Original post link

  1. Distribute according to the number of Regions, i.e., each CDC node handles an approximately equal number of regions. When the number of Regions in a table exceeds the region-threshold value, the table will be distributed across multiple nodes for processing. The default value of region-threshold is 10000.
    region-threshold = 10000
  2. Distribute according to the write traffic, i.e., each CDC node handles an approximately equal number of total modified rows in the regions. This only takes effect when the number of modified rows per minute in a table exceeds the write-key-threshold value.
    write-key-threshold = 30000
  3. The maximum number of messages in a single batch when the Pulsar Producer sends messages, with a default value of 1000.
    batching-max-messages = 1000

Try these parameters to see if they work. Theoretically, merging can improve the incremental synchronization rate, processing one by one is too slow.

| username: 表渣渣渣 | Original post link

This parameter does not take effect for downstream Kafka.
The following parameters only take effect when the downstream is Pulsar.
Downstream Kafka error:
Error: component TiCDC changefeed’s config file /Cluster-data/ticdc/changefeed_ods.toml contained unknown configuration options: batching-max-message

| username: 有猫万事足 | Original post link

I see that your JSON messages, if containing multiple events, are separated by \n.
Can the code try to split by \n and then attempt to parse?

| username: 表渣渣渣 | Original post link

That \n was added by me for easier viewing, it doesn’t actually have this newline character.

| username: 表渣渣渣 | Original post link

Moreover, since it is CDC, it should be consistent with mainstream CDC, otherwise, the ecosystem will be incompatible.

| username: 有猫万事足 | Original post link

The current ticdc output protocol no longer includes Maxwell.
Why don’t you try the simple protocol?
I see the format is similar, just with some additional fields.

| username: 表渣渣渣 | Original post link

First of all, this was supported before, and the downstream development requires Maxwell format support. Additionally, the documentation states that it is supported:

| username: 有猫万事足 | Original post link

Understood. I don’t have any better solutions either.

| username: flow-PingCAP | Original post link

We apologize, but the Maxwell protocol support for TiCDC is currently deprecated, and we will remove it from the documentation as soon as possible. We recommend using other clearly supported formats, refer to TiCDC Avro Protocol | PingCAP 文档中心