work with review

This commit is contained in:
Sema Checherinda 2024-07-02 15:16:57 +02:00
parent 05dbb241f5
commit 06e235024f
12 changed files with 55 additions and 56 deletions

View File

@ -32,7 +32,7 @@ public:
protected:
void onConsume(Chunk chunk) override
{
cur_chunk = DB::Squashing::squash(std::move(chunk));
cur_chunk = Squashing::squash(std::move(chunk));
}
GenerateResult onGenerate() override

View File

@ -56,7 +56,7 @@ String TokenInfo::debugToken() const
void TokenInfo::addChunkHash(String part)
{
if (stage == UNDEFINED)
if (stage == UNDEFINED && empty())
stage = DEFINE_SOURCE_WITH_HASHES;
if (stage != DEFINE_SOURCE_WITH_HASHES)
@ -65,7 +65,7 @@ void TokenInfo::addChunkHash(String part)
addTokenPart(std::move(part));
}
void TokenInfo::defineSourceWithChunkHashes()
void TokenInfo::finishChunkHashes()
{
if (stage == UNDEFINED && empty())
stage = DEFINE_SOURCE_WITH_HASHES;
@ -78,7 +78,7 @@ void TokenInfo::defineSourceWithChunkHashes()
void TokenInfo::setUserToken(const String & token)
{
if (stage == UNDEFINED)
if (stage == UNDEFINED && empty())
stage = DEFINE_SOURCE_USER_TOKEN;
if (stage != DEFINE_SOURCE_USER_TOKEN)
@ -87,7 +87,7 @@ void TokenInfo::setUserToken(const String & token)
addTokenPart(fmt::format("user-token-{}", token));
}
void TokenInfo::defineSourceWithUserToken(size_t block_number)
void TokenInfo::setSourceWithUserToken(size_t block_number)
{
if (stage != DEFINE_SOURCE_USER_TOKEN)
throw Exception(ErrorCodes::LOGICAL_ERROR, "token is in wrong stage {}, token {}", stage, debugToken());
@ -108,7 +108,7 @@ void TokenInfo::setViewID(const String & id)
addTokenPart(fmt::format("view-id-{}", id));
}
void TokenInfo::defineViewID(size_t block_number)
void TokenInfo::setViewBlockNumber(size_t block_number)
{
if (stage != DEFINE_VIEW)
throw Exception(ErrorCodes::LOGICAL_ERROR, "token is in wrong stage {}, token {}", stage, debugToken());
@ -138,6 +138,7 @@ size_t TokenInfo::getTotalSize() const
for (const auto & part : parts)
size += part.size();
// we reserve more size here to be able to add delimenter between parts.
return size + parts.size() - 1;
}
@ -149,17 +150,11 @@ void CheckTokenTransform::transform(Chunk & chunk)
if (!token_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk has to have DedupTokenInfo as ChunkInfo, {}", debug);
if (!must_be_present)
{
LOG_DEBUG(log, "{}, no token required, token {}", debug, token_info->debugToken());
return;
}
LOG_DEBUG(log, "debug: {}, token: {}", debug, token_info->debugToken());
}
#endif
String SetInitialTokenTransform::getChunkHash(const Chunk & chunk)
String DefineSourceWithChunkHashesTransform::getChunkHash(const Chunk & chunk)
{
SipHash hash;
for (const auto & colunm : chunk.getColumns())
@ -170,20 +165,20 @@ String SetInitialTokenTransform::getChunkHash(const Chunk & chunk)
}
void SetInitialTokenTransform::transform(Chunk & chunk)
void DefineSourceWithChunkHashesTransform::transform(Chunk & chunk)
{
auto token_info = chunk.getChunkInfos().get<TokenInfo>();
if (!token_info)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"TokenInfo is expected for consumed chunk in SetInitialTokenTransform");
"TokenInfo is expected for consumed chunk in DefineSourceWithChunkHashesTransform");
if (token_info->isDefined())
return;
token_info->addChunkHash(getChunkHash(chunk));
token_info->defineSourceWithChunkHashes();
token_info->finishChunkHashes();
}
void SetUserTokenTransform::transform(Chunk & chunk)
@ -203,7 +198,7 @@ void SetSourceBlockNumberTransform::transform(Chunk & chunk)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"TokenInfo is expected for consumed chunk in SetSourceBlockNumberTransform");
token_info->defineSourceWithUserToken(block_number++);
token_info->setSourceWithUserToken(block_number++);
}
void SetViewIDTransform::transform(Chunk & chunk)
@ -223,7 +218,7 @@ void SetViewBlockNumberTransform::transform(Chunk & chunk)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"TokenInfo is expected for consumed chunk in SetViewBlockNumberTransform");
token_info->defineViewID(block_number++);
token_info->setViewBlockNumber(block_number++);
}
void ResetTokenTransform::transform(Chunk & chunk)

