mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Fix parallel_reading_from_replicas
with clickhouse-bechmark
(#34751)
* Use INITIAL_QUERY for clickhouse-benchmark Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix parallel_reading_from_replicas with clickhouse-bechmark Before it produces the following error: $ clickhouse-benchmark --stacktrace -i1 --query "select * from remote('127.1', default.data_mt) limit 10" --allow_experimental_parallel_reading_from_replicas=1 --max_parallel_replicas=3 Loaded 1 queries. Logical error: 'Coordinator for parallel reading from replicas is not initialized'. Aborted (core dumped) Since it uses the same code, i.e RemoteQueryExecutor -> MultiplexedConnections, which enables coordinator if it was requested from settings, but it should be done only for non-initial queries, i.e. when server send connection to another server. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> * Fix 02226_parallel_reading_from_replicas_benchmark for older shellcheck By shellcheck 0.8 does not complains, while on CI shellcheck 0.7.0 and it does complains [1]: In 02226_parallel_reading_from_replicas_benchmark.sh line 17: --allow_experimental_parallel_reading_from_replicas=1 ^-- SC2191: The = here is literal. To assign by index, use ( [index]=value ) with no spaces. To keep as literal, quote it. Did you mean: "--allow_experimental_parallel_reading_from_replicas=1" [1]: https://s3.amazonaws.com/clickhouse-test-reports/34751/d883af711822faf294c876b017cbf745b1cda1b3/style_check__actions_/shellcheck_output.txt Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
parent
c4b6342853
commit
a871036361
@ -435,6 +435,8 @@ private:
|
||||
Progress progress;
|
||||
executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
|
||||
|
||||
executor.sendQuery(ClientInfo::QueryKind::INITIAL_QUERY);
|
||||
|
||||
ProfileInfo info;
|
||||
while (Block block = executor.read())
|
||||
info.update(block);
|
||||
|
@ -133,7 +133,12 @@ void MultiplexedConnections::sendQuery(
|
||||
modified_settings.group_by_two_level_threshold_bytes = 0;
|
||||
}
|
||||
|
||||
if (settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas)
|
||||
bool parallel_reading_from_replicas = settings.max_parallel_replicas > 1
|
||||
&& settings.allow_experimental_parallel_reading_from_replicas
|
||||
/// To avoid trying to coordinate with clickhouse-benchmark,
|
||||
/// since it uses the same code.
|
||||
&& client_info.query_kind != ClientInfo::QueryKind::INITIAL_QUERY;
|
||||
if (parallel_reading_from_replicas)
|
||||
{
|
||||
client_info.collaborate_with_initiator = true;
|
||||
client_info.count_participating_replicas = replica_info.all_replicas_count;
|
||||
|
@ -210,7 +210,7 @@ static Block adaptBlockStructure(const Block & block, const Block & header)
|
||||
return res;
|
||||
}
|
||||
|
||||
void RemoteQueryExecutor::sendQuery()
|
||||
void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind)
|
||||
{
|
||||
if (sent_query)
|
||||
return;
|
||||
@ -237,13 +237,7 @@ void RemoteQueryExecutor::sendQuery()
|
||||
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
|
||||
ClientInfo modified_client_info = context->getClientInfo();
|
||||
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
|
||||
/// Set initial_query_id to query_id for the clickhouse-benchmark.
|
||||
///
|
||||
/// (since first query of clickhouse-benchmark will be issued as SECONDARY_QUERY,
|
||||
/// due to it executes queries via RemoteBlockInputStream)
|
||||
if (modified_client_info.initial_query_id.empty())
|
||||
modified_client_info.initial_query_id = query_id;
|
||||
modified_client_info.query_kind = query_kind;
|
||||
if (CurrentThread::isInitialized())
|
||||
{
|
||||
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;
|
||||
|
@ -83,7 +83,13 @@ public:
|
||||
~RemoteQueryExecutor();
|
||||
|
||||
/// Create connection and send query, external tables and scalars.
|
||||
void sendQuery();
|
||||
///
|
||||
/// @param query_kind - kind of query, usually it is SECONDARY_QUERY,
|
||||
/// since this is the queries between servers
|
||||
/// (for which this code was written in general).
|
||||
/// But clickhouse-benchmark uses the same code,
|
||||
/// and it should pass INITIAL_QUERY.
|
||||
void sendQuery(ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::SECONDARY_QUERY);
|
||||
|
||||
/// Query is resent to a replica, the query itself can be modified.
|
||||
std::atomic<bool> resent_query { false };
|
||||
|
29
tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh
Executable file
29
tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh
Executable file
@ -0,0 +1,29 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT -nm -q "
|
||||
drop table if exists data_02226;
|
||||
create table data_02226 (key Int) engine=MergeTree() order by key
|
||||
as select * from numbers(1);
|
||||
"
|
||||
|
||||
# Regression for:
|
||||
#
|
||||
# Logical error: 'Coordinator for parallel reading from replicas is not initialized'.
|
||||
opts=(
|
||||
--allow_experimental_parallel_reading_from_replicas 1
|
||||
--max_parallel_replicas 3
|
||||
|
||||
--iterations 1
|
||||
)
|
||||
$CLICKHOUSE_BENCHMARK --query "select * from remote('127.1', $CLICKHOUSE_DATABASE, data_02226)" "${opts[@]}" >& /dev/null
|
||||
ret=$?
|
||||
|
||||
$CLICKHOUSE_CLIENT -nm -q "
|
||||
drop table data_02226;
|
||||
"
|
||||
|
||||
exit $ret
|
Loading…
Reference in New Issue
Block a user