Merge branch 'master' into tighten-limits-functional-tests

This commit is contained in:
Alexey Milovidov 2024-07-26 14:11:59 +02:00
commit ffe15e2e37
12 changed files with 75 additions and 30 deletions

View File

@ -20,7 +20,7 @@
</max_execution_time>
<max_memory_usage>
<max>10G</max>
<max>5G</max>
</max_memory_usage>
<table_function_remote_max_addresses>

View File

@ -33,7 +33,7 @@ namespace DB
namespace
{
#if defined(OS_LINUX)
//thread_local size_t write_trace_iteration = 0;
thread_local size_t write_trace_iteration = 0;
#endif
/// Even after timer_delete() the signal can be delivered,
/// since it does not do anything with pending signals.
@ -57,7 +57,7 @@ namespace
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
#if defined(OS_LINUX) && false //asdqwe
#if defined(OS_LINUX)
if (info)
{
int overrun_count = info->si_overrun;
@ -92,7 +92,7 @@ namespace
constexpr bool sanitizer = false;
#endif
//asdqwe asynchronous_stack_unwinding = true;
asynchronous_stack_unwinding = true;
if (sanitizer || 0 == sigsetjmp(asynchronous_stack_unwinding_signal_jump_buffer, 1))
{
stack_trace.emplace(signal_context);

View File

@ -24,7 +24,6 @@
#include <Processors/Sources/NullSource.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/StorageSnapshot.h>
@ -517,14 +516,11 @@ void executeQueryWithParallelReplicas(
"`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard");
}
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(
new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size);
auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,
new_cluster,
storage_id,
std::move(coordinator),
header,
processed_stage,
new_context,

View File

@ -21,7 +21,7 @@
#include <Client/ConnectionPoolWithFailover.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Parsers/ASTFunction.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <boost/algorithm/string/join.hpp>
namespace DB
@ -362,7 +362,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
ASTPtr query_ast_,
ClusterPtr cluster_,
const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_,
QueryProcessingStage::Enum stage_,
ContextMutablePtr context_,
@ -375,7 +374,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
, cluster(cluster_)
, query_ast(query_ast_)
, storage_id(storage_id_)
, coordinator(std::move(coordinator_))
, stage(std::move(stage_))
, context(context_)
, throttler(throttler_)
@ -438,6 +436,9 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func);
}
coordinator
= std::make_shared<ParallelReplicasReadingCoordinator>(max_replicas_to_use, current_settings.parallel_replicas_mark_segment_size);
for (size_t i=0; i < max_replicas_to_use; ++i)
{
IConnections::ReplicaInfo replica_info

View File

@ -70,7 +70,6 @@ public:
ASTPtr query_ast_,
ClusterPtr cluster_,
const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_,
QueryProcessingStage::Enum stage_,
ContextMutablePtr context_,

View File

@ -33,7 +33,10 @@ def create_tables(cluster, table_name):
@pytest.mark.parametrize("skip_unavailable_shards", [1, 0])
def test_skip_all_replicas(start_cluster, skip_unavailable_shards):
@pytest.mark.parametrize("max_parallel_replicas", [2, 3, 100])
def test_skip_all_replicas(
start_cluster, skip_unavailable_shards, max_parallel_replicas
):
cluster_name = "test_1_shard_3_unavaliable_replicas"
table_name = "tt"
create_tables(cluster_name, table_name)
@ -43,7 +46,7 @@ def test_skip_all_replicas(start_cluster, skip_unavailable_shards):
f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3,
"max_parallel_replicas": max_parallel_replicas,
"cluster_for_parallel_replicas": cluster_name,
"skip_unavailable_shards": skip_unavailable_shards,
},

View File

@ -4,8 +4,13 @@
</settings>
<create_query>CREATE TABLE t (x UInt64, d32 Decimal32(3), d64 Decimal64(4), d128 Decimal128(5)) ENGINE = Memory</create_query>
<!-- use less threads to save memory -->
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(500000000) SETTINGS max_threads = 8</fill_query>
<!-- use less threads and several queries to save memory -->
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(100000000) SETTINGS max_threads = 2</fill_query>
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(100000000, 100000000) SETTINGS max_threads = 2</fill_query>
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(200000000, 100000000) SETTINGS max_threads = 2</fill_query>
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(300000000, 100000000) SETTINGS max_threads = 2</fill_query>
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(400000000, 100000000) SETTINGS max_threads = 2</fill_query>
<drop_query>DROP TABLE IF EXISTS t</drop_query>
<query>SELECT min(d32), max(d32), argMin(x, d32), argMax(x, d32) FROM t</query>

View File

@ -1,6 +1,6 @@
12 -> 102
13 -> 103
14 -> -1
12(r) -> 102
13(r) -> 103
14(r) -> 104
12 (after reloading) -> 102
13 (after reloading) -> 103
14 (after reloading) -> 104

