TiCDC does not respect the kafka partition dispatcher

I have configured my TiCDC with the following dispatchers:

      "dispatchers": [
        {
          "matcher": [
            "task*.*"
          ],
          "partition": "table",
          "topic": ""
        }
      ],

and also I have:

"enable_old_value": true,

This should mean that tables under the task-pipeline database will always be deterministically mapped to the same partition correct? E.g. table foo will always be replicated to partition 2.

However, very rarely, I find a table being replicated to another partition. Today this happened during a large database migration.

Any suggestions on how I can ensure that the same table is ALWAYS written to the same kafka partition?

Application environment:

Running TIDB, TiCDC in kubernetes

TiDB version:

ticdc:v7.1.0
pd:v7.1.0
tidb:v7.1.0
tikv:v7.1.0

Reproduction method:

I have not been able to reproduce this problem. From my understanding, the settings I’ve set should ensure this never happens.

Problem:

TiCDC dispatches table changes to the wrong partition.

Attachment:

Here are the full configs:

[root@tidb-ticdc-0 /]# ./cdc cli changefeed query  --pd=http://tidb-pd.tidb:2379  --changefeed-id=tidb-replication-task
{
  "upstream_id": 7154035701378375041,
  "namespace": "default",
  "id": "tidb-replication-task",
  "sink_uri": "",
  "config": {
    "memory_quota": 1073741824,
    "case_sensitive": true,
    "enable_old_value": true,
    "force_replicate": false,
    "ignore_ineligible_table": false,
    "check_gc_safe_point": true,
    "enable_sync_point": false,
    "bdr_mode": false,
    "sync_point_interval": 600000000000,
    "sync_point_retention": 86400000000000,
    "filter": {
      "rules": [
        "*.*"
      ],
      "event_filters": null
    },
    "mounter": {
      "worker_num": 16
    },
    "sink": {
      "protocol": "open-protocol",
      "schema_registry": "",
      "csv": {
        "delimiter": ",",
        "quote": "\"",
        "null": "\\N",
        "include_commit_ts": false
      },
      "dispatchers": [
        {
          "matcher": [
            "task*.*"
          ],
          "partition": "table",
          "topic": ""
        }
      ],
      "column_selectors": null,
      "transaction_atomicity": "",
      "encoder_concurrency": 16,
      "terminator": "\r\n",
      "date_separator": "day",
      "enable_partition_separator": true,
      "file_index_digit": 0,
      "enable_kafka_sink_v2": false,
      "only_output_updated_columns": null
    },
    "consistent": {
      "level": "none",
      "max_log_size": 64,
      "flush_interval": 2000,
      "storage": "",
      "use_file_backend": false
    },
    "scheduler": {
      "enable_table_across_nodes": false,
      "region_threshold": 100000,
      "write_key_threshold": 0
    },
    "integrity": {
      "integrity_check_level": "none",
      "corruption_handle_level": "warn"
    }
  },
  "create_time": "2024-01-12 05:43:15.603",
  "start_ts": 446965157349818368,
  "resolved_ts": 447092825593741314,
  "target_ts": 0,
  "checkpoint_tso": 447092825554157575,
  "checkpoint_time": "2024-01-17 20:36:19.122",
  "state": "normal",
  "error": null,
  "error_history": null,
  "creator_version": "v7.1.0",
  "task_status": [
    {
      "capture_id": "1eac59f3-5d67-4647-b1d8-c827cd7a631f",
      "table_ids": [
        86,
        84,
        110,
        160,
        196,
        82,
        83,
        93,
        114,
        189,
        200,
        85,
        87,
        179,
        186
      ],
      "table_operations": null
    }
  ]
}

Based on the provided configuration, TiCDC should deterministically map tables under the task-pipeline database to the same Kafka partition. However, you mentioned that sometimes a table is replicated to another partition, especially during a large database migration.

There could be a few reasons why this is happening:

  1. Concurrency: TiCDC uses multiple workers to replicate data, and each worker may handle different tables or partitions concurrently. This concurrency can lead to tables being replicated to different partitions, especially during high load or migration scenarios.

  2. Partition rebalancing: TiCDC may rebalance partitions based on load or other factors. This can cause tables to be moved to different partitions, even if they were initially mapped to a specific partition.

To ensure that the same table is always written to the same Kafka partition, you can try the following suggestions:

  1. Increase the number of partitions: If you have a large number of tables or high write throughput, increasing the number of Kafka partitions can help distribute the load more evenly and reduce the chances of tables being moved to different partitions.

  2. Adjust the TiCDC configuration: You can experiment with different TiCDC configuration options to optimize the replication behavior. For example, you can try adjusting the number of workers (mounter.worker_num) or the flush interval (consistent.flush_interval) to see if it improves the consistency of table-to-partition mapping.

  3. Monitor TiCDC performance: Monitoring the performance of TiCDC, including resource usage and replication lag, can help identify any bottlenecks or issues that may affect the consistency of table-to-partition mapping. You can use TiDB’s built-in monitoring tools or third-party monitoring solutions to track TiCDC’s performance.

It’s important to note that TiCDC’s behavior is influenced by various factors, and achieving strict determinism in table-to-partition mapping may not always be possible. However, the suggestions above can help improve the consistency and reduce the chances of tables being replicated to different partitions.

For more information on TiCDC configuration options and best practices, you can refer to the official TiCDC documentation .

: TiCDC FAQ
: TiCDC Overview