When writing data from Spark to TiDB, a "Communications link failure during rollback(). Transaction resolution unknown" error occurs

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: spark向TiDB写入数据时报 Communications link failure during rollback(). Transaction resolution unknown.

| username: TTTTiDBer

[TiDB Usage Environment] Production Environment
[TiDB Version] v6.1.0
[Reproduction Path] Error when increasing the number of parallel tasks executed by Spark
[Encountered Problem: Phenomenon and Impact] I tried to use Spark to parse a 30GB CSV file with approximately 270 million rows into TiDB. The target table has about 100 million rows, multiple ordinary indexes, and a unique index. The write code is as follows:

csvDataFrameTime.write.format("org.apache.spark.sql.execution.datasources.jdbc2").options(
                Map(
                    "savemode" -> JDBCSaveMode.Update.toString,
                    "driver" -> "com.mysql.cj.jdbc.Driver",
                    "url" -> (DBConf.TIDB_TEST.get("url").get + dbConfigMap.get("dbName").get + "?rewriteBatchedStatements=true&autoReconnect=true&allowMultiQueries=true&serverTimezone=UTC"),
                    "user" -> DBConf.TIDB_TEST.get("user").get,
                    "password" -> DBConf.TIDB_TEST.get("password").get,
                    "dbtable" -> dbConfigMap.get("tableName").get,
                    "batchsize" -> "10000", # This is to adapt to this CSV file, adjusting from 1000 to 10000 doubled the execution speed of each task
                    "useSSL" -> "false",
                    "showSql" -> "true"
                )
            ).save()

When I use the command spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --master yarn --deploy-mode cluster --executor-cores 1 --num-executors 10 --executor-memory 2GB --conf spark.default.parallelism=3000 --class com.process.Task /dispose/spark/csvProcess to execute the code, the program runs normally, executing ten tasks simultaneously at a speed of about 210,000 rows of metadata per hour per task.
Note: spark.default.parallelism=3000 is to split the data to avoid triggering TiDB’s transaction limits and pessimistic lock errors.
To improve overall execution speed, I tried to allocate more resources to execute more tasks in parallel, such as using spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --master yarn --deploy-mode cluster --executor-cores 2 --num-executors 10 --executor-memory 4GB --conf spark.default.parallelism=3000 --class com.process.Task /dispose/spark/csvProcess. At this time, 20 tasks are executed in parallel, but after successfully executing a dozen or dozens of tasks, a collective error java.sql.SQLNonTransientConnectionException: Communications link failure during rollback(). Transaction resolution unknown. occurs, causing the entire Spark job to fail. How can I execute more tasks in parallel without triggering this error?
[Resource Configuration] Enter TiDB Dashboard - Cluster Info - Hosts and take a screenshot of this page
Note: Below are 20 TiKV nodes with 16C 32G each

[Attachments: Screenshots/Logs/Monitoring]
Spark error information and execution information


| username: Jellybean | Original post link

You can try lowering the number of tasks executed in parallel by Spark and then run it again.

| username: ShawnYan | Original post link

Off-topic, the resources are quite abundant, equipped with 5 TiFlash.

| username: ShawnYan | Original post link

Does this URL point to a specific TiDB?

| username: TTTTiDBer | Original post link

Hmm… but my goal is to execute more tasks in parallel to write all this data faster… I have tested that reducing the number of tasks can run normally.

| username: TTTTiDBer | Original post link

Similar? Actually, there is a load balancer that distributes connections pointing to this URL to various TiDB instances.

| username: ShawnYan | Original post link

Have you considered writing directly to TiKV? Or directly importing into TiDB, instead of using Spark to write?

| username: TTTTiDBer | Original post link

I hope to avoid such special operations as much as possible because they are different from the parsing process of other files and do not match well with other operations needed during parsing. However, may I ask how direct writing to TiKV is implemented?

| username: ShawnYan | Original post link

You can learn about the TiKV client for Java
https://tikv.github.io/client-java/

However, didn’t you use TiSpark here?

TiSpark writes directly to TiKV, bypassing TiDB.

| username: Jellybean | Original post link

Yes, you need to find such a balance point, otherwise, if the concurrency is too high, conflicts between different task transactions will cause errors and lead to program failure.

| username: TTTTiDBer | Original post link

Got it, I haven’t used TiSpark before. I’ll give it a try. Thank you very much.

| username: ShawnYan | Original post link

Regarding TiSpark, you can refer to this post:

| username: TTTTiDBer | Original post link

Oh, thank you, I was just looking for some information.