Failed to Parse Data Consumed by Kafka for TiCBC Data Synchronization

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: ticbc数据同步kafka消费数据解析失败

| username: TiDBer_UPGPH4wh

[TiDB Usage Environment] Production Environment / Testing / PoC
[TiDB Version] 4.0.9
[Reproduction Path] What operations were performed when the issue occurred
[Encountered Issue: Issue Phenomenon and Impact] Data is synchronized to Kafka through ticdc. In the Java project, Kafka messages are read, and an error occurs when parsing the data using the official TiDB example.


When the approve_date field data is null, it can be parsed successfully. When it is not null, parsing fails. The reason for the failure is

When reading the data stream, the last segment is truncated, so it is no longer a JSON object, resulting in JSON conversion failure.
[Resource Configuration]
[Attachments: Screenshots/Logs/Monitoring]

| username: tidb狂热爱好者 | Original post link

That’s an issue with your code. Python can parse this JSON.

| username: TiDBer_UPGPH4wh | Original post link

This segment of parsing code is from the official example.

| username: Billmay表妹 | Original post link

The JSON data format feature was only GA in version 6.3, please upgrade to the new version.

| username: tidb狂热爱好者 | Original post link

You still need to debug; you can’t just copy and paste to use it.

| username: Billmay表妹 | Original post link

Based on your description, it is possible that the data format synchronized by TiCDC to Kafka does not meet your expectations, causing parsing failures when reading Kafka messages in the Java project. The specific reason might be that the data format synchronized by TiCDC to Kafka is incompatible with the way your Java project parses the data, or there might be an issue with the data itself synchronized by TiCDC to Kafka.

To better resolve your issue, I suggest you follow these steps for troubleshooting:

  1. Confirm whether the data format synchronized by TiCDC to Kafka meets your expectations. You can use Kafka’s built-in command-line tool kafka-console-consumer to view the message content in Kafka and check if the message format meets your expectations. For example, you can use the following command to view the messages in the Kafka topic named my_topic:

    kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic --from-beginning
    

    If the data format synchronized by TiCDC to Kafka does not meet your expectations, you can try adjusting TiCDC’s configuration or modifying the way your Java project parses the data.

  2. Confirm whether the way your Java project parses the data is correct. Check the code in your Java project that parses the data to ensure the code is correct and matches the data format synchronized by TiCDC to Kafka. If there is an issue with the way your Java project parses the data, you can try modifying the code or using other methods to parse the data.

  3. If the above two steps do not resolve the issue, you can try adding a data format conversion middleware between TiCDC and Kafka, such as Apache Kafka Connect, to convert the data format synchronized by TiCDC to Kafka into a format that your Java project can parse.

I hope the above suggestions can help you resolve the issue. If you need more assistance, please provide more detailed information, and I will do my best to help you.

| username: TiDBer_UPGPH4wh | Original post link

TiDB synchronizes data to Kafka through TiCDC, with the 0~7 bits of the value indicating the length of the value. If there are multiple events in one message, the first value might be parsed with one extra byte, and the subsequent values might be parsed with one byte less, which will cause errors. Is there a solution for this? The parsing method is not written by me; it is parsed through TicdcEventDecoder(), and I haven’t made any changes.

| username: Billmay表妹 | Original post link

Based on your description, the first 8 bytes of the value in the data synchronized by TiCDC to Kafka indicate the length of the value. However, in the case of multiple Events in one Message, the value of the first Event may have an extra byte parsed, and the value of the subsequent Events may have one byte less parsed, leading to parsing failure. You want to know if there is a solution.

For this issue, I suggest you try the following two solutions:

  1. Modify the TiCDC configuration to merge multiple Events into one Message. This can be achieved by modifying the sink configuration of TiCDC. Specifically, you can set the format parameter in the sink configuration to canal-json and set the batch-size parameter of canal-json to a larger value, such as 1000. This way, TiCDC will merge multiple Events into one Message, avoiding the issue with the length flag of multiple Event values.

  2. Modify the way data is parsed in the Java project to avoid the issue with the length flag of multiple Event values. Specifically, when using TicdcEventDecoder to parse data in the Java project, you can first read the first 8 bytes of the value to get the length of the value, and then read the corresponding length of bytes as the value. This way, even if there are multiple Events in one Message, each Event’s value can be correctly parsed.

I hope the above suggestions can help you solve the problem. If you need more help, please provide more detailed information, and I will do my best to assist you.

| username: TiDBer_UPGPH4wh | Original post link

My current TiDB version is 4.0.9, and this version does not have the canal-json protocol format, only the canal protocol format. Additionally, batch-size conflicts with the canal format.



The second method I am currently using for parsing is the one that has issues. Are there any other solutions?

| username: tidb狂热爱好者 | Original post link

You definitely don’t support the 6.5 features, you need to code them yourself. You migrated to 4.0.

| username: tidb狂热爱好者 | Original post link

If you don’t have coding skills, it’s recommended to upgrade the database.

| username: tidb狂热爱好者 | Original post link

This can synchronize Kafka data to S3. You can directly read from S3.

| username: system | Original post link

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.