OOM Occurs in Non-Parallel Mode for hashAgg Operator

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: 对于hashAgg算子非并行模式下还是发生OOM

| username: 人如其名

[TiDB Usage Environment] Test
[TiDB Version] v7.0.0
Because the hashAgg operator cannot spill to disk when parallelism is enabled, leading to uncontrollable memory usage or frequent OOM (Out of Memory) errors, it is necessary to avoid interruptions in actual production as much as possible. Therefore, single-threaded hashAgg is used to allow it to spill to disk, reducing memory usage. However, in actual use, it is often encountered that even with non-parallel hashAgg, OOM errors still occur.

The general execution logic for non-parallel hashAgg operators is as follows:
select col1, count(*) from a group by col1, first put col1 (deduplicated to get the key) into a map, then calculate each row for the chunk (multiple rows of records) flowing in from the child operator. If col1 already exists in the map, perform the function calculation (here it is count), if not, determine whether to start spilling to disk. Suppose a spill occurs, then for subsequent rows in the chunk, if they do not exist in the map, they will be spilled to disk one by one. This way, the first batch of keys and aggregate records that existed in the map before the spill can be obtained and returned to the parent operator. Next, read the previously unprocessed records from the disk one by one (leading to slow performance, issue 1), repeat the previous steps, and if memory is still insufficient, perform a second spill (at this time there is a spill counter spillTimes), but it cannot exceed maxSpillTimes=10 times (why limit the maximum number of spills, issue 2), and the second spill will continue to append to the temporary spill file without clearing the previous records, which may lead to large disk space usage (issue 3).

Parameter settings are as follows:

mysql> show variables like 'tidb_hashagg%';
+----------------------------------+-------+
| Variable_name                    | Value |
+----------------------------------+-------+
| tidb_hashagg_final_concurrency   | 1     |
| tidb_hashagg_partial_concurrency | 1     |
+----------------------------------+-------+
2 rows in set (0.00 sec)

mysql> show variables like 'tidb_mem_quota_query';
+----------------------+------------+
| Variable_name        | Value      |
+----------------------+------------+
| tidb_mem_quota_query | 1073741824 |
+----------------------+------------+
1 row in set (0.00 sec)

mysql> show variables like 'tidb_enable_rate_limit_action';
+-------------------------------+-------+
| Variable_name                 | Value |
+-------------------------------+-------+
| tidb_enable_rate_limit_action | OFF   |
+-------------------------------+-------+
1 row in set (0.01 sec)
mysql> show variables like '%oom%';
+--------------------------------+--------+
| Variable_name                  | Value  |
+--------------------------------+--------+
| tidb_enable_tmp_storage_on_oom | ON     |
| tidb_mem_oom_action            | CANCEL |
+--------------------------------+--------+
2 rows in set (0.00 sec)

Test as follows:


mysql> explain analyze select C_NAME, count(C_PHONE) from customer group by c_name;
+----------------------------+-------------+----------+-----------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+
| id                         | estRows     | actRows  | task      | access object  | execution info                                                                                                                                                                                                                                                                                           | operator info                                                                                                                                       | memory   | disk    |
+----------------------------+-------------+----------+-----------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+
| Projection_4               | 15000000.00 | 15000000 | root      |                | time:2m6s, loops:14650, RRU:48513.136572, WRU:0.000000, Concurrency:5                                                                                                                                                                                                                                    | tpch100.customer.c_name, Column#9                                                                                                                   | 427.8 KB | N/A     |
| └─HashAgg_7                | 15000000.00 | 15000000 | root      |                | time:2m5.9s, loops:14650                                                                                                                                                                                                                                                                                 | group by:tpch100.customer.c_name, funcs:count(tpch100.customer.c_phone)->Column#9, funcs:firstrow(tpch100.customer.c_name)->tpch100.customer.c_name | 971.6 MB | 1.38 GB |
|   └─TableReader_12         | 15000000.00 | 15000000 | root      |                | time:146.5ms, loops:14675, cop_task: {num: 565, max: 58.6ms, min: 393µs, avg: 12.5ms, p95: 26.4ms, max_proc_keys: 50144, p95_proc_keys: 50144, tot_proc: 5.39s, tot_wait: 65.6ms, rpc_num: 565, rpc_time: 7.04s, copr_cache_hit_ratio: 0.00, build_task_duration: 37.6µs, max_distsql_concurrency: 15}   | data:TableFullScan_11                                                                                                                               | 37.5 MB  | N/A     |
|     └─TableFullScan_11     | 15000000.00 | 15000000 | cop[tikv] | table:customer | tikv_task:{proc max:38ms, min:0s, avg: 8.29ms, p80:15ms, p95:20ms, iters:16879, tasks:565}, scan_detail: {total_process_keys: 15000000, total_process_keys_size: 3052270577, total_keys: 15000565, get_snapshot_time: 22.2ms, rocksdb: {key_skipped_count: 15000000, block: {cache_hit_count: 103777}}}  | keep order:false                                                                                                                                    | N/A      | N/A     |
+----------------------------+-------------+----------+-----------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+
4 rows in set (2 min 6.19 sec)

