Merge pull request #32271 from Algunenano/kill_scalar_github

Be able to KILL scalar queries
This commit is contained in:
Nikolai Kochetov 2021-12-29 16:00:59 +03:00 committed by GitHub
commit c715204e18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 339 additions and 129 deletions

View File

@ -169,7 +169,7 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
processors.push_back(std::move(sink));
processors.push_back(std::move(exception_handling));
auto executor = std::make_shared<PipelineExecutor>(processors);
auto executor = std::make_shared<PipelineExecutor>(processors, getContext()->getProcessListElement());
executor->execute(/*num_threads = */ 1);
/// We are ready to receive the next file, for this we clear all the information received

View File

@ -1,17 +1,18 @@
#include <Formats/FormatFactory.h>
#include <algorithm>
#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Core/Settings.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h>
#include <Processors/Formats/Impl/ParallelFormattingOutputFormat.h>
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Poco/URI.h>
#include <Common/Exception.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadHelpers.h>
@ -235,6 +236,18 @@ InputFormatPtr FormatFactory::getInputFormat(
return format;
}
static void addExistingProgressToOutputFormat(OutputFormatPtr format, ContextPtr context)
{
auto element_id = context->getProcessListElement();
if (element_id)
{
/// While preparing the query there might have been progress (for example in subscalar subqueries) so add it here
auto current_progress = element_id->getProgressIn();
Progress read_progress{current_progress.read_rows, current_progress.read_bytes, current_progress.total_rows_to_read};
format->onProgress(read_progress);
}
}
OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
const String & name,
WriteBuffer & buf,
@ -263,7 +276,9 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
if (context->hasQueryContext() && settings.log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Format, name);
return std::make_shared<ParallelFormattingOutputFormat>(builder);
auto format = std::make_shared<ParallelFormattingOutputFormat>(builder);
addExistingProgressToOutputFormat(format, context);
return format;
}
return getOutputFormat(name, buf, sample, context, callback, _format_settings);
@ -303,6 +318,8 @@ OutputFormatPtr FormatFactory::getOutputFormat(
if (auto * mysql = typeid_cast<MySQLOutputFormat *>(format.get()))
mysql->setContext(context);
addExistingProgressToOutputFormat(format, context);
return format;
}

View File

