From c634012dbf8fa4570f9f60ff3e17b14d457f3c3d Mon Sep 17 00:00:00 2001 From: Jordi Villar Date: Mon, 4 Mar 2024 09:26:25 +0100 Subject: [PATCH 1/5] Reproducer for insert-select + insert_deduplication_token bug --- ...001_insert_threads_deduplication.reference | 10 +++ .../03001_insert_threads_deduplication.sh | 82 +++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 tests/queries/0_stateless/03001_insert_threads_deduplication.reference create mode 100755 tests/queries/0_stateless/03001_insert_threads_deduplication.sh diff --git a/tests/queries/0_stateless/03001_insert_threads_deduplication.reference b/tests/queries/0_stateless/03001_insert_threads_deduplication.reference new file mode 100644 index 00000000000..0c6a5a55576 --- /dev/null +++ b/tests/queries/0_stateless/03001_insert_threads_deduplication.reference @@ -0,0 +1,10 @@ +This bug has been there forever. Present in 22.2 +- When using multiple threads the insert produces 3 parts causing undesired deduplication. +- When using a single thread the insert produces 1 part without deduplication. +1 +4 +This bug has been introduced in CH 24.2+. See https://github.com/ClickHouse/ClickHouse/pull/59448 +- When using remote function and multiple threads the insert produces 3 parts causing undesired deduplication. +- When using remote function and a single thread the insert produces 1 part without deduplication. +1 +4 diff --git a/tests/queries/0_stateless/03001_insert_threads_deduplication.sh b/tests/queries/0_stateless/03001_insert_threads_deduplication.sh new file mode 100755 index 00000000000..cf87f7c2c67 --- /dev/null +++ b/tests/queries/0_stateless/03001_insert_threads_deduplication.sh @@ -0,0 +1,82 @@ +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +echo " +DROP TABLE IF EXISTS landing SYNC; +CREATE TABLE landing +( + timestamp DateTime64(3), + status String, + id String +) +ENGINE = MergeTree() +ORDER BY timestamp; + +SYSTEM STOP MERGES landing; -- Stopping merges to force 3 parts + +INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1; +INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1; +INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1; + +DROP TABLE IF EXISTS ds SYNC; + +CREATE TABLE ds +( + timestamp DateTime64(3), + status String, + id String +) +ENGINE = MergeTree() +ORDER BY timestamp +SETTINGS non_replicated_deduplication_window=1000; + +SELECT 'This bug has been there forever. Present in 22.2'; +SELECT '- When using multiple threads the insert produces 3 parts causing undesired deduplication.'; +SELECT '- When using a single thread the insert produces 1 part without deduplication.'; + +INSERT INTO ds SELECT * FROM landing +SETTINGS insert_deduplicate=1, insert_deduplication_token='token1', + max_insert_threads=5; + +SELECT count() FROM ds; + +INSERT INTO ds SELECT * FROM landing +SETTINGS insert_deduplicate=1, insert_deduplication_token='token2', + max_insert_threads=1; + +SELECT count() FROM ds; +" | $CLICKHOUSE_CLIENT -n + +echo " +CREATE TABLE ds_remote +( + timestamp DateTime64(3), + status String, + id String +) +ENGINE = MergeTree() +ORDER BY timestamp +SETTINGS non_replicated_deduplication_window=1000; + +SELECT 'This bug has been introduced in CH 24.2+. See https://github.com/ClickHouse/ClickHouse/pull/59448'; +SELECT '- When using remote function and multiple threads the insert produces 3 parts causing undesired deduplication.'; +SELECT '- When using remote function and a single thread the insert produces 1 part without deduplication.'; + +INSERT INTO ds_remote SELECT * FROM remote('localhost:$CLICKHOUSE_PORT_TCP', $CLICKHOUSE_DATABASE, landing) +SETTINGS insert_deduplicate=1, insert_deduplication_token='token1', + max_insert_threads=5; + +SELECT count() FROM ds_remote; + +INSERT INTO ds_remote SELECT * FROM remote('localhost:$CLICKHOUSE_PORT_TCP', $CLICKHOUSE_DATABASE, landing) +SETTINGS insert_deduplicate=1, insert_deduplication_token='token2', + max_insert_threads=1; + +SELECT count() FROM ds_remote; +" | $CLICKHOUSE_LOCAL -n + +echo " +DROP TABLE IF EXISTS landing SYNC; +DROP TABLE IF EXISTS ds SYNC; +" | $CLICKHOUSE_CLIENT -n From 63747271e8b57076467811f5a537d63c60e71cb3 Mon Sep 17 00:00:00 2001 From: Jordi Villar Date: Mon, 4 Mar 2024 09:53:09 +0100 Subject: [PATCH 2/5] Fix test --- tests/queries/0_stateless/03001_insert_threads_deduplication.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/queries/0_stateless/03001_insert_threads_deduplication.sh b/tests/queries/0_stateless/03001_insert_threads_deduplication.sh index cf87f7c2c67..154e578a7a8 100755 --- a/tests/queries/0_stateless/03001_insert_threads_deduplication.sh +++ b/tests/queries/0_stateless/03001_insert_threads_deduplication.sh @@ -1,3 +1,5 @@ +#!/bin/bash + CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CUR_DIR"/../shell_config.sh From 1da1bbeae2aee280fca052acd2b19672188a1ce1 Mon Sep 17 00:00:00 2001 From: Jordi Villar Date: Mon, 4 Mar 2024 14:35:57 +0100 Subject: [PATCH 3/5] Set streams to 1 when using insert_deduplication_token --- src/Interpreters/InterpreterInsertQuery.cpp | 6 ++++++ .../03001_insert_threads_deduplication.reference | 8 ++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index e27a8bd414b..df833803970 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -513,6 +513,12 @@ BlockIO InterpreterInsertQuery::execute() const bool resize_to_max_insert_threads = !table->isView() && views.empty(); pre_streams_size = resize_to_max_insert_threads ? settings.max_insert_threads : std::min(settings.max_insert_threads, pipeline.getNumStreams()); + + /// Deduplication when passing insert_deduplication_token breaks if using more than one thread + const String & deduplication_token = settings.insert_deduplication_token; + if (!deduplication_token.empty()) + pre_streams_size = 1; + if (table->supportsParallelInsert()) sink_streams_size = pre_streams_size; } diff --git a/tests/queries/0_stateless/03001_insert_threads_deduplication.reference b/tests/queries/0_stateless/03001_insert_threads_deduplication.reference index 0c6a5a55576..0791b98cc09 100644 --- a/tests/queries/0_stateless/03001_insert_threads_deduplication.reference +++ b/tests/queries/0_stateless/03001_insert_threads_deduplication.reference @@ -1,10 +1,10 @@ This bug has been there forever. Present in 22.2 - When using multiple threads the insert produces 3 parts causing undesired deduplication. - When using a single thread the insert produces 1 part without deduplication. -1 -4 +3 +6 This bug has been introduced in CH 24.2+. See https://github.com/ClickHouse/ClickHouse/pull/59448 - When using remote function and multiple threads the insert produces 3 parts causing undesired deduplication. - When using remote function and a single thread the insert produces 1 part without deduplication. -1 -4 +3 +6 From b72507fdf6fa74e40e42b2e8bc1af56b7bf93725 Mon Sep 17 00:00:00 2001 From: Jordi Villar Date: Mon, 4 Mar 2024 15:48:38 +0100 Subject: [PATCH 4/5] Simplify test now that the cause is clear --- ...001_insert_threads_deduplication.reference | 8 -- .../03001_insert_threads_deduplication.sh | 84 ------------------- .../03001_insert_threads_deduplication.sql | 42 ++++++++++ 3 files changed, 42 insertions(+), 92 deletions(-) delete mode 100755 tests/queries/0_stateless/03001_insert_threads_deduplication.sh create mode 100644 tests/queries/0_stateless/03001_insert_threads_deduplication.sql diff --git a/tests/queries/0_stateless/03001_insert_threads_deduplication.reference b/tests/queries/0_stateless/03001_insert_threads_deduplication.reference index 0791b98cc09..2559e5c49e7 100644 --- a/tests/queries/0_stateless/03001_insert_threads_deduplication.reference +++ b/tests/queries/0_stateless/03001_insert_threads_deduplication.reference @@ -1,10 +1,2 @@ -This bug has been there forever. Present in 22.2 -- When using multiple threads the insert produces 3 parts causing undesired deduplication. -- When using a single thread the insert produces 1 part without deduplication. -3 -6 -This bug has been introduced in CH 24.2+. See https://github.com/ClickHouse/ClickHouse/pull/59448 -- When using remote function and multiple threads the insert produces 3 parts causing undesired deduplication. -- When using remote function and a single thread the insert produces 1 part without deduplication. 3 6 diff --git a/tests/queries/0_stateless/03001_insert_threads_deduplication.sh b/tests/queries/0_stateless/03001_insert_threads_deduplication.sh deleted file mode 100755 index 154e578a7a8..00000000000 --- a/tests/queries/0_stateless/03001_insert_threads_deduplication.sh +++ /dev/null @@ -1,84 +0,0 @@ -#!/bin/bash - -CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -# shellcheck source=../shell_config.sh -. "$CUR_DIR"/../shell_config.sh - -echo " -DROP TABLE IF EXISTS landing SYNC; -CREATE TABLE landing -( - timestamp DateTime64(3), - status String, - id String -) -ENGINE = MergeTree() -ORDER BY timestamp; - -SYSTEM STOP MERGES landing; -- Stopping merges to force 3 parts - -INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1; -INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1; -INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1; - -DROP TABLE IF EXISTS ds SYNC; - -CREATE TABLE ds -( - timestamp DateTime64(3), - status String, - id String -) -ENGINE = MergeTree() -ORDER BY timestamp -SETTINGS non_replicated_deduplication_window=1000; - -SELECT 'This bug has been there forever. Present in 22.2'; -SELECT '- When using multiple threads the insert produces 3 parts causing undesired deduplication.'; -SELECT '- When using a single thread the insert produces 1 part without deduplication.'; - -INSERT INTO ds SELECT * FROM landing -SETTINGS insert_deduplicate=1, insert_deduplication_token='token1', - max_insert_threads=5; - -SELECT count() FROM ds; - -INSERT INTO ds SELECT * FROM landing -SETTINGS insert_deduplicate=1, insert_deduplication_token='token2', - max_insert_threads=1; - -SELECT count() FROM ds; -" | $CLICKHOUSE_CLIENT -n - -echo " -CREATE TABLE ds_remote -( - timestamp DateTime64(3), - status String, - id String -) -ENGINE = MergeTree() -ORDER BY timestamp -SETTINGS non_replicated_deduplication_window=1000; - -SELECT 'This bug has been introduced in CH 24.2+. See https://github.com/ClickHouse/ClickHouse/pull/59448'; -SELECT '- When using remote function and multiple threads the insert produces 3 parts causing undesired deduplication.'; -SELECT '- When using remote function and a single thread the insert produces 1 part without deduplication.'; - -INSERT INTO ds_remote SELECT * FROM remote('localhost:$CLICKHOUSE_PORT_TCP', $CLICKHOUSE_DATABASE, landing) -SETTINGS insert_deduplicate=1, insert_deduplication_token='token1', - max_insert_threads=5; - -SELECT count() FROM ds_remote; - -INSERT INTO ds_remote SELECT * FROM remote('localhost:$CLICKHOUSE_PORT_TCP', $CLICKHOUSE_DATABASE, landing) -SETTINGS insert_deduplicate=1, insert_deduplication_token='token2', - max_insert_threads=1; - -SELECT count() FROM ds_remote; -" | $CLICKHOUSE_LOCAL -n - -echo " -DROP TABLE IF EXISTS landing SYNC; -DROP TABLE IF EXISTS ds SYNC; -" | $CLICKHOUSE_CLIENT -n diff --git a/tests/queries/0_stateless/03001_insert_threads_deduplication.sql b/tests/queries/0_stateless/03001_insert_threads_deduplication.sql new file mode 100644 index 00000000000..5b5cb1d6845 --- /dev/null +++ b/tests/queries/0_stateless/03001_insert_threads_deduplication.sql @@ -0,0 +1,42 @@ +DROP TABLE IF EXISTS landing SYNC; +DROP TABLE IF EXISTS ds SYNC; + +CREATE TABLE landing +( + timestamp DateTime64(3), + status String, + id String +) +ENGINE = MergeTree() +ORDER BY timestamp; + +SYSTEM STOP MERGES landing; -- Stopping merges to force 3 parts + +INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1; +INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1; +INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1; + +CREATE TABLE ds +( + timestamp DateTime64(3), + status String, + id String +) +ENGINE = MergeTree() +ORDER BY timestamp +SETTINGS non_replicated_deduplication_window=1000; + +INSERT INTO ds SELECT * FROM landing +SETTINGS insert_deduplicate=1, insert_deduplication_token='token1', + max_insert_threads=5; + +SELECT count() FROM ds; + +INSERT INTO ds SELECT * FROM landing +SETTINGS insert_deduplicate=1, insert_deduplication_token='token2', + max_insert_threads=1; + +SELECT count() FROM ds; + +DROP TABLE IF EXISTS landing SYNC; +DROP TABLE IF EXISTS ds SYNC; From b4e90e512115071bc0edd9d25a9b9dcac00e9214 Mon Sep 17 00:00:00 2001 From: Jordi Villar Date: Tue, 5 Mar 2024 08:49:33 +0100 Subject: [PATCH 5/5] Address PR comments --- src/Interpreters/InterpreterInsertQuery.cpp | 8 ++++-- ...001_insert_threads_deduplication.reference | 2 ++ .../03001_insert_threads_deduplication.sql | 27 +++++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index df833803970..5680857ed3d 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -515,9 +515,13 @@ BlockIO InterpreterInsertQuery::execute() : std::min(settings.max_insert_threads, pipeline.getNumStreams()); /// Deduplication when passing insert_deduplication_token breaks if using more than one thread - const String & deduplication_token = settings.insert_deduplication_token; - if (!deduplication_token.empty()) + if (!settings.insert_deduplication_token.toString().empty()) + { + LOG_DEBUG( + getLogger("InsertQuery"), + "Insert-select query using insert_deduplication_token, setting streams to 1 to avoid deduplication issues"); pre_streams_size = 1; + } if (table->supportsParallelInsert()) sink_streams_size = pre_streams_size; diff --git a/tests/queries/0_stateless/03001_insert_threads_deduplication.reference b/tests/queries/0_stateless/03001_insert_threads_deduplication.reference index 2559e5c49e7..b6d6006f84c 100644 --- a/tests/queries/0_stateless/03001_insert_threads_deduplication.reference +++ b/tests/queries/0_stateless/03001_insert_threads_deduplication.reference @@ -1,2 +1,4 @@ 3 6 +12 +18 diff --git a/tests/queries/0_stateless/03001_insert_threads_deduplication.sql b/tests/queries/0_stateless/03001_insert_threads_deduplication.sql index 5b5cb1d6845..093d2b3185d 100644 --- a/tests/queries/0_stateless/03001_insert_threads_deduplication.sql +++ b/tests/queries/0_stateless/03001_insert_threads_deduplication.sql @@ -1,4 +1,7 @@ +-- Tags: distributed + DROP TABLE IF EXISTS landing SYNC; +DROP TABLE IF EXISTS landing_dist SYNC; DROP TABLE IF EXISTS ds SYNC; CREATE TABLE landing @@ -10,6 +13,14 @@ CREATE TABLE landing ENGINE = MergeTree() ORDER BY timestamp; +CREATE TABLE landing_dist +( + timestamp DateTime64(3), + status String, + id String +) +ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), 'landing', rand()); + SYSTEM STOP MERGES landing; -- Stopping merges to force 3 parts INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1; @@ -38,5 +49,21 @@ SETTINGS insert_deduplicate=1, insert_deduplication_token='token2', SELECT count() FROM ds; +-- When reading from distributed table, 6 rows are going to be retrieved +-- due to the being using the two shards cluster + +INSERT INTO ds SELECT * FROM landing_dist +SETTINGS insert_deduplicate=1, insert_deduplication_token='token3', + max_insert_threads=5; + +SELECT count() FROM ds; + +INSERT INTO ds SELECT * FROM landing_dist +SETTINGS insert_deduplicate=1, insert_deduplication_token='token4', + max_insert_threads=1; + +SELECT count() FROM ds; + DROP TABLE IF EXISTS landing SYNC; +DROP TABLE IF EXISTS landing_dist SYNC; DROP TABLE IF EXISTS ds SYNC;