TICDC Error to Kafka

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: TICDC到kafka报错

| username: TiDBer_8rWAgqMU

After running the command:

tiup ctl:v6.1.0 cdc changefeed create --pd=http://slave007:2379 \
--sink-uri="kafka://slave002:9092,slave003:9092,slave004:9092,slave005:9092,slave006:9092/tidb-example-t1?kafka-version=2.4.1&replication-factor=3" \
--changefeed-id="simple-kafka-task" \
--sort-engine="unified" \
--config ../config/cdc/tidb_example_kafka.toml

an error occurred:

[2022/08/18 10:40:47.537 +08:00] [WARN] [event_router.go:236] ["This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution!"]
Error: [CDC:ErrKafkaTopicExprInvalid]invalid topic expression

Additionally, I still encountered the same error even after writing only one IP for Kafka.

CDC version: V6.1.0

The configuration of the tidb_example_kafka.toml file is as follows:

case-sensitive = true

enable-old-value = true

[filter]
ignore-txn-start-ts = [1, 2]

rules = ['tidb_example.*']

[mounter]
worker-num = 16

[sink]
dispatchers = [
    {matcher = ['tidb_example.*'], topic = "tidb_example_{table}", partition = "index-value" }
]

protocol = "canal-json"
| username: 爱白话的晓辉 | Original post link

What is the version of Kafka?

| username: TiDBer_8rWAgqMU | Original post link

2.4.1

| username: 爱白话的晓辉 | Original post link

Try manually creating the topic.

| username: TiDBer_8rWAgqMU | Original post link

Still the same error.

| username: TiDBer_8rWAgqMU | Original post link

Here is all the printed information after executing the create task command:
Starting component ctl: /root/.tiup/components/ctl/v6.1.0/ctl cdc changefeed create --pd=http://slave007:2379 --sink-uri=kafka://slave002:9092/tidb-example-t?kafka-version=2.4.1&replication-factor=5 --changefeed-id=simple-kafka-task --sort-engine=unified --config …/config/cdc/tidb_example_kafka.toml
[WARN] This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution! dispatch-rules: &config.DispatchRule{Matcher:string{“tidb_example.*”}, DispatcherRule:“”, PartitionRule:“index-value”, TopicRule:“tidbexample{table}”}[2022/08/18 11:29:45.311 +08:00] [WARN] [kafka.go:442] [“topic already exist, TiCDC will not create the topic”] [topic=tidb-example-t] [detail=“{"NumPartitions":3,"ReplicationFactor":5,"ReplicaAssignment":{"0":[3,5,1,4,2],"1":[5,1,4,2,3],"2":[1,4,2,3,5]},"ConfigEntries":{"compression.type":"producer","flush.messages":"10000","flush.ms":"1000","index.interval.bytes":"4096","max.message.bytes":"10485760","min.insync.replicas":"2","segment.bytes":"1073741824","segment.index.bytes":"10485760"}}”]
[2022/08/18 11:29:45.388 +08:00] [WARN] [event_router.go:236] [“This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution!”]
Error: [CDC:ErrKafkaTopicExprInvalid]invalid topic expression
Usage:
cdc cli changefeed create [flags]

Flags:
-c, --changefeed-id string Replication task (changefeed) ID
–config string Path of the configuration file
–cyclic-filter-replica-ids uints (Experimental) Cyclic replication filter replica ID of changefeed (default )
–cyclic-replica-id uint (Experimental) Cyclic replication replica ID of changefeed
–cyclic-sync-ddl (Experimental) Cyclic replication sync DDL of changefeed (default true)
–disable-gc-check Disable GC safe point check
-h, --help help for create
–no-confirm Don’t ask user whether to ignore ineligible table
–opts key=value Extra options, in the key=value format
–schema-registry string Avro Schema Registry URI
–sink-uri string sink uri
–sort-engine string sort engine used for data sort (default “unified”)
–start-ts uint Start ts of changefeed
–sync-interval duration (Experimental) Set the interval for syncpoint in replication(default 10min) (default 10m0s)
–sync-point (Experimental) Set and Record syncpoint in replication(default off)
–target-ts uint Target ts of changefeed
–tz string timezone used when checking sink uri (changefeed timezone is determined by cdc server) (default “SYSTEM”)