@ -411,9 +411,8 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
res.read_bytes = progress_in.read_bytes;
res.total_rows = progress_in.total_rows_to_read;
/// TODO: Use written_rows and written_bytes when real time progress is implemented
res.written_rows = progress_out.read_rows;
res.written_bytes = progress_out.read_bytes;
res.written_rows = progress_out.written_rows;
res.written_bytes = progress_out.written_bytes;
if (thread_group)
{

View File

@ -94,7 +94,7 @@ protected:
ExecutionSpeedLimits limits;
OverflowMode overflow_mode;
QueryPriorities::Handle priority_handle;
QueryPriorities::Handle priority_handle = nullptr;
std::atomic<bool> is_killed { false };

View File

@ -24,12 +24,6 @@
# include <sys/resource.h>
#endif
namespace ProfileEvents
{
extern const Event InsertedRows;
extern const Event InsertedBytes;
}
/// Implement some methods of ThreadStatus and CurrentThread here to avoid extra linking dependencies in clickhouse_common_io
/// TODO It doesn't make sense.
@ -447,9 +441,8 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String
elem.read_rows = progress_in.read_rows.load(std::memory_order_relaxed);
elem.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed);
/// TODO: Use written_rows and written_bytes when run time progress is implemented
elem.written_rows = progress_out.read_rows.load(std::memory_order_relaxed);
elem.written_bytes = progress_out.read_bytes.load(std::memory_order_relaxed);
elem.written_rows = progress_out.written_rows.load(std::memory_order_relaxed);
elem.written_bytes = progress_out.written_bytes.load(std::memory_order_relaxed);
elem.memory_usage = memory_tracker.get();
elem.peak_memory_usage = memory_tracker.getPeak();
@ -520,8 +513,8 @@ void ThreadStatus::logToQueryViewsLog(const ViewRuntimeData & vinfo)
auto events = std::make_shared<ProfileEvents::Counters::Snapshot>(performance_counters.getPartiallyAtomicSnapshot());
element.read_rows = progress_in.read_rows.load(std::memory_order_relaxed);
element.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed);
element.written_rows = (*events)[ProfileEvents::InsertedRows];
element.written_bytes = (*events)[ProfileEvents::InsertedBytes];
element.written_rows = progress_out.written_rows.load(std::memory_order_relaxed);
element.written_bytes = progress_out.written_bytes.load(std::memory_order_relaxed);
element.peak_memory_usage = memory_tracker.getPeak() > 0 ? memory_tracker.getPeak() : 0;
if (query_context_ptr->getSettingsRef().log_profile_events != 0)
{

View File

@ -841,8 +841,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
else /// will be used only for ordinary INSERT queries
{
auto progress_out = process_list_elem->getProgressOut();
elem.result_rows = progress_out.read_rows;
elem.result_bytes = progress_out.read_bytes;
elem.result_rows = progress_out.written_rows;
elem.result_bytes = progress_out.written_rows;
}
if (elem.read_rows != 0)

View File

@ -26,7 +26,7 @@ public:
/// During pipeline execution new processors can appear. They will be added to existing set.
///
/// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(Processors & processors, QueryStatus * elem = nullptr);
explicit PipelineExecutor(Processors & processors, QueryStatus * elem);
~PipelineExecutor();
/// Execute pipeline in multiple threads. Must be called once.

View File

@ -6,16 +6,13 @@
namespace DB
{
BuildQueryPipelineSettings BuildQueryPipelineSettings::fromSettings(const Settings & from)
BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from)
{
BuildQueryPipelineSettings settings;
settings.actions_settings = ExpressionActionsSettings::fromSettings(from, CompileExpressions::yes);
settings.actions_settings = ExpressionActionsSettings::fromSettings(from->getSettingsRef(), CompileExpressions::yes);
settings.process_list_element = from->getProcessListElement();
settings.progress_callback = from->getProgressCallback();
return settings;
}
BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from)
{
return fromSettings(from->getSettingsRef());
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <IO/Progress.h>
#include <Interpreters/ExpressionActionsSettings.h>
#include <cstddef>
@ -8,14 +9,15 @@ namespace DB
{
struct Settings;
class QueryStatus;
struct BuildQueryPipelineSettings
{
ExpressionActionsSettings actions_settings;
QueryStatus * process_list_element = nullptr;
ProgressCallback progress_callback = nullptr;
const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; }
static BuildQueryPipelineSettings fromSettings(const Settings & from);
static BuildQueryPipelineSettings fromContext(ContextPtr from);
};

View File

@ -180,6 +180,9 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
for (auto & context : interpreter_context)
last_pipeline->addInterpreterContext(std::move(context));
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
return last_pipeline;
}

View File

