diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index 0024246b56c..a6c81f794d4 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -226,7 +226,7 @@ struct MergeTreeBaseSelectProcessor::AsyncReadingState /// * background thread changes status InProgress -> IsFinished /// * (status == InProgress) => (MergeTreeBaseSelectProcessor is alive) - void setResult(std::optional chunk_) + void setResult(std::optional chunk_) { chunk = std::move(chunk_); control->finish(); @@ -244,7 +244,7 @@ struct MergeTreeBaseSelectProcessor::AsyncReadingState return control; } - std::optional getResult() + std::optional getResult() { control->event.read(); control->stage = Stage::NotStarted; @@ -283,9 +283,9 @@ struct MergeTreeBaseSelectProcessor::AsyncReadingState } private: - std::optional chunk; - std::exception_ptr exception; std::shared_ptr control; + std::optional chunk; + std::exception_ptr exception; }; @@ -311,13 +311,31 @@ ISource::Status MergeTreeBaseSelectProcessor::prepare() std::optional MergeTreeBaseSelectProcessor::tryGenerate() { if (!reader_settings.use_asynchronous_read_from_pool) - return read(); + { + auto chunk = read(); + if (chunk) + { + progress(chunk->num_read_rows, chunk->num_read_bytes); + return std::move(chunk->chunk); + } + + return {}; + } if (!async_reading_state) async_reading_state = std::make_unique(); if (async_reading_state->getStage() == AsyncReadingState::Stage::IsFinished) - return async_reading_state->getResult(); + { + auto chunk = async_reading_state->getResult(); + if (chunk) + { + progress(chunk->num_read_rows, chunk->num_read_bytes); + return std::move(chunk->chunk); + } + + return {}; + } assert(async_reading_state->getStage() == AsyncReadingState::Stage::NotStarted); @@ -348,7 +366,7 @@ int MergeTreeBaseSelectProcessor::schedule() } -std::optional MergeTreeBaseSelectProcessor::read() +std::optional MergeTreeBaseSelectProcessor::read() { while (!isCancelled()) { @@ -381,7 +399,10 @@ std::optional MergeTreeBaseSelectProcessor::read() ordered_columns.push_back(res.block.getByName(name).column); } - return Chunk(ordered_columns, res.row_count); + return ChunkWithProgress{ + .chunk = Chunk(ordered_columns, res.row_count), + .num_read_rows = res.num_read_rows, + .num_read_bytes = res.num_read_bytes}; } } @@ -549,7 +570,8 @@ MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::rea UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows; - progress(read_result.numReadRows(), read_result.numBytesRead()); + size_t num_read_rows = read_result.numReadRows(); + size_t num_read_bytes = read_result.numBytesRead(); if (task->size_predictor) { @@ -562,7 +584,11 @@ MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::rea if (read_result.num_rows == 0) return {}; - BlockAndRowCount res = { sample_block.cloneWithColumns(read_result.columns), read_result.num_rows }; + BlockAndRowCount res = { + .block = sample_block.cloneWithColumns(read_result.columns), + .row_count = read_result.num_rows, + .num_read_rows = num_read_rows, + .num_read_bytes = num_read_bytes }; return res; } diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h index 94808bd2e9e..9f20b0f13af 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h @@ -66,10 +66,19 @@ protected: { Block block; size_t row_count = 0; + size_t num_read_rows = 0; + size_t num_read_bytes = 0; + }; + + struct ChunkWithProgress + { + Chunk chunk; + size_t num_read_rows = 0; + size_t num_read_bytes = 0; }; std::optional tryGenerate() final; - std::optional read(); + std::optional read(); int schedule() override;