mysql> explain analyze select C_NAME, count(C_PHONE), count(C_ADDRESS) from customer group by c_name;
ERROR 1105 (HY000): Out Of Memory Quota![conn=3978033069293983]

Observe the tidb logs during its execution:

[2023/05/03 23:17:15.596 +08:00] [INFO] [controller.go:282] ["[resource group controller] create resource group cost controller"] [name=default]
[2023/05/03 23:17:16.310 +08:00] [INFO] [aggregate.go:1949] ["memory exceeds quota, set aggregate mode to spill-mode"] [spillTimes=1] [consumed=165914156] [quota=207374182]
[2023/05/03 23:17:28.915 +08:00] [WARN] [expensivequery.go:118] ["memory exceeds quota"] [cost_time=13.329033457s] [cop_time=0.015257304s] [process_time=6.481480135s] [wait_time=0.047572265s] [request_count=565] [total_keys=14977315] [process_keys=14976768] [num_cop_tasks=565] [process_avg_time=0.011471646s] [process_p90_time=0.024542849s] [process_max_time=0.035438311s] [process_max_addr=192.168.31.201:20160] [wait_avg_time=0.000084198s] [wait_p90_time=0.000135554s] [wait_max_time=0.000884638s] [wait_max_addr=192.168.31.201:20160] [stats=customer:441211688017461249] [conn=3978033069293983] [user=root] [database=tpch100] [table_ids="[369]"] [txn_start_ts=441221653614297089] [mem_max="232134581 Bytes (221.4 MB)"] [sql="explain analyze select C_NAME, count(C_PHONE), count(C_ADDRESS) from customer group by c_name"]
[2023/05/03 23:17:28.915 +08:00] [ERROR] [projection.go:466] ["projection executor panicked"] [error="Out Of Memory Quota![conn=3978033069293983]"] [stack="github.com/pingcap/tidb/executor.recoveryProjection\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/projection.go:466\ngithub.com/pingcap/tidb/executor.(*projectionInputFetcher).run.func1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/projection.go:364\nruntime.gopanic\n\t/usr/local/go/src/runtime/panic.go:884\ngithub.com/pingcap/tidb/util/memory.(*PanicOnExceed).Action\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/util/memory/action.go:148\ngithub.com/pingcap/tidb/util/memory.(*Tracker).Consume.func1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/util/memory/tracker.go:437\ngithub.com/pingcap/tidb/util/memory.(*Tracker).Consume\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/util/memory/tracker.go:478\ngithub.com/pingcap/tidb/executor.(*HashAggExec).getPartialResults\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/aggregate.go:1121\ngithub.com/pingcap/tidb/executor.(*HashAggExec).execute\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/aggregate.go:1045\ngithub.com/pingcap/tidb/executor.(*HashAggExec).unparallelExec\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/aggregate.go:970\ngithub.com/pingcap/tidb/executor.(*HashAggExec).Next\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/aggregate.go:789\ngithub.com/pingcap/tidb/executor.Next\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/executor.go:326\ngithub.com/pingcap/tidb/executor.(*projectionInputFetcher).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/projection.go:388"]
[2023/05/03 23:17:29.098 +08:00] [INFO] [conn.go:1151] ["command dispatched failed"] [conn=3978033069293983] [connInfo="id:3978033069293983, addr:192.168.31.200:37786 status:10, collation:utf8mb4_0900_ai_ci, user:root"] [command=Query] [status="inTxn:0, autocommit:1"] [sql="explain analyze select C_NAME, count(C_PHONE), count(C_ADDRESS) from customer group by c_name"] [txn_mode=PESSIMISTIC] [timestamp=441221653614297089] [err="Out Of Memory Quota![conn=3978033069293983]\ngithub.com/pingcap/tidb/executor.recoveryProjection\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/projection.go:464\ngithub.com/pingcap/tidb/executor.(*projectionInputFetcher).run.func1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/projection.go:364\nruntime.gopanic\n\t/usr/local/go/src/runtime/panic.go:884\ngithub.com/pingcap/tidb/util/memory.(*PanicOnExceed).Action\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/util/memory/action.go:148\ngithub.com/pingcap/tidb/util/memory.(*Tracker).Consume.func1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/util/memory/tracker.go:437\ngithub.com/pingcap/tidb/util/memory.(*Tracker).Consume\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/util/memory/tracker.go:478\ngithub.com/pingcap/tidb/executor.(*HashAggExec).getPartialResults\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/aggregate.go:1121\ngithub.com/pingcap/tidb/executor.(*HashAggExec).execute\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/aggregate.go:1045\ngithub.com/pingcap/tidb/executor.(*HashAggExec).unparallelExec\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/aggregate.go:970\ngithub.com/pingcap/tidb/executor.(*HashAggExec).Next\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/aggregate.go:789\ngithub.com/pingcap/tidb/executor.Next\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/executor.go:326\ngithub.com/pingcap/tidb/executor.(*projectionInputFetcher).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb/executor/projection.go:388\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1598"]

