mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Fix more tests.
This commit is contained in:
parent
5b4658aa5e
commit
5ef51ed27b
@ -274,7 +274,7 @@ public:
|
||||
Chunk generate() override
|
||||
{
|
||||
Chunk chunk;
|
||||
while (!executor.pull(chunk))
|
||||
while (executor.pull(chunk))
|
||||
{
|
||||
if (chunk)
|
||||
return chunk;
|
||||
|
@ -50,6 +50,7 @@ ISource::Status ISource::prepare()
|
||||
|
||||
void ISource::progress(size_t read_rows, size_t read_bytes)
|
||||
{
|
||||
//std::cerr << "========= Progress " << read_rows << " from " << getName() << std::endl << StackTrace().toString() << std::endl;
|
||||
read_progress_was_set = true;
|
||||
read_progress.read_rows += read_rows;
|
||||
read_progress.read_bytes += read_bytes;
|
||||
|
@ -80,8 +80,6 @@ void ExpressionStep::describeActions(FormatSettings & settings) const
|
||||
String prefix(settings.offset, ' ');
|
||||
bool first = true;
|
||||
|
||||
std::cerr << actions_dag->dumpDAG() << std::endl;
|
||||
|
||||
auto expression = std::make_shared<ExpressionActions>(actions_dag);
|
||||
for (const auto & action : expression->getActions())
|
||||
{
|
||||
|
@ -32,14 +32,13 @@ void ReadProgressCallback::setProcessListElement(QueryStatus * elem)
|
||||
///
|
||||
/// NOTE: This can be done only if progress callback already set, since
|
||||
/// otherwise total_rows_approx will lost.
|
||||
if (total_rows_approx != 0 && progress_callback)
|
||||
size_t rows_approx = 0;
|
||||
if (progress_callback && (rows_approx = total_rows_approx.exchange(0)) != 0)
|
||||
{
|
||||
Progress total_rows_progress = {0, 0, total_rows_approx};
|
||||
Progress total_rows_progress = {0, 0, rows_approx};
|
||||
|
||||
progress_callback(total_rows_progress);
|
||||
process_list_elem->updateProgressIn(total_rows_progress);
|
||||
|
||||
total_rows_approx = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,22 +47,21 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes)
|
||||
if (!limits.speed_limits.checkTimeLimit(total_stopwatch, limits.timeout_overflow_mode))
|
||||
return false;
|
||||
|
||||
if (total_rows_approx != 0)
|
||||
size_t rows_approx = 0;
|
||||
if ((rows_approx = total_rows_approx.exchange(0)) != 0)
|
||||
{
|
||||
Progress total_rows_progress = {0, 0, total_rows_approx};
|
||||
Progress total_rows_progress = {0, 0, rows_approx};
|
||||
|
||||
if (progress_callback)
|
||||
progress_callback(total_rows_progress);
|
||||
|
||||
if (process_list_elem)
|
||||
process_list_elem->updateProgressIn(total_rows_progress);
|
||||
|
||||
total_rows_approx = 0;
|
||||
}
|
||||
|
||||
Progress value {read_rows, read_bytes};
|
||||
|
||||
if (progress_callback)
|
||||
if (progress_callback && (read_rows || read_bytes))
|
||||
progress_callback(value);
|
||||
|
||||
if (process_list_elem)
|
||||
@ -108,12 +106,15 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes)
|
||||
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
|
||||
UInt64 total_elapsed_microseconds = total_stopwatch.elapsedMicroseconds();
|
||||
|
||||
std::lock_guard lock(last_profile_events_update_time_mutex);
|
||||
{
|
||||
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
|
||||
{
|
||||
/// TODO: Should be done in PipelineExecutor.
|
||||
CurrentThread::updatePerformanceCounters();
|
||||
last_profile_events_update_time = total_elapsed_microseconds;
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO: Should be done in PipelineExecutor.
|
||||
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <QueryPipeline/StreamLocalLimits.h>
|
||||
#include <IO/Progress.h>
|
||||
#include <mutex>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -33,11 +34,12 @@ private:
|
||||
QueryStatus * process_list_elem = nullptr;
|
||||
|
||||
/// The approximate total number of rows to read. For progress bar.
|
||||
size_t total_rows_approx = 0;
|
||||
std::atomic_size_t total_rows_approx = 0;
|
||||
|
||||
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time.
|
||||
/// According to total_stopwatch in microseconds.
|
||||
UInt64 last_profile_events_update_time = 0;
|
||||
std::mutex last_profile_events_update_time_mutex;
|
||||
|
||||
bool update_profile_events = true;
|
||||
};
|
||||
|
@ -1,6 +1,4 @@
|
||||
Expression ((Projection + Before ORDER BY))
|
||||
Header: x UInt8
|
||||
SettingQuotaAndLimits (Set limits and quota after reading from storage)
|
||||
Header: dummy UInt8
|
||||
ReadFromStorage (SystemOne)
|
||||
Header: dummy UInt8
|
||||
|
@ -7,7 +7,6 @@ ExpressionTransform × 3
|
||||
AggregatingInOrderTransform × 3
|
||||
(Expression)
|
||||
ExpressionTransform × 3
|
||||
(SettingQuotaAndLimits)
|
||||
(ReadFromMergeTree)
|
||||
ExpressionTransform × 4
|
||||
MergeTreeInOrder 0 → 1
|
||||
|
@ -85,8 +85,6 @@
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
-----------------
|
||||
|
@ -1,6 +1,5 @@
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(SettingQuotaAndLimits)
|
||||
(ReadFromMergeTree)
|
||||
ExpressionTransform
|
||||
ReplacingSorted 2 → 1
|
||||
@ -15,7 +14,6 @@ ExpressionTransform
|
||||
6 6
|
||||
(Expression)
|
||||
ExpressionTransform × 2
|
||||
(SettingQuotaAndLimits)
|
||||
(ReadFromMergeTree)
|
||||
ExpressionTransform × 2
|
||||
ReplacingSorted × 2 2 → 1
|
||||
|
@ -13,7 +13,6 @@ ExpressionTransform
|
||||
Copy 1 → 2
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(SettingQuotaAndLimits)
|
||||
(ReadFromStorage)
|
||||
Memory 0 → 1
|
||||
1 0 1 4500
|
||||
@ -106,7 +105,6 @@ ExpressionTransform
|
||||
Copy × 3 1 → 2
|
||||
(Expression)
|
||||
ExpressionTransform × 3
|
||||
(SettingQuotaAndLimits)
|
||||
(ReadFromStorage)
|
||||
NumbersMt × 3 0 → 1
|
||||
4999500000 10000
|
||||
|
@ -3,26 +3,18 @@ explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) orde
|
||||
Expression (Projection)
|
||||
Limit (preliminary LIMIT (without OFFSET))
|
||||
Sorting (Merge sorted streams after aggregation stage for ORDER BY)
|
||||
SettingQuotaAndLimits (Set limits and quota after reading from storage)
|
||||
Union
|
||||
Sorting (Sorting for ORDER BY)
|
||||
Expression (Before ORDER BY)
|
||||
SettingQuotaAndLimits (Set limits and quota after reading from storage)
|
||||
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
|
||||
SettingQuotaAndLimits (Set limits and quota after reading from storage)
|
||||
Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))))
|
||||
ReadFromStorage (SystemNumbers)
|
||||
ReadFromRemote (Read from remote replica)
|
||||
explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=1;
|
||||
Expression (Projection)
|
||||
Limit (preliminary LIMIT (without OFFSET))
|
||||
Sorting (Merge sorted streams after aggregation stage for ORDER BY)
|
||||
SettingQuotaAndLimits (Set limits and quota after reading from storage)
|
||||
Union
|
||||
Limit (preliminary LIMIT (with OFFSET))
|
||||
Sorting (Sorting for ORDER BY)
|
||||
Expression (Before ORDER BY)
|
||||
SettingQuotaAndLimits (Set limits and quota after reading from storage)
|
||||
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
|
||||
SettingQuotaAndLimits (Set limits and quota after reading from storage)
|
||||
Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))))
|
||||
ReadFromStorage (SystemNumbers)
|
||||
ReadFromRemote (Read from remote replica)
|
||||
|
@ -1,3 +1,3 @@
|
||||
set receive_timeout = '10', receive_data_timeout_ms = '10000', extremes = '1', allow_suspicious_low_cardinality_types = '1', force_primary_key = '1', join_use_nulls = '1', max_rows_to_read = '1', join_algorithm = 'partial_merge';
|
||||
set receive_timeout = '10', receive_data_timeout_ms = '10000', extremes = '1', allow_suspicious_low_cardinality_types = '1', force_primary_key = '1', join_use_nulls = '1', max_rows_to_read = '2', join_algorithm = 'partial_merge';
|
||||
|
||||
SELECT * FROM (SELECT dummy AS val FROM system.one) AS s1 ANY LEFT JOIN (SELECT toLowCardinality(dummy) AS rval FROM system.one) AS s2 ON (val + 9223372036854775806) = (rval * 1);
|
||||
|
@ -2,7 +2,6 @@
|
||||
EXPLAIN PIPELINE SELECT sleep(1);
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(SettingQuotaAndLimits)
|
||||
(ReadFromStorage)
|
||||
SourceFromSingleChunk 0 → 1
|
||||
SELECT sleep(1) SETTINGS log_processors_profiles=true, log_queries=1, log_queries_min_type='QUERY_FINISH';
|
||||
|
@ -4,7 +4,6 @@ ExpressionTransform
|
||||
JoiningTransform 2 → 1
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(SettingQuotaAndLimits)
|
||||
(Limit)
|
||||
Limit
|
||||
(ReadFromStorage)
|
||||
@ -12,7 +11,6 @@ ExpressionTransform
|
||||
(Expression)
|
||||
FillingRightJoinSide
|
||||
ExpressionTransform
|
||||
(SettingQuotaAndLimits)
|
||||
(Limit)
|
||||
Limit
|
||||
(ReadFromStorage)
|
||||
|
Loading…
Reference in New Issue
Block a user