@ -26,6 +26,8 @@ SourceWithProgress::SourceWithProgress(Block header, bool enable_auto_progress)
void SourceWithProgress::setProcessListElement(QueryStatus * elem)
{
process_list_elem = elem;
if (!elem)
return;
/// Update total_rows_approx as soon as possible.
///

View File

@ -18,20 +18,21 @@ namespace DB
void CountingTransform::onConsume(Chunk chunk)
{
Progress local_progress(chunk.getNumRows(), chunk.bytes(), 0);
Progress local_progress{WriteProgress(chunk.getNumRows(), chunk.bytes())};
progress.incrementPiecewiseAtomically(local_progress);
//std::cerr << "============ counting adding progress for " << static_cast<const void *>(thread_status) << ' ' << chunk.getNumRows() << " rows\n";
if (thread_status)
{
thread_status->performance_counters.increment(ProfileEvents::InsertedRows, local_progress.read_rows);
thread_status->performance_counters.increment(ProfileEvents::InsertedBytes, local_progress.read_bytes);
thread_status->performance_counters.increment(ProfileEvents::InsertedRows, local_progress.written_rows);
thread_status->performance_counters.increment(ProfileEvents::InsertedBytes, local_progress.written_bytes);
thread_status->progress_out.incrementPiecewiseAtomically(local_progress);
}
else
{
ProfileEvents::increment(ProfileEvents::InsertedRows, local_progress.read_rows);
ProfileEvents::increment(ProfileEvents::InsertedBytes, local_progress.read_bytes);
ProfileEvents::increment(ProfileEvents::InsertedRows, local_progress.written_rows);
ProfileEvents::increment(ProfileEvents::InsertedBytes, local_progress.written_bytes);
}
if (process_elem)

View File

@ -3,6 +3,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/ProcessList.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
@ -14,6 +15,7 @@
#include <Storages/StorageValues.h>
#include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h>
#include <Common/ProfileEvents.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/ThreadStatus.h>
#include <Common/checkStackSize.h>
@ -23,6 +25,12 @@
#include <atomic>
#include <chrono>
namespace ProfileEvents
{
extern const Event SelectedBytes;
extern const Event SelectedRows;
}
namespace DB
{
@ -451,13 +459,6 @@ static QueryPipeline process(Block block, ViewRuntimeData & view, const ViewsDat
pipeline.getHeader(),
std::make_shared<ExpressionActions>(std::move(converting))));
pipeline.setProgressCallback([context](const Progress & progress)
{
CurrentThread::updateProgressIn(progress);
if (auto callback = context->getProgressCallback())
callback(progress);
});
return QueryPipelineBuilder::getPipeline(std::move(pipeline));
}
@ -595,7 +596,11 @@ void PushingToLiveViewSink::consume(Chunk chunk)
{
Progress local_progress(chunk.getNumRows(), chunk.bytes(), 0);
StorageLiveView::writeIntoLiveView(live_view, getHeader().cloneWithColumns(chunk.detachColumns()), context);
CurrentThread::updateProgressIn(local_progress);
auto process = context->getProcessListElement();
if (process)
process->updateProgressIn(local_progress);
ProfileEvents::increment(ProfileEvents::SelectedRows, local_progress.read_rows);
ProfileEvents::increment(ProfileEvents::SelectedBytes, local_progress.read_bytes);
}
@ -614,7 +619,11 @@ void PushingToWindowViewSink::consume(Chunk chunk)
Progress local_progress(chunk.getNumRows(), chunk.bytes(), 0);
StorageWindowView::writeIntoWindowView(
window_view, getHeader().cloneWithColumns(chunk.detachColumns()), context);
CurrentThread::updateProgressIn(local_progress);
auto process = context->getProcessListElement();
if (process)
process->updateProgressIn(local_progress);
ProfileEvents::increment(ProfileEvents::SelectedRows, local_progress.read_rows);
ProfileEvents::increment(ProfileEvents::SelectedBytes, local_progress.read_bytes);
}

View File

@ -27,7 +27,8 @@ TEST(Processors, PortsConnected)
processors.emplace_back(std::move(source));
processors.emplace_back(std::move(sink));
PipelineExecutor executor(processors);
QueryStatus * element = nullptr;
PipelineExecutor executor(processors, element);
executor.execute(1);
}
@ -51,7 +52,8 @@ TEST(Processors, PortsNotConnected)
try
{
PipelineExecutor executor(processors);
QueryStatus * element = nullptr;
PipelineExecutor executor(processors, element);
executor.execute(1);
ASSERT_TRUE(false) << "Should have thrown.";
}

View File

@ -560,6 +560,7 @@ QueryPipeline QueryPipelineBuilder::getPipeline(QueryPipelineBuilder builder)
{
QueryPipeline res(std::move(builder.pipe));
res.setNumThreads(builder.getNumThreads());
res.setProcessListElement(builder.process_list_element);
return res;
}

File diff suppressed because one or more lines are too long

View File

