On Manual Adjustment of Pushing Down hashAgg Operator to TiKV Nodes

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: 关于手工调整hashAgg算子下推到tikv节点的支持

| username: 人如其名

When performing aggregation on non-index fields, the optimizer will generally choose the hashAgg operator based on cost. However, hashAgg is a multi-stage aggregation that can first aggregate on the TiKV storage nodes and then re-aggregate on the TiDB compute nodes. When the key field of the aggregation has many duplicate values, pushing the aggregation down to TiKV can reduce the amount of data sent to the TiDB nodes, thereby reducing computation time and memory usage. However, if the key field has few duplicate values, pushing the aggregation down to TiKV can be counterproductive, wasting CPU resources on the storage nodes and resulting in poor aggregation performance. Specifically:

  1. If the key field of the aggregation has low duplication, it is preferable not to perform aggregation on the TiKV nodes and instead aggregate directly on the TiDB nodes.
  2. If the key field of the aggregation has high duplication, it is preferable to perform an initial aggregation on the TiKV nodes to reduce the aggregation pressure on the TiDB nodes, significantly improving performance.

However, statistics may not always be timely (or the optimizer’s evaluation may be problematic, such as when multiple fields have business-related correlations, leading to significant selectivity deviations), resulting in incorrect execution plans. This is especially problematic in the second scenario, where a large amount of filtering could have been done on TiKV but ends up being done on the TiDB side. For example, in a query like select col1, sum(col2_decimal), sum(col3_decimal), sum(col4_decimal)... from T group by col1;, all data is sent to the TiDB side, and the memory amplification issue with decimal data types often leads to OOM (Out of Memory) problems or very slow aggregation calculations.

Therefore, it is hoped that a session/global-level parameter can be provided to choose whether to push down the hashAgg to the storage nodes. Ideally, this could be implemented at the hint level for finer control, allowing users to specify hints for the necessary hashAgg operators (as there may be multiple hashAgg operators in a single statement, although this is rare in general).

Note: The tidb_opt_agg_push_down parameter does not push the hashAgg down to the TiKV nodes but is used to set whether the optimizer performs aggregation function pushdown before Join, Projection, and UnionAll optimization operations.

Here is an example (without considering optimization techniques like partitioned tables or multi-table bypass forms). For a large historical table (tens of billions of rows), data needs to be loaded based on the date O_ORDERDATE (millions of rows), and the status O_ORDERSTATUS of the newly loaded data is “not started.” After loading, batch query tasks involving aggregation need to be performed immediately on this newly loaded data. Since the amount of newly loaded data is relatively small compared to the overall data, it may not trigger automatic statistics collection, leading to potential evaluation issues for the new data. Collecting statistics for the entire table could take a long time and be costly, possibly even longer than the subsequent batch processing time.

Demonstration using the orders table from TPCH100, with the following steps:

  1. Create a new table orders_bak like orders and load a large amount of data before a certain date (O_ORDERDATE) from orders.
  2. Add a composite index (O_ORDERDATE, O_ORDERSTATUS) to orders_bak for quick filtering of new data.
  3. Collect statistics for the orders_bak table, treating it as historical full data.
  4. Load the latest date’s data into the orders_bak table (without collecting statistics).
  5. Perform an aggregation operation on the O_ORDERPRIORITY field in the orders_bak table using sum(O_TOTALPRICE).
  6. Observe the execution plan, hoping for the aggregation to be pushed down to TiKV, but it is not actually pushed down:
    explain select O_ORDERPRIORITY, sum(O_TOTALPRICE) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_ORDERPRIORITY;
mysql> show create table orders \G
*************************** 1. row ***************************
       Table: orders
