diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 2961d643869..0f3df3752cb 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -39,6 +39,7 @@ #include #include #include +#include "base/defines.h" namespace ProfileEvents @@ -398,6 +399,9 @@ Chain InterpreterInsertQuery::buildPreSinkChain( std::pair, std::vector> InterpreterInsertQuery::buildPreAndSyncChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block) { + chassert(presink_streams > 0); + chassert(sink_streams > 0); + ThreadGroupPtr running_group; if (current_thread) running_group = current_thread->getThreadGroup(); @@ -410,8 +414,8 @@ std::pair, std::vector> InterpreterInsertQuery::buildP for (size_t i = 0; i < sink_streams; ++i) { LOG_DEBUG(getLogger("InsertQuery"), - "call buildSink table name {}.{}, stream {}/{}", - table->getStorageID().database_name, table->getStorageID().table_name, i, presink_streams); + "call buildSink sink_streams table name {}.{}, stream {}/{}", + table->getStorageID().database_name, table->getStorageID().table_name, i, sink_streams); auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr, running_group, /* elapsed_counter_ms= */ nullptr); @@ -421,6 +425,10 @@ std::pair, std::vector> InterpreterInsertQuery::buildP for (size_t i = 0; i < presink_streams; ++i) { + LOG_DEBUG(getLogger("InsertQuery"), + "call buildSink presink_streams table name {}.{}, stream {}/{}", + table->getStorageID().database_name, table->getStorageID().table_name, i, presink_streams); + auto out = buildPreSinkChain(sink_chains[0].getInputHeader(), table, metadata_snapshot, query_sample_block); presink_chains.emplace_back(std::move(out)); } @@ -454,6 +462,9 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & ContextPtr select_context = getContext(); + LOG_DEBUG(getLogger("InsertQuery"), + "execute() is_trivial_insert_select {} prefersLargeBlocks={} max_insert_threads {}", is_trivial_insert_select, table->prefersLargeBlocks(), settings.max_insert_threads); + if (is_trivial_insert_select) { /** When doing trivial INSERT INTO ... SELECT ... FROM table, @@ -462,9 +473,6 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & * to avoid unnecessary squashing. */ - LOG_DEBUG(getLogger("InsertQuery"), - "execute() is_trivial_insert_select=true prefersLargeBlocks={}", table->prefersLargeBlocks()); - Settings new_settings = select_context->getSettings(); new_settings.max_threads = std::max(1, settings.max_insert_threads); @@ -503,6 +511,11 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & pipeline.dropTotalsAndExtremes(); + LOG_DEBUG(getLogger("InsertQuery"), + "adding transforms, pipline size {}, threads {}, max_insert_threads {}", + pipeline.getNumStreams(), pipeline.getNumThreads(), settings.max_insert_threads); + + /// Allow to insert Nullable into non-Nullable columns, NULL values will be added as defaults values. if (getContext()->getSettingsRef().insert_null_as_default) { @@ -532,6 +545,56 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & } } + pipeline.resize(1); + + if (shouldAddSquashingFroStorage(table)) + { + bool table_prefers_large_blocks = table->prefersLargeBlocks(); + + pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr + { + return std::make_shared( + in_header, + table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, + table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL); + }); + } + + pipeline.addSimpleTransform([&](const Block &in_header) -> ProcessorPtr + { + return std::make_shared(in_header); + }); + + if (!settings.insert_deduplication_token.value.empty()) + { + pipeline.addSimpleTransform([&](const Block &in_header) -> ProcessorPtr + { + return std::make_shared(settings.insert_deduplication_token.value, in_header); + }); + + pipeline.addSimpleTransform([&](const Block &in_header) -> ProcessorPtr + { + return std::make_shared(in_header); + }); + } + + /// Number of streams works like this: + /// * For the SELECT, use `max_threads`, or `max_insert_threads`, or whatever + /// InterpreterSelectQuery ends up with. + /// * Use `max_insert_threads` streams for various insert-preparation steps, e.g. + /// materializing and squashing (too slow to do in one thread). That's `presink_chains`. + /// * If the table supports parallel inserts, use max_insert_threads for writing to IStorage. + /// Otherwise ResizeProcessor them down to 1 stream. + + size_t presink_streams_size = std::max(settings.max_insert_threads, pipeline.getNumStreams()); + size_t sink_streams_size = table->supportsParallelInsert() ? std::max(1, settings.max_insert_threads) : 1; + + auto [presink_chains, sink_chains] = buildPreAndSyncChains( + presink_streams_size, sink_streams_size, + table, metadata_snapshot, query_sample_block); + + pipeline.resize(presink_chains.size()); + auto actions_dag = ActionsDAG::makeConvertingActions( pipeline.getHeader().getColumnsWithTypeAndName(), query_sample_block.getColumnsWithTypeAndName(), @@ -560,54 +623,12 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & return counting; }); - if (shouldAddSquashingFroStorage(table)) - { - bool table_prefers_large_blocks = table->prefersLargeBlocks(); - - pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr - { - return std::make_shared( - in_header, - table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, - table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL); - }); - } - - /// Number of streams works like this: - /// * For the SELECT, use `max_threads`, or `max_insert_threads`, or whatever - /// InterpreterSelectQuery ends up with. - /// * Use `max_insert_threads` streams for various insert-preparation steps, e.g. - /// materializing and squashing (too slow to do in one thread). That's `presink_chains`. - /// * If the table supports parallel inserts, use the same streams for writing to IStorage. - /// Otherwise ResizeProcessor them down to 1 stream. - - size_t presink_streams_size = std::max(1, std::max(settings.max_insert_threads, pipeline.getNumStreams())); - size_t sink_streams_size = table->supportsParallelInsert() ? presink_streams_size : 1; - - auto [presink_chains, sink_chains] = buildPreAndSyncChains( - presink_streams_size, sink_streams_size, - table, metadata_snapshot, query_sample_block); - - if (!settings.insert_deduplication_token.value.empty()) - { - pipeline.resize(1); - - pipeline.addSimpleTransform([&](const Block &in_header) -> ProcessorPtr - { - return std::make_shared(settings.insert_deduplication_token.value, in_header); - }); - pipeline.addSimpleTransform([&](const Block &in_header) -> ProcessorPtr - { - return std::make_shared(in_header); - }); - } - - pipeline.resize(presink_chains.size()); for (auto & chain : presink_chains) pipeline.addResources(chain.detachResources()); pipeline.addChains(std::move(presink_chains)); pipeline.resize(sink_streams_size); + for (auto & chain : sink_chains) pipeline.addResources(chain.detachResources()); pipeline.addChains(std::move(sink_chains)); @@ -655,12 +676,6 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query chain.appendChain(std::move(sink_chains.front())); } - if (!settings.insert_deduplication_token.value.empty()) - { - chain.addSource(std::make_shared(chain.getInputHeader())); - chain.addSource(std::make_shared(settings.insert_deduplication_token.value, chain.getInputHeader())); - } - if (shouldAddSquashingFroStorage(table)) { bool table_prefers_large_blocks = table->prefersLargeBlocks(); @@ -673,6 +688,14 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query chain.addSource(std::move(squashing)); } + if (!settings.insert_deduplication_token.value.empty()) + { + chain.addSource(std::make_shared(chain.getInputHeader())); + chain.addSource(std::make_shared(settings.insert_deduplication_token.value, chain.getInputHeader())); + } + + chain.addSource(std::make_shared(chain.getInputHeader())); + auto context_ptr = getContext(); auto counting = std::make_shared(chain.getInputHeader(), nullptr, context_ptr->getQuota()); counting->setProcessListElement(context_ptr->getProcessListElement()); diff --git a/src/Processors/Sinks/SinkToStorage.cpp b/src/Processors/Sinks/SinkToStorage.cpp index fff4a881e3d..36bb70f493f 100644 --- a/src/Processors/Sinks/SinkToStorage.cpp +++ b/src/Processors/Sinks/SinkToStorage.cpp @@ -16,9 +16,7 @@ void SinkToStorage::onConsume(Chunk chunk) Nested::validateArraySizes(getHeader().cloneWithColumns(chunk.getColumns())); consume(chunk); - fillDeduplicationTokenForChildren(chunk); - if (!lastBlockIsDuplicate()) // TODO: remove that - cur_chunk = std::move(chunk); + cur_chunk = std::move(chunk); } SinkToStorage::GenerateResult SinkToStorage::onGenerate() diff --git a/src/Processors/Sinks/SinkToStorage.h b/src/Processors/Sinks/SinkToStorage.h index 21e003c4317..c350b9f79b0 100644 --- a/src/Processors/Sinks/SinkToStorage.h +++ b/src/Processors/Sinks/SinkToStorage.h @@ -4,7 +4,6 @@ #include #include #include -#include "Processors/Transforms/NumberBlocksTransform.h" namespace DB { @@ -23,25 +22,6 @@ public: protected: virtual void consume(Chunk & chunk) = 0; - virtual bool lastBlockIsDuplicate() const { return false; } - - void fillDeduplicationTokenForChildren(Chunk & chunk) const - { - auto token_info = chunk.getChunkInfos().get(); - if (token_info) - return; - - SipHash hash; - for (const auto & colunm: chunk.getColumns()) - { - colunm->updateHashFast(hash); - } - const auto hash_value = hash.get128(); - - chunk.getChunkInfos().add(std::make_shared( - fmt::format(":hash-{}", toString(hash_value.items[0]) + "_" + toString(hash_value.items[1])) - )); - } private: std::vector table_locks; diff --git a/src/Processors/Transforms/NumberBlocksTransform.cpp b/src/Processors/Transforms/NumberBlocksTransform.cpp index 61ff3f6bfd5..19ebf94a27a 100644 --- a/src/Processors/Transforms/NumberBlocksTransform.cpp +++ b/src/Processors/Transforms/NumberBlocksTransform.cpp @@ -1 +1,157 @@ #include + +#include + +#include +#include +#include + + +#include + + +namespace DB +{ +namespace DeduplicationToken +{ + +String DB::DeduplicationToken::TokenInfo::getToken(bool enable_assert) const +{ + chassert(stage == MATERIALIZE_VIEW_ID || !enable_assert); + + String result; + result.reserve(getTotalSize()); + + for (const auto & part : parts) + result.append(part); + + return result; +} + +void DB::DeduplicationToken::TokenInfo::setInitialToken(String part) +{ + chassert(stage == INITIAL); + addTokenPart(std::move(part)); + stage = MATERIALIZE_VIEW_ID; +} + +void TokenInfo::setUserToken(const String & token) +{ + chassert(stage == INITIAL); + addTokenPart(fmt::format("user-token-{}", token)); + stage = SOURCE_BLOCK_NUMBER; +} + +void TokenInfo::setSourceBlockNumber(size_t sbn) +{ + chassert(stage == SOURCE_BLOCK_NUMBER); + addTokenPart(fmt::format(":source-number-{}", sbn)); + stage = MATERIALIZE_VIEW_ID; +} + +void TokenInfo::setMaterializeViewID(const String & id) +{ + chassert(stage == MATERIALIZE_VIEW_ID); + addTokenPart(fmt::format(":mv-{}", id)); + stage = MATERIALIZE_VIEW_BLOCK_NUMBER; +} + +void TokenInfo::setMaterializeViewBlockNumber(size_t mvbn) +{ + chassert(stage == MATERIALIZE_VIEW_BLOCK_NUMBER); + addTokenPart(fmt::format(":mv-bn-{}", mvbn)); + stage = MATERIALIZE_VIEW_ID; +} + +void TokenInfo::reset() +{ + stage = INITIAL; + parts.clear(); +} + +void TokenInfo::addTokenPart(String part) +{ + if (!part.empty()) + parts.push_back(std::move(part)); +} + +size_t TokenInfo::getTotalSize() const +{ + size_t size = 0; + for (const auto & part : parts) + size += part.size(); + return size; +} + +void CheckTokenTransform::transform(Chunk & chunk) +{ + auto token_info = chunk.getChunkInfos().get(); + + if (!token_info) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk has to have DedupTokenInfo as ChunkInfo, {}", debug); + + if (!must_be_present) + { + LOG_DEBUG(getLogger("CheckInsertDeduplicationTokenTransform"), "{}, no token required, token {}", debug, token_info->getToken(false)); + return; + } + + LOG_DEBUG(getLogger("CheckInsertDeduplicationTokenTransform"), "{}, token: {}", debug, token_info->getToken(false)); +} + +void SetInitialTokenTransform::transform(Chunk & chunk) +{ + auto token_builder = chunk.getChunkInfos().get(); + chassert(token_builder); + if (token_builder->tokenInitialized()) + return; + + SipHash hash; + for (const auto & colunm : chunk.getColumns()) + colunm->updateHashFast(hash); + + const auto hash_value = hash.get128(); + token_builder->setInitialToken(toString(hash_value.items[0]) + "_" + toString(hash_value.items[1])); +} + +void SetUserTokenTransform::transform(Chunk & chunk) +{ + auto token_info = chunk.getChunkInfos().get(); + chassert(token_info); + chassert(!token_info->tokenInitialized()); + token_info->setUserToken(user_token); +} + +void SetSourceBlockNumberTransform::transform(Chunk & chunk) +{ + auto token_info = chunk.getChunkInfos().get(); + chassert(token_info); + chassert(!token_info->tokenInitialized()); + token_info->setSourceBlockNumber(block_number++); +} + +void SetMaterializeViewIDTransform::transform(Chunk & chunk) +{ + auto token_info = chunk.getChunkInfos().get(); + chassert(token_info); + chassert(token_info->tokenInitialized()); + token_info->setMaterializeViewID(mv_id); +} + +void SetMaterializeViewBlockNumberTransform::transform(Chunk & chunk) +{ + auto token_info = chunk.getChunkInfos().get(); + chassert(token_info); + chassert(token_info->tokenInitialized()); + token_info->setMaterializeViewBlockNumber(block_number++); +} + +void ResetTokenTransform::transform(Chunk & chunk) +{ + auto token_info = chunk.getChunkInfos().get(); + chassert(token_info); + token_info->reset(); +} + +} +} diff --git a/src/Processors/Transforms/NumberBlocksTransform.h b/src/Processors/Transforms/NumberBlocksTransform.h index 6586f015d3e..46b62029c21 100644 --- a/src/Processors/Transforms/NumberBlocksTransform.h +++ b/src/Processors/Transforms/NumberBlocksTransform.h @@ -2,10 +2,9 @@ #include #include -#include -#include -#include +#include + namespace ErrorCodes { @@ -14,220 +13,6 @@ namespace ErrorCodes namespace DB { - struct SerialBlockNumberInfo : public ChunkInfoCloneable - { - SerialBlockNumberInfo(const SerialBlockNumberInfo & other) = default; - explicit SerialBlockNumberInfo(size_t block_number_) - : block_number(block_number_) - { - } - - size_t block_number = 0; - }; - - - class NumberBlocksTransform : public ISimpleTransform - { - public: - explicit NumberBlocksTransform(const Block & header) - : ISimpleTransform(header, header, true) - { - } - - String getName() const override { return "NumberBlocksTransform"; } - - void transform(Chunk & chunk) override - { - chunk.getChunkInfos().add(std::make_shared(block_number++)); - } - - private: - size_t block_number = 0; - }; - - - class DedupTokenInfo : public ChunkInfoCloneable - { - public: - DedupTokenInfo() = default; - DedupTokenInfo(const DedupTokenInfo & other) = default; - explicit DedupTokenInfo(String first_part) - { - addTokenPart(std::move(first_part)); - } - - String getToken() const - { - String result; - result.reserve(getTotalSize()); - - for (const auto & part : token_parts) - { - result.append(part); - } - - return result; - } - - bool empty() const - { - return token_parts.empty(); - } - - void addTokenPart(String part) - { - if (!part.empty()) - token_parts.push_back(std::move(part)); - } - - private: - size_t getTotalSize() const - { - size_t size = 0; - for (const auto & part : token_parts) - size += part.size(); - return size; - } - - std::vector token_parts; - }; - - class AddUserDeduplicationTokenTransform : public ISimpleTransform - { - public: - AddUserDeduplicationTokenTransform(String token_, const Block & header_) - : ISimpleTransform(header_, header_, true) - , token(token_) - { - } - - String getName() const override { return "AddUserDeduplicationTokenTransform"; } - - void transform(Chunk & chunk) override - { - chunk.getChunkInfos().add(std::make_shared(token)); - } - - private: - String token; - }; - - - class CheckInsertDeduplicationTokenTransform : public ISimpleTransform - { - public: - CheckInsertDeduplicationTokenTransform(String debug_, bool must_be_present_, const Block & header_) - : ISimpleTransform(header_, header_, true) - , debug(debug_) - , must_be_present(must_be_present_) - { - } - - String getName() const override { return "CheckInsertDeduplicationTokenTransform"; } - - void transform(Chunk & chunk) override - { - if (!must_be_present) - return; - - auto token_info = chunk.getChunkInfos().get(); - if (!token_info) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk has to have DedupTokenInfo as ChunkInfo, {}", debug); - - LOG_DEBUG(getLogger("CheckInsertDeduplicationTokenTransform"), - "{}, token: {}", - debug, token_info->getToken()); - } - - private: - String debug; - bool must_be_present = false; - }; - - - class ExtendDeduplicationWithBlockNumberFromInfoTokenTransform : public ISimpleTransform - { - public: - explicit ExtendDeduplicationWithBlockNumberFromInfoTokenTransform(const Block & header_) - : ISimpleTransform(header_, header_, true) - { - } - - String getName() const override { return "ExtendDeduplicationWithBlockNumberFromInfoTokenTransform"; } - - void transform(Chunk & chunk) override - { - auto token_info = chunk.getChunkInfos().get(); - if (!token_info) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk has to have DedupTokenInfo as ChunkInfo, recs {}", chunk.getChunkInfos().size()); - - auto block_number_info = chunk.getChunkInfos().get(); - if (!block_number_info) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk has to have SerialBlockNumberInfo as ChunkInfo"); - - token_info->addTokenPart(fmt::format(":block-{}", block_number_info->block_number)); - - LOG_DEBUG(getLogger("ExtendDeduplicationWithBlockNumberFromInfoTokenTransform"), - "updated with {}, result: {}", - fmt::format(":block-{}", block_number_info->block_number), token_info->getToken()); - } - }; - - class ExtendDeduplicationWithBlockNumberTokenTransform : public ISimpleTransform - { - public: - explicit ExtendDeduplicationWithBlockNumberTokenTransform(const Block & header_) - : ISimpleTransform(header_, header_, true) - { - } - - String getName() const override { return "ExtendDeduplicationWithBlockNumberTokenTransform"; } - - void transform(Chunk & chunk) override - { - auto token_info = chunk.getChunkInfos().get(); - if (!token_info) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk has to have DedupTokenInfo as ChunkInfo"); - - auto x = block_number++; - token_info->addTokenPart(fmt::format(":block-{}", x)); - - LOG_DEBUG(getLogger("ExtendDeduplicationWithBlockNumberTokenTransform"), - "updated with {}, result: {}", - fmt::format(":block-{}", x), token_info->getToken()); - } - private: - size_t block_number = 0; - }; - - class ExtendDeduplicationWithTokenPartTransform : public ISimpleTransform - { - public: - ExtendDeduplicationWithTokenPartTransform(String token_part_, const Block & header_) - : ISimpleTransform(header_, header_, true) - , token_part(token_part_) - { - } - - String getName() const override { return "ExtendDeduplicationWithBlockNumberTokenTransform"; } - - void transform(Chunk & chunk) override - { - auto token_info = chunk.getChunkInfos().get(); - if (!token_info) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk has to have DedupTokenInfo as ChunkInfo, try to add token part {}", token_part); - - token_info->addTokenPart(fmt::format("{}", token_part)); - - LOG_DEBUG(getLogger("ExtendDeduplicationWithTokenPartTransform"), - "updated with {}, result: {}", - token_part, token_info->getToken()); - } - - private: - String token_part; - }; - class RestoreChunkInfosTransform : public ISimpleTransform { public: @@ -248,4 +33,176 @@ namespace DB Chunk::ChunkInfoCollection chunk_infos; }; + +namespace DeduplicationToken +{ + class TokenInfo : public ChunkInfoCloneable + { + public: + TokenInfo() = default; + TokenInfo(const TokenInfo & other) = default; + + String getToken(bool enable_assert = true) const; + + bool empty() const { return parts.empty(); } + bool tokenInitialized() const { return stage != INITIAL && stage != SOURCE_BLOCK_NUMBER; } + + void setInitialToken(String part); + void setUserToken(const String & token); + void setSourceBlockNumber(size_t sbn); + void setMaterializeViewID(const String & id); + void setMaterializeViewBlockNumber(size_t mvbn); + void reset(); + + private: + void addTokenPart(String part); + size_t getTotalSize() const; + + enum BuildingStage + { + INITIAL, + SOURCE_BLOCK_NUMBER, + MATERIALIZE_VIEW_ID, + MATERIALIZE_VIEW_BLOCK_NUMBER, + }; + + BuildingStage stage = INITIAL; + std::vector parts; + }; + + + class CheckTokenTransform : public ISimpleTransform + { + public: + CheckTokenTransform(String debug_, bool must_be_present_, const Block & header_) + : ISimpleTransform(header_, header_, true) + , debug(debug_) + , must_be_present(must_be_present_) + { + } + + String getName() const override { return "DeduplicationToken::CheckTokenTransform"; } + + void transform(Chunk & chunk) override; + + private: + String debug; + bool must_be_present = false; + }; + + + class AddTokenInfoTransform : public ISimpleTransform + { + public: + explicit AddTokenInfoTransform(const Block & header_) + : ISimpleTransform(header_, header_, true) + { + } + + String getName() const override { return "DeduplicationToken::AddTokenInfoTransform"; } + + void transform(Chunk & chunk) override + { + chunk.getChunkInfos().add(std::make_shared()); + } + }; + + + class SetInitialTokenTransform : public ISimpleTransform + { + public: + explicit SetInitialTokenTransform(const Block & header_) + : ISimpleTransform(header_, header_, true) + { + } + + String getName() const override { return "DeduplicationToken::SetInitialTokenTransform"; } + + void transform(Chunk & chunk) override; + }; + + class ResetTokenTransform : public ISimpleTransform + { + public: + explicit ResetTokenTransform(const Block & header_) + : ISimpleTransform(header_, header_, true) + { + } + + String getName() const override { return "DeduplicationToken::ResetTokenTransform"; } + + void transform(Chunk & chunk) override; + }; + + + class SetUserTokenTransform : public ISimpleTransform + { + public: + SetUserTokenTransform(String user_token_, const Block & header_) + : ISimpleTransform(header_, header_, true) + , user_token(std::move(user_token_)) + { + } + + String getName() const override { return "DeduplicationToken::SetUserTokenTransform"; } + + void transform(Chunk & chunk) override; + + private: + String user_token; + }; + + + class SetSourceBlockNumberTransform : public ISimpleTransform + { + public: + explicit SetSourceBlockNumberTransform(const Block & header_) + : ISimpleTransform(header_, header_, true) + { + } + + String getName() const override { return "DeduplicationToken::SetSourceBlockNumberTransform"; } + + void transform(Chunk & chunk) override; + + private: + size_t block_number; + }; + + + class SetMaterializeViewIDTransform : public ISimpleTransform + { + public: + SetMaterializeViewIDTransform(String mv_id_, const Block & header_) + : ISimpleTransform(header_, header_, true) + , mv_id(std::move(mv_id_)) + { + } + + String getName() const override { return "DeduplicationToken::SetMaterializeViewIDTransform"; } + + void transform(Chunk & chunk) override; + + private: + String mv_id; + }; + + + class SetMaterializeViewBlockNumberTransform : public ISimpleTransform + { + public: + explicit SetMaterializeViewBlockNumberTransform(const Block & header_) + : ISimpleTransform(header_, header_, true) + { + } + + String getName() const override { return "DeduplicationToken::SetMaterializeViewBlockNumberTransform"; } + + void transform(Chunk & chunk) override; + + private: + size_t block_number; + }; + +} } diff --git a/src/Processors/Transforms/SquashingChunksTransform.cpp b/src/Processors/Transforms/SquashingChunksTransform.cpp index 7464cb79ba6..1a29b8d8a2d 100644 --- a/src/Processors/Transforms/SquashingChunksTransform.cpp +++ b/src/Processors/Transforms/SquashingChunksTransform.cpp @@ -79,7 +79,7 @@ SimpleSquashingChunksTransform::SimpleSquashingChunksTransform( void SimpleSquashingChunksTransform::transform(Chunk & chunk) { LOG_DEBUG(getLogger("SimpleSquashingChunksTransform"), - "transform {}", chunk.getNumRows()); + "transform {}, finished {}", chunk.getNumRows(), finished); if (!finished) { diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index ccecfcf3333..0c1893e0f37 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -108,7 +108,7 @@ private: class ExecutingInnerQueryFromViewTransform final : public ExceptionKeepingTransform { public: - ExecutingInnerQueryFromViewTransform(const Block & header, ViewRuntimeData & view_, ViewsDataPtr views_data_); + ExecutingInnerQueryFromViewTransform(const Block & header, ViewRuntimeData & view_, ViewsDataPtr views_data_, bool disable_deduplication_for_children_); String getName() const override { return "ExecutingInnerQueryFromView"; } @@ -119,6 +119,7 @@ protected: private: ViewsDataPtr views_data; ViewRuntimeData & view; + bool disable_deduplication_for_children; struct State { @@ -219,6 +220,11 @@ std::optional generateViewChain( const auto & insert_settings = insert_context->getSettingsRef(); + if (disable_deduplication_for_children) + { + insert_context->setSetting("insert_deduplicate", Field{false}); + } + // Processing of blocks for MVs is done block by block, and there will // be no parallel reading after (plus it is not a costless operation) select_context->setSetting("parallelize_output_from_storages", Field{false}); @@ -330,16 +336,6 @@ std::optional generateViewChain( bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type; out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); - out.addSource(std::make_shared("Before inner chain", !disable_deduplication_for_children, out.getInputHeader())); - - if (!disable_deduplication_for_children) - { - String addition_part = view_id.hasUUID() ? toString(view_id.uuid) : view_id.getFullNameNotQuoted(); - out.addSource(std::make_shared(fmt::format(":mv-{}", addition_part), out.getInputHeader())); - } - - out.addSource(std::make_shared("Before extend token", !disable_deduplication_for_children, out.getInputHeader())); - if (interpreter.shouldAddSquashingFroStorage(inner_table)) { bool table_prefers_large_blocks = inner_table->prefersLargeBlocks(); @@ -351,7 +347,7 @@ std::optional generateViewChain( table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL)); } - out.addSource(std::make_shared("Before squashing", !disable_deduplication_for_children, out.getInputHeader())); + out.addSource(std::make_shared("Before squashing", !disable_deduplication_for_children, out.getInputHeader())); auto counting = std::make_shared(out.getInputHeader(), current_thread, insert_context->getQuota()); counting->setProcessListElement(insert_context->getProcessListElement()); @@ -394,23 +390,15 @@ std::optional generateViewChain( if (type == QueryViewsLogElement::ViewType::MATERIALIZED) { - out.addSource(std::make_shared("Right after Inner query", !disable_deduplication_for_children, out.getInputHeader())); - - // if (!disable_deduplication_for_children) - // { - // // out.addSource(std::make_shared(out.getInputHeader())); - // // out.addSource(std::make_shared(out.getInputHeader())); - - // out.addSource(std::make_shared(out.getInputHeader())); - // } + out.addSource(std::make_shared("Right after Inner query", !disable_deduplication_for_children, out.getInputHeader())); auto executing_inner_query = std::make_shared( - storage_header, views_data->views.back(), views_data); + storage_header, views_data->views.back(), views_data, disable_deduplication_for_children); executing_inner_query->setRuntimeData(view_thread_status, view_counter_ms); out.addSource(std::move(executing_inner_query)); - out.addSource(std::make_shared("Right before Inner query", !disable_deduplication_for_children, out.getInputHeader())); + out.addSource(std::make_shared("Right before Inner query", !disable_deduplication_for_children, out.getInputHeader())); } return out; @@ -451,8 +439,6 @@ Chain buildPushingToViewsChain( */ result_chain.addTableLock(storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout)); - /// If the "root" table deduplicates blocks, there are no need to make deduplication for children - /// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks bool disable_deduplication_for_children = false; if (!context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views) disable_deduplication_for_children = !no_destination && storage->supportsDeduplication(); @@ -563,6 +549,10 @@ Chain buildPushingToViewsChain( sink->setRuntimeData(thread_status, elapsed_counter_ms); result_chain.addSource(std::move(sink)); } + else + { + result_chain.addSource(std::make_shared(result_chain.getInputHeader())); + } if (result_chain.empty()) result_chain.addSink(std::make_shared(storage_header)); @@ -578,7 +568,7 @@ Chain buildPushingToViewsChain( return result_chain; } -static QueryPipeline process(Block block, ViewRuntimeData & view, const ViewsData & views_data, Chunk::ChunkInfoCollection chunk_infos) +static QueryPipeline process(Block block, ViewRuntimeData & view, const ViewsData & views_data, Chunk::ChunkInfoCollection chunk_infos, bool disable_deduplication_for_children) { const auto & context = views_data.context; @@ -625,9 +615,18 @@ static QueryPipeline process(Block block, ViewRuntimeData & view, const ViewsDat pipeline.getHeader(), std::make_shared(std::move(converting)))); - //pipeline.addTransform(std::make_shared(pipeline.getHeader())); pipeline.addTransform(std::make_shared(std::move(chunk_infos), pipeline.getHeader())); - pipeline.addTransform(std::make_shared(pipeline.getHeader())); + + if (!disable_deduplication_for_children) + { + String materialize_view_id = view.table_id.hasUUID() ? toString(view.table_id.uuid) : view.table_id.getFullNameNotQuoted(); + pipeline.addTransform(std::make_shared(std::move(materialize_view_id), pipeline.getHeader())); + pipeline.addTransform(std::make_shared(pipeline.getHeader())); + } + else + { + pipeline.addTransform(std::make_shared(pipeline.getHeader())); + } return QueryPipelineBuilder::getPipeline(std::move(pipeline)); } @@ -720,17 +719,19 @@ IProcessor::Status CopyingDataToViewsTransform::prepare() ExecutingInnerQueryFromViewTransform::ExecutingInnerQueryFromViewTransform( const Block & header, ViewRuntimeData & view_, - std::shared_ptr views_data_) + std::shared_ptr views_data_, + bool disable_deduplication_for_children_) : ExceptionKeepingTransform(header, view_.sample_block) , views_data(std::move(views_data_)) , view(view_) + , disable_deduplication_for_children(disable_deduplication_for_children_) { } void ExecutingInnerQueryFromViewTransform::onConsume(Chunk chunk) { auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns()); - state.emplace(process(block, view, *views_data, chunk.getChunkInfos())); + state.emplace(process(block, view, *views_data, chunk.getChunkInfos(), disable_deduplication_for_children)); } diff --git a/src/Storages/FileLog/StorageFileLog.cpp b/src/Storages/FileLog/StorageFileLog.cpp index 6ca4ec6e079..b86845d48e0 100644 --- a/src/Storages/FileLog/StorageFileLog.cpp +++ b/src/Storages/FileLog/StorageFileLog.cpp @@ -740,7 +740,13 @@ bool StorageFileLog::streamToViews() auto new_context = Context::createCopy(getContext()); - InterpreterInsertQuery interpreter(insert, new_context, false, true, true, false); + InterpreterInsertQuery interpreter( + insert, + new_context, + false, + true, + true, + false); auto block_io = interpreter.execute(); /// Each stream responsible for closing it's files and store meta diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 2d29f87c556..4b0fa94e183 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -90,16 +90,20 @@ void MergeTreeSink::consume(Chunk & chunk) bool support_parallel_write = false; String block_dedup_token; - std::shared_ptr dedub_token_info_for_children = nullptr; + auto token_info = chunk.getChunkInfos().get(); if (storage.getDeduplicationLog()) { - auto token_info = chunk.getChunkInfos().get(); - if (!token_info && !context->getSettingsRef().insert_deduplication_token.value.empty()) + if (!token_info) throw Exception(ErrorCodes::LOGICAL_ERROR, - "DedupTokenInfo is expected for consumed chunk in MergeTreeSink for table: {}", + "DedupTokenBuilder is expected for consumed chunk in MergeTreeSink for table: {}", storage.getStorageID().getNameForLogs()); - if (token_info) + if (!token_info->tokenInitialized() && !context->getSettingsRef().insert_deduplication_token.value.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "DedupTokenBuilder has to be initialized with user token for table: {}", + storage.getStorageID().getNameForLogs()); + + if (token_info->tokenInitialized()) { block_dedup_token = token_info->getToken(); @@ -109,9 +113,6 @@ void MergeTreeSink::consume(Chunk & chunk) } else { - dedub_token_info_for_children = std::make_shared(); - chunk.getChunkInfos().add(dedub_token_info_for_children); - LOG_DEBUG(storage.log, "dedup token from hash is calculated"); } @@ -141,10 +142,10 @@ void MergeTreeSink::consume(Chunk & chunk) if (!temp_part.part) continue; - if (dedub_token_info_for_children) + if (!token_info->tokenInitialized()) { chassert(temp_part.part); - dedub_token_info_for_children->addTokenPart(":block_hash-" + temp_part.part->getPartBlockIDHash()); + token_info->setInitialToken(temp_part.part->getPartBlockIDHash()); } if (!support_parallel_write && temp_part.part->getDataPartStorage().supportParallelWrite()) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index e855bb7d969..b03f3f88611 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -294,17 +294,21 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk & chunk) } String block_dedup_token; - std::shared_ptr dedub_token_info_for_children = nullptr; + auto token_info = chunk.getChunkInfos().get(); if constexpr (!async_insert) { - auto token_info = chunk.getChunkInfos().get(); - if (!token_info && !context->getSettingsRef().insert_deduplication_token.value.empty()) + if (!token_info) throw Exception(ErrorCodes::LOGICAL_ERROR, - "DedupTokenInfo is expected for consumed chunk in MergeTreeSink for table: {}", + "DedupTokenBuilder is expected for consumed chunk in ReplicatedMergeTreeSink for table: {}", + storage.getStorageID().getNameForLogs()); + + if (!token_info->tokenInitialized() && !context->getSettingsRef().insert_deduplication_token.value.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "DedupTokenBuilder has to be initialized with user token for table: {}", storage.getStorageID().getNameForLogs()); - if (token_info) + if (token_info->tokenInitialized()) { /// multiple blocks can be inserted within the same insert query /// an ordinal number is added to dedup token to generate a distinctive block id for each block @@ -316,8 +320,6 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk & chunk) } else { - dedub_token_info_for_children = std::make_shared(); - chunk.getChunkInfos().add(dedub_token_info_for_children); LOG_DEBUG(storage.log, "dedup token from hash is calculated"); } @@ -386,10 +388,10 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk & chunk) LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num)); } - if (dedub_token_info_for_children) + if (!token_info->tokenInitialized()) { chassert(temp_part.part); - dedub_token_info_for_children->addTokenPart(":block_hash-" + temp_part.part->getPartBlockIDHash()); + token_info->setInitialToken(temp_part.part->getPartBlockIDHash()); } } @@ -444,8 +446,8 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk & chunk) /// value for `last_block_is_duplicate`, which is possible only after the part is committed. /// Othervide we can delay commit. /// TODO: we can also delay commit if there is no MVs. - if (!settings.deduplicate_blocks_in_dependent_materialized_views) - finishDelayedChunk(zookeeper); + // if (!settings.deduplicate_blocks_in_dependent_materialized_views) + // finishDelayedChunk(zookeeper); ++num_blocks_processed; } @@ -456,8 +458,6 @@ void ReplicatedMergeTreeSinkImpl::finishDelayedChunk(const ZooKeeperWithF if (!delayed_chunk) return; - last_block_is_duplicate = false; - for (auto & partition : delayed_chunk->partitions) { ProfileEventsScope scoped_attach(&partition.part_counters); @@ -470,8 +470,6 @@ void ReplicatedMergeTreeSinkImpl::finishDelayedChunk(const ZooKeeperWithF { bool deduplicated = commitPart(zookeeper, part, partition.block_id, delayed_chunk->replicas_num).second; - last_block_is_duplicate = last_block_is_duplicate || deduplicated; - /// Set a special error code if the block is duplicate int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0; auto counters_snapshot = std::make_shared(partition.part_counters.getPartiallyAtomicSnapshot()); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h index e460804d7f1..7d025361717 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h @@ -59,16 +59,6 @@ public: /// For ATTACHing existing data on filesystem. bool writeExistingPart(MergeTreeData::MutableDataPartPtr & part); - /// For proper deduplication in MaterializedViews - bool lastBlockIsDuplicate() const override - { - /// If MV is responsible for deduplication, block is not considered duplicating. - if (context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views) - return false; - - return last_block_is_duplicate; - } - struct DelayedChunk; private: std::vector detectConflictsInAsyncBlockIDs(const std::vector & ids); @@ -126,7 +116,6 @@ private: bool allow_attach_while_readonly = false; bool quorum_parallel = false; const bool deduplicate = true; - bool last_block_is_duplicate = false; UInt64 num_blocks_processed = 0; LoggerPtr log; diff --git a/tests/queries/0_stateless/02912_ingestion_mv_deduplication.reference b/tests/queries/0_stateless/02912_ingestion_mv_deduplication.reference index 335b55f05c8..ae82b9c0463 100644 --- a/tests/queries/0_stateless/02912_ingestion_mv_deduplication.reference +++ b/tests/queries/0_stateless/02912_ingestion_mv_deduplication.reference @@ -10,7 +10,8 @@ 2022-09-01 12:23:34 42 2023-09-01 12:23:34 42 -- MV -2022-09-01 12:00:00 42 +2022-09-01 12:00:00 84 +2023-09-01 12:00:00 42 -- Original issue with deduplicate_blocks_in_dependent_materialized_views = 1 AND max_insert_delayed_streams_for_parallel_write > 1 -- Landing 2022-09-01 12:23:34 42 diff --git a/tests/queries/0_stateless/02912_ingestion_mv_deduplication.sql b/tests/queries/0_stateless/02912_ingestion_mv_deduplication.sql index f206f0d7775..06fe156500d 100644 --- a/tests/queries/0_stateless/02912_ingestion_mv_deduplication.sql +++ b/tests/queries/0_stateless/02912_ingestion_mv_deduplication.sql @@ -56,6 +56,7 @@ SELECT '-- Original issue with deduplicate_blocks_in_dependent_materialized_view - 2nd insert gets first block 20220901 deduplicated and second one inserted in landing table - 2nd insert is not inserting anything in mv table due to a bug computing blocks to be discarded + Now it is fixed. */ SET deduplicate_blocks_in_dependent_materialized_views = 0, max_insert_delayed_streams_for_parallel_write = 1000; diff --git a/tests/queries/0_stateless/03008_deduplication_insert_several_blocks.reference b/tests/queries/0_stateless/03008_deduplication_insert_several_blocks.reference index 9b4738ce805..641735d1bb6 100644 --- a/tests/queries/0_stateless/03008_deduplication_insert_several_blocks.reference +++ b/tests/queries/0_stateless/03008_deduplication_insert_several_blocks.reference @@ -121,47 +121,93 @@ OK Test case 8: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_a_b -count 1 +count 10 table_when_b_even -count 1 -EXPECTED_TO_FAIL +count 5 +0 +0 +table_a_b +count 10 +table_when_b_even +count 5 +0 +0 +FIXED Test case 9: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b -count 1 +count 10 table_when_b_even -count 1 -EXPECTED_TO_FAIL +count 10 +0 +0 +table_a_b +count 10 +table_when_b_even +count 10 +0 +0 +FIXED Test case 10: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_a_b -count 1 +count 10 table_when_b_even count 5 -EXPECTED_TO_FAIL +0 +0 +table_a_b +count 10 +table_when_b_even +count 10 +0 +0 +FIXED Test case 11: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=False table_a_b -count 1 +count 10 table_when_b_even count 10 -EXPECTED_TO_FAIL +0 +0 +table_a_b +count 10 +table_when_b_even +count 20 +0 +0 +FIXED Test case 12: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_a_b count 10 table_when_b_even -count 1 +count 5 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 20 +table_when_b_even +count 5 +0 +0 +FIXED Test case 13: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b count 10 table_when_b_even -count 1 +count 10 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 20 +table_when_b_even +count 10 +0 +0 +FIXED Test case 14: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_a_b @@ -555,47 +601,93 @@ OK Test case 40: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_a_b -count 1 +count 10 table_when_b_even -count 1 -EXPECTED_TO_FAIL +count 5 +0 +0 +table_a_b +count 10 +table_when_b_even +count 5 +0 +0 +FIXED Test case 41: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b -count 1 +count 10 table_when_b_even -count 1 -EXPECTED_TO_FAIL +count 10 +0 +0 +table_a_b +count 10 +table_when_b_even +count 10 +0 +0 +FIXED Test case 42: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_a_b -count 1 +count 10 table_when_b_even count 5 -EXPECTED_TO_FAIL +0 +0 +table_a_b +count 10 +table_when_b_even +count 10 +0 +0 +FIXED Test case 43: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=False table_a_b -count 1 +count 10 table_when_b_even count 10 -EXPECTED_TO_FAIL +0 +0 +table_a_b +count 10 +table_when_b_even +count 20 +0 +0 +FIXED Test case 44: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_a_b count 10 table_when_b_even -count 1 +count 5 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 20 +table_when_b_even +count 5 +0 +0 +FIXED Test case 45: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b count 10 table_when_b_even -count 1 +count 10 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 20 +table_when_b_even +count 10 +0 +0 +FIXED Test case 46: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_a_b diff --git a/tests/queries/0_stateless/03008_deduplication_mv_generates_several_blocks.reference b/tests/queries/0_stateless/03008_deduplication_mv_generates_several_blocks.reference index 4411bdecea8..06f30793670 100644 --- a/tests/queries/0_stateless/03008_deduplication_mv_generates_several_blocks.reference +++ b/tests/queries/0_stateless/03008_deduplication_mv_generates_several_blocks.reference @@ -121,47 +121,93 @@ OK Test case 8: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_a_b -count 1 +count 5 table_when_b_even_and_joined -count 10 -EXPECTED_TO_FAIL +count 47 +0 +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 9: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b -count 1 +count 5 table_when_b_even_and_joined -count 9 -EXPECTED_TO_FAIL +count 45 +0 +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 45 +0 +0 +FIXED Test case 10: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_a_b -count 1 +count 5 table_when_b_even_and_joined count 47 -EXPECTED_TO_FAIL +0 +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 94 +0 +0 +FIXED Test case 11: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=False table_a_b -count 1 +count 5 table_when_b_even_and_joined count 45 -EXPECTED_TO_FAIL +0 +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 90 +0 +0 +FIXED Test case 12: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_a_b count 5 table_when_b_even_and_joined -count 10 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 13: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b count 5 table_when_b_even_and_joined -count 9 +count 45 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 45 +0 +0 +FIXED Test case 14: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_a_b @@ -197,9 +243,16 @@ Test case 16: insert_method=InsertSelect engine=MergeTree use_insert_token=False table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 17: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -250,9 +303,16 @@ Test case 20: insert_method=InsertSelect engine=MergeTree use_insert_token=False table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 21: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -303,9 +363,16 @@ Test case 24: insert_method=InsertSelect engine=MergeTree use_insert_token=False table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 25: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -356,9 +423,16 @@ Test case 28: insert_method=InsertSelect engine=MergeTree use_insert_token=False table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 29: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -527,47 +601,93 @@ OK Test case 40: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_a_b -count 1 +count 5 table_when_b_even_and_joined -count 10 -EXPECTED_TO_FAIL +count 47 +0 +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 41: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b -count 1 +count 5 table_when_b_even_and_joined -count 9 -EXPECTED_TO_FAIL +count 45 +0 +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 45 +0 +0 +FIXED Test case 42: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_a_b -count 1 +count 5 table_when_b_even_and_joined count 47 -EXPECTED_TO_FAIL +0 +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 94 +0 +0 +FIXED Test case 43: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=False table_a_b -count 1 +count 5 table_when_b_even_and_joined count 45 -EXPECTED_TO_FAIL +0 +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 90 +0 +0 +FIXED Test case 44: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_a_b count 5 table_when_b_even_and_joined -count 10 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 45: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b count 5 table_when_b_even_and_joined -count 9 +count 45 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 45 +0 +0 +FIXED Test case 46: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_a_b @@ -603,9 +723,16 @@ Test case 48: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_t table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 49: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -656,9 +783,16 @@ Test case 52: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_t table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 53: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -709,9 +843,16 @@ Test case 56: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_t table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 57: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -762,9 +903,16 @@ Test case 60: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_t table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 61: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -1055,9 +1203,16 @@ Test case 80: insert_method=InsertValues engine=MergeTree use_insert_token=False table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 81: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -1108,9 +1263,16 @@ Test case 84: insert_method=InsertValues engine=MergeTree use_insert_token=False table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 85: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -1161,9 +1323,16 @@ Test case 88: insert_method=InsertValues engine=MergeTree use_insert_token=False table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 89: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -1214,9 +1383,16 @@ Test case 92: insert_method=InsertValues engine=MergeTree use_insert_token=False table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 93: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -1507,9 +1683,16 @@ Test case 112: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_ table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 113: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -1560,9 +1743,16 @@ Test case 116: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_ table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 117: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -1613,9 +1803,16 @@ Test case 120: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_ table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 5 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 121: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_a_b @@ -1666,9 +1863,16 @@ Test case 124: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_ table_a_b count 5 table_when_b_even_and_joined -count 14 +count 47 0 -EXPECTED_TO_FAIL +0 +table_a_b +count 10 +table_when_b_even_and_joined +count 47 +0 +0 +FIXED Test case 125: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_a_b diff --git a/tests/queries/0_stateless/03008_deduplication_several_mv_into_one_table.reference b/tests/queries/0_stateless/03008_deduplication_several_mv_into_one_table.reference index a56f7deb744..4d517948a25 100644 --- a/tests/queries/0_stateless/03008_deduplication_several_mv_into_one_table.reference +++ b/tests/queries/0_stateless/03008_deduplication_several_mv_into_one_table.reference @@ -88,36 +88,70 @@ table_dst count 32 OK Test case 8: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True -table_src count 1 -table_dst count 2 -EXPECTED_TO_FAIL +table_src count 8 +table_dst count 6 +0 +0 +table_src count 8 +table_dst count 6 +0 +0 +FIXED Test case 9: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False -table_src count 1 -table_dst count 2 -EXPECTED_TO_FAIL +table_src count 8 +table_dst count 16 +0 +0 +table_src count 8 +table_dst count 16 +0 +0 +FIXED Test case 10: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True -table_src count 1 +table_src count 8 table_dst count 6 -EXPECTED_TO_FAIL +0 +0 +table_src count 8 +table_dst count 12 +0 +0 +FIXED Test case 11: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=False -table_src count 1 +table_src count 8 table_dst count 16 -EXPECTED_TO_FAIL +0 +0 +table_src count 8 +table_dst count 32 +0 +0 +FIXED Test case 12: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 2 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +table_src count 16 +table_dst count 6 +0 +0 +FIXED Test case 13: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_src count 8 -table_dst count 2 +table_dst count 16 0 -EXPECTED_TO_FAIL +0 +table_src count 16 +table_dst count 16 +0 +0 +FIXED Test case 14: insert_method=InsertSelect engine=MergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -143,19 +177,25 @@ OK Test case 16: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +table_src count 8 +table_dst count 6 +0 +0 +FIXED Test case 17: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_src count 1 -table_dst count 1 +table_dst count 2 0 0 table_src count 1 -table_dst count 1 +table_dst count 2 0 -EXPECTED_TO_FAIL +0 +FIXED Test case 18: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -181,19 +221,25 @@ OK Test case 20: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 -0 -EXPECTED_TO_FAIL - -Test case 21: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False -table_src count 8 -table_dst count 1 +table_dst count 6 0 0 table_src count 16 -table_dst count 1 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +FIXED + +Test case 21: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False +table_src count 8 +table_dst count 2 +0 +0 +table_src count 16 +table_dst count 2 +0 +0 +FIXED Test case 22: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -219,19 +265,25 @@ OK Test case 24: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +table_src count 8 +table_dst count 6 +0 +0 +FIXED Test case 25: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_src count 1 -table_dst count 1 +table_dst count 2 0 0 table_src count 1 -table_dst count 1 +table_dst count 2 0 -EXPECTED_TO_FAIL +0 +FIXED Test case 26: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -257,19 +309,25 @@ OK Test case 28: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 -0 -EXPECTED_TO_FAIL - -Test case 29: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False -table_src count 8 -table_dst count 1 +table_dst count 6 0 0 table_src count 16 -table_dst count 1 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +FIXED + +Test case 29: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False +table_src count 8 +table_dst count 2 +0 +0 +table_src count 16 +table_dst count 2 +0 +0 +FIXED Test case 30: insert_method=InsertSelect engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -382,36 +440,70 @@ table_dst count 32 OK Test case 40: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True -table_src count 1 -table_dst count 2 -EXPECTED_TO_FAIL +table_src count 8 +table_dst count 6 +0 +0 +table_src count 8 +table_dst count 6 +0 +0 +FIXED Test case 41: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False -table_src count 1 -table_dst count 2 -EXPECTED_TO_FAIL +table_src count 8 +table_dst count 16 +0 +0 +table_src count 8 +table_dst count 16 +0 +0 +FIXED Test case 42: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True -table_src count 1 +table_src count 8 table_dst count 6 -EXPECTED_TO_FAIL +0 +0 +table_src count 8 +table_dst count 12 +0 +0 +FIXED Test case 43: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=False -table_src count 1 +table_src count 8 table_dst count 16 -EXPECTED_TO_FAIL +0 +0 +table_src count 8 +table_dst count 32 +0 +0 +FIXED Test case 44: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 2 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +table_src count 16 +table_dst count 6 +0 +0 +FIXED Test case 45: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False table_src count 8 -table_dst count 2 +table_dst count 16 0 -EXPECTED_TO_FAIL +0 +table_src count 16 +table_dst count 16 +0 +0 +FIXED Test case 46: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=True single_thread=False deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -437,19 +529,25 @@ OK Test case 48: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +table_src count 8 +table_dst count 6 +0 +0 +FIXED Test case 49: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_src count 1 -table_dst count 1 +table_dst count 2 0 0 table_src count 1 -table_dst count 1 +table_dst count 2 0 -EXPECTED_TO_FAIL +0 +FIXED Test case 50: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -475,19 +573,25 @@ OK Test case 52: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 -0 -EXPECTED_TO_FAIL - -Test case 53: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False -table_src count 8 -table_dst count 1 +table_dst count 6 0 0 table_src count 16 -table_dst count 1 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +FIXED + +Test case 53: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False +table_src count 8 +table_dst count 2 +0 +0 +table_src count 16 +table_dst count 2 +0 +0 +FIXED Test case 54: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -513,19 +617,25 @@ OK Test case 56: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +table_src count 8 +table_dst count 6 +0 +0 +FIXED Test case 57: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_src count 1 -table_dst count 1 +table_dst count 2 0 0 table_src count 1 -table_dst count 1 +table_dst count 2 0 -EXPECTED_TO_FAIL +0 +FIXED Test case 58: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -551,19 +661,25 @@ OK Test case 60: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 -0 -EXPECTED_TO_FAIL - -Test case 61: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False -table_src count 8 -table_dst count 1 +table_dst count 6 0 0 table_src count 16 -table_dst count 1 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +FIXED + +Test case 61: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False +table_src count 8 +table_dst count 2 +0 +0 +table_src count 16 +table_dst count 2 +0 +0 +FIXED Test case 62: insert_method=InsertSelect engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -765,19 +881,25 @@ OK Test case 80: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +table_src count 8 +table_dst count 6 +0 +0 +FIXED Test case 81: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_src count 1 -table_dst count 1 +table_dst count 2 0 0 table_src count 1 -table_dst count 1 +table_dst count 2 0 -EXPECTED_TO_FAIL +0 +FIXED Test case 82: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -803,19 +925,25 @@ OK Test case 84: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 -0 -EXPECTED_TO_FAIL - -Test case 85: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False -table_src count 8 -table_dst count 1 +table_dst count 6 0 0 table_src count 16 -table_dst count 1 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +FIXED + +Test case 85: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False +table_src count 8 +table_dst count 2 +0 +0 +table_src count 16 +table_dst count 2 +0 +0 +FIXED Test case 86: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -841,19 +969,25 @@ OK Test case 88: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +table_src count 8 +table_dst count 6 +0 +0 +FIXED Test case 89: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_src count 1 -table_dst count 1 +table_dst count 2 0 0 table_src count 1 -table_dst count 1 +table_dst count 2 0 -EXPECTED_TO_FAIL +0 +FIXED Test case 90: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -879,19 +1013,25 @@ OK Test case 92: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 -0 -EXPECTED_TO_FAIL - -Test case 93: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False -table_src count 8 -table_dst count 1 +table_dst count 6 0 0 table_src count 16 -table_dst count 1 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +FIXED + +Test case 93: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False +table_src count 8 +table_dst count 2 +0 +0 +table_src count 16 +table_dst count 2 +0 +0 +FIXED Test case 94: insert_method=InsertValues engine=MergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -1093,19 +1233,25 @@ OK Test case 112: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +table_src count 8 +table_dst count 6 +0 +0 +FIXED Test case 113: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_src count 1 -table_dst count 1 +table_dst count 2 0 0 table_src count 1 -table_dst count 1 +table_dst count 2 0 -EXPECTED_TO_FAIL +0 +FIXED Test case 114: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -1131,19 +1277,25 @@ OK Test case 116: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 -0 -EXPECTED_TO_FAIL - -Test case 117: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False -table_src count 8 -table_dst count 1 +table_dst count 6 0 0 table_src count 16 -table_dst count 1 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +FIXED + +Test case 117: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False +table_src count 8 +table_dst count 2 +0 +0 +table_src count 16 +table_dst count 2 +0 +0 +FIXED Test case 118: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=True deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -1169,19 +1321,25 @@ OK Test case 120: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +table_src count 8 +table_dst count 6 +0 +0 +FIXED Test case 121: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=True insert_unique_blocks=False table_src count 1 -table_dst count 1 +table_dst count 2 0 0 table_src count 1 -table_dst count 1 +table_dst count 2 0 -EXPECTED_TO_FAIL +0 +FIXED Test case 122: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=True deduplicate_dst_table=False insert_unique_blocks=True table_src count 8 @@ -1207,19 +1365,25 @@ OK Test case 124: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=True table_src count 8 -table_dst count 4 -0 -EXPECTED_TO_FAIL - -Test case 125: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False -table_src count 8 -table_dst count 1 +table_dst count 6 0 0 table_src count 16 -table_dst count 1 +table_dst count 6 0 -EXPECTED_TO_FAIL +0 +FIXED + +Test case 125: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=True insert_unique_blocks=False +table_src count 8 +table_dst count 2 +0 +0 +table_src count 16 +table_dst count 2 +0 +0 +FIXED Test case 126: insert_method=InsertValues engine=ReplicatedMergeTree use_insert_token=False single_thread=False deduplicate_src_table=False deduplicate_dst_table=False insert_unique_blocks=True table_src count 8