work with review comments

This commit is contained in:
Sema Checherinda 2024-07-03 14:05:06 +02:00
parent c4207e9a6e
commit 913e97b1a5
3 changed files with 9 additions and 5 deletions

View File

@ -674,7 +674,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query
{
auto [presink_chains, sink_chains] = buildPreAndSinkChains(
1, 1,
/* presink_streams */1, /* sink_streams */1,
table, metadata_snapshot, query_sample_block);
chain = std::move(presink_chains.front());

View File

@ -95,6 +95,8 @@ void MergeTreeSink::consume(Chunk & chunk)
"TokenInfo is expected for consumed chunk in MergeTreeSink for table: {}",
storage.getStorageID().getNameForLogs());
const bool need_to_define_dedup_token = !token_info->isDefined();
String block_dedup_token;
if (token_info->isDefined())
block_dedup_token = token_info->getToken();
@ -123,7 +125,7 @@ void MergeTreeSink::consume(Chunk & chunk)
if (!temp_part.part)
continue;
if (!token_info->isDefined())
if (need_to_define_dedup_token)
{
chassert(temp_part.part);
const auto hash_value = temp_part.part->getPartBlockIDHash();
@ -166,7 +168,7 @@ void MergeTreeSink::consume(Chunk & chunk)
});
}
if (!token_info->isDefined())
if (need_to_define_dedup_token)
{
token_info->finishChunkHashes();
}

View File

@ -302,6 +302,8 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
"TokenInfo is expected for consumed chunk in ReplicatedMergeTreeSink for table: {}",
storage.getStorageID().getNameForLogs());
const bool need_to_define_dedup_token = !token_info->isDefined();
if (token_info->isDefined())
block_dedup_token = token_info->getToken();
@ -368,7 +370,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num));
}
if (!token_info->isDefined())
if (need_to_define_dedup_token)
{
chassert(temp_part.part);
const auto hash_value = temp_part.part->getPartBlockIDHash();
@ -419,7 +421,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
));
}
if (!token_info->isDefined())
if (need_to_define_dedup_token)
{
token_info->finishChunkHashes();
}