Create Table: CREATE TABLE `orders` (
  `O_ORDERKEY` bigint(20) NOT NULL,
  `O_CUSTKEY` bigint(20) NOT NULL,
  `O_ORDERSTATUS` char(1) NOT NULL,
  `O_TOTALPRICE` decimal(15,2) NOT NULL,
  `O_ORDERDATE` date NOT NULL,
  `O_ORDERPRIORITY` char(15) NOT NULL,
  `O_CLERK` char(15) NOT NULL,
  `O_SHIPPRIORITY` bigint(20) NOT NULL,
  `O_COMMENT` varchar(79) NOT NULL,
  PRIMARY KEY (`O_ORDERKEY`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
1 row in set (0.00 sec)

mysql> select count(*) from orders;
+-----------+
| count(*)  |
+-----------+
| 150000000 |
+-----------+
1 row in set (1.83 sec)

mysql> select max(O_ORDERDATE), min(O_ORDERDATE) from orders;
+------------------+------------------+
| max(O_ORDERDATE) | min(O_ORDERDATE) |
+------------------+------------------+
| 1998-08-02       | 1992-01-01       |
+------------------+------------------+
1 row in set (1.55 sec)

mysql> select O_ORDERSTATUS, count(*) from orders group by O_ORDERSTATUS;
+---------------+----------+
| O_ORDERSTATUS | count(*) |
+---------------+----------+
| O             | 73086053 |
| F             | 73072502 |
| P             |  3841445 |
+---------------+----------+
3 rows in set (2.18 sec)

mysql> select O_ORDERPRIORITY, count(*) from orders group by O_ORDERPRIORITY;
+-----------------+----------+
| O_ORDERPRIORITY | count(*) |
+-----------------+----------+
| 4-NOT SPECIFIED | 30004093 |
| 1-URGENT        | 29995209 |
| 5-LOW           | 30002971 |
| 2-HIGH          | 29997467 |
| 3-MEDIUM        | 30000260 |
+-----------------+----------+
5 rows in set (2.69 sec)

Create the orders_bak table and load data:

mysql> create table orders_bak like orders;
Query OK, 0 rows affected (0.12 sec)

-- Load over a million rows of data (execution time is long due to the large amount of historical data, so only over a million rows are loaded here as historical data)
-- Enable TiFlash to accelerate loading
mysql> set tidb_enable_tiflash_read_for_write_stmt=ON;
Query OK, 0 rows affected (0.00 sec)

mysql> explain insert into orders_bak select * from orders where O_ORDERDATE != '1998-08-02' limit 1000000;
+----------------------------------+------------+--------------+---------------+------------------------------------------------------------+
| id                               | estRows    | task         | access object | operator info                                              |
+----------------------------------+------------+--------------+---------------+------------------------------------------------------------+
| Insert_1                         | N/A        | root         |               | N/A                                                        |
| └─Limit_12                       | 1000000.00 | root         |               | offset:0, count:1000000                                    |
|   └─TableReader_23               | 1000000.00 | root         |               | MppVersion: 1, data:ExchangeSender_22                      |
|     └─ExchangeSender_22          | 1000000.00 | mpp[tiflash] |               | ExchangeType: PassThrough                                  |
|       └─Limit_21                 | 1000000.00 | mpp[tiflash] |               | offset:0, count:1000000                                    |
|         └─Selection_20           | 1000000.00 | mpp[tiflash] |               | ne(tpch100.orders.o_orderdate, 1998-08-02 00:00:00.000000) |
|           └─TableFullScan_19     | 1002486.09 | mpp[tiflash] | table:orders  | keep order:false                                           |
+----------------------------------+------------+--------------+---------------+------------------------------------------------------------+
7 rows in set (0.01 sec)

mysql> batch on orders.O_ORDERKEY limit 5000 insert into orders_bak select * from orders where O_ORDERDATE  like '1992-01-%';
+----------------+---------------+
| number of jobs | job status    |
+----------------+---------------+
|            387 | all succeeded |
+----------------+---------------+
1 row in set (1 min 37.83 sec)
Records: 2789  Duplicates: 0  Warnings: 0

mysql> alter table orders_bak add index (O_ORDERDATE, O_ORDERSTATUS);
Query OK, 0 rows affected (7.34 sec)

mysql> analyze table orders_bak;
Query OK, 0 rows affected, 1 warning (2.29 sec)

Add incremental data to the orders_bak table. Note that the historical data does not contain the same O_ORDERDATE and O_ORDERSTATUS values as the incremental data. Here, a new “uninitialized status value” N is added. Load the data as follows:

mysql> insert into orders_bak select O_ORDERKEY, O_CUSTKEY, 'N', O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT from orders where O_ORDERDATE='1998-08-02';
Query OK, 62388 rows affected (7.58 sec)
Records: 62388  Duplicates: 0  Warnings: 0

The SQL execution plan is as follows:

mysql> explain select O_SHIPPRIORITY, sum(O_TOTALPRICE), count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;
+------------------------------------+---------+-----------+-----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id                                 | estRows | task      | access object                                                   | operator info                                                                                                                                                                                                                                 |
+------------------------------------+---------+-----------+-----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection_5                       | 1.00    | root      |                                                                 | tpch100.orders_bak.o_shippriority, Column#10, Column#11                                                                                                                                                                                       |
| └─HashAgg_9                        | 1.00    | root      |                                                                 | group by:tpch100.orders_bak.o_shippriority, funcs:sum(tpch100.orders_bak.o_totalprice)->Column#10, funcs:count(tpch100.orders_bak.o_custkey)->Column#11, funcs:firstrow(tpch100.orders_bak.o_shippriority)->tpch100.orders_bak.o_shippriority |
|   └─IndexLookUp_31                 | 0.00    | root      |                                                                 |                                                                                                                                                                                                                                               |
|     ├─IndexRangeScan_29(Build)     | 0.00    | cop[tikv] | table:orders_bak, index:O_ORDERDATE(O_ORDERDATE, O_ORDERSTATUS) | range:[1998-08-02 "N",1998-08-02 "N"], keep order:false                                                                                                                                                                                       |
|     └─TableRowIDScan_30(Probe)     | 0.00    | cop[tikv] | table:orders_bak                                                | keep order:false                                                                                                                                                                                                                              |
+------------------------------------+---------+-----------+-----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
5 rows in set (0.01 sec)

mysql> explain analyze select O_SHIPPRIORITY, sum(O_TOTALPRICE), count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;

| id                                 | estRows | actRows | task      | access object                                                   | execution info| operator info                                                                                                                                                                                                                                 | memory  | disk    |

| Projection_5                       | 1.00    | 1       | root      |                                                                 | time:361.1ms, loops:2, RRU:436.527075, WRU:0.000000, Concurrency| tpch100.orders_bak.o_shippriority, Column#10, Column#11                                                                                                                                                                                       | 2.14 KB | N/A     |
| └─HashAgg_9                        | 1.00    | 1       | root      |                                                                 | time:361.1ms, loops| group by:tpch100.orders_bak.o_shippriority, funcs:sum(tpch100.orders_bak.o_totalprice)->Column#10, funcs:count(tpch100.orders_bak.o_custkey)->Column#11, funcs:firstrow(tpch100.orders_bak.o_shippriority)->tpch100.orders_bak.o_shippriority | 83.2 KB | 0 Bytes |
|   └─IndexLookUp_31                 | 0.00    | 62388   | root      |                                                                 | time:354.1ms, loops:62, index_task: {total_time: 94.3ms, fetch_handle: 94.2ms, build: 18.9µs, wait: 68.8µs}, table_task: {total_time: 897.2ms, num: 7, concurrency: 5}, next: {wait_index: 31.5ms, wait_table_lookup_build: 459.3µs, wait_table_lookup_resp: 318.5ms}                                                                                                                                                                                                                                                                                                                                                                                                               |                                                                                                                                                                                                                                               | 5.28 MB | N/A     |
|     ├─IndexRangeScan_29(Build)     | 0.00    | 62388   | cop[tikv] | table:orders_bak, index:O_ORDERDATE(O_ORDERDATE, O_ORDERSTATUS) | time:91.1ms, loops:64, cop_task: {num: 9, max: 26.2ms, min: 5.22ms, avg: 10.2ms, p95: 26.2ms, max_proc_keys: 23988, p95_proc_keys: 23988, tot_proc: 74.1ms, tot_wait: 688.7µs, rpc_num: 9, rpc_time: 91.9ms, copr_cache_hit_ratio: 0.00, build_task_duration: 30.2µs, max_distsql_concurrency: 1}, tikv_task:{proc max:15ms, min:4ms, avg: 8.22ms, p80:15ms, p95:15ms, iters:96, tasks:9}, scan_detail: {total_process_keys: 62388, total_process_keys_size: 3431340, total_keys: 62397, get_snapshot_time: 205.9µs, rocksdb: {delete_skipped_count: 337313, key_skipped_count: 399701, block: {cache_hit_count: 1891, read_count: 257, read_byte: 3.98 MB, read_time: 5.44ms}}}    | range:[1998-08-02 "N",1998-08-02 "N"], keep order:false                                                                                                                                                                                       | N/A     | N/A     |
|     └─TableRowIDScan_30(Probe)     | 0.00    | 62388   | cop[tikv] | table:orders_bak                                                | time:855ms, loops:71, cop_task: {num: 9, max: 278.6ms, min: 11ms, avg: 99.2ms, p95: 278.6ms, max_proc_keys: 18170, p95_proc_keys: 18170, tot_proc: 631.3ms, tot_wait: 3.71ms, rpc_num: 9, rpc_time: 892.1ms, copr_cache_hit_ratio: 0.00, build_task_duration: 1.68ms, max_distsql_concurrency: 2}, tikv_task:{proc max:260ms, min:8ms, avg: 88.2ms, p80:188ms, p95:260ms, iters:102, tasks:9}, scan_detail: {total_process_keys: 62388, total_process_keys_size: 9472269, total_keys: 62401, get_snapshot_time: 3.01ms, rocksdb: {delete_skipped_count: 24, key_skipped_count: 50, block: {cache_hit_count: 275613, read_count: 12, read_byte: 190.6 KB, read_time: 338.2µs}}}      | keep order:false                                                                                                                                                                                                                              | N/A     | N/A     |

5 rows in set (0.36 sec)

As you can see, the hashAgg is not pushed down. If the entire table has a very large amount of data and the newly added data is in the hundreds of millions, performing hashAgg on the TiDB layer will put a lot of pressure on it. To avoid hashAgg causing OOM-kill, the parallelism is usually set to 1, allowing it to spill to disk. However, non-parallel hashAgg has relatively low operator efficiency, causing IndexLookUp_31 to potentially accumulate a lot of data. In practical use, if there are many aggregation fields (sum_col1, sum_col2, sum_col3…), it may occupy a lot of memory, leading to OOM-kill in the IndexLookup operator.

After collecting statistics for the entire table, the execution plan is as follows:

mysql> explain select O_SHIPPRIORITY, sum(O_TOTALPRICE), count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;
+-------------------------------+------------+-----------+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id                            | estRows    | task      | access object    | operator info                                                                                                                                                                                        |
+-------------------------------+------------+-----------+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection_5                  | 1.00       |
| username: time-and-fate | Original post link

Currently, TiDB provides the agg_to_cop() hint to instruct the optimizer to push down aggregate functions. You can try it to see if it solves the problem.
Documentation: https://docs.pingcap.com/zh/tidb/dev/optimizer-hints#agg_to_cop

| username: 人如其名 | Original post link

Thank you, master, it works!

| username: system | Original post link

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.