From 4efa847a1f08dc71d5062dc6f3e36e4aecb9ddf2 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 20 Jul 2022 22:29:46 +0300 Subject: [PATCH 1/2] Fix LOGICAL_ERROR on race between DROP and INSERT with materialized views In case of parallel INSERT (max_insert_threads > 1) it is possible for VIEW to be DROP/DETACH'ed while building pipeline for various paralell streams, and in this case the header will not match since when you have VIEW you will got empty header and non-empty header otherwise. And this leads to LOGICAL_ERROR later, while checking that output headers are the same (in QueryPipelineBuilder::addChains() -> Pipe::addChains()). However it also makes the pipeline different for various parallel streams, and it looks like it is better to fail in this case, so instead of always returning empty header from buildChainImpl() explicit check had been added. Note, that I wasn't able to reproduce the issue with the added test, but CI may have more "luck" (although I've verified it manually). Fixes: #35902 Cc: @KochetovNicolai Signed-off-by: Azat Khuzhin --- src/Interpreters/InterpreterInsertQuery.cpp | 10 +++ .../02380_insert_mv_race.reference | 0 .../0_stateless/02380_insert_mv_race.sh | 66 +++++++++++++++++++ 3 files changed, 76 insertions(+) create mode 100644 tests/queries/0_stateless/02380_insert_mv_race.reference create mode 100755 tests/queries/0_stateless/02380_insert_mv_race.sh diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 7b6066575ae..493d8df57ff 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -44,6 +44,7 @@ namespace ErrorCodes extern const int NO_SUCH_COLUMN_IN_TABLE; extern const int ILLEGAL_COLUMN; extern const int DUPLICATE_COLUMN; + extern const int TABLE_IS_DROPPED; } InterpreterInsertQuery::InterpreterInsertQuery( @@ -424,6 +425,15 @@ BlockIO InterpreterInsertQuery::execute() for (size_t i = 0; i < out_streams_size; ++i) { auto out = buildChainImpl(table, metadata_snapshot, query_sample_block, nullptr, nullptr); + if (!out_chains.empty()) + { + if (out.getProcessors().size() != out_chains.back().getProcessors().size()) + { + throw Exception(ErrorCodes::TABLE_IS_DROPPED, + "Some VIEW is gone in between ({} vs {} processors, on {} parallel stream)", + out.getProcessors().size(), out_chains.back().getProcessors().size(), i); + } + } out_chains.emplace_back(std::move(out)); } } diff --git a/tests/queries/0_stateless/02380_insert_mv_race.reference b/tests/queries/0_stateless/02380_insert_mv_race.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02380_insert_mv_race.sh b/tests/queries/0_stateless/02380_insert_mv_race.sh new file mode 100755 index 00000000000..ba002832715 --- /dev/null +++ b/tests/queries/0_stateless/02380_insert_mv_race.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash +# Tags: long, race + +# Regression test for INSERT into table with MV attached, +# to avoid possible errors if some table will disappears, +# in case of multiple streams was used (i.e. max_insert_threads>1) + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +function bootstrap() +{ + $CLICKHOUSE_CLIENT -nm -q " + DROP TABLE IF EXISTS null; + CREATE TABLE null (key Int) ENGINE = Null; + + DROP TABLE IF EXISTS mv; + CREATE MATERIALIZED VIEW mv ENGINE = Null() AS SELECT * FROM null; + " +} + +function insert_thread() +{ + local opts=( + --max_insert_threads 100 + --max_threads 100 + ) + local patterns=( + -e UNKNOWN_TABLE + -e TABLE_IS_DROPPED + ) + + while :; do + $CLICKHOUSE_CLIENT "${opts[@]}" -q "INSERT INTO null SELECT * FROM numbers_mt(1e6)" |& { + grep -F "DB::Exception: " | grep -v -F "${patterns[@]}" + } + done +} +export -f insert_thread + +function drop_thread() +{ + local opts=( + --database_atomic_wait_for_drop_and_detach_synchronously 1 + ) + + while :; do + $CLICKHOUSE_CLIENT -nm "${opts[@]}" -q "DETACH TABLE mv" + sleep 0.01 + $CLICKHOUSE_CLIENT -nm "${opts[@]}" -q "ATTACH TABLE mv" + done +} +export -f drop_thread + +function main() +{ + local test_timeout=1m + + bootstrap + timeout "$test_timeout" bash -c insert_thread & + timeout "$test_timeout" bash -c drop_thread & + + wait +} +main "$@" From cf342326755973e1f0f54d1c9346fa3d8f2ff71d Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 1 Aug 2022 11:48:36 +0000 Subject: [PATCH 2/2] Output header is now empty for every MV chain. Instead of checking that number of processors different for different threads, simply always return empty header from buildChainImpl(), by adding explicit conversion. v2: ignore UNKNOWN_TABLE errors in test --- src/Interpreters/InterpreterInsertQuery.cpp | 10 --- .../Transforms/buildPushingToViewsChain.cpp | 8 +++ .../0_stateless/02380_insert_mv_race.sh | 68 ++++--------------- 3 files changed, 22 insertions(+), 64 deletions(-) diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 493d8df57ff..7b6066575ae 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -44,7 +44,6 @@ namespace ErrorCodes extern const int NO_SUCH_COLUMN_IN_TABLE; extern const int ILLEGAL_COLUMN; extern const int DUPLICATE_COLUMN; - extern const int TABLE_IS_DROPPED; } InterpreterInsertQuery::InterpreterInsertQuery( @@ -425,15 +424,6 @@ BlockIO InterpreterInsertQuery::execute() for (size_t i = 0; i < out_streams_size; ++i) { auto out = buildChainImpl(table, metadata_snapshot, query_sample_block, nullptr, nullptr); - if (!out_chains.empty()) - { - if (out.getProcessors().size() != out_chains.back().getProcessors().size()) - { - throw Exception(ErrorCodes::TABLE_IS_DROPPED, - "Some VIEW is gone in between ({} vs {} processors, on {} parallel stream)", - out.getProcessors().size(), out_chains.back().getProcessors().size(), i); - } - } out_chains.emplace_back(std::move(out)); } } diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index a8890f5bccb..d71d6901cee 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -410,6 +410,14 @@ Chain buildPushingToViewsChain( if (result_chain.empty()) result_chain.addSink(std::make_shared(storage_header)); + if (result_chain.getOutputHeader().columns() != 0) + { + /// Convert result header to empty block. + auto dag = ActionsDAG::makeConvertingActions(result_chain.getOutputHeader().getColumnsWithTypeAndName(), {}, ActionsDAG::MatchColumnsMode::Name); + auto actions = std::make_shared(std::move(dag)); + result_chain.addSink(std::make_shared(result_chain.getOutputHeader(), std::move(actions))); + } + return result_chain; } diff --git a/tests/queries/0_stateless/02380_insert_mv_race.sh b/tests/queries/0_stateless/02380_insert_mv_race.sh index ba002832715..d66e7b62d89 100755 --- a/tests/queries/0_stateless/02380_insert_mv_race.sh +++ b/tests/queries/0_stateless/02380_insert_mv_race.sh @@ -9,58 +9,18 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CUR_DIR"/../shell_config.sh -function bootstrap() -{ - $CLICKHOUSE_CLIENT -nm -q " - DROP TABLE IF EXISTS null; - CREATE TABLE null (key Int) ENGINE = Null; +$CLICKHOUSE_CLIENT -nm -q " + DROP TABLE IF EXISTS null; + CREATE TABLE null (key Int) ENGINE = Null; + DROP TABLE IF EXISTS mv; + CREATE MATERIALIZED VIEW mv ENGINE = Null() AS SELECT * FROM null; +" - DROP TABLE IF EXISTS mv; - CREATE MATERIALIZED VIEW mv ENGINE = Null() AS SELECT * FROM null; - " -} - -function insert_thread() -{ - local opts=( - --max_insert_threads 100 - --max_threads 100 - ) - local patterns=( - -e UNKNOWN_TABLE - -e TABLE_IS_DROPPED - ) - - while :; do - $CLICKHOUSE_CLIENT "${opts[@]}" -q "INSERT INTO null SELECT * FROM numbers_mt(1e6)" |& { - grep -F "DB::Exception: " | grep -v -F "${patterns[@]}" - } - done -} -export -f insert_thread - -function drop_thread() -{ - local opts=( - --database_atomic_wait_for_drop_and_detach_synchronously 1 - ) - - while :; do - $CLICKHOUSE_CLIENT -nm "${opts[@]}" -q "DETACH TABLE mv" - sleep 0.01 - $CLICKHOUSE_CLIENT -nm "${opts[@]}" -q "ATTACH TABLE mv" - done -} -export -f drop_thread - -function main() -{ - local test_timeout=1m - - bootstrap - timeout "$test_timeout" bash -c insert_thread & - timeout "$test_timeout" bash -c drop_thread & - - wait -} -main "$@" +$CLICKHOUSE_CLIENT -q "INSERT INTO null SELECT * FROM numbers_mt(1000) settings max_threads=1000, max_insert_threads=1000, max_block_size=1" |& { + # To avoid handling stacktrace here, get only first line (-m1) + # this should be OK, since you cannot have multiple exceptions from the client anyway. + grep -m1 -F 'DB::Exception:' | grep -F -v -e 'UNKNOWN_TABLE' +} & +sleep 0.05 +$CLICKHOUSE_CLIENT -q "DETACH TABLE mv" +wait