From a687c41a8c695e516b9d6e4a47adc9313789b357 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 3 Apr 2018 21:37:35 +0300 Subject: [PATCH 01/34] mayBenefitFromIndexForIn returns true if at least one tuple element is in pk [#CLICKHOUSE-3680] --- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 5b5a6b87c8e..f27faee404d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -2195,12 +2195,11 @@ bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) con if (left_in_operand_tuple && left_in_operand_tuple->name == "tuple") { for (const auto & item : left_in_operand_tuple->arguments->children) - if (!isPrimaryKeyColumnPossiblyWrappedInFunctions(item)) - /// The tuple itself may be part of the primary key, so check that as a last resort. - return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand); + if (isPrimaryKeyColumnPossiblyWrappedInFunctions(item)) + return true; - /// tuple() is invalid but can still be found here since this method may be called before the arguments are validated. - return !left_in_operand_tuple->arguments->children.empty(); + /// The tuple itself may be part of the primary key, so check that as a last resort. + return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand); } else { From 33774e93faf96be09dd34c3939946eb325d526c2 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 3 Apr 2018 22:16:24 +0300 Subject: [PATCH 02/34] added aggregated_columns to mayBenefitFromIndexForIn [#CLICKHOUSE-3680] --- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 500e4cd775d..2b84a413778 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -1519,7 +1519,9 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block & } else { - ExpressionActionsPtr temp_actions = std::make_shared(source_columns, settings); + NamesAndTypesList temp_columns = source_columns; + temp_columns.insert(temp_columns.end(), aggregated_columns.begin(), aggregated_columns.end()); + ExpressionActionsPtr temp_actions = std::make_shared(temp_columns, settings); getRootActions(func->arguments->children.at(0), true, false, temp_actions); Block sample_block_with_calculated_columns = temp_actions->getSampleBlock(); From f7b39b8df27463b326693ce7d593aa876669d135 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 3 Apr 2018 22:30:39 +0300 Subject: [PATCH 03/34] added test [#CLICKHOUSE-3680] --- .../0_stateless/00612_pk_in_tuple.reference | 13 +++++++++++++ .../queries/0_stateless/00612_pk_in_tuple.sql | 16 ++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference create mode 100644 dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql diff --git a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference new file mode 100644 index 00000000000..de69348862a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference @@ -0,0 +1,13 @@ +all +1 [1] +2 [2] +key, arrayJoin(arr) in (1, 1) +1 1 +key, arrayJoin(arr) in ((1, 1), (2, 2)) +1 1 +2 2 +(key, left array join arr) in (1, 1) +1 +(key, left array join arr) in ((1, 1), (2, 2)) +1 +2 diff --git a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql new file mode 100644 index 00000000000..f6e34909cae --- /dev/null +++ b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql @@ -0,0 +1,16 @@ +create database if not exists test; +drop table if exists test.tab; +create table test.tab (key UInt64, arr Array(UInt64)) Engine = MergeTree order by key; +insert into test.tab values (1, [1]); +insert into test.tab values (2, [2]); +select 'all'; +select * from test.tab order by key; +select 'key, arrayJoin(arr) in (1, 1)'; +select key, arrayJoin(arr) as val from test.tab where (key, val) in (1, 1); +select 'key, arrayJoin(arr) in ((1, 1), (2, 2))'; +select key, arrayJoin(arr) as val from test.tab where (key, val) in ((1, 1), (2, 2)) order by key; +select '(key, left array join arr) in (1, 1)'; +select key from test.tab left array join arr as val where (key, val) in (1, 1); +select '(key, left array join arr) in ((1, 1), (2, 2))'; +select key from test.tab left array join arr as val where (key, val) in ((1, 1), (2, 2)) order by key; + From 23baae447d18493524077e7194b052220558a6c4 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Wed, 4 Apr 2018 00:16:58 +0300 Subject: [PATCH 04/34] Update MergeTreeData.cpp --- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index f27faee404d..19daa71d3e3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -2189,8 +2189,8 @@ bool MergeTreeData::isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const { - /// Make sure that the left side of the IN operator is part of the primary key. - /// If there is a tuple on the left side of the IN operator, each item of the tuple must be part of the primary key. + /// Make sure that the left side of the IN operator contain part of the primary key. + /// If there is a tuple on the left side of the IN operator, at least one item of the tuple must be part of the primary key (probably wrapped by a chain of some acceptable functions). const ASTFunction * left_in_operand_tuple = typeid_cast(left_in_operand.get()); if (left_in_operand_tuple && left_in_operand_tuple->name == "tuple") { From eb4171bc0e497717ad00206229f54d4aaa02b274 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 4 Apr 2018 17:36:28 +0300 Subject: [PATCH 05/34] added columns from array join to aggregated_columns [#CLICKHOUSE-3680] --- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 21 ++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 2b84a413778..7a87013e6b1 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -508,11 +508,14 @@ void ExpressionAnalyzer::analyzeAggregation() has_aggregation = true; ExpressionActionsPtr temp_actions = std::make_shared(source_columns, settings); + std::unordered_map aggregated_columns_map; if (select_query && select_query->array_join_expression_list()) { getRootActions(select_query->array_join_expression_list(), true, false, temp_actions); addMultipleArrayJoinAction(temp_actions); + for (const auto & name_and_type : temp_actions->getSampleBlock().getNamesAndTypesList()) + aggregated_columns_map.try_emplace(name_and_type.name, name_and_type.type); } if (select_query) @@ -536,6 +539,7 @@ void ExpressionAnalyzer::analyzeAggregation() /// Find out aggregation keys. if (select_query->group_expression_list) { + bool added_aggregation = false; NameSet unique_keys; ASTs & group_asts = select_query->group_expression_list->children; for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i) @@ -573,30 +577,31 @@ void ExpressionAnalyzer::analyzeAggregation() if (!unique_keys.count(key.name)) { unique_keys.insert(key.name); - aggregation_keys.push_back(key); + + aggregated_columns_map.try_emplace(key.name, key.type); + added_aggregation = true; /// Key is no longer needed, therefore we can save a little by moving it. - aggregated_columns.push_back(std::move(key)); + aggregation_keys.push_back(std::move(key)); } } if (group_asts.empty()) { select_query->group_expression_list = nullptr; - has_aggregation = select_query->having_expression || aggregate_descriptions.size(); + has_aggregation = select_query->having_expression || added_aggregation; } } for (size_t i = 0; i < aggregate_descriptions.size(); ++i) { AggregateDescription & desc = aggregate_descriptions[i]; - aggregated_columns.emplace_back(desc.column_name, desc.function->getReturnType()); + aggregated_columns_map.try_emplace(desc.column_name, desc.function->getReturnType()); } } - else - { - aggregated_columns = temp_actions->getSampleBlock().getNamesAndTypesList(); - } + + for (const auto & pair : aggregated_columns_map) + aggregated_columns.emplace_back(pair.first, pair.second); } From 3d19b0e5d31bb7d7943d513f5c2be077afa55c89 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Wed, 4 Apr 2018 21:04:20 +0300 Subject: [PATCH 06/34] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fdb7f5303b1..1d3f9812214 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -# ClickHouse 1.1.54370 Release Candidate, 2018-03-16 +# ClickHouse release 1.1.54370, 2018-03-16 ## New features: From 49a481ed0bc090a1ee43622b1f6f46a8c61b346c Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Wed, 4 Apr 2018 21:04:42 +0300 Subject: [PATCH 07/34] Update CHANGELOG_RU.md --- CHANGELOG_RU.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG_RU.md b/CHANGELOG_RU.md index 4e0b481e718..56d4bcd1cb7 100644 --- a/CHANGELOG_RU.md +++ b/CHANGELOG_RU.md @@ -1,4 +1,4 @@ -# ClickHouse 1.1.54370 Release Candidate, 2018-03-16 +# ClickHouse release 1.1.54370, 2018-03-16 ## Новые возможности: From 248fe37cb8e2cda7b97fa85d459dcd14174da677 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 4 Apr 2018 21:56:30 +0300 Subject: [PATCH 08/34] added array_join_columns [#CLICKHOUSE-3680] --- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 24 ++++++++------------ dbms/src/Interpreters/ExpressionAnalyzer.h | 2 ++ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 7a87013e6b1..8705a1a5b4c 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -508,14 +508,12 @@ void ExpressionAnalyzer::analyzeAggregation() has_aggregation = true; ExpressionActionsPtr temp_actions = std::make_shared(source_columns, settings); - std::unordered_map aggregated_columns_map; if (select_query && select_query->array_join_expression_list()) { getRootActions(select_query->array_join_expression_list(), true, false, temp_actions); addMultipleArrayJoinAction(temp_actions); - for (const auto & name_and_type : temp_actions->getSampleBlock().getNamesAndTypesList()) - aggregated_columns_map.try_emplace(name_and_type.name, name_and_type.type); + array_join_columns = temp_actions->getSampleBlock().getNamesAndTypesList(); } if (select_query) @@ -539,7 +537,6 @@ void ExpressionAnalyzer::analyzeAggregation() /// Find out aggregation keys. if (select_query->group_expression_list) { - bool added_aggregation = false; NameSet unique_keys; ASTs & group_asts = select_query->group_expression_list->children; for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i) @@ -577,31 +574,30 @@ void ExpressionAnalyzer::analyzeAggregation() if (!unique_keys.count(key.name)) { unique_keys.insert(key.name); - - aggregated_columns_map.try_emplace(key.name, key.type); - added_aggregation = true; + aggregation_keys.push_back(key); /// Key is no longer needed, therefore we can save a little by moving it. - aggregation_keys.push_back(std::move(key)); + aggregated_columns.push_back(std::move(key)); } } if (group_asts.empty()) { select_query->group_expression_list = nullptr; - has_aggregation = select_query->having_expression || added_aggregation; + has_aggregation = select_query->having_expression || aggregate_descriptions.size(); } } for (size_t i = 0; i < aggregate_descriptions.size(); ++i) { AggregateDescription & desc = aggregate_descriptions[i]; - aggregated_columns_map.try_emplace(desc.column_name, desc.function->getReturnType()); + aggregated_columns.emplace_back(desc.column_name, desc.function->getReturnType()); } } - - for (const auto & pair : aggregated_columns_map) - aggregated_columns.emplace_back(pair.first, pair.second); + else + { + aggregated_columns = temp_actions->getSampleBlock().getNamesAndTypesList(); + } } @@ -1525,7 +1521,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block & else { NamesAndTypesList temp_columns = source_columns; - temp_columns.insert(temp_columns.end(), aggregated_columns.begin(), aggregated_columns.end()); + temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end()); ExpressionActionsPtr temp_actions = std::make_shared(temp_columns, settings); getRootActions(func->arguments->children.at(0), true, false, temp_actions); diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index 3422506c144..e50ae568ad0 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -159,6 +159,8 @@ private: /// Columns after ARRAY JOIN, JOIN, and/or aggregation. NamesAndTypesList aggregated_columns; + NamesAndTypesList array_join_columns; + /// The main table in FROM clause, if exists. StoragePtr storage; From dddbe14b4bbee288a3dde797bf24d7fd26d629f5 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 4 Apr 2018 22:26:14 +0300 Subject: [PATCH 09/34] added columns_added_by_join into in with pk [#CLICKHOUSE-3680] --- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 8705a1a5b4c..757aeecffa0 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -1522,6 +1522,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block & { NamesAndTypesList temp_columns = source_columns; temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end()); + temp_columns.insert(temp_columns.end(), columns_added_by_join.begin(), columns_added_by_join.end()); ExpressionActionsPtr temp_actions = std::make_shared(temp_columns, settings); getRootActions(func->arguments->children.at(0), true, false, temp_actions); From 1ad3010c4a3e4d79c6c7cfeb47e73d77a5e289f7 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 4 Apr 2018 22:36:22 +0300 Subject: [PATCH 10/34] updated test [#CLICKHOUSE-3680] --- .../0_stateless/00612_pk_in_tuple.reference | 23 ++++++++++++++++++ .../queries/0_stateless/00612_pk_in_tuple.sql | 24 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference index de69348862a..0d430f5263d 100644 --- a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference +++ b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference @@ -11,3 +11,26 @@ key, arrayJoin(arr) in ((1, 1), (2, 2)) (key, left array join arr) in ((1, 1), (2, 2)) 1 2 +all +1 [1] +2 [2] +key, arrayJoin(n.x) in (1, 1) +1 1 +key, arrayJoin(n.x) in ((1, 1), (2, 2)) +1 1 +2 2 +(key, left array join n.x) in (1, 1) +1 +(key, left array join n.x) in ((1, 1), (2, 2)) +1 +2 +max(key) from tab where (key, left array join n.x) in (1, 1) +1 +1 +max(key) from tab where (key, left array join n.x) in ((1, 1), (2, 2)) +2 +2 +max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in (1, 1) +1 +max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2)) +2 diff --git a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql index f6e34909cae..3783bcb3f2f 100644 --- a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql +++ b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql @@ -14,3 +14,27 @@ select key from test.tab left array join arr as val where (key, val) in (1, 1); select '(key, left array join arr) in ((1, 1), (2, 2))'; select key from test.tab left array join arr as val where (key, val) in ((1, 1), (2, 2)) order by key; +drop table if exists test.tab; +create table test.tab (key UInt64, n Nested(x UInt64)) Engine = MergeTree order by key; +insert into test.tab values (1, [1]); +insert into test.tab values (2, [2]); +select 'all'; +select * from test.tab order by key; +select 'key, arrayJoin(n.x) in (1, 1)'; +select key, arrayJoin(n.x) as val from test.tab where (key, val) in (1, 1); +select 'key, arrayJoin(n.x) in ((1, 1), (2, 2))'; +select key, arrayJoin(n.x) as val from test.tab where (key, val) in ((1, 1), (2, 2)) order by key; +select '(key, left array join n.x) in (1, 1)'; +select key from test.tab left array join n.x as val where (key, val) in (1, 1); +select '(key, left array join n.x) in ((1, 1), (2, 2))'; +select key from test.tab left array join n.x as val where (key, val) in ((1, 1), (2, 2)) order by key; +select 'max(key) from tab where (key, left array join n.x) in (1, 1)'; +select max(key) from test.tab left array join `n.x` as val where (key, val) in ((1, 1)); +select max(key) from test.tab left array join n as val where (key, val.x) in (1, 1); +select 'max(key) from tab where (key, left array join n.x) in ((1, 1), (2, 2))'; +select max(key) from test.tab left array join `n.x` as val where (key, val) in ((1, 1), (2, 2)); +select max(key) from test.tab left array join n as val where (key, val.x) in ((1, 1), (2, 2)); +select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in (1, 1)'; +select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in (1, 1); +select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2))'; +select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in ((1, 1), (2, 2)); From 7d0602c279a050a2c38aa28c96771e8ef1c550cd Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 4 Apr 2018 17:36:28 +0300 Subject: [PATCH 11/34] added columns from array join to aggregated_columns [#CLICKHOUSE-3680] --- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 21 ++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 2b84a413778..7a87013e6b1 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -508,11 +508,14 @@ void ExpressionAnalyzer::analyzeAggregation() has_aggregation = true; ExpressionActionsPtr temp_actions = std::make_shared(source_columns, settings); + std::unordered_map aggregated_columns_map; if (select_query && select_query->array_join_expression_list()) { getRootActions(select_query->array_join_expression_list(), true, false, temp_actions); addMultipleArrayJoinAction(temp_actions); + for (const auto & name_and_type : temp_actions->getSampleBlock().getNamesAndTypesList()) + aggregated_columns_map.try_emplace(name_and_type.name, name_and_type.type); } if (select_query) @@ -536,6 +539,7 @@ void ExpressionAnalyzer::analyzeAggregation() /// Find out aggregation keys. if (select_query->group_expression_list) { + bool added_aggregation = false; NameSet unique_keys; ASTs & group_asts = select_query->group_expression_list->children; for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i) @@ -573,30 +577,31 @@ void ExpressionAnalyzer::analyzeAggregation() if (!unique_keys.count(key.name)) { unique_keys.insert(key.name); - aggregation_keys.push_back(key); + + aggregated_columns_map.try_emplace(key.name, key.type); + added_aggregation = true; /// Key is no longer needed, therefore we can save a little by moving it. - aggregated_columns.push_back(std::move(key)); + aggregation_keys.push_back(std::move(key)); } } if (group_asts.empty()) { select_query->group_expression_list = nullptr; - has_aggregation = select_query->having_expression || aggregate_descriptions.size(); + has_aggregation = select_query->having_expression || added_aggregation; } } for (size_t i = 0; i < aggregate_descriptions.size(); ++i) { AggregateDescription & desc = aggregate_descriptions[i]; - aggregated_columns.emplace_back(desc.column_name, desc.function->getReturnType()); + aggregated_columns_map.try_emplace(desc.column_name, desc.function->getReturnType()); } } - else - { - aggregated_columns = temp_actions->getSampleBlock().getNamesAndTypesList(); - } + + for (const auto & pair : aggregated_columns_map) + aggregated_columns.emplace_back(pair.first, pair.second); } From fde23f797525e17e397eb97a1604f92b4692030c Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 4 Apr 2018 21:56:30 +0300 Subject: [PATCH 12/34] added array_join_columns [#CLICKHOUSE-3680] --- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 24 ++++++++------------ dbms/src/Interpreters/ExpressionAnalyzer.h | 2 ++ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 7a87013e6b1..8705a1a5b4c 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -508,14 +508,12 @@ void ExpressionAnalyzer::analyzeAggregation() has_aggregation = true; ExpressionActionsPtr temp_actions = std::make_shared(source_columns, settings); - std::unordered_map aggregated_columns_map; if (select_query && select_query->array_join_expression_list()) { getRootActions(select_query->array_join_expression_list(), true, false, temp_actions); addMultipleArrayJoinAction(temp_actions); - for (const auto & name_and_type : temp_actions->getSampleBlock().getNamesAndTypesList()) - aggregated_columns_map.try_emplace(name_and_type.name, name_and_type.type); + array_join_columns = temp_actions->getSampleBlock().getNamesAndTypesList(); } if (select_query) @@ -539,7 +537,6 @@ void ExpressionAnalyzer::analyzeAggregation() /// Find out aggregation keys. if (select_query->group_expression_list) { - bool added_aggregation = false; NameSet unique_keys; ASTs & group_asts = select_query->group_expression_list->children; for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i) @@ -577,31 +574,30 @@ void ExpressionAnalyzer::analyzeAggregation() if (!unique_keys.count(key.name)) { unique_keys.insert(key.name); - - aggregated_columns_map.try_emplace(key.name, key.type); - added_aggregation = true; + aggregation_keys.push_back(key); /// Key is no longer needed, therefore we can save a little by moving it. - aggregation_keys.push_back(std::move(key)); + aggregated_columns.push_back(std::move(key)); } } if (group_asts.empty()) { select_query->group_expression_list = nullptr; - has_aggregation = select_query->having_expression || added_aggregation; + has_aggregation = select_query->having_expression || aggregate_descriptions.size(); } } for (size_t i = 0; i < aggregate_descriptions.size(); ++i) { AggregateDescription & desc = aggregate_descriptions[i]; - aggregated_columns_map.try_emplace(desc.column_name, desc.function->getReturnType()); + aggregated_columns.emplace_back(desc.column_name, desc.function->getReturnType()); } } - - for (const auto & pair : aggregated_columns_map) - aggregated_columns.emplace_back(pair.first, pair.second); + else + { + aggregated_columns = temp_actions->getSampleBlock().getNamesAndTypesList(); + } } @@ -1525,7 +1521,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block & else { NamesAndTypesList temp_columns = source_columns; - temp_columns.insert(temp_columns.end(), aggregated_columns.begin(), aggregated_columns.end()); + temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end()); ExpressionActionsPtr temp_actions = std::make_shared(temp_columns, settings); getRootActions(func->arguments->children.at(0), true, false, temp_actions); diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index 3422506c144..e50ae568ad0 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -159,6 +159,8 @@ private: /// Columns after ARRAY JOIN, JOIN, and/or aggregation. NamesAndTypesList aggregated_columns; + NamesAndTypesList array_join_columns; + /// The main table in FROM clause, if exists. StoragePtr storage; From dc22b881a778c2378039e551b2a7c02f0c516942 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 4 Apr 2018 22:26:14 +0300 Subject: [PATCH 13/34] added columns_added_by_join into in with pk [#CLICKHOUSE-3680] --- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 8705a1a5b4c..757aeecffa0 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -1522,6 +1522,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block & { NamesAndTypesList temp_columns = source_columns; temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end()); + temp_columns.insert(temp_columns.end(), columns_added_by_join.begin(), columns_added_by_join.end()); ExpressionActionsPtr temp_actions = std::make_shared(temp_columns, settings); getRootActions(func->arguments->children.at(0), true, false, temp_actions); From f5ec675f4a971581c6c719b1ef33a1f86d84abe0 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 4 Apr 2018 22:36:22 +0300 Subject: [PATCH 14/34] updated test [#CLICKHOUSE-3680] --- .../0_stateless/00612_pk_in_tuple.reference | 23 ++++++++++++++++++ .../queries/0_stateless/00612_pk_in_tuple.sql | 24 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference index de69348862a..0d430f5263d 100644 --- a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference +++ b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference @@ -11,3 +11,26 @@ key, arrayJoin(arr) in ((1, 1), (2, 2)) (key, left array join arr) in ((1, 1), (2, 2)) 1 2 +all +1 [1] +2 [2] +key, arrayJoin(n.x) in (1, 1) +1 1 +key, arrayJoin(n.x) in ((1, 1), (2, 2)) +1 1 +2 2 +(key, left array join n.x) in (1, 1) +1 +(key, left array join n.x) in ((1, 1), (2, 2)) +1 +2 +max(key) from tab where (key, left array join n.x) in (1, 1) +1 +1 +max(key) from tab where (key, left array join n.x) in ((1, 1), (2, 2)) +2 +2 +max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in (1, 1) +1 +max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2)) +2 diff --git a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql index f6e34909cae..3783bcb3f2f 100644 --- a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql +++ b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql @@ -14,3 +14,27 @@ select key from test.tab left array join arr as val where (key, val) in (1, 1); select '(key, left array join arr) in ((1, 1), (2, 2))'; select key from test.tab left array join arr as val where (key, val) in ((1, 1), (2, 2)) order by key; +drop table if exists test.tab; +create table test.tab (key UInt64, n Nested(x UInt64)) Engine = MergeTree order by key; +insert into test.tab values (1, [1]); +insert into test.tab values (2, [2]); +select 'all'; +select * from test.tab order by key; +select 'key, arrayJoin(n.x) in (1, 1)'; +select key, arrayJoin(n.x) as val from test.tab where (key, val) in (1, 1); +select 'key, arrayJoin(n.x) in ((1, 1), (2, 2))'; +select key, arrayJoin(n.x) as val from test.tab where (key, val) in ((1, 1), (2, 2)) order by key; +select '(key, left array join n.x) in (1, 1)'; +select key from test.tab left array join n.x as val where (key, val) in (1, 1); +select '(key, left array join n.x) in ((1, 1), (2, 2))'; +select key from test.tab left array join n.x as val where (key, val) in ((1, 1), (2, 2)) order by key; +select 'max(key) from tab where (key, left array join n.x) in (1, 1)'; +select max(key) from test.tab left array join `n.x` as val where (key, val) in ((1, 1)); +select max(key) from test.tab left array join n as val where (key, val.x) in (1, 1); +select 'max(key) from tab where (key, left array join n.x) in ((1, 1), (2, 2))'; +select max(key) from test.tab left array join `n.x` as val where (key, val) in ((1, 1), (2, 2)); +select max(key) from test.tab left array join n as val where (key, val.x) in ((1, 1), (2, 2)); +select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in (1, 1)'; +select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in (1, 1); +select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2))'; +select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in ((1, 1), (2, 2)); From ce8de108ed45dd0abe827ae640380e08ebcef611 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Wed, 4 Apr 2018 15:11:38 +0300 Subject: [PATCH 15/34] Better timeouts handling. [#CLICKHOUSE-2] --- dbms/CMakeLists.txt | 2 ++ dbms/src/Client/Connection.cpp | 4 +-- dbms/src/Client/TimeoutSetter.h | 11 ++++--- .../DataStreams/RemoteBlockOutputStream.cpp | 22 ++++++++++++- dbms/src/Server/TCPHandler.cpp | 31 +++++++++++++++---- 5 files changed, 56 insertions(+), 14 deletions(-) diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index e45a53427cb..8c823db478d 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -167,6 +167,8 @@ if (Poco_SQLODBC_FOUND) target_link_libraries (dbms ${Poco_SQLODBC_LIBRARY} ${Poco_SQL_LIBRARY}) target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQLODBC_INCLUDE_DIRS} PUBLIC ${Poco_SQL_INCLUDE_DIRS}) endif() +target_include_directories (clickhouse_common_io PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include) +target_include_directories (dbms PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include) if (Poco_DataODBC_FOUND) target_link_libraries (clickhouse_common_io ${Poco_Data_LIBRARY}) diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 42cdfd59ffc..8c6d1d0e583 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -234,7 +234,7 @@ bool Connection::ping() { // LOG_TRACE(log_wrapper.get(), "Ping"); - TimeoutSetter timeout_setter(*socket, sync_request_timeout); + TimeoutSetter timeout_setter(*socket, sync_request_timeout, true); try { UInt64 pong = 0; @@ -274,7 +274,7 @@ TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & req if (!connected) connect(); - TimeoutSetter timeout_setter(*socket, sync_request_timeout); + TimeoutSetter timeout_setter(*socket, sync_request_timeout, true); writeVarUInt(Protocol::Client::TablesStatusRequest, *out); request.write(*out, server_revision); diff --git a/dbms/src/Client/TimeoutSetter.h b/dbms/src/Client/TimeoutSetter.h index 0e908c1bdac..693ba88e5f3 100644 --- a/dbms/src/Client/TimeoutSetter.h +++ b/dbms/src/Client/TimeoutSetter.h @@ -11,21 +11,22 @@ namespace DB /// Timeouts could be only decreased struct TimeoutSetter { - TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_) + TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_, + bool limit_max_timeout = false) : socket(socket_), send_timeout(send_timeout_), recieve_timeout(recieve_timeout_) { old_send_timeout = socket.getSendTimeout(); old_receive_timeout = socket.getReceiveTimeout(); - if (old_send_timeout > send_timeout) + if (!limit_max_timeout || old_send_timeout > send_timeout) socket.setSendTimeout(send_timeout); - if (old_receive_timeout > recieve_timeout) + if (!limit_max_timeout || old_receive_timeout > recieve_timeout) socket.setReceiveTimeout(recieve_timeout); } - TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_) - : TimeoutSetter(socket_, timeout_, timeout_) {} + TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_, bool limit_max_timeout = false) + : TimeoutSetter(socket_, timeout_, timeout_, limit_max_timeout) {} ~TimeoutSetter() { diff --git a/dbms/src/DataStreams/RemoteBlockOutputStream.cpp b/dbms/src/DataStreams/RemoteBlockOutputStream.cpp index 7464e94e4be..51fb62ef5a1 100644 --- a/dbms/src/DataStreams/RemoteBlockOutputStream.cpp +++ b/dbms/src/DataStreams/RemoteBlockOutputStream.cpp @@ -47,7 +47,27 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const void RemoteBlockOutputStream::write(const Block & block) { assertBlocksHaveEqualStructure(block, header, "RemoteBlockOutputStream"); - connection.sendData(block); + + try + { + connection.sendData(block); + } + catch (const NetException & e) + { + /// Try to get more detailed exception from server + if (connection.poll(0)) + { + Connection::Packet packet = connection.receivePacket(); + + if (Protocol::Server::Exception == packet.type) + { + packet.exception->rethrow(); + return; + } + } + + throw; + } } diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 72caa5ffe53..53ca6c8699f 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -134,6 +134,7 @@ void TCPHandler::runImpl() * The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet. */ std::unique_ptr exception; + bool network_error = false; try { @@ -183,6 +184,10 @@ void TCPHandler::runImpl() if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT) throw; + + /// If a timeout occurred, try to inform client about it and close the session + if (e.code() == ErrorCodes::SOCKET_TIMEOUT) + network_error = true; } catch (const Poco::Net::NetException & e) { @@ -211,8 +216,6 @@ void TCPHandler::runImpl() exception = std::make_unique("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION); } - bool network_error = false; - try { if (exception) @@ -251,6 +254,14 @@ void TCPHandler::runImpl() void TCPHandler::readData(const Settings & global_settings) { + auto receive_timeout = query_context.getSettingsRef().receive_timeout.value; + + /// Poll interval should not be greater than receive_timeout + size_t default_poll_interval = global_settings.poll_interval.value * 1000000; + size_t current_poll_interval = static_cast(receive_timeout.totalMicroseconds()); + constexpr size_t min_poll_interval = 5000; // 5 ms + size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval)); + while (1) { Stopwatch watch(CLOCK_MONOTONIC_COARSE); @@ -258,7 +269,7 @@ void TCPHandler::readData(const Settings & global_settings) /// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down. while (1) { - if (static_cast(*in).poll(global_settings.poll_interval * 1000000)) + if (static_cast(*in).poll(poll_interval)) break; /// Do we need to shut down? @@ -269,8 +280,16 @@ void TCPHandler::readData(const Settings & global_settings) * If we periodically poll, the receive_timeout of the socket itself does not work. * Therefore, an additional check is added. */ - if (watch.elapsedSeconds() > global_settings.receive_timeout.totalSeconds()) - throw Exception("Timeout exceeded while receiving data from client", ErrorCodes::SOCKET_TIMEOUT); + double elapsed = watch.elapsedSeconds(); + if (elapsed > receive_timeout.totalSeconds()) + { + std::stringstream ss; + ss << "Timeout exceeded while receiving data from client."; + ss << " Waited for " << static_cast(elapsed) << " seconds,"; + ss << " timeout is " << receive_timeout.totalSeconds() << " seconds."; + + throw Exception(ss.str(), ErrorCodes::SOCKET_TIMEOUT); + } } /// If client disconnected. @@ -560,7 +579,7 @@ bool TCPHandler::receivePacket() return false; default: - throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT); + throw Exception("Unknown packet " + toString(packet_type) + " from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT); } } From a7a924c03c2a7f347209d796c608410b321cbd39 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Wed, 4 Apr 2018 22:44:28 +0300 Subject: [PATCH 16/34] Update TimeoutSetter.h --- dbms/src/Client/TimeoutSetter.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Client/TimeoutSetter.h b/dbms/src/Client/TimeoutSetter.h index 693ba88e5f3..30ce28e889c 100644 --- a/dbms/src/Client/TimeoutSetter.h +++ b/dbms/src/Client/TimeoutSetter.h @@ -8,7 +8,7 @@ namespace DB { /// Temporarily overrides socket send/recieve timeouts and reset them back into destructor -/// Timeouts could be only decreased +/// If "limit_max_timeout" is true, timeouts could be only decreased (maxed by previous value). struct TimeoutSetter { TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_, From b96039bafe002ec11501c7ca3532510d6ffe4fa9 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 4 Apr 2018 23:37:28 +0300 Subject: [PATCH 17/34] added check for partition column in mayBenefitFromIndexForIn #2170 --- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 13 ++++++++----- dbms/src/Storages/MergeTree/MergeTreeData.h | 2 +- dbms/src/Storages/MergeTree/PKCondition.cpp | 4 ++++ 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 19daa71d3e3..fd78090c9ec 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -2172,7 +2172,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit() return total_covered_parts; } -bool MergeTreeData::isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const +bool MergeTreeData::isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const { String column_name = node->getColumnName(); @@ -2180,9 +2180,12 @@ bool MergeTreeData::isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & if (column_name == column.column_name) return true; + if (partition_expr_ast && partition_expr_ast->children.at(0)->getColumnName() == column_name) + return true; + if (const ASTFunction * func = typeid_cast(node.get())) if (func->arguments->children.size() == 1) - return isPrimaryKeyColumnPossiblyWrappedInFunctions(func->arguments->children.front()); + return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(func->arguments->children.front()); return false; } @@ -2195,15 +2198,15 @@ bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) con if (left_in_operand_tuple && left_in_operand_tuple->name == "tuple") { for (const auto & item : left_in_operand_tuple->arguments->children) - if (isPrimaryKeyColumnPossiblyWrappedInFunctions(item)) + if (isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(item)) return true; /// The tuple itself may be part of the primary key, so check that as a last resort. - return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand); + return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(left_in_operand); } else { - return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand); + return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(left_in_operand); } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index f412419459c..d0b47b095d3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -652,7 +652,7 @@ private: std::lock_guard & data_parts_lock) const; /// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument. - bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr &node) const; + bool isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const; }; } diff --git a/dbms/src/Storages/MergeTree/PKCondition.cpp b/dbms/src/Storages/MergeTree/PKCondition.cpp index 1d80ea38a87..11bffdace85 100644 --- a/dbms/src/Storages/MergeTree/PKCondition.cpp +++ b/dbms/src/Storages/MergeTree/PKCondition.cpp @@ -1006,6 +1006,10 @@ bool PKCondition::mayBeTrueInRangeImpl(const std::vector & key_ranges, co rpn_stack.back() = !rpn_stack.back(); } } + else + { + throw Exception("Set for IN is not created yet!", ErrorCodes::LOGICAL_ERROR); + } } else if (element.function == RPNElement::FUNCTION_NOT) { From 8f4b54d65bc28170eda5fdd1782923f52e4e14eb Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 5 Apr 2018 00:02:53 +0300 Subject: [PATCH 18/34] updated test #2170 --- dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference | 1 + dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference index 0d430f5263d..351e10ca3ff 100644 --- a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference +++ b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference @@ -34,3 +34,4 @@ max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) usi 1 max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2)) 2 +1 diff --git a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql index 3783bcb3f2f..54a38c5fcca 100644 --- a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql +++ b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql @@ -38,3 +38,8 @@ select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in (1, 1); select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2))'; select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in ((1, 1), (2, 2)); + +drop table if exists test.tab; +CREATE TABLE test.tab (key1 Int32, id1 Int64, c1 Int64) ENGINE = MergeTree PARTITION BY id1 ORDER BY (key1) ; +insert into test.tab values ( -1, 1, 0 ); +SELECT count(*) FROM test.tab PREWHERE id1 IN (1); From 669b0d4bb0237e55e4ad097647084dc992f94fe4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 00:05:11 +0300 Subject: [PATCH 19/34] Fixed highlight for UNION ALL [#CLICKHOUSE-3689] --- dbms/src/Parsers/ASTSelectWithUnionQuery.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbms/src/Parsers/ASTSelectWithUnionQuery.cpp b/dbms/src/Parsers/ASTSelectWithUnionQuery.cpp index 041f9bce8d5..82b7dcf2cd5 100644 --- a/dbms/src/Parsers/ASTSelectWithUnionQuery.cpp +++ b/dbms/src/Parsers/ASTSelectWithUnionQuery.cpp @@ -26,7 +26,10 @@ void ASTSelectWithUnionQuery::formatQueryImpl(const FormatSettings & settings, F for (ASTs::const_iterator it = list_of_selects->children.begin(); it != list_of_selects->children.end(); ++it) { if (it != list_of_selects->children.begin()) - settings.ostr << settings.nl_or_ws << indent_str << hilite_keyword << "UNION ALL" << hilite_none << settings.nl_or_ws; + settings.ostr + << settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "") + << "UNION ALL" << (settings.hilite ? hilite_keyword : "") + << settings.nl_or_ws; (*it)->formatImpl(settings, state, frame); } From ddc4e1e3880eec31fd81316dfc2bd7440cb8dc8a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 02:33:21 +0300 Subject: [PATCH 20/34] ZooKeeper: Better connection loop [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeper.cpp | 3 +++ dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 24 ++++++++++++--------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp index 800bbc04a73..65e9f2d5084 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp @@ -49,6 +49,9 @@ void ZooKeeper::init(const std::string & hosts_, const std::string & identity_, session_timeout_ms = session_timeout_ms_; chroot = chroot_; + if (hosts.empty()) + throw KeeperException("No addresses passed to ZooKeeper constructor.", ZooKeeperImpl::ZooKeeper::ZBADARGUMENTS); + std::vector addresses_strings; boost::split(addresses_strings, hosts, boost::is_any_of(",")); ZooKeeperImpl::ZooKeeper::Addresses addresses; diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 0895e58c827..6e1b4016d93 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -532,9 +532,6 @@ ZooKeeper::ZooKeeper( connect(addresses, connection_timeout); - sendHandshake(); - receiveHandshake(); - if (!auth_scheme.empty()) sendAuth(auth_scheme, auth_data); @@ -549,6 +546,9 @@ void ZooKeeper::connect( const Addresses & addresses, Poco::Timespan connection_timeout) { + if (addresses.empty()) + throw Exception("No addresses passed to ZooKeeperImpl constructor", ZBADARGUMENTS); + static constexpr size_t num_tries = 3; bool connected = false; @@ -560,6 +560,17 @@ void ZooKeeper::connect( try { socket.connect(address, connection_timeout); + + socket.setReceiveTimeout(operation_timeout); + socket.setSendTimeout(operation_timeout); + socket.setNoDelay(true); + + in.emplace(socket); + out.emplace(socket); + + sendHandshake(); + receiveHandshake(); + connected = true; break; } @@ -594,13 +605,6 @@ void ZooKeeper::connect( out << fail_reasons.str(); throw Exception(out.str(), ZCONNECTIONLOSS); } - - socket.setReceiveTimeout(operation_timeout); - socket.setSendTimeout(operation_timeout); - socket.setNoDelay(true); - - in.emplace(socket); - out.emplace(socket); } From 5a525605bedb9015e5c22cbc749ce44e99a677e9 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 02:36:58 +0300 Subject: [PATCH 21/34] ZooKeeper: thread names [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 5 +++++ dbms/src/Server/config.d/zookeeper.xml | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 6e1b4016d93..54096511bca 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -689,6 +690,8 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data) void ZooKeeper::sendThread() { + setThreadName("ZooKeeperSend"); + auto prev_heartbeat_time = clock::now(); try @@ -744,6 +747,8 @@ void ZooKeeper::sendThread() void ZooKeeper::receiveThread() { + setThreadName("ZooKeeperRecv"); + try { Int64 waited = 0; diff --git a/dbms/src/Server/config.d/zookeeper.xml b/dbms/src/Server/config.d/zookeeper.xml index d390a935107..095f4be78c1 100644 --- a/dbms/src/Server/config.d/zookeeper.xml +++ b/dbms/src/Server/config.d/zookeeper.xml @@ -4,5 +4,13 @@ localhost 2181 + + yandex.ru + 2181 + + + 111.0.1.2 + 2181 + From e4fe1ef4b3ce31af8f261c8b18aa01259880059a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 02:43:11 +0300 Subject: [PATCH 22/34] ZooKeeper: Better connection loop [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 54096511bca..9d420162f8b 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -560,6 +560,7 @@ void ZooKeeper::connect( { try { + socket = Poco::Net::StreamSocket(); /// Reset the state of previous attempt. socket.connect(address, connection_timeout); socket.setReceiveTimeout(operation_timeout); @@ -577,7 +578,7 @@ void ZooKeeper::connect( } catch (const Poco::Net::NetException & e) { - fail_reasons << "\n" << getCurrentExceptionMessage(false); + fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << address.toString(); } catch (const Poco::TimeoutException & e) { @@ -603,7 +604,7 @@ void ZooKeeper::connect( out << address.toString(); } - out << fail_reasons.str(); + out << fail_reasons.str() << "\n"; throw Exception(out.str(), ZCONNECTIONLOSS); } } From 8faea5437ae337f7cf23cae3f237586e8f7ec978 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 02:59:37 +0300 Subject: [PATCH 23/34] ZooKeeper: Better metrics about active sessions [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 2 ++ dbms/src/Common/ZooKeeper/ZooKeeperImpl.h | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 9d420162f8b..b523f01690a 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -980,6 +980,8 @@ void ZooKeeper::finalize(bool error_send, bool error_receive) bool expired_prev = false; if (expired.compare_exchange_strong(expired_prev, true)) { + active_session_metric_increment.destroy(); + try { if (!error_send) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index b3d8a057c49..6368e2ed99d 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -571,7 +571,7 @@ private: template void read(T &); - CurrentMetrics::Increment metric_increment{CurrentMetrics::ZooKeeperSession}; + CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession}; }; }; From b8df381a9703eaf07c48d9a5437e19961edfba8b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 03:02:09 +0300 Subject: [PATCH 24/34] ZooKeeper: changed operation timeout to 10s because some users run ZooKeeper on servers with HDD and high background load [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeper.cpp | 2 +- dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp index 65e9f2d5084..ef0f6109b52 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp @@ -12,7 +12,7 @@ #include #define ZOOKEEPER_CONNECTION_TIMEOUT_MS 1000 -#define ZOOKEEPER_OPERATION_TIMEOUT_MS 1000 +#define ZOOKEEPER_OPERATION_TIMEOUT_MS 10000 namespace DB diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index b523f01690a..079df186c17 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -506,7 +506,7 @@ ZooKeeper::ZooKeeper( Poco::Timespan operation_timeout) : root_path(root_path_), session_timeout(session_timeout), - operation_timeout(operation_timeout) + operation_timeout(std::min(operation_timeout, session_timeout)) { if (!root_path.empty()) { From 0c519c763b61fdba4837aea22799d2fa4786f0aa Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 03:44:58 +0300 Subject: [PATCH 25/34] ZooKeeper: added comment [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeperImpl.h | 46 +++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index 6368e2ed99d..f519e9e232d 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -24,6 +24,52 @@ #include +/** ZooKeeper C++ library, a replacement for libzookeeper. + * + * Motivation. + * + * libzookeeper has many bugs: + * - segfaults: for example, if zookeeper connection was interrupted while reading result of multi response; + * - memory corruption: for example, as a result of double free inside libzookeeper; + * - no timeouts for synchronous operations: they may stuck forever under simple Jepsen-like tests; + * - logical errors: for example, chroot prefix is not removed from the results of multi responses. + * - data races; + * + * The code of libzookeeper is over complicated: + * - memory ownership is unclear and bugs are very difficult to track and fix. + * - extremely creepy code for implementation of "chroot" feature. + * + * As of 2018, there are no active maintainers of libzookeeper: + * - bugs in JIRA are fixed only occasionaly with ad-hoc patches by library users. + * + * libzookeeper is a classical example of bad code written in C. + * + * In Go, Python and Rust programming languages, + * there are separate libraries for ZooKeeper, not based on libzookeeper. + * Motivation is almost the same. Example: + * https://github.com/python-zk/kazoo/blob/master/docs/implementation.rst + * + * About "session restore" feature. + * + * libzookeeper has the feature of session restore. Client receives session id and session token from the server, + * and when connection is lost, it can quickly reconnect to any server with the same session id and token, + * to continue with existing session. + * libzookeeper performs this reconnection automatically. + * + * This feature is proven to be harmful. + * For example, it makes very difficult to correctly remove ephemeral nodes. + * This may lead to weird bugs in application code. + * For example, our developers have found that type of bugs in Curator Java library. + * + * On the other side, session restore feature has no advantages, + * because every application should be able to establish new session and reinitialize internal state, + * when the session is lost and cannot be restored. + * + * This library never restores the session. In case of any error, the session is considered as expired + * and you should create a new instance of ZooKeeperImpl object and reinitialize the application state. + */ + + namespace CurrentMetrics { extern const Metric ZooKeeperSession; From cde5d315f1d6bcdd00a4ed2cbe7009927a5b7279 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 03:46:26 +0300 Subject: [PATCH 26/34] ZooKeeper: added comment [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeperImpl.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index f519e9e232d..94784c20fe6 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -66,7 +66,7 @@ * when the session is lost and cannot be restored. * * This library never restores the session. In case of any error, the session is considered as expired - * and you should create a new instance of ZooKeeperImpl object and reinitialize the application state. + * and you should create a new instance of ZooKeeper object and reinitialize the application state. */ @@ -107,7 +107,7 @@ public: * - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap: * for example, just signal a condvar / fulfull a promise. * - you also may provide callbacks for watches; they are also invoked in internal thread and must be cheap. - * - whenever you receive SessionExpired exception of method isValid returns false, + * - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true, * the ZooKeeper instance is no longer usable - you may only destroy it and probably create another. * - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event. * - data for callbacks must be alive when ZooKeeper instance is alive. From 0dbc8aa1a553e52b3043afa9a49f4d1a446b3f61 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 03:53:27 +0300 Subject: [PATCH 27/34] ZooKeeper: added comment [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeperImpl.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index 94784c20fe6..75ba32e6388 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -67,6 +67,8 @@ * * This library never restores the session. In case of any error, the session is considered as expired * and you should create a new instance of ZooKeeper object and reinitialize the application state. + * + * This library is not intended to be CPU efficient. Hundreds of thousands operations per second is usually enough. */ From f734a4523a10b6d6f3abbb9269c014a2bef67579 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 05:19:32 +0300 Subject: [PATCH 28/34] Fixed bad test [#CLICKHOUSE-2] --- .../0_stateless/00534_long_functions_bad_arguments8.reference | 2 ++ .../0_stateless/00534_long_functions_bad_arguments9.reference | 2 ++ 2 files changed, 4 insertions(+) diff --git a/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments8.reference b/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments8.reference index 7193c3d3f3d..0d72c2d7fe0 100644 --- a/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments8.reference +++ b/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments8.reference @@ -1 +1,3 @@ Still alive +Still alive +Still alive diff --git a/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments9.reference b/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments9.reference index 7193c3d3f3d..0d72c2d7fe0 100644 --- a/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments9.reference +++ b/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments9.reference @@ -1 +1,3 @@ Still alive +Still alive +Still alive From dec4094e5b7dea720dc19f760b4048dde608ef22 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 05:31:17 +0300 Subject: [PATCH 29/34] Removed test case that test for behaviour that was correct accidentially [#CLICKHOUSE-2] --- ...de_query.reference => 00167_settings_inside_query.reference} | 2 -- ...ettings_inside_query.sql => 00167_settings_inside_query.sql} | 1 - 2 files changed, 3 deletions(-) rename dbms/tests/queries/0_stateless/{00167_shard_settings_inside_query.reference => 00167_settings_inside_query.reference} (57%) rename dbms/tests/queries/0_stateless/{00167_shard_settings_inside_query.sql => 00167_settings_inside_query.sql} (57%) diff --git a/dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.reference b/dbms/tests/queries/0_stateless/00167_settings_inside_query.reference similarity index 57% rename from dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.reference rename to dbms/tests/queries/0_stateless/00167_settings_inside_query.reference index cd62bbbf596..3e67fe1ac4f 100644 --- a/dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.reference +++ b/dbms/tests/queries/0_stateless/00167_settings_inside_query.reference @@ -1,4 +1,2 @@ 123 123 -61 -62 diff --git a/dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.sql b/dbms/tests/queries/0_stateless/00167_settings_inside_query.sql similarity index 57% rename from dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.sql rename to dbms/tests/queries/0_stateless/00167_settings_inside_query.sql index 6a892987e39..987a9475b8d 100644 --- a/dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.sql +++ b/dbms/tests/queries/0_stateless/00167_settings_inside_query.sql @@ -1,3 +1,2 @@ SELECT min(number) FROM system.numbers WHERE toUInt64(number % 1000) IN (SELECT DISTINCT blockSize() FROM system.numbers SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break') SETTINGS max_rows_to_read = 1000000, read_overflow_mode = 'break'; SELECT * FROM (SELECT DISTINCT blockSize() AS x FROM system.numbers SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break'); -SELECT x FROM (SELECT DISTINCT blockSize() AS x FROM remote('127.0.0.{2,3}', system.numbers) WHERE number IN (SELECT number * 2 FROM system.numbers SETTINGS max_rows_to_read = 10000, read_overflow_mode = 'break') SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break') ORDER BY x; From d5c5a340498436a240d4c5caa76ede36bf04db4f Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 05:55:21 +0300 Subject: [PATCH 30/34] Miscellaneous [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeper.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp index ef0f6109b52..c3696d54ffa 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp @@ -323,7 +323,7 @@ bool ZooKeeper::exists(const std::string & path, Stat * stat, const EventPtr & w return existsWatch(path, stat, callbackForEvent(watch)); } -bool ZooKeeper::existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback) +bool ZooKeeper::existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback) { int32_t code = existsImpl(path, stat, watch_callback); @@ -372,7 +372,7 @@ bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat, return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code); } -bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat, const WatchCallback & watch_callback, int * return_code) +bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback, int * return_code) { int32_t code = getImpl(path, res, stat, watch_callback); From 403a2c62a243bad4b3fcba00e436220a5f1193b1 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 05:56:11 +0300 Subject: [PATCH 31/34] Miscellaneous [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeper.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.h b/dbms/src/Common/ZooKeeper/ZooKeeper.h index 25b6f4a993a..e91bb20d877 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.h @@ -110,7 +110,7 @@ public: int32_t tryRemove(const std::string & path, int32_t version = -1); bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr); - bool existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback); + bool existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback); std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr); @@ -118,7 +118,7 @@ public: /// * The node doesn't exist. Returns false in this case. bool tryGet(const std::string & path, std::string & res, Stat * stat = nullptr, const EventPtr & watch = nullptr, int * code = nullptr); - bool tryGetWatch(const std::string & path, std::string & res, Stat * stat, const WatchCallback & watch_callback, int * code = nullptr); + bool tryGetWatch(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback, int * code = nullptr); void set(const std::string & path, const std::string & data, int32_t version = -1, Stat * stat = nullptr); From e00e81c3e94c8a9012bb276f1f8857b193b097df Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 07:06:23 +0300 Subject: [PATCH 32/34] ZooKeeper: fixed error [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeper.cpp | 46 ++++++++++++++------- dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 2 +- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp index c3696d54ffa..bccfd16b61c 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp @@ -530,39 +530,53 @@ void ZooKeeper::tryRemoveRecursive(const std::string & path) } -void ZooKeeper::waitForDisappear(const std::string & path) +namespace { - while (true) + struct WaitForDisappearState { int32_t code = 0; int32_t event_type = 0; Poco::Event event; + }; + using WaitForDisappearStatePtr = std::shared_ptr; +} - auto callback = [&](const ZooKeeperImpl::ZooKeeper::ExistsResponse & response) +void ZooKeeper::waitForDisappear(const std::string & path) +{ + WaitForDisappearStatePtr state = std::make_shared(); + + while (true) + { + auto callback = [state](const ZooKeeperImpl::ZooKeeper::ExistsResponse & response) { - code = response.error; - if (code) - event.set(); + state->code = response.error; + if (state->code) + state->event.set(); }; - auto watch = [&](const ZooKeeperImpl::ZooKeeper::WatchResponse & response) + auto watch = [state](const ZooKeeperImpl::ZooKeeper::WatchResponse & response) { - code = response.error; - if (!code) - event_type = response.type; - event.set(); + if (!state->code) + { + state->code = response.error; + if (!state->code) + state->event_type = response.type; + state->event.set(); + } }; + /// NOTE: if the node doesn't exist, the watch will leak. + impl->exists(path, callback, watch); - event.wait(); + state->event.wait(); - if (code == ZooKeeperImpl::ZooKeeper::ZNONODE) + if (state->code == ZooKeeperImpl::ZooKeeper::ZNONODE) return; - if (code) - throw KeeperException(code, path); + if (state->code) + throw KeeperException(state->code, path); - if (event_type == ZooKeeperImpl::ZooKeeper::DELETED) + if (state->event_type == ZooKeeperImpl::ZooKeeper::DELETED) return; } } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 079df186c17..cbd229c0b81 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1021,7 +1021,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive) WatchResponse response; response.type = SESSION; response.state = EXPIRED_SESSION; - response.error = ZCONNECTIONLOSS; + response.error = ZSESSIONEXPIRED; for (auto & callback : path_watches.second) if (callback) From d3408d45a8a324bd2f8dace1b8b8418d8c1f2510 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 07:31:04 +0300 Subject: [PATCH 33/34] ZooKeeper: fixed error [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 61 +++++++++++++-------- dbms/src/Common/ZooKeeper/ZooKeeperImpl.h | 6 +- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index cbd229c0b81..67160cc18c8 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -711,12 +711,26 @@ void ZooKeeper::sendThread() std::chrono::duration_cast(next_heartbeat_time - now).count(), operation_timeout.totalMilliseconds()); - RequestPtr request; - if (requests.tryPop(request, max_wait)) + RequestInfo info; + if (requests_queue.tryPop(info, max_wait)) { - request->write(*out); + { + CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest); + std::lock_guard lock(operations_mutex); + operations[info.request->xid] = info; + } - if (request->xid == close_xid) + if (info.watch) + { + info.request->has_watch = true; + CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch); + std::lock_guard lock(watches_mutex); + watches[info.request->getPath()].emplace_back(std::move(info.watch)); + } + + info.request->write(*out); + + if (info.request->xid == close_xid) break; } } @@ -740,9 +754,24 @@ void ZooKeeper::sendThread() } /// Drain queue - RequestPtr request; - while (requests.tryPop(request)) - ; + RequestInfo info; + while (requests_queue.tryPop(info)) + { + if (info.callback) + { + ResponsePtr response = info.request->makeResponse(); + response->error = ZSESSIONEXPIRED; + info.callback(*response); + } + if (info.watch) + { + WatchResponse response; + response.type = SESSION; + response.state = EXPIRED_SESSION; + response.error = ZSESSIONEXPIRED; + info.watch(response); + } + } } @@ -1004,7 +1033,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive) { RequestInfo & request_info = op.second; ResponsePtr response = request_info.request->makeResponse(); - response->error = ZCONNECTIONLOSS; + response->error = ZSESSIONEXPIRED; if (request_info.callback) request_info.callback(*response); } @@ -1256,21 +1285,7 @@ void ZooKeeper::pushRequest(RequestInfo && info) ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); - { - CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest); - std::lock_guard lock(operations_mutex); - operations[info.request->xid] = info; - } - - if (info.watch) - { - info.request->has_watch = true; - CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch); - std::lock_guard lock(watches_mutex); - watches[info.request->getPath()].emplace_back(std::move(info.watch)); - } - - if (!requests.tryPush(info.request, operation_timeout.totalMilliseconds())) + if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds())) throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT); } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index 75ba32e6388..46359a1baeb 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -439,6 +439,8 @@ public: using CheckCallback = std::function; using MultiCallback = std::function; + /// If the method will throw exception, callbacks won't be called. + /// After the method is executed successfully, you must wait for callbacks. void create( const String & path, @@ -573,9 +575,9 @@ private: clock::time_point time; }; - using RequestsQueue = ConcurrentBoundedQueue; + using RequestsQueue = ConcurrentBoundedQueue; - RequestsQueue requests{1}; + RequestsQueue requests_queue{1}; void pushRequest(RequestInfo && request); using Operations = std::map; From 36c940989968572ff070b5ff4e466ff6fcb819b4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Apr 2018 07:42:20 +0300 Subject: [PATCH 34/34] Fixed error [#CLICKHOUSE-2] --- .../IProfilingBlockInputStream.cpp | 15 +++++++++++++++ .../DataStreams/IProfilingBlockInputStream.h | 19 +++---------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index f7558dbf3eb..8cb570bbf62 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -339,6 +339,21 @@ void IProfilingBlockInputStream::cancel(bool kill) } +bool IProfilingBlockInputStream::isCancelled() const +{ + return is_cancelled; +} + +bool IProfilingBlockInputStream::isCancelledOrThrowIfKilled() const +{ + if (!is_cancelled) + return false; + if (is_killed) + throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED); + return true; +} + + void IProfilingBlockInputStream::setProgressCallback(const ProgressCallback & callback) { progress_callback = callback; diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.h b/dbms/src/DataStreams/IProfilingBlockInputStream.h index 0bed471f245..a9601d5c265 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.h +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.h @@ -119,21 +119,8 @@ public: */ virtual void cancel(bool kill); - /** Do you want to abort the receipt of data. - */ - bool isCancelled() const - { - return is_cancelled.load(std::memory_order_seq_cst); - } - - bool isCancelledOrThrowIfKilled() const - { - if (!isCancelled()) - return false; - if (is_killed) - throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED); - return true; - } + bool isCancelled() const; + bool isCancelledOrThrowIfKilled() const; /** What limitations and quotas should be checked. * LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check). @@ -189,7 +176,7 @@ public: protected: BlockStreamProfileInfo info; std::atomic is_cancelled{false}; - bool is_killed{false}; + std::atomic is_killed{false}; ProgressCallback progress_callback; ProcessListElement * process_list_elem = nullptr;