TiSpark writes 100,000 records successfully, but starts to report errors at 500,000 records

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: tispark写入10万记录OK,50万开始报错

| username: Hacker_APbTle51

[TiDB Usage Environment] Testing
[TiDB Version] 6.5
[Reproduction Path] Currently using Spark to transform existing business. In one scenario, Spark writes the computed data into TiDB. The data records are approximately 8 million, with each record having 10 fields. Writing directly into TiDB using Spark takes 5 minutes. Considering that using TiSpark might be faster, I set up a simple Spark standalone cluster with one master and two workers. The Spark version is 3.3.2, and the TiSpark jar package is tispark-assembly-3.3_2.12-3.1.3.jar. The data is written into the same TiDB test database.
Writing 50,000 records takes 48 seconds; 100,000 records take 1.2 minutes. When the data volume increases to 500,000, an error occurs.
Just started using TiSpark, not sure what caused the issue. Any experienced experts’ guidance would be appreciated.

[Encountered Problem: Problem Phenomenon and Impact] For 500,000 data, the log shows it was divided into 37 tasks. The 20th task reported “Error reading region,” and subsequent tasks reported the same error.
23/08/25 14:53:36 WARN TaskSetManager: Lost task 20.0 in stage 0.0 (TID 20) (10.120.3.63 executor 1): com.pingcap.tikv.exception.TiClientInternalException: Error reading region:
at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:191)
at com.pingcap.tikv.operation.iterator.DAGIterator.readNextRegionChunks(DAGIterator.java:168)
at com.pingcap.tikv.operation.iterator.DAGIterator.hasNext(DAGIterator.java:114)
at org.apache.spark.sql.tispark.TiRowRDD$$anon$1.hasNext(TiRowRDD.scala:70)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:186)
… 20 more
Caused by: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:
at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:243)
at com.pingcap.tikv.operation.iterator.DAGIterator.lambda$submitTasks$1(DAGIterator.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
… 3 more
Caused by: com.pingcap.tikv.exception.GrpcException: retry is exhausted.
at com.pingcap.tikv.util.ConcreteBackOffer.doBackOffWithMaxSleep(ConcreteBackOffer.java:153)
at com.pingcap.tikv.util.ConcreteBackOffer.doBackOff(ConcreteBackOffer.java:124)
at com.pingcap.tikv.region.RegionStoreClient.handleCopResponse(RegionStoreClient.java:708)
at com.pingcap.tikv.region.RegionStoreClient.coprocess(RegionStoreClient.java:689)
at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:229)
… 7 more
Caused by: com.pingcap.tikv.exception.GrpcException: TiKV down or Network partition
… 10 more

23/08/25 14:53:36 INFO TaskSetManager: Lost task 18.0 in stage 0.0 (TID 18) on 10.120.3.67, executor 0: com.pingcap.tikv.exception.TiClientInternalException (Error reading region:) [duplicate 1]
23/08/25 14:53:36 INFO TaskSetManager: Starting task 18.1 in stage 0.0 (TID 26) (10.120.3.67, executor 0, partition 18, ANY, 9207 bytes) taskResourceAssignments Map()
[Resource Configuration] Enter TiDB Dashboard - Cluster Info - Hosts and take a screenshot of this page

[Attachments: Screenshots/Logs/Monitoring]
tispark_write_500k_records_log.txt (146.7 KB)

| username: RenlySir | Original post link

Is the TiKV status normal?

| username: RenlySir | Original post link

Additionally, is the 500,000 write a single write or done in batches? If in batches, what is the batch size?

| username: Hacker_APbTle51 | Original post link

| username: Hacker_APbTle51 | Original post link

Dataset dataSet = tispark.sqlContext().sql("SELECT ta_account_no, ta_transaction_account_no, pos_date, cust_manager_id, product_code, cust_vol_belong_to_mg, percent, " +
“saler_code, update_time, essential_saler_code, ta_system_code from dcdb.cr_cust_pos_custmg_mapping limit 500000”);
dataSet.write().format(“tidb”).option(“database”, “dcdb”).option(“table”, “cr_cust_pos_custmg_map20230819”).options(tidbOptions).mode(“append”).save();

