Fix metrics and progress.

This commit is contained in:
Nikolai Kochetov 2022-11-16 14:10:56 +00:00
parent 1b1d23c100
commit 17b75d3f5c
2 changed files with 49 additions and 32 deletions

View File

@ -6,6 +6,7 @@
#include <Storages/MergeTree/RequestResponse.h>
#include <Columns/FilterDescription.h>
#include <Common/typeid_cast.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h>
@ -226,7 +227,7 @@ struct MergeTreeBaseSelectProcessor::AsyncReadingState
/// * background thread changes status InProgress -> IsFinished
/// * (status == InProgress) => (MergeTreeBaseSelectProcessor is alive)
void setResult(std::optional<ChunkWithProgress> chunk_)
void setResult(ChunkWithProgress chunk_)
{
chunk = std::move(chunk_);
control->finish();
@ -244,7 +245,12 @@ struct MergeTreeBaseSelectProcessor::AsyncReadingState
return control;
}
std::optional<ChunkWithProgress> getResult()
void schedule(ThreadPool::Job job)
{
callback_runner(std::move(job), 0);
}
ChunkWithProgress getResult()
{
control->event.read();
control->stage = Stage::NotStarted;
@ -261,6 +267,7 @@ struct MergeTreeBaseSelectProcessor::AsyncReadingState
AsyncReadingState()
{
control = std::make_shared<Control>();
callback_runner = threadPoolCallbackRunner<void>(IOThreadPool::get(), "MergeTreeRead");
}
~AsyncReadingState()
@ -283,8 +290,9 @@ struct MergeTreeBaseSelectProcessor::AsyncReadingState
}
private:
ThreadPoolCallbackRunner<void> callback_runner;
std::shared_ptr<Control> control;
std::optional<ChunkWithProgress> chunk;
ChunkWithProgress chunk;
std::exception_ptr exception;
};
@ -308,34 +316,28 @@ ISource::Status MergeTreeBaseSelectProcessor::prepare()
}
std::optional<Chunk> MergeTreeBaseSelectProcessor::reportProgress(ChunkWithProgress chunk)
{
if (chunk.num_read_rows || chunk.num_read_bytes)
progress(chunk.num_read_rows, chunk.num_read_bytes);
if (chunk.chunk.hasRows())
return std::move(chunk.chunk);
return {};
}
std::optional<Chunk> MergeTreeBaseSelectProcessor::tryGenerate()
{
if (!reader_settings.use_asynchronous_read_from_pool)
{
auto chunk = read();
if (chunk)
{
progress(chunk->num_read_rows, chunk->num_read_bytes);
return std::move(chunk->chunk);
}
return {};
}
return reportProgress(read());
if (!async_reading_state)
async_reading_state = std::make_unique<AsyncReadingState>();
if (async_reading_state->getStage() == AsyncReadingState::Stage::IsFinished)
{
auto chunk = async_reading_state->getResult();
if (chunk)
{
progress(chunk->num_read_rows, chunk->num_read_bytes);
return std::move(chunk->chunk);
}
return {};
}
return reportProgress(async_reading_state->getResult());
assert(async_reading_state->getStage() == AsyncReadingState::Stage::NotStarted);
@ -354,7 +356,7 @@ std::optional<Chunk> MergeTreeBaseSelectProcessor::tryGenerate()
}
};
IOThreadPool::get().scheduleOrThrowOnError(std::move(job));
async_reading_state->schedule(std::move(job));
return Chunk();
}
@ -366,20 +368,23 @@ int MergeTreeBaseSelectProcessor::schedule()
}
std::optional<MergeTreeBaseSelectProcessor::ChunkWithProgress> MergeTreeBaseSelectProcessor::read()
MergeTreeBaseSelectProcessor::ChunkWithProgress MergeTreeBaseSelectProcessor::read()
{
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
while (!isCancelled())
{
try
{
if ((!task || task->isFinished()) && !getNewTask())
return {};
break;
}
catch (const Exception & e)
{
/// See MergeTreeBaseSelectProcessor::getTaskFromBuffer()
if (e.code() == ErrorCodes::QUERY_WAS_CANCELLED)
return {};
break;
throw;
}
@ -399,14 +404,24 @@ std::optional<MergeTreeBaseSelectProcessor::ChunkWithProgress> MergeTreeBaseSele
ordered_columns.push_back(res.block.getByName(name).column);
}
/// Account a progress from previous empty chunks.
res.num_read_rows += num_read_rows;
res.num_read_bytes += num_read_bytes;
num_read_rows = num_read_bytes = 0;
return ChunkWithProgress{
.chunk = Chunk(ordered_columns, res.row_count),
.num_read_rows = res.num_read_rows,
.num_read_bytes = res.num_read_bytes};
}
else
{
num_read_rows += res.num_read_rows;
num_read_bytes += res.num_read_bytes;
}
}
return {};
return {Chunk(), num_read_rows, num_read_bytes};
}
void MergeTreeBaseSelectProcessor::initializeMergeTreeReadersForPart(
@ -581,11 +596,12 @@ MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::rea
task->size_predictor->update(sample_block, read_result.columns, read_result.num_rows);
}
if (read_result.num_rows == 0)
return {};
Block block;
if (read_result.num_rows != 0)
block = sample_block.cloneWithColumns(read_result.columns);
BlockAndRowCount res = {
.block = sample_block.cloneWithColumns(read_result.columns),
.block = std::move(block),
.row_count = read_result.num_rows,
.num_read_rows = num_read_rows,
.num_read_bytes = num_read_bytes };

View File

@ -77,8 +77,9 @@ protected:
size_t num_read_bytes = 0;
};
std::optional<Chunk> reportProgress(ChunkWithProgress chunk);
std::optional<Chunk> tryGenerate() final;
std::optional<ChunkWithProgress> read();
ChunkWithProgress read();
int schedule() override;