It can be seen that the maximum number of spills to disk during hashAgg is: spillTimes=1, only once. In its spill behavior definition, it needs to exceed 10 spills (maxSpillTimes) before falling back to oom-action=cancel (DefPanicPriority lowest priority), otherwise, it will prioritize spilling (DefSpillPriority). Relevant code:

// Action set HashAggExec spill mode.
func (a *AggSpillDiskAction) Action(t *memory.Tracker) {
	// Guarantee that processed data is at least 20% of the threshold, to avoid spilling too frequently.
	if atomic.LoadUint32(&a.e.inSpillMode) == 0 && a.spillTimes < maxSpillTimes && a.e.memTracker.BytesConsumed() >= t.GetBytesLimit()/5 {
		a.spillTimes++
		logutil.BgLogger().Info("memory exceeds quota, set aggregate mode to spill-mode",
			zap.Uint32("spillTimes", a.spillTimes),
			zap.Int64("consumed", t.BytesConsumed()),
			zap.Int64("quota", t.GetBytesLimit()))
		atomic.StoreUint32(&a.e.inSpillMode, 1)
		memory.QueryForceDisk.Add(1)
		return
	}
	if fallback := a.GetFallback(); fallback != nil {
		fallback.Action(t)
	}
}

Since only one spill is performed, why is it still canceled (issue 4)?

  1. For issue 1, almost all spill behaviors have this slow situation, can it be optimized?
  2. For issue 2, why limit the number of repeated spills to maxSpillTimes=10?
  3. For issue 3, multiple spill operations not only amplify issue 1 but also cause disk space to expand rapidly. Can it be designed into multiple “segments” to recycle unused space in time?
  4. For issue 4, even if spilling is enabled, it is still often canceled. What is the main reason, and can it be optimized?
  5. Additionally, for parallel hashAgg, is there a plan to add spill behavior?
| username: 人如其名 | Original post link

For question 4, it should have occurred during the process of hashAgg data spilling to disk. The fallback action detected insufficient memory and chose a lower priority action (in this case, PanicOnExceed, oom-action=cancel), which killed the current session.

This situation generally happens because the data sent from TiKV to TiDB is relatively slow. When TiDB receives the data and passes it to the hashAgg operator, the hashAgg operator detects insufficient memory and starts spilling to disk. However, the spilling process happens row by row, which might be slower than the rate at which TiKV sends data to TiDB. This causes an increasing backlog of RPC messages in TiDB. While the spilling is still ongoing (a.e.inSpillMode=1), the system will follow the degradation logic (looking for a lower priority action). Since we set oom-action=cancel (PanicOnExceed), and the memory data backlog keeps increasing, it is determined to cancel the session.