@ -14,10 +14,10 @@ SELECT COUNT() FROM single_column_bloom_filter WHERE i32 IN (1, 2) SETTINGS max_
SELECT COUNT() FROM single_column_bloom_filter WHERE (i32, i32) IN ((1, 2), (2, 3)) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM single_column_bloom_filter WHERE (i32, i64) IN ((1, 1), (2, 2)) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM single_column_bloom_filter WHERE (i64, (i64, i32)) IN ((1, (1, 1)), (2, (2, 2))) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM single_column_bloom_filter WHERE i32 IN (SELECT arrayJoin([toInt32(1), toInt32(2)])) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM single_column_bloom_filter WHERE (i32, i32) IN (SELECT arrayJoin([(toInt32(1), toInt32(2)), (toInt32(2), toInt32(3))])) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM single_column_bloom_filter WHERE (i32, i64) IN (SELECT arrayJoin([(toInt32(1), toUInt64(1)), (toInt32(2), toUInt64(2))])) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM single_column_bloom_filter WHERE (i64, (i64, i32)) IN (SELECT arrayJoin([(toUInt64(1), (toUInt64(1), toInt32(1))), (toUInt64(2), (toUInt64(2), toInt32(2)))])) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM single_column_bloom_filter WHERE i32 IN (SELECT arrayJoin([toInt32(1), toInt32(2)])) SETTINGS max_rows_to_read = 7;
SELECT COUNT() FROM single_column_bloom_filter WHERE (i32, i32) IN (SELECT arrayJoin([(toInt32(1), toInt32(2)), (toInt32(2), toInt32(3))])) SETTINGS max_rows_to_read = 7;
SELECT COUNT() FROM single_column_bloom_filter WHERE (i32, i64) IN (SELECT arrayJoin([(toInt32(1), toUInt64(1)), (toInt32(2), toUInt64(2))])) SETTINGS max_rows_to_read = 7;
SELECT COUNT() FROM single_column_bloom_filter WHERE (i64, (i64, i32)) IN (SELECT arrayJoin([(toUInt64(1), (toUInt64(1), toInt32(1))), (toUInt64(2), (toUInt64(2), toInt32(2)))])) SETTINGS max_rows_to_read = 7;
WITH (1, 2) AS liter_prepared_set SELECT COUNT() FROM single_column_bloom_filter WHERE i32 IN liter_prepared_set SETTINGS max_rows_to_read = 6;
WITH ((1, 2), (2, 3)) AS liter_prepared_set SELECT COUNT() FROM single_column_bloom_filter WHERE (i32, i32) IN liter_prepared_set SETTINGS max_rows_to_read = 6;
WITH ((1, 1), (2, 2)) AS liter_prepared_set SELECT COUNT() FROM single_column_bloom_filter WHERE (i32, i64) IN liter_prepared_set SETTINGS max_rows_to_read = 6;

View File

