mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 11:32:03 +00:00
rm debug printing
This commit is contained in:
parent
bdcf3a0739
commit
24bf946c00
@ -626,7 +626,6 @@ class IColumn;
|
|||||||
M(Bool, enable_early_constant_folding, true, "Enable query optimization where we analyze function and subqueries results and rewrite query if there are constants there", 0) \
|
M(Bool, enable_early_constant_folding, true, "Enable query optimization where we analyze function and subqueries results and rewrite query if there are constants there", 0) \
|
||||||
M(Bool, deduplicate_blocks_in_dependent_materialized_views, false, "Should deduplicate blocks for materialized views if the block is not a duplicate for the table. Use true to always deduplicate in dependent tables.", 0) \
|
M(Bool, deduplicate_blocks_in_dependent_materialized_views, false, "Should deduplicate blocks for materialized views if the block is not a duplicate for the table. Use true to always deduplicate in dependent tables.", 0) \
|
||||||
M(Bool, throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert, true, "Throw exception on INSERT query when the setting `deduplicate_blocks_in_dependent_materialized_views` is enabled along with `async_insert`. It guarantees correctness, because these features can't work together.", 0) \
|
M(Bool, throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert, true, "Throw exception on INSERT query when the setting `deduplicate_blocks_in_dependent_materialized_views` is enabled along with `async_insert`. It guarantees correctness, because these features can't work together.", 0) \
|
||||||
M(Bool, update_insert_deduplication_token_in_dependent_materialized_views, false, "Should update insert deduplication token with table identifier during insert in dependent materialized views.", 0) \
|
|
||||||
M(Bool, materialized_views_ignore_errors, false, "Allows to ignore errors for MATERIALIZED VIEW, and deliver original block to the table regardless of MVs", 0) \
|
M(Bool, materialized_views_ignore_errors, false, "Allows to ignore errors for MATERIALIZED VIEW, and deliver original block to the table regardless of MVs", 0) \
|
||||||
M(Bool, ignore_materialized_views_with_dropped_target_table, false, "Ignore MVs with dropped target table during pushing to views", 0) \
|
M(Bool, ignore_materialized_views_with_dropped_target_table, false, "Ignore MVs with dropped target table during pushing to views", 0) \
|
||||||
M(Bool, allow_experimental_refreshable_materialized_view, false, "Allow refreshable materialized views (CREATE MATERIALIZED VIEW <name> REFRESH ...).", 0) \
|
M(Bool, allow_experimental_refreshable_materialized_view, false, "Allow refreshable materialized views (CREATE MATERIALIZED VIEW <name> REFRESH ...).", 0) \
|
||||||
|
@ -309,9 +309,6 @@ Chain InterpreterInsertQuery::buildSink(
|
|||||||
ThreadGroupPtr running_group,
|
ThreadGroupPtr running_group,
|
||||||
std::atomic_uint64_t * elapsed_counter_ms)
|
std::atomic_uint64_t * elapsed_counter_ms)
|
||||||
{
|
{
|
||||||
// LOG_DEBUG(getLogger("InsertQuery"),
|
|
||||||
// "called InterpreterInsertQuery::buildSink() engine {} table name {}.{}", table->getName(), table->getStorageID().database_name, table->getStorageID().table_name);
|
|
||||||
|
|
||||||
ThreadStatus * thread_status = current_thread;
|
ThreadStatus * thread_status = current_thread;
|
||||||
|
|
||||||
if (!thread_status_holder)
|
if (!thread_status_holder)
|
||||||
@ -413,10 +410,6 @@ std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildP
|
|||||||
|
|
||||||
for (size_t i = 0; i < sink_streams; ++i)
|
for (size_t i = 0; i < sink_streams; ++i)
|
||||||
{
|
{
|
||||||
// LOG_DEBUG(getLogger("InsertQuery"),
|
|
||||||
// "call buildSink sink_streams table name {}.{}, stream {}/{}",
|
|
||||||
// table->getStorageID().database_name, table->getStorageID().table_name, i, sink_streams);
|
|
||||||
|
|
||||||
auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr,
|
auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr,
|
||||||
running_group, /* elapsed_counter_ms= */ nullptr);
|
running_group, /* elapsed_counter_ms= */ nullptr);
|
||||||
|
|
||||||
@ -425,10 +418,6 @@ std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildP
|
|||||||
|
|
||||||
for (size_t i = 0; i < presink_streams; ++i)
|
for (size_t i = 0; i < presink_streams; ++i)
|
||||||
{
|
{
|
||||||
// LOG_DEBUG(getLogger("InsertQuery"),
|
|
||||||
// "call buildSink presink_streams table name {}.{}, stream {}/{}",
|
|
||||||
// table->getStorageID().database_name, table->getStorageID().table_name, i, presink_streams);
|
|
||||||
|
|
||||||
auto out = buildPreSinkChain(sink_chains[0].getInputHeader(), table, metadata_snapshot, query_sample_block);
|
auto out = buildPreSinkChain(sink_chains[0].getInputHeader(), table, metadata_snapshot, query_sample_block);
|
||||||
presink_chains.emplace_back(std::move(out));
|
presink_chains.emplace_back(std::move(out));
|
||||||
}
|
}
|
||||||
@ -462,9 +451,6 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
|
|||||||
|
|
||||||
ContextPtr select_context = getContext();
|
ContextPtr select_context = getContext();
|
||||||
|
|
||||||
// LOG_DEBUG(getLogger("InsertQuery"),
|
|
||||||
// "execute() is_trivial_insert_select {} prefersLargeBlocks={} max_insert_threads {}", is_trivial_insert_select, table->prefersLargeBlocks(), settings.max_insert_threads);
|
|
||||||
|
|
||||||
if (is_trivial_insert_select)
|
if (is_trivial_insert_select)
|
||||||
{
|
{
|
||||||
/** When doing trivial INSERT INTO ... SELECT ... FROM table,
|
/** When doing trivial INSERT INTO ... SELECT ... FROM table,
|
||||||
@ -511,11 +497,6 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
|
|||||||
|
|
||||||
pipeline.dropTotalsAndExtremes();
|
pipeline.dropTotalsAndExtremes();
|
||||||
|
|
||||||
// LOG_DEBUG(getLogger("InsertQuery"),
|
|
||||||
// "adding transforms, pipline size {}, threads {}, max_insert_threads {}",
|
|
||||||
// pipeline.getNumStreams(), pipeline.getNumThreads(), settings.max_insert_threads);
|
|
||||||
|
|
||||||
|
|
||||||
/// Allow to insert Nullable into non-Nullable columns, NULL values will be added as defaults values.
|
/// Allow to insert Nullable into non-Nullable columns, NULL values will be added as defaults values.
|
||||||
if (getContext()->getSettingsRef().insert_null_as_default)
|
if (getContext()->getSettingsRef().insert_null_as_default)
|
||||||
{
|
{
|
||||||
@ -743,14 +724,6 @@ BlockIO InterpreterInsertQuery::execute()
|
|||||||
StoragePtr table = getTable(query);
|
StoragePtr table = getTable(query);
|
||||||
checkStorageSupportsTransactionsIfNeeded(table, getContext());
|
checkStorageSupportsTransactionsIfNeeded(table, getContext());
|
||||||
|
|
||||||
// bool is_table_dist = false;
|
|
||||||
// if (auto * dist_storage = dynamic_cast<StorageDistributed *>(table.get()))
|
|
||||||
// {
|
|
||||||
// is_table_dist = true;
|
|
||||||
// // LOG_DEBUG(getLogger("InsertQuery"),
|
|
||||||
// // "dist_storage engine {} table name {}.{}", dist_storage->getName(), dist_storage->getStorageID().database_name, dist_storage->getStorageID().table_name);
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (query.partition_by && !table->supportsPartitionBy())
|
if (query.partition_by && !table->supportsPartitionBy())
|
||||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage");
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage");
|
||||||
|
|
||||||
@ -780,24 +753,20 @@ BlockIO InterpreterInsertQuery::execute()
|
|||||||
auto distributed = table->distributedWrite(query, getContext());
|
auto distributed = table->distributedWrite(query, getContext());
|
||||||
if (distributed)
|
if (distributed)
|
||||||
{
|
{
|
||||||
// LOG_DEBUG(getLogger("InsertQuery"),"as dist pipeline, is_table_dist {}", is_table_dist);
|
|
||||||
res.pipeline = std::move(*distributed);
|
res.pipeline = std::move(*distributed);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// LOG_DEBUG(getLogger("InsertQuery"),"as insert select after dist, is_table_dist {}", is_table_dist);
|
|
||||||
res.pipeline = buildInsertSelectPipeline(query, table);
|
res.pipeline = buildInsertSelectPipeline(query, table);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// LOG_DEBUG(getLogger("InsertQuery"),"as insert select, is_table_dist {}", is_table_dist);
|
|
||||||
res.pipeline = buildInsertSelectPipeline(query, table);
|
res.pipeline = buildInsertSelectPipeline(query, table);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// LOG_DEBUG(getLogger("InsertQuery"),"as just insert, is_table_dist {}", is_table_dist);
|
|
||||||
res.pipeline = buildInsertPipeline(query, table);
|
res.pipeline = buildInsertPipeline(query, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,11 +72,6 @@ void SquashingTransform::append(Block && input_block)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// LOG_DEBUG(getLogger("SquashingTransform"),
|
|
||||||
// "input_block rows {}, size {}, columns {}, accumulated_block rows {}, size {}, columns {}, ",
|
|
||||||
// input_block.rows(), input_block.bytes(), input_block.columns(),
|
|
||||||
// accumulated_block.rows(), accumulated_block.bytes(), accumulated_block.columns());
|
|
||||||
|
|
||||||
assert(blocksHaveEqualStructure(input_block, accumulated_block));
|
assert(blocksHaveEqualStructure(input_block, accumulated_block));
|
||||||
|
|
||||||
try
|
try
|
||||||
@ -86,13 +81,6 @@ void SquashingTransform::append(Block && input_block)
|
|||||||
const auto source_column = std::move(input_block.getByPosition(i).column);
|
const auto source_column = std::move(input_block.getByPosition(i).column);
|
||||||
auto acc_column = std::move(accumulated_block.getByPosition(i).column);
|
auto acc_column = std::move(accumulated_block.getByPosition(i).column);
|
||||||
|
|
||||||
// LOG_DEBUG(getLogger("SquashingTransform"),
|
|
||||||
// "column {} {}, acc rows {}, size {}, allocated {}, input rows {} size {} allocated {}",
|
|
||||||
// i, source_column->getName(),
|
|
||||||
// acc_column->size(), acc_column->byteSize(), acc_column->allocatedBytes(),
|
|
||||||
// source_column->size(), source_column->byteSize(), source_column->allocatedBytes());
|
|
||||||
|
|
||||||
|
|
||||||
auto mutable_column = IColumn::mutate(std::move(acc_column));
|
auto mutable_column = IColumn::mutate(std::move(acc_column));
|
||||||
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
|
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
|
||||||
accumulated_block.getByPosition(i).column = std::move(mutable_column);
|
accumulated_block.getByPosition(i).column = std::move(mutable_column);
|
||||||
|
@ -20,11 +20,6 @@ namespace ErrorCodes
|
|||||||
|
|
||||||
void RestoreChunkInfosTransform::transform(Chunk & chunk)
|
void RestoreChunkInfosTransform::transform(Chunk & chunk)
|
||||||
{
|
{
|
||||||
LOG_TRACE(getLogger("RestoreChunkInfosTransform"), "chunk infos before: {}:{}, append: {}:{}, chunk has rows {}",
|
|
||||||
chunk.getChunkInfos().size(), chunk.getChunkInfos().debug(),
|
|
||||||
chunk_infos.size(), chunk_infos.debug(),
|
|
||||||
chunk.getNumRows());
|
|
||||||
|
|
||||||
chunk.getChunkInfos().append(chunk_infos.clone());
|
chunk.getChunkInfos().append(chunk_infos.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,9 +71,6 @@ void TokenInfo::setSourceBlockNumber(size_t sbn)
|
|||||||
|
|
||||||
void TokenInfo::setViewID(const String & id)
|
void TokenInfo::setViewID(const String & id)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(getLogger("TokenInfo"),
|
|
||||||
"token: {}, stage: {}, view id: {}",
|
|
||||||
getToken(false), stage, id);
|
|
||||||
chassert(stage == VIEW_ID);
|
chassert(stage == VIEW_ID);
|
||||||
addTokenPart(fmt::format("view-id-{}", id));
|
addTokenPart(fmt::format("view-id-{}", id));
|
||||||
stage = VIEW_BLOCK_NUMBER;
|
stage = VIEW_BLOCK_NUMBER;
|
||||||
@ -146,8 +138,6 @@ void SetInitialTokenTransform::transform(Chunk & chunk)
|
|||||||
{
|
{
|
||||||
auto token_info = chunk.getChunkInfos().get<TokenInfo>();
|
auto token_info = chunk.getChunkInfos().get<TokenInfo>();
|
||||||
|
|
||||||
LOG_DEBUG(getLogger("SetInitialTokenTransform"), "has token_info {}", bool(token_info));
|
|
||||||
|
|
||||||
if (!token_info)
|
if (!token_info)
|
||||||
throw Exception(
|
throw Exception(
|
||||||
ErrorCodes::LOGICAL_ERROR,
|
ErrorCodes::LOGICAL_ERROR,
|
||||||
@ -208,7 +198,6 @@ void ResetTokenTransform::transform(Chunk & chunk)
|
|||||||
ErrorCodes::LOGICAL_ERROR,
|
ErrorCodes::LOGICAL_ERROR,
|
||||||
"TokenInfo is expected for consumed chunk in ResetTokenTransform");
|
"TokenInfo is expected for consumed chunk in ResetTokenTransform");
|
||||||
|
|
||||||
LOG_DEBUG(getLogger("ResetTokenTransform"), "token_info was {}", token_info->getToken(false));
|
|
||||||
token_info->reset();
|
token_info->reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
#include <Processors/Transforms/ExpressionTransform.h>
|
#include <Processors/Transforms/ExpressionTransform.h>
|
||||||
#include <Interpreters/ExpressionActions.h>
|
#include <Interpreters/ExpressionActions.h>
|
||||||
|
|
||||||
#include <Common/logger_useful.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
@ -17,9 +17,6 @@ SquashingChunksTransform::SquashingChunksTransform(
|
|||||||
|
|
||||||
void SquashingChunksTransform::onConsume(Chunk chunk)
|
void SquashingChunksTransform::onConsume(Chunk chunk)
|
||||||
{
|
{
|
||||||
// LOG_DEBUG(getLogger("SquashingChunksTransform"),
|
|
||||||
// "onConsume {}", chunk.getNumRows());
|
|
||||||
|
|
||||||
auto result = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
|
auto result = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
|
||||||
cur_chunk = Chunk(result.block.getColumns(), result.block.rows());
|
cur_chunk = Chunk(result.block.getColumns(), result.block.rows());
|
||||||
|
|
||||||
@ -36,11 +33,6 @@ void SquashingChunksTransform::onConsume(Chunk chunk)
|
|||||||
cur_chunk.setChunkInfos(chunk.getChunkInfos());
|
cur_chunk.setChunkInfos(chunk.getChunkInfos());
|
||||||
cur_chunkinfos = {};
|
cur_chunkinfos = {};
|
||||||
}
|
}
|
||||||
|
|
||||||
// LOG_DEBUG(getLogger("SquashingChunksTransform"),
|
|
||||||
// "got result rows {}, size {}, columns {}, infos: {}/{}",
|
|
||||||
// cur_chunk.getNumRows(), cur_chunk.bytes(), cur_chunk.getNumColumns(),
|
|
||||||
// cur_chunk.getChunkInfos().size(), cur_chunk.getChunkInfos().debug());
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -90,11 +82,6 @@ SimpleSquashingChunksTransform::SimpleSquashingChunksTransform(
|
|||||||
|
|
||||||
void SimpleSquashingChunksTransform::consume(Chunk chunk)
|
void SimpleSquashingChunksTransform::consume(Chunk chunk)
|
||||||
{
|
{
|
||||||
// LOG_DEBUG(getLogger("SimpleSquashingChunksTransform"),
|
|
||||||
// "transform rows {}, size {}, columns {}, infos: {}/{}",
|
|
||||||
// chunk.getNumRows(), chunk.bytes(), chunk.getNumColumns(),
|
|
||||||
// chunk.getChunkInfos().size(), chunk.getChunkInfos().debug());
|
|
||||||
|
|
||||||
auto result = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
|
auto result = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
|
||||||
|
|
||||||
if (result.block)
|
if (result.block)
|
||||||
@ -110,11 +97,6 @@ void SimpleSquashingChunksTransform::consume(Chunk chunk)
|
|||||||
squashed_chunk.setChunkInfos(chunk.getChunkInfos());
|
squashed_chunk.setChunkInfos(chunk.getChunkInfos());
|
||||||
squashed_info = {};
|
squashed_info = {};
|
||||||
}
|
}
|
||||||
|
|
||||||
// LOG_DEBUG(getLogger("SimpleSquashingChunksTransform"),
|
|
||||||
// "got result rows {}, size {}, columns {}, infos: {}/{}",
|
|
||||||
// squashed_chunk.getNumRows(), squashed_chunk.bytes(), squashed_chunk.getNumColumns(),
|
|
||||||
// squashed_chunk.getChunkInfos().size(), squashed_chunk.getChunkInfos().debug());
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -50,8 +50,6 @@ MergeTreeSink::MergeTreeSink(
|
|||||||
, context(context_)
|
, context(context_)
|
||||||
, storage_snapshot(storage.getStorageSnapshotWithoutData(metadata_snapshot, context_))
|
, storage_snapshot(storage.getStorageSnapshotWithoutData(metadata_snapshot, context_))
|
||||||
{
|
{
|
||||||
LOG_INFO(storage.log, "MergeTreeSink() called for {}.{}",
|
|
||||||
storage_.getStorageID().database_name, storage_.getStorageID().getTableName());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreeSink::onStart()
|
void MergeTreeSink::onStart()
|
||||||
@ -68,10 +66,6 @@ void MergeTreeSink::onFinish()
|
|||||||
|
|
||||||
void MergeTreeSink::consume(Chunk & chunk)
|
void MergeTreeSink::consume(Chunk & chunk)
|
||||||
{
|
{
|
||||||
LOG_INFO(storage.log, "consume() called num_blocks_processed {}, chunks: rows {} columns {} bytes {}",
|
|
||||||
num_blocks_processed,
|
|
||||||
chunk.getNumRows(), chunk.getNumColumns(), chunk.bytes());
|
|
||||||
|
|
||||||
if (num_blocks_processed > 0)
|
if (num_blocks_processed > 0)
|
||||||
storage.delayInsertOrThrowIfNeeded(nullptr, context, false);
|
storage.delayInsertOrThrowIfNeeded(nullptr, context, false);
|
||||||
|
|
||||||
@ -81,8 +75,6 @@ void MergeTreeSink::consume(Chunk & chunk)
|
|||||||
|
|
||||||
auto part_blocks = MergeTreeDataWriter::splitBlockIntoParts(std::move(block), max_parts_per_block, metadata_snapshot, context);
|
auto part_blocks = MergeTreeDataWriter::splitBlockIntoParts(std::move(block), max_parts_per_block, metadata_snapshot, context);
|
||||||
|
|
||||||
LOG_INFO(storage.log, "consume() called part_blocks.count {}", part_blocks.size());
|
|
||||||
|
|
||||||
using DelayedPartitions = std::vector<MergeTreeSink::DelayedChunk::Partition>;
|
using DelayedPartitions = std::vector<MergeTreeSink::DelayedChunk::Partition>;
|
||||||
DelayedPartitions partitions;
|
DelayedPartitions partitions;
|
||||||
|
|
||||||
@ -106,18 +98,7 @@ void MergeTreeSink::consume(Chunk & chunk)
|
|||||||
context->getSettingsRef().insert_deduplication_token.value);
|
context->getSettingsRef().insert_deduplication_token.value);
|
||||||
|
|
||||||
if (token_info->tokenInitialized())
|
if (token_info->tokenInitialized())
|
||||||
{
|
|
||||||
block_dedup_token = token_info->getToken();
|
block_dedup_token = token_info->getToken();
|
||||||
|
|
||||||
LOG_DEBUG(storage.log,
|
|
||||||
"dedup token from insert deduplication token in chunk: {}",
|
|
||||||
block_dedup_token);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG_DEBUG(storage.log,
|
|
||||||
"dedup token from hash is calculated");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto & current_block : part_blocks)
|
for (auto & current_block : part_blocks)
|
||||||
@ -162,13 +143,6 @@ void MergeTreeSink::consume(Chunk & chunk)
|
|||||||
else
|
else
|
||||||
max_insert_delayed_streams_for_parallel_write = 0;
|
max_insert_delayed_streams_for_parallel_write = 0;
|
||||||
|
|
||||||
LOG_INFO(storage.log, "consume() called for {}.{} "
|
|
||||||
"streams {} + {} -> {}, "
|
|
||||||
"max {} support_parallel_write {}",
|
|
||||||
storage.getStorageID().database_name, storage.getStorageID().getTableName(),
|
|
||||||
streams, temp_part.streams.size(), streams + temp_part.streams.size(),
|
|
||||||
max_insert_delayed_streams_for_parallel_write, support_parallel_write);
|
|
||||||
|
|
||||||
/// In case of too much columns/parts in block, flush explicitly.
|
/// In case of too much columns/parts in block, flush explicitly.
|
||||||
streams += temp_part.streams.size();
|
streams += temp_part.streams.size();
|
||||||
|
|
||||||
@ -211,12 +185,8 @@ void MergeTreeSink::finishDelayedChunk()
|
|||||||
if (!delayed_chunk)
|
if (!delayed_chunk)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
LOG_INFO(storage.log, "finishDelayedChunk() called partitions count {}", delayed_chunk->partitions.size());
|
|
||||||
|
|
||||||
for (auto & partition : delayed_chunk->partitions)
|
for (auto & partition : delayed_chunk->partitions)
|
||||||
{
|
{
|
||||||
LOG_INFO(storage.log, "finishDelayedChunk() part name {} dedup_token {}", partition.temp_part.part->name, partition.block_dedup_token);
|
|
||||||
|
|
||||||
ProfileEventsScope scoped_attach(&partition.part_counters);
|
ProfileEventsScope scoped_attach(&partition.part_counters);
|
||||||
|
|
||||||
partition.temp_part.finalize();
|
partition.temp_part.finalize();
|
||||||
@ -234,14 +204,10 @@ void MergeTreeSink::finishDelayedChunk()
|
|||||||
|
|
||||||
auto * deduplication_log = storage.getDeduplicationLog();
|
auto * deduplication_log = storage.getDeduplicationLog();
|
||||||
|
|
||||||
LOG_INFO(storage.log, "finishDelayedChunk() has dedup log {}", bool(deduplication_log));
|
|
||||||
|
|
||||||
if (deduplication_log)
|
if (deduplication_log)
|
||||||
{
|
{
|
||||||
const String block_id = part->getZeroLevelPartBlockID(partition.block_dedup_token);
|
const String block_id = part->getZeroLevelPartBlockID(partition.block_dedup_token);
|
||||||
|
|
||||||
LOG_INFO(storage.log, "finishDelayedChunk() block_dedup_token={}, block_id={}", partition.block_dedup_token, block_id);
|
|
||||||
|
|
||||||
auto res = deduplication_log->addPart(block_id, part->info);
|
auto res = deduplication_log->addPart(block_id, part->info);
|
||||||
if (!res.second)
|
if (!res.second)
|
||||||
{
|
{
|
||||||
|
@ -336,9 +336,6 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
|
|||||||
|
|
||||||
void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
|
void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(getLogger("MergedBlockOutputStream()"), "writeImpl block rows {} size {} getPartDirectory {}",
|
|
||||||
block.rows(), block.bytes(), data_part_storage->getPartDirectory());
|
|
||||||
|
|
||||||
block.checkNumberOfRows();
|
block.checkNumberOfRows();
|
||||||
size_t rows = block.rows();
|
size_t rows = block.rows();
|
||||||
if (!rows)
|
if (!rows)
|
||||||
|
@ -311,20 +311,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
|
|||||||
context->getSettingsRef().insert_deduplication_token.value);
|
context->getSettingsRef().insert_deduplication_token.value);
|
||||||
|
|
||||||
if (token_info->tokenInitialized())
|
if (token_info->tokenInitialized())
|
||||||
{
|
|
||||||
/// multiple blocks can be inserted within the same insert query
|
|
||||||
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
|
|
||||||
block_dedup_token = token_info->getToken();
|
block_dedup_token = token_info->getToken();
|
||||||
|
|
||||||
LOG_DEBUG(storage.log,
|
|
||||||
"dedup token from insert deduplication token in chunk: {}",
|
|
||||||
block_dedup_token);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG_DEBUG(storage.log,
|
|
||||||
"dedup token from hash is calculated");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto part_blocks = MergeTreeDataWriter::splitBlockIntoParts(std::move(block), max_parts_per_block, metadata_snapshot, context, async_insert_info);
|
auto part_blocks = MergeTreeDataWriter::splitBlockIntoParts(std::move(block), max_parts_per_block, metadata_snapshot, context, async_insert_info);
|
||||||
|
@ -1415,11 +1415,6 @@ void StorageWindowView::eventTimeParser(const ASTCreateQuery & query)
|
|||||||
void StorageWindowView::writeIntoWindowView(
|
void StorageWindowView::writeIntoWindowView(
|
||||||
StorageWindowView & window_view, Block && block, Chunk::ChunkInfoCollection && chunk_infos, ContextPtr local_context)
|
StorageWindowView & window_view, Block && block, Chunk::ChunkInfoCollection && chunk_infos, ContextPtr local_context)
|
||||||
{
|
{
|
||||||
LOG_TRACE(getLogger("StorageWindowView"), "writeIntoWindowView: rows {}, infos {} with {}, window column {}",
|
|
||||||
block.rows(),
|
|
||||||
chunk_infos.size(), chunk_infos.debug(),
|
|
||||||
window_view.timestamp_column_name);
|
|
||||||
|
|
||||||
window_view.throwIfWindowViewIsDisabled(local_context);
|
window_view.throwIfWindowViewIsDisabled(local_context);
|
||||||
while (window_view.modifying_query)
|
while (window_view.modifying_query)
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||||
@ -1464,9 +1459,6 @@ void StorageWindowView::writeIntoWindowView(
|
|||||||
lateness_bound = t_max_fired_watermark;
|
lateness_bound = t_max_fired_watermark;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TRACE(getLogger("StorageWindowView"), "writeIntoWindowView: lateness_bound {}, window_view.is_proctime {}",
|
|
||||||
lateness_bound, window_view.is_proctime);
|
|
||||||
|
|
||||||
if (lateness_bound > 0) /// Add filter, which leaves rows with timestamp >= lateness_bound
|
if (lateness_bound > 0) /// Add filter, which leaves rows with timestamp >= lateness_bound
|
||||||
{
|
{
|
||||||
auto filter_function = makeASTFunction(
|
auto filter_function = makeASTFunction(
|
||||||
@ -1583,9 +1575,6 @@ void StorageWindowView::writeIntoWindowView(
|
|||||||
|
|
||||||
if (block_max_timestamp)
|
if (block_max_timestamp)
|
||||||
window_view.updateMaxTimestamp(block_max_timestamp);
|
window_view.updateMaxTimestamp(block_max_timestamp);
|
||||||
|
|
||||||
LOG_TRACE(getLogger("StorageWindowView"), "writeIntoWindowView: block_max_timestamp {}",
|
|
||||||
block_max_timestamp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
UInt32 lateness_upper_bound = 0;
|
UInt32 lateness_upper_bound = 0;
|
||||||
|
Loading…
Reference in New Issue
Block a user