View File

@ -42,13 +42,13 @@ namespace DeduplicationToken
bool isDefined() const { return stage == DEFINED; }
void addChunkHash(String part);
void defineSourceWithChunkHashes();
void finishChunkHashes();
void setUserToken(const String & token);
void defineSourceWithUserToken(size_t block_number);
void setSourceWithUserToken(size_t block_number);
void setViewID(const String & id);
void defineViewID(size_t block_number);
void setViewBlockNumber(size_t block_number);
void reset();
@ -98,10 +98,9 @@ namespace DeduplicationToken
class CheckTokenTransform : public ISimpleTransform
{
public:
CheckTokenTransform(String debug_, bool must_be_present_, const Block & header_)
CheckTokenTransform(String debug_, const Block & header_)
: ISimpleTransform(header_, header_, true)
, debug(std::move(debug_))
, must_be_present(must_be_present_)
{
}
@ -112,7 +111,6 @@ namespace DeduplicationToken
private:
String debug;
LoggerPtr log = getLogger("CheckInsertDeduplicationTokenTransform");
bool must_be_present = false;
};
#endif
@ -134,16 +132,19 @@ namespace DeduplicationToken
};
class SetInitialTokenTransform : public ISimpleTransform
class DefineSourceWithChunkHashesTransform : public ISimpleTransform
{
public:
explicit SetInitialTokenTransform(const Block & header_)
explicit DefineSourceWithChunkHashesTransform(const Block & header_)
: ISimpleTransform(header_, header_, true)
{
}
String getName() const override { return "DeduplicationToken::SetInitialTokenTransform"; }
String getName() const override { return "DeduplicationToken::DefineSourceWithChunkHashesTransform"; }
// Usually MergeTreeSink/ReplicatedMergeTreeSink calls addChunkHash for the deduplication token with heshes from the parts.
// But if there is some table with different engine, we still need to define the source of the data in deduplication token
// We use that transform to define the source as a hash of entire block in deduplication token
void transform(Chunk & chunk) override;
static String getChunkHash(const Chunk & chunk);

View File

@ -18,7 +18,7 @@ SquashingTransform::SquashingTransform(
void SquashingTransform::onConsume(Chunk chunk)
{
cur_chunk = DB::Squashing::squash(squashing.add(std::move(chunk)));
cur_chunk = Squashing::squash(squashing.add(std::move(chunk)));
}
SquashingTransform::GenerateResult SquashingTransform::onGenerate()
@ -31,7 +31,7 @@ SquashingTransform::GenerateResult SquashingTransform::onGenerate()
void SquashingTransform::onFinish()
{
finish_chunk = DB::Squashing::squash(squashing.flush());
finish_chunk = Squashing::squash(squashing.flush());
}
void SquashingTransform::work()
@ -63,14 +63,14 @@ void SimpleSquashingTransform::transform(Chunk & chunk)
{
if (!finished)
{
chunk = DB::Squashing::squash(squashing.add(std::move(chunk)));
chunk = Squashing::squash(squashing.add(std::move(chunk)));
}
else
{
if (chunk.hasRows())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost");
chunk = DB::Squashing::squash(squashing.flush());
chunk = Squashing::squash(squashing.flush());
}
}

View File

@ -357,7 +357,7 @@ std::optional<Chain> generateViewChain(
}
#ifdef ABORT_ON_LOGICAL_ERROR
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Before squashing", !disable_deduplication_for_children, out.getInputHeader()));
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Before squashing", out.getInputHeader()));
#endif
auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), current_thread, insert_context->getQuota());
@ -403,7 +403,7 @@ std::optional<Chain> generateViewChain(
if (type == QueryViewsLogElement::ViewType::MATERIALIZED)
{
#ifdef ABORT_ON_LOGICAL_ERROR
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right after Inner query", !disable_deduplication_for_children, out.getInputHeader()));
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right after Inner query", out.getInputHeader()));
#endif
auto executing_inner_query = std::make_shared<ExecutingInnerQueryFromViewTransform>(
@ -413,7 +413,7 @@ std::optional<Chain> generateViewChain(
out.addSource(std::move(executing_inner_query));
#ifdef ABORT_ON_LOGICAL_ERROR
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right before Inner query", !disable_deduplication_for_children, out.getInputHeader()));
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right before Inner query", out.getInputHeader()));
#endif
}
@ -547,7 +547,7 @@ Chain buildPushingToViewsChain(
sink->setRuntimeData(thread_status, elapsed_counter_ms);
result_chain.addSource(std::move(sink));
result_chain.addSource(std::make_shared<DeduplicationToken::SetInitialTokenTransform>(result_chain.getInputHeader()));
result_chain.addSource(std::make_shared<DeduplicationToken::DefineSourceWithChunkHashesTransform>(result_chain.getInputHeader()));
}
else if (auto * window_view = dynamic_cast<StorageWindowView *>(storage.get()))
{
@ -555,7 +555,7 @@ Chain buildPushingToViewsChain(
sink->setRuntimeData(thread_status, elapsed_counter_ms);
result_chain.addSource(std::move(sink));
result_chain.addSource(std::make_shared<DeduplicationToken::SetInitialTokenTransform>(result_chain.getInputHeader()));
result_chain.addSource(std::make_shared<DeduplicationToken::DefineSourceWithChunkHashesTransform>(result_chain.getInputHeader()));
}
else if (dynamic_cast<StorageMaterializedView *>(storage.get()))
{
@ -564,7 +564,7 @@ Chain buildPushingToViewsChain(
sink->setRuntimeData(thread_status, elapsed_counter_ms);
result_chain.addSource(std::move(sink));
result_chain.addSource(std::make_shared<DeduplicationToken::SetInitialTokenTransform>(result_chain.getInputHeader()));
result_chain.addSource(std::make_shared<DeduplicationToken::DefineSourceWithChunkHashesTransform>(result_chain.getInputHeader()));
}
/// Do not push to destination table if the flag is set
else if (!no_destination)
@ -573,13 +573,13 @@ Chain buildPushingToViewsChain(
metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName());
sink->setRuntimeData(thread_status, elapsed_counter_ms);
result_chain.addSource(std::make_shared<DeduplicationToken::SetInitialTokenTransform>(sink->getHeader()));
result_chain.addSource(std::make_shared<DeduplicationToken::DefineSourceWithChunkHashesTransform>(sink->getHeader()));
result_chain.addSource(std::move(sink));
}
else
{
result_chain.addSource(std::make_shared<DeduplicationToken::SetInitialTokenTransform>(storage_header));
result_chain.addSource(std::make_shared<DeduplicationToken::DefineSourceWithChunkHashesTransform>(storage_header));
}
if (result_chain.empty())

View File

@ -889,7 +889,7 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro
while (readDataNext())
{
squashing.setHeader(state.block_for_insert.cloneEmpty());
auto result_chunk = DB::Squashing::squash(squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()}));
auto result_chunk = Squashing::squash(squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()}));
if (result_chunk)
{
auto result = squashing.getHeader().cloneWithColumns(result_chunk.detachColumns());
@ -901,7 +901,7 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro
}
}
Chunk result_chunk = DB::Squashing::squash(squashing.flush());
Chunk result_chunk = Squashing::squash(squashing.flush());
if (!result_chunk)
{
return insert_queue.pushQueryWithBlock(state.parsed_query, squashing.getHeader(), query_context);

View File

@ -2322,12 +2322,11 @@ String IMergeTreeDataPart::getUniqueId() const
return getDataPartStorage().getUniqueId();
}
String IMergeTreeDataPart::getPartBlockIDHash() const
UInt128 IMergeTreeDataPart::getPartBlockIDHash() const
{
SipHash hash;
checksums.computeTotalChecksumDataOnly(hash);
const auto hash_value = hash.get128();
return toString(hash_value.items[0]) + "_" + toString(hash_value.items[1]);
return hash.get128();
}
String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
@ -2336,7 +2335,10 @@ String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name);
if (token.empty())
return info.partition_id + "_" + getPartBlockIDHash();
{
const auto hash_value = getPartBlockIDHash();
return info.partition_id + "_" + toString(hash_value.items[0]) + "_" + toString(hash_value.items[1]);
}
SipHash hash;
hash.update(token.data(), token.size());

