* feat: Preserve number of streams after evaluation the window functions to allow parallel stream processing
* fix style
* fix style
* fix style
* setting query_plan_preserve_num_streams_after_window_functions default true
* fix tests by SETTINGS query_plan_preserve_num_streams_after_window_functions=0
* fix test references
* Resize the streams after the last window function, to keep the order between WindowTransforms (and WindowTransform works on single stream anyway).
* feat: Preserve number of streams after evaluation the window functions to allow parallel stream processing
* fix style
* fix style
* fix style
* setting query_plan_preserve_num_streams_after_window_functions default true
* fix tests by SETTINGS query_plan_preserve_num_streams_after_window_functions=0
* fix test references
* Resize the streams after the last window function, to keep the order between WindowTransforms (and WindowTransform works on single stream anyway).
* add perf test
* perf: change the dataset from 50M to 5M
* rename query_plan_preserve_num_streams_after_window_functions -> query_plan_enable_multithreading_after_window_functions
* update test reference
* fix clang-tidy
---------
Co-authored-by: Nikita Taranov <nikita.taranov@clickhouse.com>
* Cache cast function in set during execution
Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
* minor fix for performance test
Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
* Update src/Interpreters/castColumn.cpp
Co-authored-by: Nikita Taranov <nickita.taranov@gmail.com>
* improvement
Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
* fix use-after-free
Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
---------
Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
Co-authored-by: Nikita Taranov <nickita.taranov@gmail.com>
* Optimize the merge if all hashSets are singleLevel
In PR(https://github.com/ClickHouse/ClickHouse/pull/50748), it has added new phase
`parallelizeMergePrepare` before merge if all the hashSets are not all singleLevel
or not all twoLevel. Then it will convert all the singleLevelSet to twoLevelSet in
parallel, which will increase the CPU utilization and QPS.
But if all the hashtables are singleLevel, it could also benefit from the
`parallelizeMergePrepare` optimization in most cases if the hashtable size are not
too small. By tuning the Query `SELECT COUNT(DISTINCT SearchPhase) FROM hits_v1`
in different threads, we have got the mild threshold 6,000.
Test patch with the Query 'SELECT COUNT(DISTINCT Title) FROM hits_v1' on 2x80 vCPUs
server. If the threads are less than 48, the hashSets are all twoLevel or mixed by
singleLevel and twoLevel. If the threads are over 56, all the hashSets are singleLevel.
And the QPS has got at most 2.35x performance gain.
Threads Opt/Base
8 100.0%
16 99.4%
24 110.3%
32 99.9%
40 99.3%
48 99.8%
56 183.0%
64 234.7%
72 233.1%
80 229.9%
88 224.5%
96 229.6%
104 235.1%
112 229.5%
120 229.1%
128 217.8%
136 222.9%
144 217.8%
152 204.3%
160 203.2%
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
* Add the comment and explanation for PR#52973
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
---------
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
* Convert hashSets in parallel before merge
Before merge, if one of the lhs and rhs is singleLevelSet and the other is twoLevelSet,
then the SingleLevelSet will call convertToTwoLevel(). The convert process is not in parallel
and it will cost lots of cycle if it cosume all the singleLevelSet.
The idea of the patch is to convert all the singleLevelSets to twoLevelSets in parallel if
the hashsets are not all singleLevel or not all twoLevel.
I have tested the patch on Intel 2 x 112 vCPUs SPR server with clickbench and latest upstream
ClickHouse.
Q5 has got a big 264% performance improvement and 24 queries have got at least 5% performance
gain. The overall geomean of 43 queries has gained 7.4% more than the base code.
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
* add resize() for the data_vec in parallelizeMergePrepare()
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
* Add the performance test prepare_hash_before_merge.xml
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
* Fit the CI to rename the data set from hits_v1 to test.hits.
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
* remove the redundant branch in UniqExactSet
Co-authored-by: Nikita Taranov <nickita.taranov@gmail.com>
* Remove the empty methods and add throw exception in parallelizeMergePrepare()
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
---------
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
Co-authored-by: Nikita Taranov <nickita.taranov@gmail.com>
Adding more processors for parallelize_output_from_storages is not a
costless operation (I've experienced some issues in production because
of this), and it is not easy to fix in a normal way, so let's disable it
for now.
Before this patch:
- INSERT INTO input SELECT * FROM numbers(10e6) SETTINGS parallelize_output_from_storages=1, min_insert_block_size_rows=1000
0 rows in set. Elapsed: 3.648 sec. Processed 20.00 million rows, 120.00 MB (5.48 million rows/s., 32.90 MB/s.)
- INSERT INTO input SELECT * FROM numbers(10e6) SETTINGS parallelize_output_from_storages=0, min_insert_block_size_rows=1000
0 rows in set. Elapsed: 1.851 sec. Processed 20.00 million rows, 120.00 MB (10.80 million rows/s., 64.82 MB/s.)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
As it turns out, HashMap/PackedHashMap works great even with max load
factor of 0.99. By "great" I mean it least it works faster then
google sparsehash, and not to mention it's friendliness to the memory
allocator (it has zero fragmentation since it works with a continuious
memory region, in comparison to the sparsehash that doing lots of
realloc, which jemalloc does not like, due to it's slabs).
Here is a table of different setups:
settings | load (sec) | read (sec) | read (million rows/s) | bytes_allocated | RSS
- | - | - | - | - | -
HASHED upstream | - | - | - | - | 35GiB
SPARSE_HASHED upstream | - | - | - | - | 26GiB
- | - | - | - | - | -
sparse_hash_map glibc hashbench | - | - | - | - | 17.5GiB
sparse_hash_map packed allocator | 101.878 | 231.48 | 4.32 | - | 17.7GiB
PackedHashMap 0.5 | 15.514 | 42.35 | 23.61 | 20GiB | 22GiB
hashed 0.95 | 34.903 | 115.615 | 8.65 | 16GiB | 18.7GiB
**PackedHashMap 0.95** | **93.6** | **19.883** | **10.68** | **10GiB** | **12.8GiB**
PackedHashMap 0.99 | 26.113 | 83.6 | 11.96 | 10GiB | 12.3GiB
As it shows, PackedHashMap with 0.95 max_load_factor, eats 2.6x less
memory then SPARSE_HASHED in upstream, and it also 2x faster for read!
v2: fix grower
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
In case you want dictionary optimized for memory, SPARSE_HASHED is not
always gives you what you need.
Consider the following example <UInt64, UInt16> as <Key, Value>, but
this pair will also have a 6 byte padding (on amd64), so this is almost
40% of space wastage.
And because of this padding, even google::sparse_hash_map, does not make
picture better, in fact, sparse_hash_map is not very friendly to memory
allocators (especially jemalloc).
Here are some numbers for dictionary with 1e9 elements and UInt64 as
key, and UInt16 as value:
settings | load (sec) | read (sec) | read (million rows/s) | bytes_allocated | RSS
HASHED upstream | - | - | - | - | 35GiB
SPARSE_HASHED upstream | - | - | - | - | 26GiB
- | - | - | - | - | -
sparse_hash_map glibc hashbench | - | - | - | - | 17.5GiB
sparse_hash_map packed allocator | 101.878 | 231.48 | 4.32 | - | 17.7GiB
PackedHashMap | 15.514 | 42.35 | 23.61 | 20GiB | 22GiB
As you can see PackedHashMap looks way more better then HASHED, and
even better then SPARSE_HASHED, but slightly worse then sparse_hash_map
with packed allocator (it is done with a custom patch to google
sparse_hash_map).
v2: rebase on top of bucket_count fix
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
Reasons:
1. The original Gorilla paper proposed a compression schema for pairs of
time stamps and double-precision FP values. ClickHouse's Gorilla
codec only implements compression of the latter and it does not
impose any data type restrictions.
- Data types != Float* or (U)Int* (e.g. Decimal, Point etc.) are
definitely not supposed to be used with Gorilla.
- (U)Int* types are debatable. The paper only considers
integers-stored-as-FP-values, a practical use case for which
Gorilla works well. Standalone integers are not considered which
makes them at least suspicious.
2. Achieve consistency with FPC, another specialized floating-point
timeseries codec, which rejects non-float data.
3. On practical datasets, ZSTD is often "good enough" (**) so it should
be okay to disincentive non-ZSTD codecs a little bit. If needed,
Delta and DoubleDelta codecs are viable alternative for slowly
changing (time-series-like) integer sequences.
Since on-prem and hosted users may still have Gorilla-compressed
non-float data, this combination is only deprecated for now. No warning
or error will be emitted. Users are encouraged to migrate
Gorilla-compressed non-float data to an alternative codec. It is planned
to treat Gorilla-compressed non-float columns as "suspicious" six months
after this commit (i.e. in v23.6). Even then, it will still be possible
to set "allow_suspicious_codecs = true" and read and write
Gorilla-compressed non-float data.
(*) Sec. 4.1.2, "Gorilla restricts the value element in its tuple to a
double floating point type.", https://doi.org/10.14778/2824032.2824078
(**) https://clickhouse.com/blog/optimize-clickhouse-codecs-compression-schema
Right now dictionaries (here I will talk about only
HASHED/SPARSE_HASHED/COMPLEX_KEY_HASHED/COMPLEX_KEY_SPARSE_HASHED)
can load data only in one thread, since it uses one hash table that
cannot be filled from multiple threads.
And in case you have very big dictionary (i.e. 10e9 elements), it can
take a awhile to load them, especially for SPARSE_HASHED variants (and
if you have such amount of elements there, you are likely use
SPARSE_HASHED, since it requires less memory), in my env it takes ~4
hours, which is enormous amount of time.
So this patch add support of shards for dictionaries, number of shards
determine how much hash tables will use this dictionary, also, and which
is more important, how much threads it can use to load the data.
And with 16 threads this works 2x faster, not perfect though, see the
follow up patches in this series.
v0: PARTITION BY
v1: SHARDS 1
v2: SHARDS(1)
v3: tried optimized mod - logical and, but it does not gain even 10%
v4: tried squashing more (max_block_size * shards), but it does not gain even 10% either
v5: move SHARDS into layout parameters (unknown simply ignored)
v6: tune params for perf tests (to avoid too long queries)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
* impl
* fix style
* make executeQueryWithParallelReplicas similar to executeQuery
* impl for parallel replicas
* cleaner code for remote sorting properties
* update test
* fix
* handle when nodes of old versions participate
* small fixes
* temporary enable for testing
* fix after merge
* Revert "temporary enable for testing"
This reverts commit cce7f8884c.
* review fixes
* add bc test
* Update src/Core/Settings.h