Progress for MergeTreeSelectProcessor.

This commit is contained in:
Nikolai Kochetov 2019-10-04 18:40:05 +03:00
parent 627d48c19a
commit 23069ca6d0
15 changed files with 234 additions and 37 deletions

View File

@ -139,7 +139,7 @@ public:
* The function takes the number of rows in the last block, the number of bytes in the last block.
* Note that the callback can be called from different threads.
*/
void setProgressCallback(const ProgressCallback & callback);
virtual void setProgressCallback(const ProgressCallback & callback);
/** In this method:
@ -164,11 +164,11 @@ public:
* Based on this information, the quota and some restrictions will be checked.
* This information will also be available in the SHOW PROCESSLIST request.
*/
void setProcessListElement(QueryStatus * elem);
virtual void setProcessListElement(QueryStatus * elem);
/** Set the approximate total number of rows to read.
*/
void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
virtual void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
/** Ask to abort the receipt of data as soon as possible.
@ -209,7 +209,7 @@ public:
};
/** Set limitations that checked on each block. */
void setLimits(const LocalLimits & limits_)
virtual void setLimits(const LocalLimits & limits_)
{
limits = limits_;
}
@ -222,7 +222,7 @@ public:
/** Set the quota. If you set a quota on the amount of raw data,
* then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
*/
void setQuota(QuotaForIntervals & quota_)
virtual void setQuota(QuotaForIntervals & quota_)
{
quota = &quota_;
}

View File

@ -28,6 +28,8 @@ namespace ErrorCodes
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
extern const int QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING;
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
}

View File

@ -1,4 +1,5 @@
#include <Processors/Executors/TreeExecutor.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <stack>
namespace DB
@ -13,7 +14,7 @@ static void checkProcessorHasSingleOutput(IProcessor * processor)
ErrorCodes::LOGICAL_ERROR);
}
static void validateTree(const Processors & processors, IProcessor * root)
static void validateTree(const Processors & processors, IProcessor * root, std::vector<ISourceWithProgress *> & sources)
{
std::unordered_map<IProcessor *, size_t> index;
@ -56,6 +57,13 @@ static void validateTree(const Processors & processors, IProcessor * root)
auto & children = node->getInputs();
for (auto & child : children)
stack.push(&child.getOutputPort().getProcessor());
/// Fill sources array.
if (children.empty())
{
if (auto * source = dynamic_cast<ISourceWithProgress *>(node))
sources.push_back(source);
}
}
for (size_t i = 0; i < is_visited.size(); ++i)
@ -71,7 +79,7 @@ void TreeExecutor::init()
root = processors.back().get();
validateTree(processors, root);
validateTree(processors, root, sources_with_progress);
port = std::make_unique<InputPort>(getHeader(), root);
connect(root->getOutputs().front(), *port);
@ -170,4 +178,35 @@ Block TreeExecutor::readImpl()
}
}
void TreeExecutor::setProgressCallback(const ProgressCallback & callback)
{
for (auto & source : sources_with_progress)
source->setProgressCallback(callback);
}
void TreeExecutor::setProcessListElement(QueryStatus * elem)
{
for (auto & source : sources_with_progress)
source->setProcessListElement(elem);
}
void TreeExecutor::setLimits(const IBlockInputStream::LocalLimits & limits_)
{
for (auto & source : sources_with_progress)
source->setLimits(limits_);
}
void TreeExecutor::setQuota(QuotaForIntervals & quota_)
{
for (auto & source : sources_with_progress)
source->setQuota(quota_);
}
void TreeExecutor::addTotalRowsApprox(size_t value)
{
/// Add only for one source.
if (!sources_with_progress.empty())
sources_with_progress.front()->addTotalRowsApprox(value);
}
}

View File

