Fix more tests.

This commit is contained in:
Nikolai Kochetov 2022-05-30 13:10:30 +00:00
parent 5b4658aa5e
commit 5ef51ed27b
14 changed files with 161 additions and 179 deletions

View File

@ -274,7 +274,7 @@ public:
Chunk generate() override
{
Chunk chunk;
while (!executor.pull(chunk))
while (executor.pull(chunk))
{
if (chunk)
return chunk;

View File

@ -50,6 +50,7 @@ ISource::Status ISource::prepare()
void ISource::progress(size_t read_rows, size_t read_bytes)
{
//std::cerr << "========= Progress " << read_rows << " from " << getName() << std::endl << StackTrace().toString() << std::endl;
read_progress_was_set = true;
read_progress.read_rows += read_rows;
read_progress.read_bytes += read_bytes;

View File

@ -80,8 +80,6 @@ void ExpressionStep::describeActions(FormatSettings & settings) const
String prefix(settings.offset, ' ');
bool first = true;
std::cerr << actions_dag->dumpDAG() << std::endl;
auto expression = std::make_shared<ExpressionActions>(actions_dag);
for (const auto & action : expression->getActions())
{

View File

@ -32,14 +32,13 @@ void ReadProgressCallback::setProcessListElement(QueryStatus * elem)
///
/// NOTE: This can be done only if progress callback already set, since
/// otherwise total_rows_approx will lost.
if (total_rows_approx != 0 && progress_callback)
size_t rows_approx = 0;
if (progress_callback && (rows_approx = total_rows_approx.exchange(0)) != 0)
{
Progress total_rows_progress = {0, 0, total_rows_approx};
Progress total_rows_progress = {0, 0, rows_approx};
progress_callback(total_rows_progress);
process_list_elem->updateProgressIn(total_rows_progress);
total_rows_approx = 0;
}
}
@ -48,22 +47,21 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes)
if (!limits.speed_limits.checkTimeLimit(total_stopwatch, limits.timeout_overflow_mode))
return false;
if (total_rows_approx != 0)
size_t rows_approx = 0;
if ((rows_approx = total_rows_approx.exchange(0)) != 0)
{
Progress total_rows_progress = {0, 0, total_rows_approx};
Progress total_rows_progress = {0, 0, rows_approx};
if (progress_callback)
progress_callback(total_rows_progress);
if (process_list_elem)
process_list_elem->updateProgressIn(total_rows_progress);
total_rows_approx = 0;
}
Progress value {read_rows, read_bytes};
if (progress_callback)
if (progress_callback && (read_rows || read_bytes))
progress_callback(value);
if (process_list_elem)
@ -108,12 +106,15 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes)
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = total_stopwatch.elapsedMicroseconds();
std::lock_guard lock(last_profile_events_update_time_mutex);
{
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
{
/// TODO: Should be done in PipelineExecutor.
CurrentThread::updatePerformanceCounters();
last_profile_events_update_time = total_elapsed_microseconds;
}
}
/// TODO: Should be done in PipelineExecutor.
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);

View File

@ -2,6 +2,7 @@
#include <Common/Stopwatch.h>
#include <QueryPipeline/StreamLocalLimits.h>
#include <IO/Progress.h>
#include <mutex>
namespace DB
{
@ -33,11 +34,12 @@ private:
QueryStatus * process_list_elem = nullptr;
/// The approximate total number of rows to read. For progress bar.
size_t total_rows_approx = 0;
std::atomic_size_t total_rows_approx = 0;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time.
/// According to total_stopwatch in microseconds.
UInt64 last_profile_events_update_time = 0;
std::mutex last_profile_events_update_time_mutex;
bool update_profile_events = true;
};

View File

@ -1,6 +1,4 @@
Expression ((Projection + Before ORDER BY))
Header: x UInt8
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: dummy UInt8
ReadFromStorage (SystemOne)
Header: dummy UInt8

View File

@ -7,7 +7,6 @@ ExpressionTransform × 3
AggregatingInOrderTransform × 3
(Expression)
ExpressionTransform × 3
(SettingQuotaAndLimits)
(ReadFromMergeTree)
ExpressionTransform × 4
MergeTreeInOrder 0 → 1

View File

@ -85,8 +85,6 @@
}
]
}
]
}
}
]
-----------------

View File

@ -1,6 +1,5 @@
(Expression)
ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromMergeTree)
ExpressionTransform
ReplacingSorted 2 → 1
@ -15,7 +14,6 @@ ExpressionTransform
6 6
(Expression)
ExpressionTransform × 2
(SettingQuotaAndLimits)
(ReadFromMergeTree)
ExpressionTransform × 2
ReplacingSorted × 2 2 → 1

View File

@ -13,7 +13,6 @@ ExpressionTransform
Copy 1 → 2
(Expression)
ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromStorage)
Memory 0 → 1
1 0 1 4500
@ -106,7 +105,6 @@ ExpressionTransform
Copy × 3 1 → 2
(Expression)
ExpressionTransform × 3
(SettingQuotaAndLimits)
(ReadFromStorage)
NumbersMt × 3 0 → 1
4999500000 10000

View File

@ -3,26 +3,18 @@ explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) orde
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
Sorting (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))))
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=1;
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
Sorting (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Limit (preliminary LIMIT (with OFFSET))
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))))
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)

View File

@ -1,3 +1,3 @@
set receive_timeout = '10', receive_data_timeout_ms = '10000', extremes = '1', allow_suspicious_low_cardinality_types = '1', force_primary_key = '1', join_use_nulls = '1', max_rows_to_read = '1', join_algorithm = 'partial_merge';
set receive_timeout = '10', receive_data_timeout_ms = '10000', extremes = '1', allow_suspicious_low_cardinality_types = '1', force_primary_key = '1', join_use_nulls = '1', max_rows_to_read = '2', join_algorithm = 'partial_merge';
SELECT * FROM (SELECT dummy AS val FROM system.one) AS s1 ANY LEFT JOIN (SELECT toLowCardinality(dummy) AS rval FROM system.one) AS s2 ON (val + 9223372036854775806) = (rval * 1);

View File

@ -2,7 +2,6 @@
EXPLAIN PIPELINE SELECT sleep(1);
(Expression)
ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromStorage)
SourceFromSingleChunk 0 → 1
SELECT sleep(1) SETTINGS log_processors_profiles=true, log_queries=1, log_queries_min_type='QUERY_FINISH';

View File

@ -4,7 +4,6 @@ ExpressionTransform
JoiningTransform 2 → 1
(Expression)
ExpressionTransform
(SettingQuotaAndLimits)
(Limit)
Limit
(ReadFromStorage)
@ -12,7 +11,6 @@ ExpressionTransform
(Expression)
FillingRightJoinSide
ExpressionTransform
(SettingQuotaAndLimits)
(Limit)
Limit
(ReadFromStorage)