| username: Hacker_APbTle51 | Original post link

Direct write, no other control done.

| username: RenlySir | Original post link

It is recommended to write in batches of 5000 to 10,000 entries at a time. Additionally, you can try enabling compression for TiKV (https://docs.pingcap.com/zh/tidb/v7.3/tikv-configuration-file#compression-per-level), or adjust the region size of TiKV to 256MiB and try again.

| username: RenlySir | Original post link

Another solution is to split dcdb.cr_cust_pos_custmg_map20230819 as follows: ALTER TABLE t ATTRIBUTES 'merge_option=deny'; and use the split syntax to split the table (Split Region 使用文档 | PingCAP 文档中心).

| username: Hacker_APbTle51 | Original post link

Thank you, I’ll give it a try.
I have another question. From what I understand about TiSpark’s write mechanism, it involves data preprocessing, pre-partitioning, two-phase commit, and other steps. Will batch writing cause the total time to be too long? Also, under the current conditions, TiSpark’s write performance doesn’t seem to show a significant advantage. Under what conditions will TiSpark’s write performance see a noticeable improvement?

| username: RenlySir | Original post link

Writing directly to TiKV using TiSpark can indeed be faster, but there is one point to note with TiDB: when performing large batch writes, it is easy to encounter region splits, which can lead to errors such as “region is unavailable.” Pay attention to the following points:

  1. Use NVMe SSDs for TiKV disks;
  2. Pre-split the table to be written into appropriately;
  3. Increase the region size;
  4. Enable compression in TiKV (zstd).

compression-per-level

  • Default compression algorithm for each level.

  • Default value for defaultcf: [“no”, “no”, “lz4”, “lz4”, “lz4”, “zstd”, “zstd”]

  • Default value for writecf: [“no”, “no”, “lz4”, “lz4”, “lz4”, “zstd”, “zstd”]

  • Default value for lockcf: [“no”, “no”, “no”, “no”, “no”, “no”, “no”]

Reference: TiKV 配置文件描述 | PingCAP 文档中心

Enable compression in the configuration file

[rocksdb.defaultcf] compression-per-level = [“no”, “no”, “lz4”, “lz4”, “lz4”, “zstd”, “zstd”]

Change it to compression-per-level = [“no”, “no”, “zstd”, “zstd”, “zstd”, “zstd”, “zstd”].

| username: Hacker_APbTle51 | Original post link

Thank you for the explanation, I will give it a try.

| username: RenlySir | Original post link

Sure, after trying it out, I hope you can also share your experience here.

| username: RenlySir | Original post link

I asked about it, and the suggestions are as follows:

  1. For reading, it is recommended to read directly from TiKV or TiFlash using TiSpark;
  2. For writing, if the batch is relatively large, it is recommended to use the JDBC method and split the batch for writing.
val customer = spark.sql("select * from customer limit 100000")
// To balance the nodes and increase concurrency, you can repartition the data source
val df = customer.repartition(32)
df.write
.mode(saveMode = "append")
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
// Replace with your hostname and port address, and ensure batch rewriting is enabled
.option("url", "jdbc:mysql://127.0.0.1:4000/test?rewriteBatchedStatements=true")
.option("useSSL", "false")
// For testing, it is recommended to set to 150
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 150)
.option("dbtable", s"cust_test_select") // Database name and table name
.option("isolationLevel", "NONE") // If you need to write a large DataFrame, it is recommended to set isolationLevel to NONE
.option("user", "root") // TiDB username
.save()
| username: redgame | Original post link

It looks like a resource issue. I suggest splitting the batch write.

| username: Hacker_APbTle51 | Original post link

I couldn’t find a particularly good way to split the RDD. I tried using Spark SQL, but it doesn’t support limit offset. Using just limit alone can’t split a complete dataset into multiple non-overlapping subsets. Do you have any good suggestions? It feels like storing millions of records in TiDB using TiSpark is quite different from what I initially envisioned.