View File

@ -1,4 +1,6 @@
#!/usr/bin/env bash
# Tags: no-random-settings
# Dictionaries are updated using the server time.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
@ -6,8 +8,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
set -e -o pipefail
# NOTE: dictionaries TTLs works with server timezone, so session_timeout cannot be used
$CLICKHOUSE_CLIENT --session_timezone '' --multiquery <<EOF
$CLICKHOUSE_CLIENT --multiquery <<EOF
CREATE TABLE ${CLICKHOUSE_DATABASE}.table(x Int64, y Int64, insert_time DateTime) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (12, 102, now());
@ -28,16 +29,21 @@ $CLICKHOUSE_CLIENT --query "SELECT '12 -> ', dictGetInt64('${CLICKHOUSE_DATABASE
$CLICKHOUSE_CLIENT --query "INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (13, 103, now())"
$CLICKHOUSE_CLIENT --query "INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (14, 104, now() - INTERVAL 1 DAY)"
# Wait when the dictionary will update the value for 13 on its own:
while [ "$(${CLICKHOUSE_CLIENT} --query "SELECT dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))")" = -1 ]
do
sleep 0.5
done
do
sleep 0.5
done
$CLICKHOUSE_CLIENT --query "SELECT '13 -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))"
# By the way, the value for 14 is expected to not be updated at this moment,
# because the values were selected by the update field insert_time, and for 14 it was set as one day ago.
$CLICKHOUSE_CLIENT --query "SELECT '14 -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(14))"
# SYSTEM RELOAD DICTIONARY reloads it completely, regardless of the update field, so we will see new values, even for key 14.
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD DICTIONARY '${CLICKHOUSE_DATABASE}.dict'"
$CLICKHOUSE_CLIENT --query "SELECT '12(r) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(12))"
$CLICKHOUSE_CLIENT --query "SELECT '13(r) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))"
$CLICKHOUSE_CLIENT --query "SELECT '14(r) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(14))"
$CLICKHOUSE_CLIENT --query "SELECT '12 (after reloading) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(12))"
$CLICKHOUSE_CLIENT --query "SELECT '13 (after reloading) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))"
$CLICKHOUSE_CLIENT --query "SELECT '14 (after reloading) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(14))"

View File

@ -13,7 +13,7 @@ function insert1()
{
local TIMELIMIT=$((SECONDS+$1))
while [ $SECONDS -lt "$TIMELIMIT" ]; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT CSV 1,"a"'
${MY_CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT CSV 1,"a"'
done
}
@ -21,7 +21,7 @@ function insert2()
{
local TIMELIMIT=$((SECONDS+$1))
while [ $SECONDS -lt "$TIMELIMIT" ]; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}'
${MY_CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}'
done
}
@ -29,7 +29,7 @@ function insert3()
{
local TIMELIMIT=$((SECONDS+$1))
while [ $SECONDS -lt "$TIMELIMIT" ]; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 1 -q "INSERT INTO async_inserts_race VALUES (7, 'g') (8, 'h')" &
${MY_CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --wait_for_async_insert 1 -q "INSERT INTO async_inserts_race VALUES (7, 'g') (8, 'h')" &
sleep 0.05
done

View File

@ -0,0 +1,35 @@
SET alter_sync = 2;
SET max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree = true;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t1__fuzz_26;
CREATE TABLE t1__fuzz_26 (`a` Nullable(Float64), `b` Nullable(Float32), `pk` Int64) ENGINE = MergeTree ORDER BY pk;
CREATE TABLE t1 ( a Float64, b Int64, pk String) Engine = MergeTree() ORDER BY pk;
ALTER TABLE t1
(MODIFY COLUMN `a` Float64 TTL toDateTime(b) + toIntervalMonth(viewExplain('EXPLAIN', 'actions = 1', (
SELECT
toIntervalMonth(1),
2
FROM t1__fuzz_26
GROUP BY
toFixedString('%Prewhere%', 10),
toNullable(12)
WITH ROLLUP
)), 1)) settings allow_experimental_parallel_reading_from_replicas = 1; -- { serverError INCORRECT_RESULT_OF_SCALAR_SUBQUERY }
ALTER TABLE t1
(MODIFY COLUMN `a` Float64 TTL toDateTime(b) + toIntervalMonth(viewExplain('EXPLAIN', 'actions = 1', (
SELECT
toIntervalMonth(1),
2
FROM t1__fuzz_26
GROUP BY
toFixedString('%Prewhere%', 10),
toNullable(12)
WITH ROLLUP
)), 1)) settings allow_experimental_parallel_reading_from_replicas = 0; -- { serverError INCORRECT_RESULT_OF_SCALAR_SUBQUERY }
DROP TABLE t1;
DROP TABLE t1__fuzz_26;