mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Merge branch 'master' into select_many_merge_selector
This commit is contained in:
commit
3f9183c69f
@ -131,16 +131,6 @@ Type: UInt64
|
||||
|
||||
Default: 8
|
||||
|
||||
## background_pool_size
|
||||
|
||||
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
|
||||
|
||||
Before changing it, please also take a look at related MergeTree settings, such as `number_of_free_entries_in_pool_to_lower_max_size_of_merge` and `number_of_free_entries_in_pool_to_execute_mutation`.
|
||||
|
||||
Type: UInt64
|
||||
|
||||
Default: 16
|
||||
|
||||
## background_schedule_pool_size
|
||||
|
||||
The maximum number of threads that will be used for constantly executing some lightweight periodic operations for replicated tables, Kafka streaming, and DNS cache updates.
|
||||
|
@ -25,8 +25,10 @@ struct BitShiftLeftImpl
|
||||
{
|
||||
if constexpr (is_big_int_v<B>)
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "BitShiftLeft is not implemented for big integers as second argument");
|
||||
else if (b < 0 || static_cast<UInt256>(b) > 8 * sizeof(A))
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value and less or equal to the bit width of the value to shift");
|
||||
else if (b < 0)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value");
|
||||
else if (static_cast<UInt256>(b) > 8 * sizeof(A))
|
||||
return static_cast<Result>(0);
|
||||
else if constexpr (is_big_int_v<A>)
|
||||
return static_cast<Result>(a) << static_cast<UInt32>(b);
|
||||
else
|
||||
@ -43,9 +45,10 @@ struct BitShiftLeftImpl
|
||||
const UInt8 word_size = 8 * sizeof(*pos);
|
||||
size_t n = end - pos;
|
||||
const UInt128 bit_limit = static_cast<UInt128>(word_size) * n;
|
||||
if (b < 0 || static_cast<decltype(bit_limit)>(b) > bit_limit)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value and less or equal to the bit width of the value to shift");
|
||||
if (b == bit_limit)
|
||||
if (b < 0)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value");
|
||||
|
||||
if (b == bit_limit || static_cast<decltype(bit_limit)>(b) > bit_limit)
|
||||
{
|
||||
// insert default value
|
||||
out_vec.push_back(0);
|
||||
@ -111,9 +114,10 @@ struct BitShiftLeftImpl
|
||||
const UInt8 word_size = 8;
|
||||
size_t n = end - pos;
|
||||
const UInt128 bit_limit = static_cast<UInt128>(word_size) * n;
|
||||
if (b < 0 || static_cast<decltype(bit_limit)>(b) > bit_limit)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value and less or equal to the bit width of the value to shift");
|
||||
if (b == bit_limit)
|
||||
if (b < 0)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value");
|
||||
|
||||
if (b == bit_limit || static_cast<decltype(bit_limit)>(b) > bit_limit)
|
||||
{
|
||||
// insert default value
|
||||
out_vec.resize_fill(out_vec.size() + n);
|
||||
|
@ -26,8 +26,10 @@ struct BitShiftRightImpl
|
||||
{
|
||||
if constexpr (is_big_int_v<B>)
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "BitShiftRight is not implemented for big integers as second argument");
|
||||
else if (b < 0 || static_cast<UInt256>(b) > 8 * sizeof(A))
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value and less or equal to the bit width of the value to shift");
|
||||
else if (b < 0)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value");
|
||||
else if (static_cast<UInt256>(b) > 8 * sizeof(A))
|
||||
return static_cast<Result>(0);
|
||||
else if constexpr (is_big_int_v<A>)
|
||||
return static_cast<Result>(a) >> static_cast<UInt32>(b);
|
||||
else
|
||||
@ -59,9 +61,10 @@ struct BitShiftRightImpl
|
||||
const UInt8 word_size = 8;
|
||||
size_t n = end - pos;
|
||||
const UInt128 bit_limit = static_cast<UInt128>(word_size) * n;
|
||||
if (b < 0 || static_cast<decltype(bit_limit)>(b) > bit_limit)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value and less or equal to the bit width of the value to shift");
|
||||
if (b == bit_limit)
|
||||
if (b < 0)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value");
|
||||
|
||||
if (b == bit_limit || static_cast<decltype(bit_limit)>(b) > bit_limit)
|
||||
{
|
||||
/// insert default value
|
||||
out_vec.push_back(0);
|
||||
@ -99,9 +102,10 @@ struct BitShiftRightImpl
|
||||
const UInt8 word_size = 8;
|
||||
size_t n = end - pos;
|
||||
const UInt128 bit_limit = static_cast<UInt128>(word_size) * n;
|
||||
if (b < 0 || static_cast<decltype(bit_limit)>(b) > bit_limit)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value and less or equal to the bit width of the value to shift");
|
||||
if (b == bit_limit)
|
||||
if (b < 0)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The number of shift positions needs to be a non-negative value");
|
||||
|
||||
if (b == bit_limit || static_cast<decltype(bit_limit)>(b) > bit_limit)
|
||||
{
|
||||
// insert default value
|
||||
out_vec.resize_fill(out_vec.size() + n);
|
||||
|
@ -150,7 +150,7 @@ bool allow(
|
||||
double sum_size,
|
||||
double max_size,
|
||||
double min_age,
|
||||
double range_size,
|
||||
size_t range_size,
|
||||
double partition_size,
|
||||
double min_size_to_lower_base_log,
|
||||
double max_size_to_lower_base_log,
|
||||
@ -159,6 +159,9 @@ bool allow(
|
||||
if (settings.min_age_to_force_merge && min_age >= settings.min_age_to_force_merge)
|
||||
return true;
|
||||
|
||||
if (settings.min_parts_to_merge_at_once && range_size < settings.min_parts_to_merge_at_once)
|
||||
return false;
|
||||
|
||||
/// Map size to 0..1 using logarithmic scale
|
||||
/// Use log(1 + x) instead of log1p(x) because our sum_size is always integer.
|
||||
/// Also log1p seems to be slow and significantly affect performance of merges assignment.
|
||||
|
@ -90,6 +90,8 @@ public:
|
||||
{
|
||||
/// Zero means unlimited. Can be overridden by the same merge tree setting.
|
||||
size_t max_parts_to_merge_at_once = 100;
|
||||
/// Zero means no minimum. Can be overridden by the same merge tree setting.
|
||||
size_t min_parts_to_merge_at_once = 0;
|
||||
|
||||
/// Some sort of a maximum number of parts in partition. Can be overridden by the same merge tree setting.
|
||||
size_t parts_to_throw_insert = 3000;
|
||||
|
@ -83,6 +83,7 @@ namespace MergeTreeSetting
|
||||
extern const MergeTreeSettingsBool merge_selector_enable_heuristic_to_remove_small_parts_at_right;
|
||||
extern const MergeTreeSettingsFloat merge_selector_base;
|
||||
extern const MergeTreeSettingsUInt64 merge_selector_max_ranges_to_select_at_once;
|
||||
extern const MergeTreeSettingsUInt64 min_parts_to_merge_at_once;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -597,6 +598,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
|
||||
simple_merge_settings.max_parts_to_merge_at_once = (*data_settings)[MergeTreeSetting::max_parts_to_merge_at_once];
|
||||
simple_merge_settings.enable_heuristic_to_remove_small_parts_at_right = (*data_settings)[MergeTreeSetting::merge_selector_enable_heuristic_to_remove_small_parts_at_right];
|
||||
simple_merge_settings.base = (*data_settings)[MergeTreeSetting::merge_selector_base];
|
||||
simple_merge_settings.min_parts_to_merge_at_once = (*data_settings)[MergeTreeSetting::min_parts_to_merge_at_once];
|
||||
|
||||
if (!(*data_settings)[MergeTreeSetting::min_age_to_force_merge_on_partition_only])
|
||||
simple_merge_settings.min_age_to_force_merge = (*data_settings)[MergeTreeSetting::min_age_to_force_merge_seconds];
|
||||
|
@ -103,6 +103,7 @@ namespace ErrorCodes
|
||||
DECLARE(MergeSelectorAlgorithm, merge_selector_algorithm, MergeSelectorAlgorithm::SIMPLE, "The algorithm to select parts for merges assignment", EXPERIMENTAL) \
|
||||
DECLARE(Bool, merge_selector_enable_heuristic_to_remove_small_parts_at_right, true, "Enable heuristic for selecting parts for merge which removes parts from right side of range, if their size is less than specified ratio (0.01) of sum_size. Works for Simple and StochasticSimple merge selectors", 0) \
|
||||
DECLARE(Float, merge_selector_base, 5.0, "Affects write amplification of assigned merges (expert level setting, don't change if you don't understand what it is doing). Works for Simple and StochasticSimple merge selectors", 0) \
|
||||
DECLARE(UInt64, min_parts_to_merge_at_once, 0, "Minimal amount of data parts which merge selector can pick to merge at once (expert level setting, don't change if you don't understand what it is doing). 0 - disabled. Works for Simple and StochasticSimple merge selectors.", 0) \
|
||||
\
|
||||
/** Inserts settings. */ \
|
||||
DECLARE(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
|
||||
|
@ -10,7 +10,7 @@ DROP TABLE IF EXISTS t1;
|
||||
CREATE TABLE t0 (vkey UInt32, pkey UInt32, c0 UInt32) engine = TinyLog;
|
||||
CREATE TABLE t1 (vkey UInt32) ENGINE = AggregatingMergeTree ORDER BY vkey;
|
||||
INSERT INTO t0 VALUES (15, 25000, 58);
|
||||
SELECT ref_5.pkey AS c_2_c2392_6 FROM t0 AS ref_5 WHERE 'J[' < multiIf(ref_5.pkey IN ( SELECT 1 ), bitShiftLeft(multiIf(ref_5.c0 > NULL, '1', ')'), 40), NULL); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT ref_5.pkey AS c_2_c2392_6 FROM t0 AS ref_5 WHERE 'J[' < multiIf(ref_5.pkey IN ( SELECT 1 ), bitShiftLeft(multiIf(ref_5.c0 > NULL, '1', ')'), 40), NULL);
|
||||
DROP TABLE t0;
|
||||
DROP TABLE t1;
|
||||
|
||||
|
@ -1,3 +1,9 @@
|
||||
-- bitShiftRight
|
||||
0
|
||||
|
||||
\0\0\0\0\0\0\0\0
|
||||
-- bitShiftLeft
|
||||
0
|
||||
|
||||
\0\0\0\0\0\0\0\0
|
||||
OK
|
||||
|
@ -1,17 +1,17 @@
|
||||
SELECT '-- bitShiftRight';
|
||||
SELECT bitShiftRight(1, -1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftRight(toUInt8(1), 8 + 1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftRight(toUInt8(1), 8 + 1);
|
||||
SELECT bitShiftRight('hola', -1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftRight('hola', 4 * 8 + 1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftRight('hola', 4 * 8 + 1);
|
||||
SELECT bitShiftRight(toFixedString('hola', 8), -1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftRight(toFixedString('hola', 8), 8 * 8 + 1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftRight(toFixedString('hola', 8), 8 * 8 + 1);
|
||||
|
||||
SELECT '-- bitShiftLeft';
|
||||
SELECT bitShiftLeft(1, -1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftLeft(toUInt8(1), 8 + 1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftLeft(toUInt8(1), 8 + 1);
|
||||
SELECT bitShiftLeft('hola', -1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftLeft('hola', 4 * 8 + 1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftLeft('hola', 4 * 8 + 1);
|
||||
SELECT bitShiftLeft(toFixedString('hola', 8), -1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftLeft(toFixedString('hola', 8), 8 * 8 + 1); -- { serverError ARGUMENT_OUT_OF_BOUND }
|
||||
SELECT bitShiftLeft(toFixedString('hola', 8), 8 * 8 + 1);
|
||||
|
||||
SELECT 'OK';
|
@ -18,12 +18,12 @@ ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM ghdata WHERE NOT ignore(*)"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q \
|
||||
"SELECT data.repo.name, count() AS stars FROM ghdata \
|
||||
WHERE data.type = 'WatchEvent' GROUP BY data.repo.name ORDER BY stars DESC, data.repo.name LIMIT 5"
|
||||
WHERE data.type = 'WatchEvent' GROUP BY data.repo.name ORDER BY stars DESC, data.repo.name LIMIT 5" --allow_suspicious_types_in_group_by=1 --allow_suspicious_types_in_order_by=1
|
||||
|
||||
${CLICKHOUSE_CLIENT} --enable_analyzer=1 -q \
|
||||
"SELECT data.payload.commits[].author.name AS name, count() AS c FROM ghdata \
|
||||
ARRAY JOIN data.payload.commits[].author.name \
|
||||
GROUP BY name ORDER BY c DESC, name LIMIT 5"
|
||||
GROUP BY name ORDER BY c DESC, name LIMIT 5" --allow_suspicious_types_in_group_by=1 --allow_suspicious_types_in_order_by=1
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT max(data.payload.pull_request.assignees[].size0) FROM ghdata"
|
||||
|
||||
|
@ -0,0 +1,4 @@
|
||||
2
|
||||
3
|
||||
4
|
||||
1
|
42
tests/queries/0_stateless/03267_min_parts_to_merge_at_once.sh
Executable file
42
tests/queries/0_stateless/03267_min_parts_to_merge_at_once.sh
Executable file
@ -0,0 +1,42 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS t;"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "CREATE TABLE t (key UInt64) ENGINE = MergeTree() ORDER BY tuple() SETTINGS min_parts_to_merge_at_once=5, merge_selector_base=1"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO t VALUES (1)"
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO t VALUES (2);"
|
||||
|
||||
# doesn't make test flaky
|
||||
sleep 1
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.parts WHERE active and database = currentDatabase() and table = 't'"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO t VALUES (3)"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.parts WHERE active and database = currentDatabase() and table = 't'"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO t VALUES (4)"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.parts WHERE active and database = currentDatabase() and table = 't'"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO t VALUES (5)"
|
||||
|
||||
counter=0 retries=60
|
||||
|
||||
while [[ $counter -lt $retries ]]; do
|
||||
result=$($CLICKHOUSE_CLIENT --query "SELECT count() FROM system.parts WHERE active and database = currentDatabase() and table = 't'")
|
||||
if [ "$result" -eq "1" ];then
|
||||
break;
|
||||
fi
|
||||
sleep 0.5
|
||||
counter=$((counter + 1))
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.parts WHERE active and database = currentDatabase() and table = 't'"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS t"
|
Loading…
Reference in New Issue
Block a user