diff --git a/src/Processors/Transforms/ApplySquashingTransform.h b/src/Processors/Transforms/ApplySquashingTransform.h index 94b890198d4..49a6581e685 100644 --- a/src/Processors/Transforms/ApplySquashingTransform.h +++ b/src/Processors/Transforms/ApplySquashingTransform.h @@ -32,7 +32,7 @@ public: protected: void onConsume(Chunk chunk) override { - cur_chunk = DB::Squashing::squash(std::move(chunk)); + cur_chunk = Squashing::squash(std::move(chunk)); } GenerateResult onGenerate() override diff --git a/src/Processors/Transforms/DeduplicationTokenTransforms.cpp b/src/Processors/Transforms/DeduplicationTokenTransforms.cpp index 374a6495f79..f50e69e730f 100644 --- a/src/Processors/Transforms/DeduplicationTokenTransforms.cpp +++ b/src/Processors/Transforms/DeduplicationTokenTransforms.cpp @@ -56,7 +56,7 @@ String TokenInfo::debugToken() const void TokenInfo::addChunkHash(String part) { - if (stage == UNDEFINED) + if (stage == UNDEFINED && empty()) stage = DEFINE_SOURCE_WITH_HASHES; if (stage != DEFINE_SOURCE_WITH_HASHES) @@ -65,7 +65,7 @@ void TokenInfo::addChunkHash(String part) addTokenPart(std::move(part)); } -void TokenInfo::defineSourceWithChunkHashes() +void TokenInfo::finishChunkHashes() { if (stage == UNDEFINED && empty()) stage = DEFINE_SOURCE_WITH_HASHES; @@ -78,7 +78,7 @@ void TokenInfo::defineSourceWithChunkHashes() void TokenInfo::setUserToken(const String & token) { - if (stage == UNDEFINED) + if (stage == UNDEFINED && empty()) stage = DEFINE_SOURCE_USER_TOKEN; if (stage != DEFINE_SOURCE_USER_TOKEN) @@ -87,7 +87,7 @@ void TokenInfo::setUserToken(const String & token) addTokenPart(fmt::format("user-token-{}", token)); } -void TokenInfo::defineSourceWithUserToken(size_t block_number) +void TokenInfo::setSourceWithUserToken(size_t block_number) { if (stage != DEFINE_SOURCE_USER_TOKEN) throw Exception(ErrorCodes::LOGICAL_ERROR, "token is in wrong stage {}, token {}", stage, debugToken()); @@ -108,7 +108,7 @@ void TokenInfo::setViewID(const String & id) addTokenPart(fmt::format("view-id-{}", id)); } -void TokenInfo::defineViewID(size_t block_number) +void TokenInfo::setViewBlockNumber(size_t block_number) { if (stage != DEFINE_VIEW) throw Exception(ErrorCodes::LOGICAL_ERROR, "token is in wrong stage {}, token {}", stage, debugToken()); @@ -138,6 +138,7 @@ size_t TokenInfo::getTotalSize() const for (const auto & part : parts) size += part.size(); + // we reserve more size here to be able to add delimenter between parts. return size + parts.size() - 1; } @@ -149,17 +150,11 @@ void CheckTokenTransform::transform(Chunk & chunk) if (!token_info) throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk has to have DedupTokenInfo as ChunkInfo, {}", debug); - if (!must_be_present) - { - LOG_DEBUG(log, "{}, no token required, token {}", debug, token_info->debugToken()); - return; - } - LOG_DEBUG(log, "debug: {}, token: {}", debug, token_info->debugToken()); } #endif -String SetInitialTokenTransform::getChunkHash(const Chunk & chunk) +String DefineSourceWithChunkHashesTransform::getChunkHash(const Chunk & chunk) { SipHash hash; for (const auto & colunm : chunk.getColumns()) @@ -170,20 +165,20 @@ String SetInitialTokenTransform::getChunkHash(const Chunk & chunk) } -void SetInitialTokenTransform::transform(Chunk & chunk) +void DefineSourceWithChunkHashesTransform::transform(Chunk & chunk) { auto token_info = chunk.getChunkInfos().get(); if (!token_info) throw Exception( ErrorCodes::LOGICAL_ERROR, - "TokenInfo is expected for consumed chunk in SetInitialTokenTransform"); + "TokenInfo is expected for consumed chunk in DefineSourceWithChunkHashesTransform"); if (token_info->isDefined()) return; token_info->addChunkHash(getChunkHash(chunk)); - token_info->defineSourceWithChunkHashes(); + token_info->finishChunkHashes(); } void SetUserTokenTransform::transform(Chunk & chunk) @@ -203,7 +198,7 @@ void SetSourceBlockNumberTransform::transform(Chunk & chunk) throw Exception( ErrorCodes::LOGICAL_ERROR, "TokenInfo is expected for consumed chunk in SetSourceBlockNumberTransform"); - token_info->defineSourceWithUserToken(block_number++); + token_info->setSourceWithUserToken(block_number++); } void SetViewIDTransform::transform(Chunk & chunk) @@ -223,7 +218,7 @@ void SetViewBlockNumberTransform::transform(Chunk & chunk) throw Exception( ErrorCodes::LOGICAL_ERROR, "TokenInfo is expected for consumed chunk in SetViewBlockNumberTransform"); - token_info->defineViewID(block_number++); + token_info->setViewBlockNumber(block_number++); } void ResetTokenTransform::transform(Chunk & chunk) diff --git a/src/Processors/Transforms/DeduplicationTokenTransforms.h b/src/Processors/Transforms/DeduplicationTokenTransforms.h index 9d087536a38..79d168d1000 100644 --- a/src/Processors/Transforms/DeduplicationTokenTransforms.h +++ b/src/Processors/Transforms/DeduplicationTokenTransforms.h @@ -42,13 +42,13 @@ namespace DeduplicationToken bool isDefined() const { return stage == DEFINED; } void addChunkHash(String part); - void defineSourceWithChunkHashes(); + void finishChunkHashes(); void setUserToken(const String & token); - void defineSourceWithUserToken(size_t block_number); + void setSourceWithUserToken(size_t block_number); void setViewID(const String & id); - void defineViewID(size_t block_number); + void setViewBlockNumber(size_t block_number); void reset(); @@ -98,10 +98,9 @@ namespace DeduplicationToken class CheckTokenTransform : public ISimpleTransform { public: - CheckTokenTransform(String debug_, bool must_be_present_, const Block & header_) + CheckTokenTransform(String debug_, const Block & header_) : ISimpleTransform(header_, header_, true) , debug(std::move(debug_)) - , must_be_present(must_be_present_) { } @@ -112,7 +111,6 @@ namespace DeduplicationToken private: String debug; LoggerPtr log = getLogger("CheckInsertDeduplicationTokenTransform"); - bool must_be_present = false; }; #endif @@ -134,16 +132,19 @@ namespace DeduplicationToken }; - class SetInitialTokenTransform : public ISimpleTransform + class DefineSourceWithChunkHashesTransform : public ISimpleTransform { public: - explicit SetInitialTokenTransform(const Block & header_) + explicit DefineSourceWithChunkHashesTransform(const Block & header_) : ISimpleTransform(header_, header_, true) { } - String getName() const override { return "DeduplicationToken::SetInitialTokenTransform"; } + String getName() const override { return "DeduplicationToken::DefineSourceWithChunkHashesTransform"; } + // Usually MergeTreeSink/ReplicatedMergeTreeSink calls addChunkHash for the deduplication token with heshes from the parts. + // But if there is some table with different engine, we still need to define the source of the data in deduplication token + // We use that transform to define the source as a hash of entire block in deduplication token void transform(Chunk & chunk) override; static String getChunkHash(const Chunk & chunk); diff --git a/src/Processors/Transforms/SquashingTransform.cpp b/src/Processors/Transforms/SquashingTransform.cpp index e457a262681..1fb4433240a 100644 --- a/src/Processors/Transforms/SquashingTransform.cpp +++ b/src/Processors/Transforms/SquashingTransform.cpp @@ -18,7 +18,7 @@ SquashingTransform::SquashingTransform( void SquashingTransform::onConsume(Chunk chunk) { - cur_chunk = DB::Squashing::squash(squashing.add(std::move(chunk))); + cur_chunk = Squashing::squash(squashing.add(std::move(chunk))); } SquashingTransform::GenerateResult SquashingTransform::onGenerate() @@ -31,7 +31,7 @@ SquashingTransform::GenerateResult SquashingTransform::onGenerate() void SquashingTransform::onFinish() { - finish_chunk = DB::Squashing::squash(squashing.flush()); + finish_chunk = Squashing::squash(squashing.flush()); } void SquashingTransform::work() @@ -63,14 +63,14 @@ void SimpleSquashingTransform::transform(Chunk & chunk) { if (!finished) { - chunk = DB::Squashing::squash(squashing.add(std::move(chunk))); + chunk = Squashing::squash(squashing.add(std::move(chunk))); } else { if (chunk.hasRows()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost"); - chunk = DB::Squashing::squash(squashing.flush()); + chunk = Squashing::squash(squashing.flush()); } } diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index 713ab25600f..8d38396ecd5 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -357,7 +357,7 @@ std::optional generateViewChain( } #ifdef ABORT_ON_LOGICAL_ERROR - out.addSource(std::make_shared("Before squashing", !disable_deduplication_for_children, out.getInputHeader())); + out.addSource(std::make_shared("Before squashing", out.getInputHeader())); #endif auto counting = std::make_shared(out.getInputHeader(), current_thread, insert_context->getQuota()); @@ -403,7 +403,7 @@ std::optional generateViewChain( if (type == QueryViewsLogElement::ViewType::MATERIALIZED) { #ifdef ABORT_ON_LOGICAL_ERROR - out.addSource(std::make_shared("Right after Inner query", !disable_deduplication_for_children, out.getInputHeader())); + out.addSource(std::make_shared("Right after Inner query", out.getInputHeader())); #endif auto executing_inner_query = std::make_shared( @@ -413,7 +413,7 @@ std::optional generateViewChain( out.addSource(std::move(executing_inner_query)); #ifdef ABORT_ON_LOGICAL_ERROR - out.addSource(std::make_shared("Right before Inner query", !disable_deduplication_for_children, out.getInputHeader())); + out.addSource(std::make_shared("Right before Inner query", out.getInputHeader())); #endif } @@ -547,7 +547,7 @@ Chain buildPushingToViewsChain( sink->setRuntimeData(thread_status, elapsed_counter_ms); result_chain.addSource(std::move(sink)); - result_chain.addSource(std::make_shared(result_chain.getInputHeader())); + result_chain.addSource(std::make_shared(result_chain.getInputHeader())); } else if (auto * window_view = dynamic_cast(storage.get())) { @@ -555,7 +555,7 @@ Chain buildPushingToViewsChain( sink->setRuntimeData(thread_status, elapsed_counter_ms); result_chain.addSource(std::move(sink)); - result_chain.addSource(std::make_shared(result_chain.getInputHeader())); + result_chain.addSource(std::make_shared(result_chain.getInputHeader())); } else if (dynamic_cast(storage.get())) { @@ -564,7 +564,7 @@ Chain buildPushingToViewsChain( sink->setRuntimeData(thread_status, elapsed_counter_ms); result_chain.addSource(std::move(sink)); - result_chain.addSource(std::make_shared(result_chain.getInputHeader())); + result_chain.addSource(std::make_shared(result_chain.getInputHeader())); } /// Do not push to destination table if the flag is set else if (!no_destination) @@ -573,13 +573,13 @@ Chain buildPushingToViewsChain( metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName()); sink->setRuntimeData(thread_status, elapsed_counter_ms); - result_chain.addSource(std::make_shared(sink->getHeader())); + result_chain.addSource(std::make_shared(sink->getHeader())); result_chain.addSource(std::move(sink)); } else { - result_chain.addSource(std::make_shared(storage_header)); + result_chain.addSource(std::make_shared(storage_header)); } if (result_chain.empty()) diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index a705ae2e013..ee38b7242b1 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -889,7 +889,7 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro while (readDataNext()) { squashing.setHeader(state.block_for_insert.cloneEmpty()); - auto result_chunk = DB::Squashing::squash(squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()})); + auto result_chunk = Squashing::squash(squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()})); if (result_chunk) { auto result = squashing.getHeader().cloneWithColumns(result_chunk.detachColumns()); @@ -901,7 +901,7 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro } } - Chunk result_chunk = DB::Squashing::squash(squashing.flush()); + Chunk result_chunk = Squashing::squash(squashing.flush()); if (!result_chunk) { return insert_queue.pushQueryWithBlock(state.parsed_query, squashing.getHeader(), query_context); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 63858ce601d..429fd8b67c5 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -2322,12 +2322,11 @@ String IMergeTreeDataPart::getUniqueId() const return getDataPartStorage().getUniqueId(); } -String IMergeTreeDataPart::getPartBlockIDHash() const +UInt128 IMergeTreeDataPart::getPartBlockIDHash() const { SipHash hash; checksums.computeTotalChecksumDataOnly(hash); - const auto hash_value = hash.get128(); - return toString(hash_value.items[0]) + "_" + toString(hash_value.items[1]); + return hash.get128(); } String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const @@ -2336,7 +2335,10 @@ String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name); if (token.empty()) - return info.partition_id + "_" + getPartBlockIDHash(); + { + const auto hash_value = getPartBlockIDHash(); + return info.partition_id + "_" + toString(hash_value.items[0]) + "_" + toString(hash_value.items[1]); + } SipHash hash; hash.update(token.data(), token.size()); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 204dfdaad0a..dbb1df3cfe8 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -210,7 +210,7 @@ public: /// Compute part block id for zero level part. Otherwise throws an exception. /// If token is not empty, block id is calculated based on it instead of block data - String getPartBlockIDHash() const; + UInt128 getPartBlockIDHash() const; String getZeroLevelPartBlockID(std::string_view token) const; void setName(const String & new_name); diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 7bc04c05a1c..4a1163d2317 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -126,7 +126,8 @@ void MergeTreeSink::consume(Chunk & chunk) if (!token_info->isDefined()) { chassert(temp_part.part); - token_info->addChunkHash(temp_part.part->getPartBlockIDHash()); + const auto hash_value = temp_part.part->getPartBlockIDHash(); + token_info->addChunkHash(toString(hash_value.items[0]) + "_" + toString(hash_value.items[1])); } if (!support_parallel_write && temp_part.part->getDataPartStorage().supportParallelWrite()) @@ -167,7 +168,7 @@ void MergeTreeSink::consume(Chunk & chunk) if (!token_info->isDefined()) { - token_info->defineSourceWithChunkHashes(); + token_info->finishChunkHashes(); } finishDelayedChunk(); @@ -206,7 +207,6 @@ void MergeTreeSink::finishDelayedChunk() if (settings.insert_deduplicate && deduplication_log) { const String block_id = part->getZeroLevelPartBlockID(partition.block_dedup_token); - auto res = deduplication_log->addPart(block_id, part->info); if (!res.second) { diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 5da36b6ee3b..3dbcb5e5bda 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1317,7 +1317,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() Block block_to_squash = projection.calculate(cur_block, ctx->context); projection_squashes[i].setHeader(block_to_squash.cloneEmpty()); - Chunk squashed_chunk = DB::Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()})); + Chunk squashed_chunk = Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()})); if (squashed_chunk) { auto result = projection_squashes[i].getHeader().cloneWithColumns(squashed_chunk.detachColumns()); @@ -1341,7 +1341,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { const auto & projection = *ctx->projections_to_build[i]; auto & projection_squash_plan = projection_squashes[i]; - auto squashed_chunk = DB::Squashing::squash(projection_squash_plan.flush()); + auto squashed_chunk = Squashing::squash(projection_squash_plan.flush()); if (squashed_chunk) { auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns()); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 228b5c596ab..3677f5b02ab 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -374,7 +374,8 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk & chunk) if (!token_info->isDefined()) { chassert(temp_part.part); - token_info->addChunkHash(temp_part.part->getPartBlockIDHash()); + const auto hash_value = temp_part.part->getPartBlockIDHash(); + token_info->addChunkHash(toString(hash_value.items[0]) + "_" + toString(hash_value.items[1])); } } @@ -423,7 +424,7 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk & chunk) if (!token_info->isDefined()) { - token_info->defineSourceWithChunkHashes(); + token_info->finishChunkHashes(); } finishDelayedChunk(zookeeper); diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index ccb6259da00..e36247103c7 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -1558,7 +1558,7 @@ void StorageWindowView::writeIntoWindowView( #ifdef ABORT_ON_LOGICAL_ERROR builder.addSimpleTransform([&](const Block & stream_header) { - return std::make_shared("StorageWindowView: Afrer tmp table before squashing", true, stream_header); + return std::make_shared("StorageWindowView: Afrer tmp table before squashing", stream_header); }); #endif @@ -1604,7 +1604,7 @@ void StorageWindowView::writeIntoWindowView( #ifdef ABORT_ON_LOGICAL_ERROR builder.addSimpleTransform([&](const Block & stream_header) { - return std::make_shared("StorageWindowView: Afrer WatermarkTransform", true, stream_header); + return std::make_shared("StorageWindowView: Afrer WatermarkTransform", stream_header); }); #endif @@ -1630,7 +1630,7 @@ void StorageWindowView::writeIntoWindowView( #ifdef ABORT_ON_LOGICAL_ERROR builder.addSimpleTransform([&](const Block & stream_header) { - return std::make_shared("StorageWindowView: Before out", true, stream_header); + return std::make_shared("StorageWindowView: Before out", stream_header); }); #endif