Clumsy attempt to fix progress.

This commit is contained in:
Nikolai Kochetov 2022-11-15 21:23:18 +00:00
parent 10f449c6c1
commit 1b1d23c100
2 changed files with 46 additions and 11 deletions

View File

@ -226,7 +226,7 @@ struct MergeTreeBaseSelectProcessor::AsyncReadingState
/// * background thread changes status InProgress -> IsFinished
/// * (status == InProgress) => (MergeTreeBaseSelectProcessor is alive)
void setResult(std::optional<Chunk> chunk_)
void setResult(std::optional<ChunkWithProgress> chunk_)
{
chunk = std::move(chunk_);
control->finish();
@ -244,7 +244,7 @@ struct MergeTreeBaseSelectProcessor::AsyncReadingState
return control;
}
std::optional<Chunk> getResult()
std::optional<ChunkWithProgress> getResult()
{
control->event.read();
control->stage = Stage::NotStarted;
@ -283,9 +283,9 @@ struct MergeTreeBaseSelectProcessor::AsyncReadingState
}
private:
std::optional<Chunk> chunk;
std::exception_ptr exception;
std::shared_ptr<Control> control;
std::optional<ChunkWithProgress> chunk;
std::exception_ptr exception;
};
@ -311,13 +311,31 @@ ISource::Status MergeTreeBaseSelectProcessor::prepare()
std::optional<Chunk> MergeTreeBaseSelectProcessor::tryGenerate()
{
if (!reader_settings.use_asynchronous_read_from_pool)
return read();
{
auto chunk = read();
if (chunk)
{
progress(chunk->num_read_rows, chunk->num_read_bytes);
return std::move(chunk->chunk);
}
return {};
}
if (!async_reading_state)
async_reading_state = std::make_unique<AsyncReadingState>();
if (async_reading_state->getStage() == AsyncReadingState::Stage::IsFinished)
return async_reading_state->getResult();
{
auto chunk = async_reading_state->getResult();
if (chunk)
{
progress(chunk->num_read_rows, chunk->num_read_bytes);
return std::move(chunk->chunk);
}
return {};
}
assert(async_reading_state->getStage() == AsyncReadingState::Stage::NotStarted);
@ -348,7 +366,7 @@ int MergeTreeBaseSelectProcessor::schedule()
}
std::optional<Chunk> MergeTreeBaseSelectProcessor::read()
std::optional<MergeTreeBaseSelectProcessor::ChunkWithProgress> MergeTreeBaseSelectProcessor::read()
{
while (!isCancelled())
{
@ -381,7 +399,10 @@ std::optional<Chunk> MergeTreeBaseSelectProcessor::read()
ordered_columns.push_back(res.block.getByName(name).column);
}
return Chunk(ordered_columns, res.row_count);
return ChunkWithProgress{
.chunk = Chunk(ordered_columns, res.row_count),
.num_read_rows = res.num_read_rows,
.num_read_bytes = res.num_read_bytes};
}
}
@ -549,7 +570,8 @@ MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::rea
UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows;
progress(read_result.numReadRows(), read_result.numBytesRead());
size_t num_read_rows = read_result.numReadRows();
size_t num_read_bytes = read_result.numBytesRead();
if (task->size_predictor)
{
@ -562,7 +584,11 @@ MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::rea
if (read_result.num_rows == 0)
return {};
BlockAndRowCount res = { sample_block.cloneWithColumns(read_result.columns), read_result.num_rows };
BlockAndRowCount res = {
.block = sample_block.cloneWithColumns(read_result.columns),
.row_count = read_result.num_rows,
.num_read_rows = num_read_rows,
.num_read_bytes = num_read_bytes };
return res;
}

View File

@ -66,10 +66,19 @@ protected:
{
Block block;
size_t row_count = 0;
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
};
struct ChunkWithProgress
{
Chunk chunk;
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
};
std::optional<Chunk> tryGenerate() final;
std::optional<Chunk> read();
std::optional<ChunkWithProgress> read();
int schedule() override;