Add time limit to SourceWithProgress.

This commit is contained in:
Nikolai Kochetov 2020-01-27 19:58:25 +03:00
parent 45741eb03a
commit 43455d79b4
7 changed files with 43 additions and 65 deletions

View File

@ -17,6 +17,8 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_SLOW;
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
}
static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_in_seconds, UInt64 total_elapsed_microseconds)
@ -88,4 +90,29 @@ void ExecutionSpeedLimits::throttle(
}
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
bool ExecutionSpeedLimits::checkTimeLimit(UInt64 elapsed_ns, OverflowMode overflow_mode)
{
if (max_execution_time != 0
&& elapsed_ns > static_cast<UInt64>(max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(overflow_mode,
"Timeout exceeded: elapsed " + toString(static_cast<double>(elapsed_ns) / 1000000000ULL)
+ " seconds, maximum: " + toString(max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
}

View File

@ -2,6 +2,7 @@
#include <Poco/Timespan.h>
#include <Core/Types.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
@ -23,6 +24,8 @@ public:
/// Pause execution in case if speed limits were exceeded.
void throttle(size_t read_rows, size_t read_bytes, size_t total_rows_to_read, UInt64 total_elapsed_microseconds);
bool checkTimeLimit(UInt64 elapsed_ns, OverflowMode overflow_mode);
};
}

View File

@ -54,7 +54,7 @@ Block IBlockInputStream::read()
if (isCancelledOrThrowIfKilled())
return res;
if (!checkTimeLimit())
if (!limits.speed_limits.checkTimeLimit(info.total_stopwatch.elapsed(), limits.timeout_overflow_mode))
limit_exceeded_need_break = true;
if (!limit_exceeded_need_break)
@ -203,33 +203,6 @@ void IBlockInputStream::updateExtremes(Block & block)
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
bool IBlockInputStream::checkTimeLimit()
{
if (limits.speed_limits.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(limits.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
void IBlockInputStream::checkQuota(Block & block)
{
switch (limits.mode)

View File

@ -10,6 +10,15 @@ namespace ErrorCodes
{
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
extern const int TIMEOUT_EXCEEDED;
}
void SourceWithProgress::work()
{
if (!limits.speed_limits.checkTimeLimit(total_stopwatch.elapsed(), limits.timeout_overflow_mode))
cancel();
else
ISourceWithProgress::work();
}
/// Aggregated copy-paste from IBlockInputStream::progressImpl.

View File

@ -58,6 +58,8 @@ protected:
/// Call this method to provide information about progress.
void progress(const Progress & value);
void work() override;
private:
LocalLimits limits;
std::shared_ptr<QuotaContext> quota;

View File

@ -17,20 +17,6 @@ namespace ErrorCodes
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
void ProcessorProfileInfo::update(const Chunk & block)
{
++blocks;
@ -44,13 +30,6 @@ LimitsCheckingTransform::LimitsCheckingTransform(const Block & header_, LocalLim
{
}
//LimitsCheckingTransform::LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem)
// : ISimpleTransform(header, header, false)
// , limits(std::move(limits))
// , process_list_elem(process_list_elem)
//{
//}
void LimitsCheckingTransform::transform(Chunk & chunk)
{
if (!info.started)
@ -59,7 +38,7 @@ void LimitsCheckingTransform::transform(Chunk & chunk)
info.started = true;
}
if (!checkTimeLimit())
if (!limits.speed_limits.checkTimeLimit(info.total_stopwatch.elapsed(), limits.timeout_overflow_mode))
{
stopReading();
return;
@ -78,18 +57,6 @@ void LimitsCheckingTransform::transform(Chunk & chunk)
}
}
bool LimitsCheckingTransform::checkTimeLimit()
{
if (limits.speed_limits.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(limits.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
void LimitsCheckingTransform::checkQuota(Chunk & chunk)
{
switch (limits.mode)

View File

@ -29,10 +29,7 @@ public:
using LocalLimits = IBlockInputStream::LocalLimits;
using LimitsMode = IBlockInputStream::LimitsMode;
/// LIMITS_CURRENT
LimitsCheckingTransform(const Block & header_, LocalLimits limits_);
/// LIMITS_TOTAL
/// LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem);
String getName() const override { return "LimitsCheckingTransform"; }