Global Flags:
–ca string CA certificate path for TLS connection
–cert string Certificate path for TLS connection
-i, --interact Run cdc cli with readline
–key string Private key path for TLS connection
–log-level string log level (etc: debug|info|warn|error) (default “warn”)
–pd string PD address, use ‘,’ to separate multiple PDs (default “http://127.0.0.1:2379”)

[CDC:ErrKafkaTopicExprInvalid]invalid topic expression
Error: exit status 1

| username: 爱白话的晓辉 | Original post link

It should be an issue with this part. Your table name contains characters that are not supported by topic.

| username: TiDBer_8rWAgqMU | Original post link

use tidb_example;
show tables;
Persons

There is only one table named Persons.

| username: TiDBer_8rWAgqMU | Original post link

I just changed it to:
dispatchers = [
{matcher = [‘tidb_example.*’], topic = “cnsbdnc”, partition = “index-value” }
]
But it still reports the same error.

| username: db_user | Original post link

Change the topic name and recreate it, nothing else needs to be changed, just recreate it.

When creating a topic, it will also check whether it contains the characters “.” or “". Why check these two characters? Because when Kafka does internal instrumentation, it names the metrics based on the topic name and will replace the dot “.” with an underscore "”. Suppose there is a topic named “topic.1_2” and another named “topic_1.2”, the final metrics name will be “topic_1_2”, causing a name conflict. For example, first create a topic named “topic.1_2”, a WARNING will be prompted, and then when creating “topic.1_2” again, an InvalidTopicException will occur.

| username: TiDBer_8rWAgqMU | Original post link

Still the same

[root@slave005 bin]# tiup ctl:v6.1.0 cdc changefeed create --pd=http://slave007:2379 --sink-uri="kafka://slave002:9092/abcd?kafka-version=2.4.1&replication-factor=5" --changefeed-id="simple-kafka-task" --sort-engine="unified" --config ../config/cdc/tidb_example_kafka.toml          
Starting component `ctl`: /root/.tiup/components/ctl/v6.1.0/ctl cdc changefeed create --pd=http://slave007:2379 --sink-uri=kafka://slave002:9092/abcd?kafka-version=2.4.1&replication-factor=5 --changefeed-id=simple-kafka-task --sort-engine=unified --config ../config/cdc/tidb_example_kafka.toml
[WARN] This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution! dispatch-rules: &config.DispatchRule{Matcher:[]string{"tidb_example.*"}, DispatcherRule:"", PartitionRule:"index-value", TopicRule:"abcd"}[2022/08/18 14:05:18.375 +08:00] [WARN] [kafka.go:442] ["topic already exist, TiCDC will not create the topic"] [topic=abcd] [detail="{\"NumPartitions\":3,\"ReplicationFactor\":5,\"ReplicaAssignment\":{\"0\":[1,3,4,5,2],\"1\":[2,4,5,1,3],\"2\":[3,5,1,2,4]},\"ConfigEntries\":{\"compression.type\":\"producer\",\"flush.messages\":\"10000\",\"flush.ms\":\"1000\",\"index.interval.bytes\":\"4096\",\"max.message.bytes\":\"10485760\",\"min.insync.replicas\":\"2\",\"segment.bytes\":\"1073741824\",\"segment.index.bytes\":\"10485760\"}}"]
[2022/08/18 14:05:18.409 +08:00] [WARN] [event_router.go:236] ["This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution!"]
Error: [CDC:ErrKafkaTopicExprInvalid]invalid topic expression
Usage:
  cdc cli changefeed create [flags]

Flags:
  -c, --changefeed-id string              Replication task (changefeed) ID
      --config string                     Path of the configuration file
      --cyclic-filter-replica-ids uints   (Experimental) Cyclic replication filter replica ID of changefeed (default [])
      --cyclic-replica-id uint            (Experimental) Cyclic replication replica ID of changefeed
      --cyclic-sync-ddl                   (Experimental) Cyclic replication sync DDL of changefeed (default true)
      --disable-gc-check                  Disable GC safe point check
  -h, --help                              help for create
      --no-confirm                        Don't ask user whether to ignore ineligible table
      --opts key=value                    Extra options, in the key=value format
      --schema-registry string            Avro Schema Registry URI
      --sink-uri string                   sink uri
      --sort-engine string                sort engine used for data sort (default "unified")
      --start-ts uint                     Start ts of changefeed
      --sync-interval duration            (Experimental) Set the interval for syncpoint in replication(default 10min) (default 10m0s)
      --sync-point                        (Experimental) Set and Record syncpoint in replication(default off)
      --target-ts uint                    Target ts of changefeed
      --tz string                         timezone used when checking sink uri (changefeed timezone is determined by cdc server) (default "SYSTEM")

Global Flags:
      --ca string          CA certificate path for TLS connection
      --cert string        Certificate path for TLS connection
  -i, --interact           Run cdc cli with readline
      --key string         Private key path for TLS connection
      --log-level string   log level (etc: debug|info|warn|error) (default "warn")
      --pd string          PD address, use ',' to separate multiple PDs (default "http://127.0.0.1:2379")

[CDC:ErrKafkaTopicExprInvalid]invalid topic expression
Error: exit status 1

1660802838(1)

| username: db_user | Original post link

Use this content to directly create a topic in Kafka, without using CDC, and see what the Kafka error is and what the Kafka log content is.

| username: TiDBer_8rWAgqMU | Original post link

My test above was conducted after creating the topic on Kafka, and no errors were reported.

[root@slave002 kafka]# bin/kafka-topics.sh --create --topic abcd --zookeeper slave002:2181,slave003:2181,slave004:2181 --partitions 3 --replication-factor 5
Created topic abcd.
[root@slave002 kafka]#

| username: db_user | Original post link

I roughly understand, it seems that the distribution rules are not set correctly.

[sink]
dispatchers = [
{matcher = ['tidb_example.*'], topic = "{schema}_{table}", partition = "index-value" }
]

Try writing it like this:

You can check this:

| username: TiDBer_8rWAgqMU | Original post link

The issue is, in my recent test, I have already changed the topic to the constant “abcd” instead of using a variable.

| username: TiDBer_8rWAgqMU | Original post link

Okay, I changed
1660805365(1)
to:


and then created the synchronization task, and it actually succeeded.
So why is that?

[root@slave005 bin]# tiup ctl:v6.1.0 cdc changefeed create --pd=http://slave007:2379 --sink-uri=“kafka://slave002:9092/abcd?kafka-version=2.4.1&replication-factor=5” --changefeed-id=“simple-kafka-task” --sort-engine=“unified” --config …/config/cdc/tidb_example_kafka.toml
Create changefeed successfully!
ID: simple-kafka-task

| username: db_user | Original post link

You can check the official documentation
{schema} is a required option, which you didn’t include. The specific database name you wrote will be recognized as a prefix.
PS: If this solves your problem, please mark it as useful to help others find it easily.

| username: TiDBer_8rWAgqMU | Original post link

Liked and marked the best answer.
Also, there is a warning after creation, do I need to worry about it?
[WARN] This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution!

My partition = “index-value”, enable-old-value = true

| username: db_user | Original post link

This depends on whether it affects your business. The CDC synchronization has an enable-old-value parameter, which just indicates that when this is enabled, it may not guarantee order in index-value mode. If this does not affect your business, you can ignore it. Otherwise, you can disable old-value or handle it by switching to table-level distribution, etc. This index-value distribution mode does not guarantee row-level order when old values are enabled, so please use it with caution!

| username: TiDBer_8rWAgqMU | Original post link

Got it. Regarding the order issues that index-value might bring in a distributed mode, please refer to the issue I raised yesterday. I have made some suggestions there. Please check if TiDB can make improvements in this area in the future.