View File

@ -210,7 +210,7 @@ public:
/// Compute part block id for zero level part. Otherwise throws an exception.
/// If token is not empty, block id is calculated based on it instead of block data
String getPartBlockIDHash() const;
UInt128 getPartBlockIDHash() const;
String getZeroLevelPartBlockID(std::string_view token) const;
void setName(const String & new_name);

View File

@ -126,7 +126,8 @@ void MergeTreeSink::consume(Chunk & chunk)
if (!token_info->isDefined())
{
chassert(temp_part.part);
token_info->addChunkHash(temp_part.part->getPartBlockIDHash());
const auto hash_value = temp_part.part->getPartBlockIDHash();
token_info->addChunkHash(toString(hash_value.items[0]) + "_" + toString(hash_value.items[1]));
}
if (!support_parallel_write && temp_part.part->getDataPartStorage().supportParallelWrite())
@ -167,7 +168,7 @@ void MergeTreeSink::consume(Chunk & chunk)
if (!token_info->isDefined())
{
token_info->defineSourceWithChunkHashes();
token_info->finishChunkHashes();
}
finishDelayedChunk();
@ -206,7 +207,6 @@ void MergeTreeSink::finishDelayedChunk()
if (settings.insert_deduplicate && deduplication_log)
{
const String block_id = part->getZeroLevelPartBlockID(partition.block_dedup_token);
auto res = deduplication_log->addPart(block_id, part->info);
if (!res.second)
{

View File

@ -1317,7 +1317,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
Block block_to_squash = projection.calculate(cur_block, ctx->context);
projection_squashes[i].setHeader(block_to_squash.cloneEmpty());
Chunk squashed_chunk = DB::Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}));
Chunk squashed_chunk = Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}));
if (squashed_chunk)
{
auto result = projection_squashes[i].getHeader().cloneWithColumns(squashed_chunk.detachColumns());
@ -1341,7 +1341,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
{
const auto & projection = *ctx->projections_to_build[i];
auto & projection_squash_plan = projection_squashes[i];
auto squashed_chunk = DB::Squashing::squash(projection_squash_plan.flush());
auto squashed_chunk = Squashing::squash(projection_squash_plan.flush());
if (squashed_chunk)
{
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());

View File

@ -374,7 +374,8 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
if (!token_info->isDefined())
{
chassert(temp_part.part);
token_info->addChunkHash(temp_part.part->getPartBlockIDHash());
const auto hash_value = temp_part.part->getPartBlockIDHash();
token_info->addChunkHash(toString(hash_value.items[0]) + "_" + toString(hash_value.items[1]));
}
}
@ -423,7 +424,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
if (!token_info->isDefined())
{
token_info->defineSourceWithChunkHashes();
token_info->finishChunkHashes();
}
finishDelayedChunk(zookeeper);

View File

@ -1558,7 +1558,7 @@ void StorageWindowView::writeIntoWindowView(
#ifdef ABORT_ON_LOGICAL_ERROR
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer tmp table before squashing", true, stream_header);
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer tmp table before squashing", stream_header);
});
#endif
@ -1604,7 +1604,7 @@ void StorageWindowView::writeIntoWindowView(
#ifdef ABORT_ON_LOGICAL_ERROR
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer WatermarkTransform", true, stream_header);
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer WatermarkTransform", stream_header);
});
#endif
@ -1630,7 +1630,7 @@ void StorageWindowView::writeIntoWindowView(
#ifdef ABORT_ON_LOGICAL_ERROR
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Before out", true, stream_header);
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Before out", stream_header);
});
#endif