| username: Raymond | Original post link

For question 3
I wonder if it can be designed this way?
MySQL 8.0 maintains a pool for temporary tablespaces. This pool initializes 10 temporary tablespaces (the number of tablespaces can be expanded as needed). Each MySQL session can use the temporary tablespaces from this pool. Similarly, I wonder if TiDB can be set up this way. If there are multiple disk writes, the data from each write can be stored in different temporary tablespaces. If the disk write data is no longer needed, the corresponding temporary tablespace can be truncated to avoid putting everything into a single file, which makes it difficult to truncate and leads to the file system space not being released in time.

| username: XuHuaiyu-PingCAP | Original post link

  1. For Question 1, almost all disk write operations exhibit this slow behavior. Can it be optimized?
  2. For Question 2, why is the maximum number of repeated disk writes limited to maxSpillTimes=10?
  3. For Question 3, multiple disk write operations not only exacerbate the issue in Question 1 but also cause a rapid increase in disk space usage. Can it be designed to have multiple “segments” to promptly reclaim unused space?
  4. For Question 4, even with disk writes enabled, cancellations still frequently occur. What are the main causes, and can it be optimized?
  5. Additionally, for parallel hashAgg, is there a plan to add disk write behavior?

Question 1: Subsequently, in the disk, row by row (causing slow performance, Question 1) read previously unprocessed records.
I checked the code, and it should read files from the disk in chunks (1024 rows).

Question 2: But it cannot exceed maxSpillTimes=10 times (why limit the maximum number of disk writes).
This was a magic number set during implementation. The initial consideration was to calculate 10% each time, completing in 10 iterations. If the number of iterations is too high, the concern is that the back-and-forth disk writes might lead to prolonged resource occupation.

Question 3: Secondary disk writes continue to append to the temporary disk file without clearing previous records, which may lead to significant disk space usage.
This is an implementation issue and can indeed be optimized.

Question 4: Even with disk writes enabled, cancellations still frequently occur. What are the main causes, and can it be optimized?

  1. The design idea for agg disk writes was to ensure that the memory of the agg operator does not increase further.
  2. Since agg disk writes cannot reduce the query memory, its trigger point is tidb_mem_quota_query * 80%.
  3. In TiDB’s implementation, the control behavior triggered by tidb_mem_quota_query * 80% (soft limit) and tidb_mem_quota_query (hard limit) are tracked using different chains. agg disk writes are placed on the ActionForSoftLimit chain, and cancel SQL is placed on the ActionForHardLimit chain.
  4. Since actions are on different chains, agg disk writes do not fall back to cancel query behavior.
  5. The reason for agg disk writes still resulting in SQL cancellations is that during the disk write process, the memory consumption of the SQL continues to grow, exceeding the tidb_mem_quota_query (hard limit), triggering the cancel behavior on the hard limit chain.

Question 5:
There are plans, but it has not yet been decided in which version it will be introduced.

| username: 人如其名 | Original post link

Thank you, but I have some doubts. The function called here, func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error), although it fetches one chunk at a time, interacts with the OS one record per IO operation.
The code interacting with IO:

// ReadFrom reads data of r, deserializes it from the format of diskFormatRow
// into Row.
func (row *rowInDisk) ReadFrom(r io.Reader) (n int64, err error) {
	b := make([]byte, 8*row.numCol)
	var n1 int
	n1, err = io.ReadFull(r, b)
	n += int64(n1)
	if err != nil {
		return
	}
	row.sizesOfColumns = bytesToI64Slice(b)
	row.cells = make([][]byte, 0, row.numCol)
	for _, size := range row.sizesOfColumns {
		if size == -1 {
			continue
		}
		cell := make([]byte, size)
		row.cells = append(row.cells, cell)
		n1, err = io.ReadFull(r, cell)
		n += int64(n1)
		if err != nil {
			return
		}
	}
	return
}
| username: XuHuaiyu-PingCAP | Original post link

Okay, from this method, it is indeed read line by line.
It should be to prevent correctness issues when reading variable-length data like strings.

| username: 人如其名 | Original post link

It reads one row with multiple IO operations, reading column by column. :zipper_mouth_face:

| username: system | Original post link

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.