@ -5,6 +5,8 @@
namespace DB
{
class ISourceWithProgress;
class TreeExecutor : public IBlockInputStream
{
public:
@ -13,6 +15,14 @@ public:
String getName() const override { return root->getName(); }
Block getHeader() const override { return root->getOutputs().front().getHeader(); }
/// This methods does not affect TreeExecutor as IBlockInputStream itself.
/// They just passed to all SourceWithProgress processors.
void setProgressCallback(const ProgressCallback & callback) final;
void setProcessListElement(QueryStatus * elem) final;
void setLimits(const LocalLimits & limits_) final;
void setQuota(QuotaForIntervals & quota_) final;
void addTotalRowsApprox(size_t value) final;
protected:
Block readImpl() override;
@ -21,6 +31,9 @@ private:
IProcessor * root = nullptr;
std::unique_ptr<InputPort> port;
/// Remember sources that support progress.
std::vector<ISourceWithProgress *> sources_with_progress;
void init();
void execute();
};

View File

@ -7,7 +7,7 @@ namespace DB
{
SourceFromInputStream::SourceFromInputStream(BlockInputStreamPtr stream_, bool force_add_aggregating_info_)
: ISource(stream_->getHeader())
: ISourceWithProgress(stream_->getHeader())
, force_add_aggregating_info(force_add_aggregating_info_)
, stream(std::move(stream_))
{

View File

@ -1,5 +1,5 @@
#pragma once
#include <Processors/ISource.h>
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
{
@ -7,7 +7,7 @@ namespace DB
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
class SourceFromInputStream : public ISource
class SourceFromInputStream : public ISourceWithProgress
{
public:
explicit SourceFromInputStream(BlockInputStreamPtr stream_, bool force_add_aggregating_info_ = false);
@ -22,6 +22,13 @@ public:
void addTotalsPort();
/// Implementation for methods from ISourceWithProgress.
void setLimits(const LocalLimits & limits_) final { stream->setLimits(limits_); }
void setQuota(QuotaForIntervals & quota_) final { stream->setQuota(quota_); }
void setProcessListElement(QueryStatus * elem) final { stream->setProcessListElement(elem); }
void setProgressCallback(const ProgressCallback & callback) final { stream->setProgressCallback(callback); }
void addTotalRowsApprox(size_t value) final { stream->addTotalRowsApprox(value); }
private:
bool has_aggregate_functions = false;
bool force_add_aggregating_info;

View File

@ -0,0 +1,69 @@
#include <Processors/Sources/SourceWithProgress.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/Quota.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
}
void SourceWithProgress::progress(const Progress & value)
{
if (total_rows_approx != 0 && process_list_elem)
{
process_list_elem->updateProgressIn({0, 0, total_rows_approx});
total_rows_approx = 0;
}
if (progress_callback)
progress_callback(value);
if (process_list_elem)
{
if (!process_list_elem->updateProgressIn(value))
cancel();
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
ProgressValues progress = process_list_elem->getProgressIn();
size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read);
/// Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
/// NOTE: Maybe it makes sense to have them checked directly in ProcessList?
if (limits.mode == LimitsMode::LIMITS_TOTAL)
{
if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read",
ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
cancel();
}
size_t total_rows = progress.total_rows_to_read;
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = total_stopwatch.elapsedMicroseconds();
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
{
/// Should be done in PipelineExecutor.
/// It is here for compatibility with IBlockInputsStream.
CurrentThread::updatePerformanceCounters();
last_profile_events_update_time = total_elapsed_microseconds;
}
/// Should be done in PipelineExecutor.
/// It is here for compatibility with IBlockInputsStream.
limits.speed_limit.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
if (quota != nullptr && limits.mode == LimitsMode::LIMITS_TOTAL)
{
quota->checkAndAddReadRowsBytes(time(nullptr), value.read_rows, value.read_bytes);
}
}
}
}

View File

@ -0,0 +1,75 @@
#pragma once
#include <Processors/ISource.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/Stopwatch.h>
namespace DB
{
/// Adds progress to ISource.
/// This class takes care of limits, quotas, callback on progress and updating performance counters for current thread.
class ISourceWithProgress : public ISource
{
public:
using ISource::ISource;
using LocalLimits = IBlockInputStream::LocalLimits;
using LimitsMode = IBlockInputStream::LimitsMode;
/// Set limitations that checked on each chunk.
virtual void setLimits(const LocalLimits & limits_) = 0;
/// Set the quota. If you set a quota on the amount of raw data,
/// then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
virtual void setQuota(QuotaForIntervals & quota_) = 0;
/// Set the pointer to the process list item.
/// General information about the resources spent on the request will be written into it.
/// Based on this information, the quota and some restrictions will be checked.
/// This information will also be available in the SHOW PROCESSLIST request.
virtual void setProcessListElement(QueryStatus * elem) = 0;
/// Set the execution progress bar callback.
/// It is called after each chunk.
/// The function takes the number of rows in the last chunk, the number of bytes in the last chunk.
/// Note that the callback can be called from different threads.
virtual void setProgressCallback(const ProgressCallback & callback) = 0;
/// Set the approximate total number of rows to read.
virtual void addTotalRowsApprox(size_t value) = 0;
};
/// Implementation for ISourceWithProgress
class SourceWithProgress : public ISourceWithProgress
{
public:
using ISourceWithProgress::ISourceWithProgress;
using LocalLimits = IBlockInputStream::LocalLimits;
using LimitsMode = IBlockInputStream::LimitsMode;
void setLimits(const LocalLimits & limits_) final { limits = limits_; }
void setQuota(QuotaForIntervals & quota_) final { quota = &quota_; }
void setProcessListElement(QueryStatus * elem) final { process_list_elem = elem; }
void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; }
void addTotalRowsApprox(size_t value) final { total_rows_approx += value; }
protected:
/// Call this method to provide information about progress.
void progress(const Progress & value);
private:
LocalLimits limits;
QuotaForIntervals * quota = nullptr;
ProgressCallback progress_callback;
QueryStatus * process_list_elem = nullptr;
/// The approximate total number of rows to read. For progress bar.
size_t total_rows_approx = 0;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time.
/// According to total_stopwatch in microseconds.
UInt64 last_profile_events_update_time = 0;
};
}

View File

@ -3,9 +3,7 @@
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Columns/FilterDescription.h>
#include <Columns/ColumnArray.h>
#include <Common/typeid_cast.h>
#include <ext/range.h>
#include <DataTypes/DataTypeNothing.h>
@ -32,7 +30,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
bool save_marks_in_cache_,
const Names & virt_column_names_)
:
ISource(getHeader(std::move(header), prewhere_info_, virt_column_names_)),
SourceWithProgress(getHeader(std::move(header), prewhere_info_, virt_column_names_)),
storage(storage_),
prewhere_info(prewhere_info_),
max_block_size_rows(max_block_size_rows_),
@ -176,7 +174,7 @@ Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows;
/// TODO: progressImpl({ read_result.numReadRows(), read_result.numBytesRead() });
progress({ read_result.numReadRows(), read_result.numBytesRead() });
if (task->size_predictor)
{

View File

@ -5,7 +5,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
#include <Processors/ISource.h>
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
{
@ -16,7 +16,7 @@ class MarkCache;
/// Base class for MergeTreeThreadSelectBlockInputStream and MergeTreeSelectBlockInputStream
class MergeTreeBaseSelectProcessor : public ISource
class MergeTreeBaseSelectProcessor : public SourceWithProgress
{
public:
MergeTreeBaseSelectProcessor(
@ -39,7 +39,7 @@ public:
protected:
Chunk generate() final;
/// Creates new this->task, and initilizes readers
/// Creates new this->task, and initializes readers.
virtual bool getNewTask() = 0;
virtual Chunk readFromPart();
@ -52,8 +52,6 @@ protected:
void initializeRangeReaders(MergeTreeReadTask & task);
size_t estimateNumRows(MergeTreeReadTask & current_task, MergeTreeRangeReader & current_reader);
protected:
const MergeTreeData & storage;

View File

@ -749,17 +749,18 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
for (size_t i = 0; i < num_streams; ++i)
{
res.push_back({std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache,
query_info.prewhere_info, settings, virt_columns)});
query_info.prewhere_info, settings, virt_columns);
if (i == 0)
{
/// Set the approximate number of rows for the first source only
/// TODO
/// res.front()->addTotalRowsApprox(total_rows);
source->addTotalRowsApprox(total_rows);
}
res.push_back({std::move(source)});
}
}
else if (sum_marks > 0)

View File

@ -1,7 +1,6 @@
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Core/Defines.h>
namespace DB
@ -39,7 +38,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
Names required_columns_,
const MarkRanges & mark_ranges_,
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
bool check_columns,
@ -55,10 +54,10 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
required_columns{required_columns_},
required_columns{std::move(required_columns_)},
data_part{owned_data_part_},
part_columns_lock(data_part->columns_lock),
all_mark_ranges(mark_ranges_),
all_mark_ranges(std::move(mark_ranges_)),
part_index_in_query(part_index_in_query_),
path(data_part->getFullPath())
{
@ -76,8 +75,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
: "")
<< " rows starting from " << data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
/// TODO
/// addTotalRowsApprox(total_rows);
addTotalRowsApprox(total_rows);
ordered_names = header_without_virtual_columns.getNames();

View File

@ -23,7 +23,7 @@ public:
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
Names column_names,
const MarkRanges & mark_ranges,
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,

View File

@ -1,7 +1,6 @@
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Core/Defines.h>
namespace DB
@ -39,7 +38,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
Names required_columns_,
const MarkRanges & mark_ranges_,
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
bool check_columns_,
@ -58,7 +57,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
required_columns{std::move(required_columns_)},
data_part{owned_data_part_},
part_columns_lock(data_part->columns_lock),
all_mark_ranges(mark_ranges_),
all_mark_ranges(std::move(mark_ranges_)),
part_index_in_query(part_index_in_query_),
check_columns(check_columns_),
path(data_part->getFullPath())
@ -77,9 +76,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
: "")
<< " rows starting from " << data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
/// TODO
/// addTotalRowsApprox(total_rows);
addTotalRowsApprox(total_rows);
ordered_names = header_without_virtual_columns.getNames();
}

View File

@ -23,7 +23,7 @@ public:
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
Names column_names_,
const MarkRanges & mark_ranges,
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,