Fix more tests.

This commit is contained in:
Nikolai Kochetov 2022-05-30 13:10:30 +00:00
parent 5b4658aa5e
commit 5ef51ed27b
14 changed files with 161 additions and 179 deletions

View File

@ -274,7 +274,7 @@ public:
Chunk generate() override Chunk generate() override
{ {
Chunk chunk; Chunk chunk;
while (!executor.pull(chunk)) while (executor.pull(chunk))
{ {
if (chunk) if (chunk)
return chunk; return chunk;

View File

@ -50,6 +50,7 @@ ISource::Status ISource::prepare()
void ISource::progress(size_t read_rows, size_t read_bytes) void ISource::progress(size_t read_rows, size_t read_bytes)
{ {
//std::cerr << "========= Progress " << read_rows << " from " << getName() << std::endl << StackTrace().toString() << std::endl;
read_progress_was_set = true; read_progress_was_set = true;
read_progress.read_rows += read_rows; read_progress.read_rows += read_rows;
read_progress.read_bytes += read_bytes; read_progress.read_bytes += read_bytes;

View File

@ -80,8 +80,6 @@ void ExpressionStep::describeActions(FormatSettings & settings) const
String prefix(settings.offset, ' '); String prefix(settings.offset, ' ');
bool first = true; bool first = true;
std::cerr << actions_dag->dumpDAG() << std::endl;
auto expression = std::make_shared<ExpressionActions>(actions_dag); auto expression = std::make_shared<ExpressionActions>(actions_dag);
for (const auto & action : expression->getActions()) for (const auto & action : expression->getActions())
{ {

View File

@ -32,14 +32,13 @@ void ReadProgressCallback::setProcessListElement(QueryStatus * elem)
/// ///
/// NOTE: This can be done only if progress callback already set, since /// NOTE: This can be done only if progress callback already set, since
/// otherwise total_rows_approx will lost. /// otherwise total_rows_approx will lost.
if (total_rows_approx != 0 && progress_callback) size_t rows_approx = 0;
if (progress_callback && (rows_approx = total_rows_approx.exchange(0)) != 0)
{ {
Progress total_rows_progress = {0, 0, total_rows_approx}; Progress total_rows_progress = {0, 0, rows_approx};
progress_callback(total_rows_progress); progress_callback(total_rows_progress);
process_list_elem->updateProgressIn(total_rows_progress); process_list_elem->updateProgressIn(total_rows_progress);
total_rows_approx = 0;
} }
} }
@ -48,22 +47,21 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes)
if (!limits.speed_limits.checkTimeLimit(total_stopwatch, limits.timeout_overflow_mode)) if (!limits.speed_limits.checkTimeLimit(total_stopwatch, limits.timeout_overflow_mode))
return false; return false;
if (total_rows_approx != 0) size_t rows_approx = 0;
if ((rows_approx = total_rows_approx.exchange(0)) != 0)
{ {
Progress total_rows_progress = {0, 0, total_rows_approx}; Progress total_rows_progress = {0, 0, rows_approx};
if (progress_callback) if (progress_callback)
progress_callback(total_rows_progress); progress_callback(total_rows_progress);
if (process_list_elem) if (process_list_elem)
process_list_elem->updateProgressIn(total_rows_progress); process_list_elem->updateProgressIn(total_rows_progress);
total_rows_approx = 0;
} }
Progress value {read_rows, read_bytes}; Progress value {read_rows, read_bytes};
if (progress_callback) if (progress_callback && (read_rows || read_bytes))
progress_callback(value); progress_callback(value);
if (process_list_elem) if (process_list_elem)
@ -108,12 +106,15 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes)
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = total_stopwatch.elapsedMicroseconds(); UInt64 total_elapsed_microseconds = total_stopwatch.elapsedMicroseconds();
std::lock_guard lock(last_profile_events_update_time_mutex);
{
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds) if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
{ {
/// TODO: Should be done in PipelineExecutor. /// TODO: Should be done in PipelineExecutor.
CurrentThread::updatePerformanceCounters(); CurrentThread::updatePerformanceCounters();
last_profile_events_update_time = total_elapsed_microseconds; last_profile_events_update_time = total_elapsed_microseconds;
} }
}
/// TODO: Should be done in PipelineExecutor. /// TODO: Should be done in PipelineExecutor.
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds); limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);

View File

@ -2,6 +2,7 @@
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <QueryPipeline/StreamLocalLimits.h> #include <QueryPipeline/StreamLocalLimits.h>
#include <IO/Progress.h> #include <IO/Progress.h>
#include <mutex>
namespace DB namespace DB
{ {
@ -33,11 +34,12 @@ private:
QueryStatus * process_list_elem = nullptr; QueryStatus * process_list_elem = nullptr;
/// The approximate total number of rows to read. For progress bar. /// The approximate total number of rows to read. For progress bar.
size_t total_rows_approx = 0; std::atomic_size_t total_rows_approx = 0;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time. Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time.
/// According to total_stopwatch in microseconds. /// According to total_stopwatch in microseconds.
UInt64 last_profile_events_update_time = 0; UInt64 last_profile_events_update_time = 0;
std::mutex last_profile_events_update_time_mutex;
bool update_profile_events = true; bool update_profile_events = true;
}; };

View File

@ -1,6 +1,4 @@
Expression ((Projection + Before ORDER BY)) Expression ((Projection + Before ORDER BY))
Header: x UInt8 Header: x UInt8
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: dummy UInt8
ReadFromStorage (SystemOne) ReadFromStorage (SystemOne)
Header: dummy UInt8 Header: dummy UInt8

View File

@ -7,7 +7,6 @@ ExpressionTransform × 3
AggregatingInOrderTransform × 3 AggregatingInOrderTransform × 3
(Expression) (Expression)
ExpressionTransform × 3 ExpressionTransform × 3
(SettingQuotaAndLimits)
(ReadFromMergeTree) (ReadFromMergeTree)
ExpressionTransform × 4 ExpressionTransform × 4
MergeTreeInOrder 0 → 1 MergeTreeInOrder 0 → 1

View File

@ -85,8 +85,6 @@
} }
] ]
} }
]
}
} }
] ]
----------------- -----------------