@ -89,8 +89,11 @@ INSERT INTO checkouts SELECT number as id, '2000-01-01 10:00:00' from numbers(50
-- by this time we should have 3 parts for target_table because of prev inserts
-- and we plan to make two more inserts. With index_granularity=128 and max id=1000
-- we expect to read not more than:
-- 1000 rows read from numbers(1000) in the INSERT itself
-- 1000 rows in the `IN (SELECT id FROM table)` in the mat views
-- (1000/128) marks per part * (3 + 2) parts * 128 granularity = 5120 rows
set max_rows_to_read = 5120;
-- Total: 7120
set max_rows_to_read = 7120;
INSERT INTO logins SELECT number as id, '2000-01-01 11:00:00' from numbers(1000);
INSERT INTO checkouts SELECT number as id, '2000-01-01 11:10:00' from numbers(1000);
@ -98,8 +101,8 @@ INSERT INTO checkouts SELECT number as id, '2000-01-01 11:10:00' from numbers(10
-- by this time we should have 5 parts for target_table because of prev inserts
-- and we plan to make two more inserts. With index_granularity=128 and max id=1
-- we expect to read not more than:
-- 1 mark per part * (5 + 2) parts * 128 granularity = 896 rows
set max_rows_to_read = 896;
-- 1 mark per part * (5 + 2) parts * 128 granularity + 1 (numbers(1)) = 897 rows
set max_rows_to_read = 897;
INSERT INTO logins SELECT number+2 as id, '2001-01-01 11:10:01' from numbers(1);
INSERT INTO checkouts SELECT number+2 as id, '2001-01-01 11:10:02' from numbers(1);

View File

@ -34,7 +34,7 @@ INSERT INTO 01504_test_memory SELECT number % 77 AS k, SUM(number) AS value, (1,
SELECT A.a = B.a, A.b = B.b, A.c = B.c, A.d = B.d, A.e = B.e FROM ( SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM 01504_test) A ANY LEFT JOIN (SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM 01504_test_memory) B USING a ORDER BY a;
CREATE TEMPORARY TABLE keys AS SELECT * FROM numbers(1000);
CREATE TEMPORARY TABLE keys AS SELECT * FROM system.numbers LIMIT 1 OFFSET 4;
SET max_rows_to_read = 2;
SELECT dummy == (1,1.2) FROM 01504_test WHERE k IN (1, 3) OR k IN (1) OR k IN (3, 1) OR k IN [1] OR k IN [1, 3] ;

View File

@ -3,7 +3,7 @@ drop table if exists insub;
create table insub (i int, j int) engine MergeTree order by i settings index_granularity = 1;
insert into insub select number a, a + 2 from numbers(10);
SET max_rows_to_read = 2;
SET max_rows_to_read = 12; -- 10 from numbers + 2 from table
select * from insub where i in (select toInt32(3) from numbers(10));
drop table if exists insub;

View File

@ -8,10 +8,12 @@ create table xp_d as xp engine Distributed(test_shard_localhost, currentDatabase
insert into xp select number, number + 2 from numbers(10);
set max_rows_to_read = 2;
set max_rows_to_read = 4; -- 2 from numbers, 2 from tables
select * from xp where i in (select * from numbers(2));
select * from xp where i global in (select * from numbers(2));
select * from xp_d where i in (select * from numbers(2));
set max_rows_to_read = 6; -- 2 from numbers, 2 from GLOBAL temp table (pushed from numbers), 2 from local xp
select * from xp_d where i global in (select * from numbers(2));
drop table if exists xp;

View File

@ -14,6 +14,14 @@
1 3
0 2
1 3
0 2
1 3
0 2
1 3
0 2
1 3
0 2
1 3
\N 100
\N 100
\N 100

View File

@ -12,17 +12,29 @@ insert into xp select null, 100;
optimize table xp final;
set max_rows_to_read = 2;
select * from xp where i in [0, 1];
select * from xp where i global in [0, 1];
select * from xp_d where i in [0, 1];
select * from xp_d where i global in [0, 1];
set max_rows_to_read = 4; -- 2 in the subquery, 2 in the query itself
select * from xp where i in (select * from numbers(2));
select * from xp where i global in (select * from numbers(2));
select * from xp_d where i in (select * from numbers(2));
set max_rows_to_read = 6; -- 2 subquery, 2 from global temp table (GLOBAL IN), 2 from local xp table
select * from xp_d where i global in (select * from numbers(2));
set transform_null_in = 1;
set max_rows_to_read = 4; -- 2 in the subquery, 2 in the query itself
select * from xp where i in (select * from numbers(2));
select * from xp where i global in (select * from numbers(2));
select * from xp_d where i in (select * from numbers(2));
set max_rows_to_read = 6; -- 2 subquery, 2 from global temp table (GLOBAL IN), 2 from local xp table
select * from xp_d where i global in (select * from numbers(2));
set max_rows_to_read = 0; -- No rows should be read
select * from xp where i in (null);
select * from xp where i global in (null);
select * from xp_d where i in (null);

View File

@ -8,12 +8,12 @@ set max_rows_to_read = 3;
select * from x where _partition_id = partitionId(1);
set max_rows_to_read = 4; -- one row for subquery
set max_rows_to_read = 5; -- one row for subquery + subquery
select * from x where _partition_id in (select partitionId(number + 1) from numbers(1));
-- trivial count optimization test
set max_rows_to_read = 1; -- one row for subquery
set max_rows_to_read = 2; -- one row for subquery + subquery itself
select count() from x where _partition_id in (select partitionId(number + 1) from numbers(1));
drop table x;

View File

@ -1,70 +1,94 @@
Row 1:
──────
stage: Query log rows
read_rows: 100
written_rows: 201
databases: ['_table_function','default']
tables: ['_table_function.numbers','default.table_a','default.table_b','default.table_b_live_view','default.table_c']
views: ['default.matview_a_to_b','default.matview_b_to_c','default.table_b_live_view']
sleep_calls: 200
sleep_us: 298
stage: Query log rows
read_rows: 400
written_rows: 201
databases: ['_table_function','default']
tables: ['_table_function.numbers','default.table_a','default.table_b','default.table_b_live_view','default.table_c']
views: ['default.matview_a_to_b','default.matview_b_to_c','default.table_b_live_view']
sleep_calls: 200
sleep_us: 298
profile_select_rows: 400
profile_select_bytes: 5200
profile_insert_rows: 201
profile_insert_bytes: 2808
Row 1:
──────
stage: Depending views
view_name: default.matview_a_to_b
view_type: Materialized
status: QueryFinish
view_target: default.table_b
view_query: SELECT toFloat64(a) AS a, b + sleepEachRow(0.000001) AS count FROM default.table_a
read_rows: 100
written_rows: 100
sleep_calls: 100
sleep_us: 99
stage: Depending views
view_name: default.matview_a_to_b
view_type: Materialized
status: QueryFinish
view_target: default.table_b
view_query: SELECT toFloat64(a) AS a, b + sleepEachRow(0.000001) AS count FROM default.table_a
read_rows: 100
written_rows: 100
sleep_calls: 100
sleep_us: 99
profile_select_rows: 100
profile_select_bytes: 2000
profile_insert_rows: 100
profile_insert_bytes: 800
Row 2:
──────
stage: Depending views
view_name: default.matview_b_to_c
view_type: Materialized
status: QueryFinish
view_target: default.table_c
view_query: SELECT sum(a + sleepEachRow(0.000002)) AS a FROM default.table_b
read_rows: 100
written_rows: 1
sleep_calls: 100
sleep_us: 199
stage: Depending views
view_name: default.matview_b_to_c
view_type: Materialized
status: QueryFinish
view_target: default.table_c
view_query: SELECT sum(a + sleepEachRow(0.000002)) AS a FROM default.table_b
read_rows: 100
written_rows: 1
sleep_calls: 100
sleep_us: 199
profile_select_rows: 100
profile_select_bytes: 800
profile_insert_rows: 1
profile_insert_bytes: 8
Row 3:
──────
stage: Depending views
view_name: default.table_b_live_view
view_type: Live
status: QueryFinish
view_target: default.table_b_live_view
view_query: SELECT sum(a + b) FROM default.table_b
read_rows: 100
written_rows: 0
sleep_calls: 0
sleep_us: 0
stage: Depending views
view_name: default.table_b_live_view
view_type: Live
status: QueryFinish
view_target: default.table_b_live_view
view_query: SELECT sum(a + b) FROM default.table_b
read_rows: 100
written_rows: 0
sleep_calls: 0
sleep_us: 0
profile_select_rows: 100
profile_select_bytes: 1600
profile_insert_rows: 0
profile_insert_bytes: 0
Row 1:
──────
stage: Query log rows 2
read_rows: 50
written_rows: 100
databases: ['_table_function','default']
tables: ['_table_function.numbers','default.table_d','default.table_e','default.table_f']
views: ['default.matview_join_d_e']
sleep_calls: 50
sleep_us: 150
stage: Query log rows 2
read_rows: 100
written_rows: 100
databases: ['_table_function','default']
tables: ['_table_function.numbers','default.table_d','default.table_e','default.table_f']
views: ['default.matview_join_d_e']
sleep_calls: 50
sleep_us: 150
profile_select_rows: 100
profile_select_bytes: 800
profile_insert_rows: 100
profile_insert_bytes: 1600
Row 1:
──────
stage: Depending views 2
view_name: default.matview_join_d_e
view_type: Materialized
status: QueryFinish
view_target: default.table_f
view_query: SELECT table_d.a AS a, table_e.count + sleepEachRow(0.000003) AS count FROM default.table_d LEFT JOIN default.table_e ON table_d.a = table_e.a
read_rows: 50
written_rows: 50
sleep_calls: 50
sleep_us: 150
stage: Depending views 2
view_name: default.matview_join_d_e
view_type: Materialized
status: QueryFinish
view_target: default.table_f
view_query: SELECT table_d.a AS a, table_e.count + sleepEachRow(0.000003) AS count FROM default.table_d LEFT JOIN default.table_e ON table_d.a = table_e.a
read_rows: 50
written_rows: 50
sleep_calls: 50
sleep_us: 150
profile_select_rows: 50
profile_select_bytes: 400
profile_insert_rows: 50
profile_insert_bytes: 800

View File

@ -45,7 +45,11 @@ SELECT
arraySort(tables) as tables,
arraySort(views) as views,
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us,
ProfileEvents['SelectedRows'] as profile_select_rows,
ProfileEvents['SelectedBytes'] as profile_select_bytes,
ProfileEvents['InsertedRows'] as profile_insert_rows,
ProfileEvents['InsertedBytes'] as profile_insert_bytes
FROM system.query_log
WHERE query like '-- INSERT 1%INSERT INTO table_a%'
AND current_database = currentDatabase()
@ -62,7 +66,11 @@ SELECT
read_rows,
written_rows,
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us,
ProfileEvents['SelectedRows'] as profile_select_rows,
ProfileEvents['SelectedBytes'] as profile_select_bytes,
ProfileEvents['InsertedRows'] as profile_insert_rows,
ProfileEvents['InsertedBytes'] as profile_insert_bytes
FROM system.query_views_log
WHERE initial_query_id =
(
@ -85,7 +93,11 @@ SELECT
arraySort(tables) as tables,
arraySort(views) as views,
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us,
ProfileEvents['SelectedRows'] as profile_select_rows,
ProfileEvents['SelectedBytes'] as profile_select_bytes,
ProfileEvents['InsertedRows'] as profile_insert_rows,
ProfileEvents['InsertedBytes'] as profile_insert_bytes
FROM system.query_log
WHERE query like '-- INSERT 2%INSERT INTO table_d%'
AND current_database = currentDatabase()
@ -102,7 +114,11 @@ SELECT
read_rows,
written_rows,
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us,
ProfileEvents['SelectedRows'] as profile_select_rows,
ProfileEvents['SelectedBytes'] as profile_select_bytes,
ProfileEvents['InsertedRows'] as profile_insert_rows,
ProfileEvents['InsertedBytes'] as profile_insert_bytes
FROM system.query_views_log
WHERE initial_query_id =
(

View File

@ -18,7 +18,7 @@ written_bytes: 4000000
select read_rows, read_bytes, written_rows, written_bytes from system.query_log where type = 'QueryFinish' and query_kind = 'Insert' and current_database = currentDatabase() format Vertical;
Row 1:
──────
read_rows: 1000000
read_bytes: 8000000
read_rows: 3000000
read_bytes: 16000000
written_rows: 3000000
written_bytes: 12000000

View File

@ -0,0 +1,2 @@
finished default_TEST02132KILL_QUERY1 default select (SELECT max(number) from system.numbers) + 1;
finished default_TEST02132KILL_QUERY2 default SELECT (SELECT number FROM system.numbers WHERE number = 1000000000000);

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# Ref: https://github.com/ClickHouse/ClickHouse/issues/1576
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function wait_for_query_to_start()
{
while [[ $($CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" -d "SELECT count() FROM system.processes WHERE query_id = '$1'") == 0 ]]; do sleep 0.1; done
}
QUERY_1_ID="${CLICKHOUSE_DATABASE}_TEST02132KILL_QUERY1"
(${CLICKHOUSE_CLIENT} --query_id="${QUERY_1_ID}" --query='select (SELECT max(number) from system.numbers) + 1;' 2>&1 | grep -q "Code: 394." || echo 'FAIL') &
wait_for_query_to_start "${QUERY_1_ID}"
${CLICKHOUSE_CLIENT} --query="KILL QUERY WHERE query_id='${QUERY_1_ID}' SYNC"
QUERY_2_ID="${CLICKHOUSE_DATABASE}_TEST02132KILL_QUERY2"
(${CLICKHOUSE_CLIENT} --query_id="${QUERY_2_ID}" --query='SELECT (SELECT number FROM system.numbers WHERE number = 1000000000000);' 2>&1 | grep -q "Code: 394." || echo 'FAIL') &
wait_for_query_to_start "${QUERY_2_ID}"
${CLICKHOUSE_CLIENT} --query="KILL QUERY WHERE query_id='${QUERY_2_ID}' SYNC"
wait

View File

@ -0,0 +1,6 @@
< X-ClickHouse-Progress: {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"100000"}
< X-ClickHouse-Progress: {"read_rows":"65505","read_bytes":"524040","written_rows":"0","written_bytes":"0","total_rows_to_read":"100000"}
< X-ClickHouse-Progress: {"read_rows":"131010","read_bytes":"1048080","written_rows":"0","written_bytes":"0","total_rows_to_read":"100000"}
< X-ClickHouse-Progress: {"read_rows":"131011","read_bytes":"1048081","written_rows":"0","written_bytes":"0","total_rows_to_read":"100000"}
< X-ClickHouse-Progress: {"read_rows":"131011","read_bytes":"1048081","written_rows":"0","written_bytes":"0","total_rows_to_read":"100000"}
< X-ClickHouse-Summary: {"read_rows":"131011","read_bytes":"1048081","written_rows":"0","written_bytes":"0","total_rows_to_read":"100000"}

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# Ref: https://github.com/ClickHouse/ClickHouse/issues/1576
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CURL -sS "${CLICKHOUSE_URL}&wait_end_of_query=1&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0" -d "SELECT (SELECT max(number), count(number) FROM numbers(100000));" -v 2>&1 | grep -E "X-ClickHouse-Summary|X-ClickHouse-Progress"

View File

@ -0,0 +1,50 @@
#1
{
"meta":
[
{
"name": "count()",
"type": "UInt64"
}
],
"data":
[
{
"count()": "100"
}
],
"rows": 1,
"rows_before_limit_at_least": 100,
"statistics":
{
"rows_read": 100,
"bytes_read": 800
}
}
#2
{
"meta":
[
{
"type": "Tuple(UInt64, UInt64)"
}
],
"data":
[
{
}
],
"rows": 1,
"statistics":
{
"rows_read": 131011,
"bytes_read": 1048081
}
}

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
# Ref: https://github.com/ClickHouse/ClickHouse/issues/1576
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo "#1"
${CLICKHOUSE_CLIENT} --query='SELECT count() FROM numbers(100) FORMAT JSON;' | grep -a -v "elapsed"
echo "#2"
${CLICKHOUSE_CLIENT} --query='SELECT (SELECT max(number), count(number) FROM numbers(100000) as n) FORMAT JSON;' | grep -a -v "elapsed" | grep -v "_subquery"

View File

@ -0,0 +1,9 @@
#02136_scalar_subquery_1 999
#02136_scalar_subquery_2 999 0
#02136_scalar_subquery_3 999 999
#02136_scalar_subquery_4 999
#02136_scalar_subquery_4 999
1001 SELECT \'#02136_scalar_subquery_1\', (SELECT max(number) FROM numbers(1000)) as n;
2001 SELECT \'#02136_scalar_subquery_2\', (SELECT max(number) FROM numbers(1000)) as n, (SELECT min(number) FROM numbers(1000)) as n2;
1001 SELECT \'#02136_scalar_subquery_3\', (SELECT max(number) FROM numbers(1000)) as n, (SELECT max(number) FROM numbers(1000)) as n2;
1002 SELECT \'#02136_scalar_subquery_4\', (SELECT max(number) FROM numbers(1000)) as n FROM system.numbers LIMIT 2;

View File

@ -0,0 +1,13 @@
SELECT '#02136_scalar_subquery_1', (SELECT max(number) FROM numbers(1000)) as n;
SELECT '#02136_scalar_subquery_2', (SELECT max(number) FROM numbers(1000)) as n, (SELECT min(number) FROM numbers(1000)) as n2;
SELECT '#02136_scalar_subquery_3', (SELECT max(number) FROM numbers(1000)) as n, (SELECT max(number) FROM numbers(1000)) as n2; -- Cached
SELECT '#02136_scalar_subquery_4', (SELECT max(number) FROM numbers(1000)) as n FROM system.numbers LIMIT 2; -- Cached
SYSTEM FLUSH LOGS;
SELECT read_rows, query FROM system.query_log
WHERE
event_date > yesterday()
AND type = 'QueryFinish'
AND current_database == currentDatabase()
AND query LIKE 'SELECT ''#02136_scalar_subquery_%'
ORDER BY query ASC;