TiCDC synchronizes data to Kafka, but the data order cannot be guaranteed

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: TiCDC同步数据到Kafka,数据无法保证顺序

| username: TiDBer_PyGYJEEA

【TiDB Usage Environment】Production Environment
【TiDB Version】v6.5.0
【Reproduction Path】TiCDC synchronizes data to Kafka, Logstash consumes Kafka data and saves it to ES
【Encountered Problem: Phenomenon and Impact】
TiCDC synchronizes data to Kafka, but the data order cannot be guaranteed, causing data anomalies in ES.
The dispatcher used is rowid, which theoretically should ensure the order of row data, but testing shows order anomalies.

TiDB table structure, using partitioned table

CREATE TABLE `tbsendrcd` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'Primary Key ID',
		.....
  PRIMARY KEY (`id`,`send_tm`) /*T![clustered_index] CLUSTERED */,
  UNIQUE KEY `sms_id` (`sms_id`,`send_tm`),
  KEY `send_tm` (`send_tm`),
  KEY `job_id` (`idx_job_id`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=8975043 COMMENT='SMS Record Table'
PARTITION BY RANGE (UNIX_TIMESTAMP(`send_tm`))
(PARTITION `p20231020` VALUES LESS THAN (1697817600),
 PARTITION `p20231021` VALUES LESS THAN (1697904000),
 PARTITION `p20231022` VALUES LESS THAN (1697990400),
 PARTITION `p20231023` VALUES LESS THAN (1698076800),
 PARTITION `p20231024` VALUES LESS THAN (1698163200),
 PARTITION `p20231025` VALUES LESS THAN (1698249600),
 PARTITION `p20231026` VALUES LESS THAN (1698336000),
 PARTITION `p20231027` VALUES LESS THAN (1698422400),
 PARTITION `p20231028` VALUES LESS THAN (1698508800),
 PARTITION `p20231029` VALUES LESS THAN (1698595200),
 PARTITION `p20231030` VALUES LESS THAN (1698681600),
 PARTITION `p20231031` VALUES LESS THAN (1698768000),
 PARTITION `p20231101` VALUES LESS THAN (1698854400),
 PARTITION `p20231102` VALUES LESS THAN (1698940800),
 PARTITION `p20231103` VALUES LESS THAN (1699027200),
 PARTITION `p20231104` VALUES LESS THAN (1699113600),
 PARTITION `p20231105` VALUES LESS THAN (1699200000),
 PARTITION `p20231106` VALUES LESS THAN (1699286400),
 PARTITION `p20231107` VALUES LESS THAN (1699372800),
 PARTITION `p20231108` VALUES LESS THAN (1699459200),
 PARTITION `p20231109` VALUES LESS THAN (1699545600),
 PARTITION `p20231110` VALUES LESS THAN (1699632000),
 PARTITION `p20231111` VALUES LESS THAN (1699718400),
 PARTITION `p20231112` VALUES LESS THAN (1699804800),
 PARTITION `p20231113` VALUES LESS THAN (1699891200),
 PARTITION `p20231114` VALUES LESS THAN (1699977600),
 PARTITION `p20231115` VALUES LESS THAN (1700064000),
 PARTITION `p20231116` VALUES LESS THAN (1700150400),
 PARTITION `p20231117` VALUES LESS THAN (1700236800),
 PARTITION `p20231118` VALUES LESS THAN (1700323200),
 PARTITION `p20231119` VALUES LESS THAN (1700409600),
 PARTITION `p20231120` VALUES LESS THAN (1700496000),
 PARTITION `p20231121` VALUES LESS THAN (1700582400),
 PARTITION `p20231122` VALUES LESS THAN (1700668800),
 PARTITION `p20231123` VALUES LESS THAN (1700755200),
 PARTITION `p20231124` VALUES LESS THAN (1700841600),
 PARTITION `p20231125` VALUES LESS THAN (1700928000),
 PARTITION `p20231126` VALUES LESS THAN (1701014400),
 PARTITION `p20231127` VALUES LESS THAN (1701100800),
 PARTITION `p20231128` VALUES LESS THAN (1701187200),
 PARTITION `p20231129` VALUES LESS THAN (1701273600),
 PARTITION `p20231130` VALUES LESS THAN (1701360000),
 PARTITION `p20231201` VALUES LESS THAN (1701446400),
 PARTITION `p20231202` VALUES LESS THAN (1701532800),
 PARTITION `p20231203` VALUES LESS THAN (1701619200),
 PARTITION `p20231204` VALUES LESS THAN (1701705600),
 PARTITION `p20231205` VALUES LESS THAN (1701792000),
 PARTITION `p20231206` VALUES LESS THAN (1701878400),
 PARTITION `p20231207` VALUES LESS THAN (1701964800),
 PARTITION `p20231208` VALUES LESS THAN (1702051200),
 PARTITION `p20231209` VALUES LESS THAN (1702137600),
 PARTITION `p20231210` VALUES LESS THAN (1702224000),
 PARTITION `p20231211` VALUES LESS THAN (1702310400),
 PARTITION `p20231212` VALUES LESS THAN (1702396800),
 PARTITION `p20231213` VALUES LESS THAN (1702483200),
 PARTITION `p20231214` VALUES LESS THAN (1702569600),
 PARTITION `p20231215` VALUES LESS THAN (1702656000),
 PARTITION `p20231216` VALUES LESS THAN (1702742400),
 PARTITION `p20231217` VALUES LESS THAN (1702828800),
 PARTITION `p20231218` VALUES LESS THAN (1702915200),
 PARTITION `p20231219` VALUES LESS THAN (1703001600),
 PARTITION `p20231220` VALUES LESS THAN (1703088000),
 PARTITION `p20231221` VALUES LESS THAN (1703174400),
 PARTITION `p20231222` VALUES LESS THAN (1703260800),
 PARTITION `p20231223` VALUES LESS THAN (1703347200),
 PARTITION `p20231224` VALUES LESS THAN (1703433600),
 PARTITION `p20231225` VALUES LESS THAN (1703520000),
 PARTITION `p20231226` VALUES LESS THAN (1703606400),
 PARTITION `p20231227` VALUES LESS THAN (1703692800),
 PARTITION `p20231228` VALUES LESS THAN (1703779200),
 PARTITION `p20231229` VALUES LESS THAN (1703865600),
 PARTITION `p20231230` VALUES LESS THAN (1703952000),
 PARTITION `p20231231` VALUES LESS THAN (1704038400),
 PARTITION `p20240101` VALUES LESS THAN (1704124800),
 PARTITION `p20240102` VALUES LESS THAN (1704211200),
 PARTITION `p20240103` VALUES LESS THAN (1704297600),
 PARTITION `p20240104` VALUES LESS THAN (1704384000),
 PARTITION `p20240105` VALUES LESS THAN (1704470400),
 PARTITION `p20240106` VALUES LESS THAN (1704556800),
 PARTITION `p20240107` VALUES LESS THAN (1704643200),
 PARTITION `p20240108` VALUES LESS THAN (1704729600),
 PARTITION `p20240109` VALUES LESS THAN (1704816000),
 PARTITION `p20240110` VALUES LESS THAN (1704902400),
 PARTITION `p20240111` VALUES LESS THAN (1704988800),
 PARTITION `p20240112` VALUES LESS THAN (1705075200),
 PARTITION `p20240113` VALUES LESS THAN (1705161600),
 PARTITION `p20240114` VALUES LESS THAN (1705248000),
 PARTITION `p20240115` VALUES LESS THAN (1705334400),
 PARTITION `p20240116` VALUES LESS THAN (1705420800),
 PARTITION `p20240117` VALUES LESS THAN (1705507200),
 PARTITION `p20240118` VALUES LESS THAN (1705593600),
 PARTITION `p20240119` VALUES LESS THAN (1705680000),
 PARTITION `p20240120` VALUES LESS THAN (1705766400),
 PARTITION `p20240121` VALUES LESS THAN (1705852800),
 PARTITION `p20240122` VALUES LESS THAN (1705939200),
 PARTITION `p20240123` VALUES LESS THAN (1706025600),
 PARTITION `p20240124` VALUES LESS THAN (1706112000),
 PARTITION `p20240125` VALUES LESS THAN (1706198400),
 PARTITION `p20240126` VALUES LESS THAN (1706284800),
 PARTITION `p20240127` VALUES LESS THAN (1706371200),
 PARTITION `p20240128` VALUES LESS THAN (1706457600),
 PARTITION `p20240129` VALUES LESS THAN (1706544000),
 PARTITION `p20240130` VALUES LESS THAN (1706630400),
 PARTITION `p20240131` VALUES LESS THAN (1706716800),
 PARTITION `p20240201` VALUES LESS THAN (1706803200),
 PARTITION `p20240202` VALUES LESS THAN (1706889600),
 PARTITION `p20240203` VALUES LESS THAN (1706976000),
 PARTITION `p20240204` VALUES LESS THAN (1707062400),
 PARTITION `p20240205` VALUES LESS THAN (1707148800),
 PARTITION `p20240206` VALUES LESS THAN (1707235200),
 PARTITION `p20240207` VALUES LESS THAN (1707321600),
 PARTITION `p20240208` VALUES LESS THAN (1707408000),
 PARTITION `p20240209` VALUES LESS THAN (1707494400),
 PARTITION `p20240210` VALUES LESS THAN (1707580800),
 PARTITION `p20240211` VALUES LESS THAN (1707667200),
 PARTITION `p20240212` VALUES LESS THAN (1707753600),
 PARTITION `p20240213` VALUES LESS THAN (1707840000),
 PARTITION `p20240214` VALUES LESS THAN (1707926400),
 PARTITION `p20240215` VALUES LESS THAN (1708012800),
 PARTITION `p20240216` VALUES LESS THAN (1708099200),
 PARTITION `p20240217` VALUES LESS THAN (1708185600),
 PARTITION `p20240218` VALUES LESS THAN (1708272000),
 PARTITION `p20240219` VALUES LESS THAN (1708358400),
 PARTITION `p20240220` VALUES LESS THAN (1708444800),
 PARTITION `p20240221` VALUES LESS THAN (1708531200),
 PARTITION `p20240222` VALUES LESS THAN (1708617600),
 PARTITION `p20240223` VALUES LESS THAN (1708704000),
 PARTITION `p20240224` VALUES LESS THAN (1708790400),
 PARTITION `p20240225` VALUES LESS THAN (1708876800),
 PARTITION `p20240226` VALUES LESS THAN (1708963200),
 PARTITION `p20240227` VALUES LESS THAN (1709049600),
 PARTITION `p20240228` VALUES LESS THAN (1709136000),
 PARTITION `p20240229` VALUES LESS THAN (1709222400),
 PARTITION `p20240301` VALUES LESS THAN (1709308800),
 PARTITION `p20240302` VALUES LESS THAN (1709395200),
 PARTITION `p20240303` VALUES LESS THAN (1709481600),
 PARTITION `p20240304` VALUES LESS THAN (1709568000),
 PARTITION `p20240305` VALUES LESS THAN (1709654400),
 PARTITION `p20240306` VALUES LESS THAN (1709740800),
 PARTITION `p20240307` VALUES LESS THAN (1709827200),
 PARTITION `p20240308` VALUES LESS THAN (1709913600),
 PARTITION `p20240309` VALUES LESS THAN (1710000000));

TiCDC synchronization task script

curl -X POST -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v1/changefeeds -d '{"changefeed_id":"sms-record-task","sink_uri":"kafka://127.0.0.1:9092/sms-record-sync?protocol=canal-json&kafka-version=2.8.1&partition-num=60&max-message-bytes=67108864&replication-factor=3","filter_rules":["itnio_sms_record.tbsendrcd"],"sink_config":{"dispatchers":[{"matcher":["itnio_sms_record.tbsendrcd"], "dispatcher":"rowid"}],"protocol":"canal-json"}}'

logstash logstash.conf configuration

# Sample Logstash configuration for creating a simple
# kafka -> Logstash -> Elasticsearch pipeline.

# Input source part
input {
	kafka{
		add_field => {"inputType" => "tbsendrcd"}
		bootstrap_servers => "127.0.0.1:9092"
		topics => "sms-record-sync"
		codec => json {
			charset => "UTF-8"
			}
		group_id => "itnio-sms-sync"
		auto_offset_reset => "earliest"
		consumer_threads => 5
	}
}
filter {
	if [inputType] == "tbsendrcd" {
		split {
			field => "data"
			remove_field => ["mysqlType","sqlType","pkNames","sql","isDdl","old","es","ts","id","@version","message","database","@timestamp"]
		}
		mutate {
			add_field => { "dataTemp" => "%{data}" }
		}
		json {
			source => "dataTemp"
			remove_field => [ "data","dataTemp","keyId" ]
		}
		mutate {
			add_field => { "keyId" => "%{id}" }
			convert => {
				"id" => "integer"
				"keyId" => "integer"
				"idxUserId" => "integer"
				"idxSupplierId" => "integer"
				"idxAgentId" => "integer"
				"srcType" => "integer"
				"sendType" => "integer"
				"idxJobId" => "integer"
				"subJobId" => "integer"
				"idxRouteId" => "integer"
				"idxChannelId" => "integer"
				"idxGoipId" => "integer"
				"portNum" => "integer"
				"flash" => "integer"
				"duration" => "integer"
				"mcc" => "integer"
				"mnc" => "integer"
				"pay" => "integer"
				"cost" => "integer"
				"chargeCnt" => "integer"
				"result" => "integer"
				"pwdKeyId" => "integer"
				"payType" => "integer"
				"statusResult" => "integer"
				"idxAppId" => "integer"
				"idxAppTypeId" => "integer"
				"backfillStatus" => "integer"
				"setMealId" => "integer"
				"rate" => "integer"
			}
		}
		date {
			match => [ "createTime",  "YYYY-MM-dd HH:mm:ss"]
			target => "createTime"
			timezone => "Etc/GMT"
		}
        date {
			match => [ "doneTm",  "YYYY-MM-dd HH:mm:ss"]
			target => "doneTm"
			timezone => "Etc/GMT"
		}
        date {
			match => [ "mdfTm",  "YYYY-MM-dd HH:mm:ss"]
			target => "mdfTm"
			timezone => "Etc/GMT"
         }
        date {
			match => [ "sendTm",  "YYYY-MM-dd HH:mm:ss"]
			target => "sendTm"
			timezone => "Etc/GMT"
        }
		date {
			match => [ "backfillStatusTm",  "YYYY-MM-dd HH:mm:ss"]
			target => "backfillStatusTm"
			timezone => "Etc/GMT"
		}
		mutate {
			add_field => {"day" => "%{sendTm}"}
		}
		ruby {
			code => "event.set('day', Date.strptime(event.get('[day]'), '%Y-%m-%dT%H:%M:%S').strftime('%Y%m'))"
		}
    }
}

output {
    if [inputType] == "tbsendrcd" and "tbsendrcd" == [table] and ![isDdl] and [smsId] != "" {
        stdout { codec => rubydebug }
        if [type] == "INSERT" {
			elasticsearch {
				document_id => "%{smsId}"
				hosts => ["https://itnio-es-01.itnio-common.svc.dev.praise:9200"]
				ssl_certificate_verification => false
				index => "send_record_%{day}"
				user => "elastic"
				password => ""
			}
        }
		if [type] == "UPDATE" {
			elasticsearch {
				document_id => "%{smsId}"
				hosts => ["https://itnio-es-01.itnio-common.svc.dev.praise:9200"]
				ssl_certificate_verification => false
				index => "send_record_%{day}"
				user => "elastic"
				password => ""
				doc_as_upsert => true
				manage_template => true
				action => "update"
			}
        }
        if [type] == "DELETE" {
			elasticsearch {
				document_id => "%{smsId}"
				action => "delete"
				hosts => ["https://itnio-es-01.itnio-common.svc.dev.praise:9200"]
				ssl_certificate_verification => false
				index => "send_record_%{day}"
				user => "elastic"
				password => ""
			}
        }
    }
}

Logstash logs show that insert is executed after update.

| username: forever | Original post link

Does Kafka use sharding, and what are the sharding rules? When Logstash consumes, is it concurrent based on sharding?

| username: TiDBer_PyGYJEEA | Original post link

What does Logstash shard concurrency refer to?

| username: 小龙虾爱大龙虾 | Original post link

I didn’t understand what you meant by the distributor being rowid. I didn’t see it in the official documentation 同步数据到 Kafka | PingCAP 文档中心. Can you share the CDC configuration? Normally, it should ensure the orderliness of individual rows.

| username: TiDBer_PyGYJEEA | Original post link

You can check this document
CDC task script

curl -X POST -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v1/changefeeds -d '{"changefeed_id":"sms-record-task","sink_uri":"kafka://127.0.0.1:9092/sms-record-sync?protocol=canal-json&kafka-version=2.8.1&partition-num=60&max-message-bytes=67108864&replication-factor=3","filter_rules":["itnio_sms_record.tbsendrcd"],"sink_config":{"dispatchers":[{"matcher":["itnio_sms_record.tbsendrcd"], "dispatcher":"rowid"}],"protocol":"canal-json"}}'
| username: yiduoyunQ | Original post link

So the protocol used in the changefeed creation command is?

| username: forever | Original post link

For Kafka sharding, for example, if Kafka is divided into 4 partitions and four consumer programs are started to consume concurrently, the order can be guaranteed within each partition, but the order between the 4 partitions cannot be guaranteed.

| username: TiDBer_PyGYJEEA | Original post link

Does this ensure that each row of data is in the same partition? And is the addition, deletion, modification, and query of this row of data sequential within this partition?

| username: TiDBer_PyGYJEEA | Original post link

Yes, is there a problem with this?