From 43455d79b4765749b444a856e5bdf43514efb0e3 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 27 Jan 2020 19:58:25 +0300 Subject: [PATCH] Add time limit to SourceWithProgress. --- dbms/src/DataStreams/ExecutionSpeedLimits.cpp | 27 ++++++++++++++ dbms/src/DataStreams/ExecutionSpeedLimits.h | 3 ++ dbms/src/DataStreams/IBlockInputStream.cpp | 29 +-------------- .../Processors/Sources/SourceWithProgress.cpp | 9 +++++ .../Processors/Sources/SourceWithProgress.h | 2 ++ .../Transforms/LimitsCheckingTransform.cpp | 35 +------------------ .../Transforms/LimitsCheckingTransform.h | 3 -- 7 files changed, 43 insertions(+), 65 deletions(-) diff --git a/dbms/src/DataStreams/ExecutionSpeedLimits.cpp b/dbms/src/DataStreams/ExecutionSpeedLimits.cpp index 532c693bd47..ca79138655a 100644 --- a/dbms/src/DataStreams/ExecutionSpeedLimits.cpp +++ b/dbms/src/DataStreams/ExecutionSpeedLimits.cpp @@ -17,6 +17,8 @@ namespace DB namespace ErrorCodes { extern const int TOO_SLOW; + extern const int LOGICAL_ERROR; + extern const int TIMEOUT_EXCEEDED; } static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_in_seconds, UInt64 total_elapsed_microseconds) @@ -88,4 +90,29 @@ void ExecutionSpeedLimits::throttle( } } +static bool handleOverflowMode(OverflowMode mode, const String & message, int code) +{ + switch (mode) + { + case OverflowMode::THROW: + throw Exception(message, code); + case OverflowMode::BREAK: + return false; + default: + throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); + } +} + +bool ExecutionSpeedLimits::checkTimeLimit(UInt64 elapsed_ns, OverflowMode overflow_mode) +{ + if (max_execution_time != 0 + && elapsed_ns > static_cast(max_execution_time.totalMicroseconds()) * 1000) + return handleOverflowMode(overflow_mode, + "Timeout exceeded: elapsed " + toString(static_cast(elapsed_ns) / 1000000000ULL) + + " seconds, maximum: " + toString(max_execution_time.totalMicroseconds() / 1000000.0), + ErrorCodes::TIMEOUT_EXCEEDED); + + return true; +} + } diff --git a/dbms/src/DataStreams/ExecutionSpeedLimits.h b/dbms/src/DataStreams/ExecutionSpeedLimits.h index a067fc86000..539a5b0108b 100644 --- a/dbms/src/DataStreams/ExecutionSpeedLimits.h +++ b/dbms/src/DataStreams/ExecutionSpeedLimits.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB { @@ -23,6 +24,8 @@ public: /// Pause execution in case if speed limits were exceeded. void throttle(size_t read_rows, size_t read_bytes, size_t total_rows_to_read, UInt64 total_elapsed_microseconds); + + bool checkTimeLimit(UInt64 elapsed_ns, OverflowMode overflow_mode); }; } diff --git a/dbms/src/DataStreams/IBlockInputStream.cpp b/dbms/src/DataStreams/IBlockInputStream.cpp index df81f26f665..db25c082626 100644 --- a/dbms/src/DataStreams/IBlockInputStream.cpp +++ b/dbms/src/DataStreams/IBlockInputStream.cpp @@ -54,7 +54,7 @@ Block IBlockInputStream::read() if (isCancelledOrThrowIfKilled()) return res; - if (!checkTimeLimit()) + if (!limits.speed_limits.checkTimeLimit(info.total_stopwatch.elapsed(), limits.timeout_overflow_mode)) limit_exceeded_need_break = true; if (!limit_exceeded_need_break) @@ -203,33 +203,6 @@ void IBlockInputStream::updateExtremes(Block & block) } -static bool handleOverflowMode(OverflowMode mode, const String & message, int code) -{ - switch (mode) - { - case OverflowMode::THROW: - throw Exception(message, code); - case OverflowMode::BREAK: - return false; - default: - throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); - } -} - - -bool IBlockInputStream::checkTimeLimit() -{ - if (limits.speed_limits.max_execution_time != 0 - && info.total_stopwatch.elapsed() > static_cast(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000) - return handleOverflowMode(limits.timeout_overflow_mode, - "Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds()) - + " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0), - ErrorCodes::TIMEOUT_EXCEEDED); - - return true; -} - - void IBlockInputStream::checkQuota(Block & block) { switch (limits.mode) diff --git a/dbms/src/Processors/Sources/SourceWithProgress.cpp b/dbms/src/Processors/Sources/SourceWithProgress.cpp index fac2a53ea54..e7964ef81f3 100644 --- a/dbms/src/Processors/Sources/SourceWithProgress.cpp +++ b/dbms/src/Processors/Sources/SourceWithProgress.cpp @@ -10,6 +10,15 @@ namespace ErrorCodes { extern const int TOO_MANY_ROWS; extern const int TOO_MANY_BYTES; + extern const int TIMEOUT_EXCEEDED; +} + +void SourceWithProgress::work() +{ + if (!limits.speed_limits.checkTimeLimit(total_stopwatch.elapsed(), limits.timeout_overflow_mode)) + cancel(); + else + ISourceWithProgress::work(); } /// Aggregated copy-paste from IBlockInputStream::progressImpl. diff --git a/dbms/src/Processors/Sources/SourceWithProgress.h b/dbms/src/Processors/Sources/SourceWithProgress.h index 59e8c6afa20..abc96cbbe2f 100644 --- a/dbms/src/Processors/Sources/SourceWithProgress.h +++ b/dbms/src/Processors/Sources/SourceWithProgress.h @@ -58,6 +58,8 @@ protected: /// Call this method to provide information about progress. void progress(const Progress & value); + void work() override; + private: LocalLimits limits; std::shared_ptr quota; diff --git a/dbms/src/Processors/Transforms/LimitsCheckingTransform.cpp b/dbms/src/Processors/Transforms/LimitsCheckingTransform.cpp index 1f621439048..57641a54b70 100644 --- a/dbms/src/Processors/Transforms/LimitsCheckingTransform.cpp +++ b/dbms/src/Processors/Transforms/LimitsCheckingTransform.cpp @@ -17,20 +17,6 @@ namespace ErrorCodes } -static bool handleOverflowMode(OverflowMode mode, const String & message, int code) -{ - switch (mode) - { - case OverflowMode::THROW: - throw Exception(message, code); - case OverflowMode::BREAK: - return false; - default: - throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); - } -} - - void ProcessorProfileInfo::update(const Chunk & block) { ++blocks; @@ -44,13 +30,6 @@ LimitsCheckingTransform::LimitsCheckingTransform(const Block & header_, LocalLim { } -//LimitsCheckingTransform::LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem) -// : ISimpleTransform(header, header, false) -// , limits(std::move(limits)) -// , process_list_elem(process_list_elem) -//{ -//} - void LimitsCheckingTransform::transform(Chunk & chunk) { if (!info.started) @@ -59,7 +38,7 @@ void LimitsCheckingTransform::transform(Chunk & chunk) info.started = true; } - if (!checkTimeLimit()) + if (!limits.speed_limits.checkTimeLimit(info.total_stopwatch.elapsed(), limits.timeout_overflow_mode)) { stopReading(); return; @@ -78,18 +57,6 @@ void LimitsCheckingTransform::transform(Chunk & chunk) } } -bool LimitsCheckingTransform::checkTimeLimit() -{ - if (limits.speed_limits.max_execution_time != 0 - && info.total_stopwatch.elapsed() > static_cast(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000) - return handleOverflowMode(limits.timeout_overflow_mode, - "Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds()) - + " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0), - ErrorCodes::TIMEOUT_EXCEEDED); - - return true; -} - void LimitsCheckingTransform::checkQuota(Chunk & chunk) { switch (limits.mode) diff --git a/dbms/src/Processors/Transforms/LimitsCheckingTransform.h b/dbms/src/Processors/Transforms/LimitsCheckingTransform.h index 9410301030a..8746563ac78 100644 --- a/dbms/src/Processors/Transforms/LimitsCheckingTransform.h +++ b/dbms/src/Processors/Transforms/LimitsCheckingTransform.h @@ -29,10 +29,7 @@ public: using LocalLimits = IBlockInputStream::LocalLimits; using LimitsMode = IBlockInputStream::LimitsMode; - /// LIMITS_CURRENT LimitsCheckingTransform(const Block & header_, LocalLimits limits_); - /// LIMITS_TOTAL - /// LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem); String getName() const override { return "LimitsCheckingTransform"; }