Added more comments.

This commit is contained in:
Nikolai Kochetov 2019-10-10 17:16:15 +03:00
parent 378052743d
commit ef14df4632
14 changed files with 52 additions and 24 deletions

View File

@ -20,6 +20,7 @@ public:
/// Verify that the speed is not too low after the specified time has elapsed.
Poco::Timespan timeout_before_checking_execution_speed = 0;
/// Pause execution in case if speed limits were exceeded.
void throttle(size_t read_rows, size_t read_bytes, size_t total_rows, UInt64 total_elapsed_microseconds);
};

View File

@ -219,11 +219,11 @@ static bool handleOverflowMode(OverflowMode mode, const String & message, int co
bool IBlockInputStream::checkTimeLimit()
{
if (limits.speed_limit.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limit.max_execution_time.totalMicroseconds()) * 1000)
if (limits.speed_limits.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(limits.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.speed_limit.max_execution_time.totalMicroseconds() / 1000000.0),
+ " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
@ -289,7 +289,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
last_profile_events_update_time = total_elapsed_microseconds;
}
limits.speed_limit.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
if (quota != nullptr && limits.mode == LIMITS_TOTAL)
{

View File

@ -202,7 +202,7 @@ public:
SizeLimits size_limits;
ExecutionSpeedLimits speed_limit;
ExecutionSpeedLimits speed_limits;
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
};

View File

@ -1587,7 +1587,7 @@ void InterpreterSelectQuery::executeFetchColumns(
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
limits.speed_limit.max_execution_time = settings.max_execution_time;
limits.speed_limits.max_execution_time = settings.max_execution_time;
limits.timeout_overflow_mode = settings.timeout_overflow_mode;
/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
@ -1599,11 +1599,11 @@ void InterpreterSelectQuery::executeFetchColumns(
*/
if (options.to_stage == QueryProcessingStage::Complete)
{
limits.speed_limit.min_execution_speed = settings.min_execution_speed;
limits.speed_limit.max_execution_speed = settings.max_execution_speed;
limits.speed_limit.min_execution_speed_bytes = settings.min_execution_speed_bytes;
limits.speed_limit.max_execution_speed_bytes = settings.max_execution_speed_bytes;
limits.speed_limit.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
limits.speed_limits.min_execution_speed = settings.min_execution_speed;
limits.speed_limits.max_execution_speed = settings.max_execution_speed;
limits.speed_limits.min_execution_speed_bytes = settings.min_execution_speed_bytes;
limits.speed_limits.max_execution_speed_bytes = settings.max_execution_speed_bytes;
limits.speed_limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
}
QuotaForIntervals & quota = context.getQuota();

View File

@ -14,6 +14,8 @@ static void checkProcessorHasSingleOutput(IProcessor * processor)
ErrorCodes::LOGICAL_ERROR);
}
/// Check tree invariants (described in TreeExecutor.h).
/// Collect sources with progress.
static void validateTree(const Processors & processors, IProcessor * root, std::vector<ISourceWithProgress *> & sources)
{
std::unordered_map<IProcessor *, size_t> index;

View File

@ -7,9 +7,17 @@ namespace DB
class ISourceWithProgress;
/// It's a wrapper from processors tree-shaped pipeline to block input stream.
/// Execute all processors in a single thread, by in-order tree traverse.
/// Also, support fro progress and quotas.
class TreeExecutor : public IBlockInputStream
{
public:
/// Last processor in list must be a tree root.
/// It is checked that
/// * processors form a tree
/// * all processors are attainable from root
/// * there is no other connected processors
explicit TreeExecutor(Processors processors_) : processors(std::move(processors_)) { init(); }
String getName() const override { return root->getName(); }
@ -35,6 +43,7 @@ private:
std::vector<ISourceWithProgress *> sources_with_progress;
void init();
/// Execute tree step-by-step until root returns next chunk or execution is finished.
void execute();
};

View File

@ -7,6 +7,7 @@ namespace DB
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
/// Wrapper for IBlockInputStream which implements ISourceWithProgress.
class SourceFromInputStream : public ISourceWithProgress
{
public:

View File

@ -12,6 +12,8 @@ namespace ErrorCodes
extern const int TOO_MANY_BYTES;
}
/// Aggregated copy-paste from IBlockInputStream::progressImpl.
/// Most of this must be done in PipelineExecutor outside. Now it's done for compatibility with IBlockInputStream.
void SourceWithProgress::progress(const Progress & value)
{
if (total_rows_approx != 0)
@ -35,13 +37,17 @@ void SourceWithProgress::progress(const Progress & value)
if (!process_list_elem->updateProgressIn(value))
cancel();
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
/// The total amount of data processed or intended for processing in all sources, possibly on remote servers.
ProgressValues progress = process_list_elem->getProgressIn();
size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read);
/// Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
/// Check the restrictions on the
/// * amount of data to read
/// * speed of the query
/// * quota on the amount of data to read
/// NOTE: Maybe it makes sense to have them checked directly in ProcessList?
if (limits.mode == LimitsMode::LIMITS_TOTAL)
{
if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read",
@ -64,7 +70,7 @@ void SourceWithProgress::progress(const Progress & value)
/// Should be done in PipelineExecutor.
/// It is here for compatibility with IBlockInputsStream.
limits.speed_limit.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
if (quota != nullptr && limits.mode == LimitsMode::LIMITS_TOTAL)
{

View File

@ -80,11 +80,11 @@ void LimitsCheckingTransform::transform(Chunk & chunk)
bool LimitsCheckingTransform::checkTimeLimit()
{
if (limits.speed_limit.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limit.max_execution_time.totalMicroseconds()) * 1000)
if (limits.speed_limits.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(limits.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.speed_limit.max_execution_time.totalMicroseconds() / 1000000.0),
+ " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;

View File

@ -4,6 +4,7 @@
namespace DB
{
/// Reverse rows in chunk.
class ReverseTransform : public ISimpleTransform
{
public:

View File

@ -238,6 +238,8 @@ public:
* if the storage can return a different number of streams.
*
* It is guaranteed that the structure of the table will not change over the lifetime of the returned streams (that is, there will not be ALTER, RENAME and DROP).
*
* Default implementation calls `readWithProcessors` and wraps into TreeExecutor.
*/
virtual BlockInputStreams read(
const Names & /*column_names*/,
@ -247,6 +249,8 @@ public:
size_t /*max_block_size*/,
unsigned /*num_streams*/);
/** The same as read, but returns processors.
*/
virtual Pipes readWithProcessors(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,

View File

@ -388,7 +388,7 @@ bool StorageKafka::streamToViews()
// Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits;
limits.speed_limit.max_execution_time = settings.stream_flush_interval_ms;
limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms;
limits.timeout_overflow_mode = OverflowMode::BREAK;
stream->setLimits(limits);
}

View File

@ -15,7 +15,7 @@ class UncompressedCache;
class MarkCache;
/// Base class for MergeTreeThreadSelectBlockInputStream and MergeTreeSelectBlockInputStream
/// Base class for MergeTreeThreadSelectProcessor and MergeTreeSelectProcessor
class MergeTreeBaseSelectProcessor : public SourceWithProgress
{
public:
@ -46,8 +46,10 @@ protected:
Chunk readFromPartImpl();
/// Two versions for header and chunk.
static void injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns);
static void injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns);
static Block getHeader(Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns);
void initializeRangeReaders(MergeTreeReadTask & task);
@ -68,6 +70,7 @@ protected:
bool save_marks_in_cache;
Names virt_column_names;
/// This header is used for chunks from readFromPart().
Block header_without_virtual_columns;
std::unique_ptr<MergeTreeReadTask> task;

View File

@ -20,11 +20,12 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
const Settings & settings,
const Names & virt_column_names_)
:
MergeTreeBaseSelectProcessor{pool_->getHeader(), storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, use_uncompressed_cache_, true, virt_column_names_},
thread{thread_},
pool{pool_}
MergeTreeBaseSelectProcessor{pool_->getHeader(), storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size,
use_uncompressed_cache_, true, virt_column_names_},
thread{thread_},
pool{pool_}
{
/// round min_marks_to_read up to nearest multiple of block_size expressed in marks
/// If granularity is adaptive it doesn't make sense