View File

@ -1,6 +1,5 @@
(Expression) (Expression)
ExpressionTransform ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromMergeTree) (ReadFromMergeTree)
ExpressionTransform ExpressionTransform
ReplacingSorted 2 → 1 ReplacingSorted 2 → 1
@ -15,7 +14,6 @@ ExpressionTransform
6 6 6 6
(Expression) (Expression)
ExpressionTransform × 2 ExpressionTransform × 2
(SettingQuotaAndLimits)
(ReadFromMergeTree) (ReadFromMergeTree)
ExpressionTransform × 2 ExpressionTransform × 2
ReplacingSorted × 2 2 → 1 ReplacingSorted × 2 2 → 1

View File

@ -13,7 +13,6 @@ ExpressionTransform
Copy 1 → 2 Copy 1 → 2
(Expression) (Expression)
ExpressionTransform ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromStorage) (ReadFromStorage)
Memory 0 → 1 Memory 0 → 1
1 0 1 4500 1 0 1 4500
@ -106,7 +105,6 @@ ExpressionTransform
Copy × 3 1 → 2 Copy × 3 1 → 2
(Expression) (Expression)
ExpressionTransform × 3 ExpressionTransform × 3
(SettingQuotaAndLimits)
(ReadFromStorage) (ReadFromStorage)
NumbersMt × 3 0 → 1 NumbersMt × 3 0 → 1
4999500000 10000 4999500000 10000

View File

@ -3,26 +3,18 @@ explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) orde
Expression (Projection) Expression (Projection)
Limit (preliminary LIMIT (without OFFSET)) Limit (preliminary LIMIT (without OFFSET))
Sorting (Merge sorted streams after aggregation stage for ORDER BY) Sorting (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union Union
Sorting (Sorting for ORDER BY) Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY) Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers) ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica) ReadFromRemote (Read from remote replica)
explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=1; explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=1;
Expression (Projection) Expression (Projection)
Limit (preliminary LIMIT (without OFFSET)) Limit (preliminary LIMIT (without OFFSET))
Sorting (Merge sorted streams after aggregation stage for ORDER BY) Sorting (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union Union
Limit (preliminary LIMIT (with OFFSET)) Limit (preliminary LIMIT (with OFFSET))
Sorting (Sorting for ORDER BY) Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY) Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers) ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica) ReadFromRemote (Read from remote replica)

View File

@ -1,3 +1,3 @@
set receive_timeout = '10', receive_data_timeout_ms = '10000', extremes = '1', allow_suspicious_low_cardinality_types = '1', force_primary_key = '1', join_use_nulls = '1', max_rows_to_read = '1', join_algorithm = 'partial_merge'; set receive_timeout = '10', receive_data_timeout_ms = '10000', extremes = '1', allow_suspicious_low_cardinality_types = '1', force_primary_key = '1', join_use_nulls = '1', max_rows_to_read = '2', join_algorithm = 'partial_merge';
SELECT * FROM (SELECT dummy AS val FROM system.one) AS s1 ANY LEFT JOIN (SELECT toLowCardinality(dummy) AS rval FROM system.one) AS s2 ON (val + 9223372036854775806) = (rval * 1); SELECT * FROM (SELECT dummy AS val FROM system.one) AS s1 ANY LEFT JOIN (SELECT toLowCardinality(dummy) AS rval FROM system.one) AS s2 ON (val + 9223372036854775806) = (rval * 1);

View File

@ -2,7 +2,6 @@
EXPLAIN PIPELINE SELECT sleep(1); EXPLAIN PIPELINE SELECT sleep(1);
(Expression) (Expression)
ExpressionTransform ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromStorage) (ReadFromStorage)
SourceFromSingleChunk 0 → 1 SourceFromSingleChunk 0 → 1
SELECT sleep(1) SETTINGS log_processors_profiles=true, log_queries=1, log_queries_min_type='QUERY_FINISH'; SELECT sleep(1) SETTINGS log_processors_profiles=true, log_queries=1, log_queries_min_type='QUERY_FINISH';

View File

@ -4,7 +4,6 @@ ExpressionTransform
JoiningTransform 2 → 1 JoiningTransform 2 → 1
(Expression) (Expression)
ExpressionTransform ExpressionTransform
(SettingQuotaAndLimits)
(Limit) (Limit)
Limit Limit
(ReadFromStorage) (ReadFromStorage)
@ -12,7 +11,6 @@ ExpressionTransform
(Expression) (Expression)
FillingRightJoinSide FillingRightJoinSide
ExpressionTransform ExpressionTransform
(SettingQuotaAndLimits)
(Limit) (Limit)
Limit Limit
(ReadFromStorage) (ReadFromStorage)