From a1cdb0049d34d6bf6b4f0058a2906c5084c94657 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 9 Jan 2019 13:47:22 +0300 Subject: [PATCH 1/3] Add test for user exception check --- .../__init__.py | 0 .../configs/remote_servers.xml | 16 ++++++ .../configs/user_restrictions.xml | 38 +++++++++++++ .../test.py | 54 +++++++++++++++++++ 4 files changed, 108 insertions(+) create mode 100644 dbms/tests/integration/test_concurrent_queries_for_user_restriction/__init__.py create mode 100644 dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/remote_servers.xml create mode 100644 dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/user_restrictions.xml create mode 100644 dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py diff --git a/dbms/tests/integration/test_concurrent_queries_for_user_restriction/__init__.py b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/remote_servers.xml b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/remote_servers.xml new file mode 100644 index 00000000000..3593cbd7f36 --- /dev/null +++ b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/remote_servers.xml @@ -0,0 +1,16 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + + + diff --git a/dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/user_restrictions.xml b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/user_restrictions.xml new file mode 100644 index 00000000000..bd91f1d495c --- /dev/null +++ b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/user_restrictions.xml @@ -0,0 +1,38 @@ + + + + 10000000000 + 0 + random + + + 10000000000 + 0 + random + 2 + + + + + + + ::/0 + + default + default + + + + + ::/0 + + good + default + + + + + + + + diff --git a/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py new file mode 100644 index 00000000000..665f0877586 --- /dev/null +++ b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py @@ -0,0 +1,54 @@ +import time + +import pytest + +from multiprocessing.dummy import Pool +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1', user_configs=['configs/user_restrictions.xml'], main_configs=['configs/remote_servers.xml']) +node2 = cluster.add_instance('node2', user_configs=['configs/user_restrictions.xml'], main_configs=['configs/remote_servers.xml']) + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + for num, node in enumerate([node1, node2]): + node.query("create table real_tbl (ID UInt64, Value String) ENGINE = MergeTree() order by tuple()") + node.query("insert into real_tbl values(0, '0000'), (1, '1111')") + node.query("create table distr_tbl (ID UInt64, Value String) ENGINE Distributed(test_cluster, default, real_tbl)") + + node1.query("create table nums (number UInt64) ENGINE = MergeTree() order by tuple()") + node1.query("insert into nums values(0),(1)") + + yield cluster + finally: + cluster.shutdown() + +def num_getter(num): + if num % 2 == 0: + return node1 + else: + return node2 + +@pytest.mark.parametrize("node_getter", [ + (lambda _: node1), + (lambda _: node2), + (num_getter), +]) +def test_exception_message(started_cluster, node_getter): + assert node1.query("select ID from distr_tbl order by ID") == "0\n1\n" + assert node1.query("select number from nums order by number") == "0\n1\n" + try: + p = Pool(10) + def query(num): + node = node_getter(num) + node.query( + "select sleep(2) from distr_tbl where ID GLOBAL IN (select number from remote('node1', 'default', 'nums'))", + user='good') + + p.map(query, xrange(3)) + except Exception as ex: + assert 'Too many simultaneous queries for user good.' in ex.message + print ex.message From b8efafd400d37c0be10706c6485997c51a78386e Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 9 Jan 2019 15:21:04 +0300 Subject: [PATCH 2/3] Fix bug with wrong user restrictions in remote table func --- .../ClusterProxy/executeQuery.cpp | 21 +++++++---- .../Interpreters/ClusterProxy/executeQuery.h | 4 +++ .../Storages/getStructureOfRemoteTable.cpp | 6 +++- .../configs/remote_servers.xml | 16 --------- .../test.py | 35 ++++++++----------- 5 files changed, 37 insertions(+), 45 deletions(-) delete mode 100644 dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/remote_servers.xml diff --git a/dbms/src/Interpreters/ClusterProxy/executeQuery.cpp b/dbms/src/Interpreters/ClusterProxy/executeQuery.cpp index 27b7d8338af..4b9aa713f07 100644 --- a/dbms/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/dbms/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -14,14 +14,8 @@ namespace DB namespace ClusterProxy { -BlockInputStreams executeQuery( - IStreamFactory & stream_factory, const ClusterPtr & cluster, - const ASTPtr & query_ast, const Context & context, const Settings & settings) +Context removeUserRestrictionsFromSettings(const Context & context, const Settings & settings) { - BlockInputStreams res; - - const std::string query = queryToString(query_ast); - Settings new_settings = settings; new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time); @@ -39,6 +33,19 @@ BlockInputStreams executeQuery( Context new_context(context); new_context.setSettings(new_settings); + return new_context; +} + +BlockInputStreams executeQuery( + IStreamFactory & stream_factory, const ClusterPtr & cluster, + const ASTPtr & query_ast, const Context & context, const Settings & settings) +{ + BlockInputStreams res; + + const std::string query = queryToString(query_ast); + + Context new_context = removeUserRestrictionsFromSettings(context, settings); + ThrottlerPtr user_level_throttler; if (auto process_list_element = context.getProcessListElement()) user_level_throttler = process_list_element->getUserNetworkThrottler(); diff --git a/dbms/src/Interpreters/ClusterProxy/executeQuery.h b/dbms/src/Interpreters/ClusterProxy/executeQuery.h index b12fc2b4646..5c07c287954 100644 --- a/dbms/src/Interpreters/ClusterProxy/executeQuery.h +++ b/dbms/src/Interpreters/ClusterProxy/executeQuery.h @@ -16,6 +16,10 @@ namespace ClusterProxy class IStreamFactory; +/// removes different restrictions (like max_concurrent_queries_for_user, max_memory_usage_for_user, etc.) +/// from settings and creates new context with them +Context removeUserRestrictionsFromSettings(const Context & context, const Settings & settings); + /// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read. /// `stream_factory` object encapsulates the logic of creating streams for a different type of query /// (currently SELECT, DESCRIBE). diff --git a/dbms/src/Storages/getStructureOfRemoteTable.cpp b/dbms/src/Storages/getStructureOfRemoteTable.cpp index 174ec49a4f1..1e5b37d62d0 100644 --- a/dbms/src/Storages/getStructureOfRemoteTable.cpp +++ b/dbms/src/Storages/getStructureOfRemoteTable.cpp @@ -1,6 +1,7 @@ #include "getStructureOfRemoteTable.h" #include #include +#include #include #include #include @@ -54,7 +55,10 @@ ColumnsDescription getStructureOfRemoteTable( ColumnsDescription res; - auto input = std::make_shared(shard_info.pool, query, InterpreterDescribeQuery::getSampleBlock(), context); + + auto new_context = ClusterProxy::removeUserRestrictionsFromSettings(context, context.getSettingsRef()); + /// Execute remote query without restrictions (because it's not real user query, but part of implementation) + auto input = std::make_shared(shard_info.pool, query, InterpreterDescribeQuery::getSampleBlock(), new_context); input->setPoolMode(PoolMode::GET_ONE); if (!table_func_ptr) input->setMainTable(QualifiedTableName{database, table}); diff --git a/dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/remote_servers.xml b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/remote_servers.xml deleted file mode 100644 index 3593cbd7f36..00000000000 --- a/dbms/tests/integration/test_concurrent_queries_for_user_restriction/configs/remote_servers.xml +++ /dev/null @@ -1,16 +0,0 @@ - - - - - - node1 - 9000 - - - node2 - 9000 - - - - - diff --git a/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py index 665f0877586..26a42637063 100644 --- a/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py +++ b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py @@ -22,33 +22,26 @@ def started_cluster(): node1.query("create table nums (number UInt64) ENGINE = MergeTree() order by tuple()") node1.query("insert into nums values(0),(1)") + node2.query("create table nums (number UInt64) ENGINE = MergeTree() order by tuple()") + node2.query("insert into nums values(0),(1)") + yield cluster finally: cluster.shutdown() -def num_getter(num): - if num % 2 == 0: - return node1 - else: - return node2 - -@pytest.mark.parametrize("node_getter", [ - (lambda _: node1), - (lambda _: node2), - (num_getter), -]) -def test_exception_message(started_cluster, node_getter): +def test_exception_message(started_cluster): assert node1.query("select ID from distr_tbl order by ID") == "0\n1\n" assert node1.query("select number from nums order by number") == "0\n1\n" - try: - p = Pool(10) - def query(num): - node = node_getter(num) - node.query( - "select sleep(2) from distr_tbl where ID GLOBAL IN (select number from remote('node1', 'default', 'nums'))", - user='good') - p.map(query, xrange(3)) + def node_busy(_): + for i in xrange(10): + node1.query("select sleep(2)", user='default') + + busy_pool = Pool(3) + busy_pool.map_async(node_busy, xrange(3)) + time.sleep(1) # wait a little until polling start + try: + assert node2.query("select number from remote('node1', 'default', 'nums')", user='good') == "0\n1\n" except Exception as ex: - assert 'Too many simultaneous queries for user good.' in ex.message print ex.message + assert False, "Exception thrown while max_concurrent_queries_for_user is not exceeded" From f00b7ba08a3699fd299440c7b7a91adeb57d4d0d Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 9 Jan 2019 15:23:41 +0300 Subject: [PATCH 3/3] Simplify test --- .../test.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py index 26a42637063..4b7cc87c15a 100644 --- a/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py +++ b/dbms/tests/integration/test_concurrent_queries_for_user_restriction/test.py @@ -7,30 +7,21 @@ from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) -node1 = cluster.add_instance('node1', user_configs=['configs/user_restrictions.xml'], main_configs=['configs/remote_servers.xml']) -node2 = cluster.add_instance('node2', user_configs=['configs/user_restrictions.xml'], main_configs=['configs/remote_servers.xml']) +node1 = cluster.add_instance('node1', user_configs=['configs/user_restrictions.xml']) +node2 = cluster.add_instance('node2', user_configs=['configs/user_restrictions.xml']) @pytest.fixture(scope="module") def started_cluster(): try: cluster.start() - for num, node in enumerate([node1, node2]): - node.query("create table real_tbl (ID UInt64, Value String) ENGINE = MergeTree() order by tuple()") - node.query("insert into real_tbl values(0, '0000'), (1, '1111')") - node.query("create table distr_tbl (ID UInt64, Value String) ENGINE Distributed(test_cluster, default, real_tbl)") - node1.query("create table nums (number UInt64) ENGINE = MergeTree() order by tuple()") node1.query("insert into nums values(0),(1)") - node2.query("create table nums (number UInt64) ENGINE = MergeTree() order by tuple()") - node2.query("insert into nums values(0),(1)") - yield cluster finally: cluster.shutdown() def test_exception_message(started_cluster): - assert node1.query("select ID from distr_tbl order by ID") == "0\n1\n" assert node1.query("select number from nums order by number") == "0\n1\n" def node_busy(_): @@ -39,7 +30,7 @@ def test_exception_message(started_cluster): busy_pool = Pool(3) busy_pool.map_async(node_busy, xrange(3)) - time.sleep(1) # wait a little until polling start + time.sleep(1) # wait a little until polling starts try: assert node2.query("select number from remote('node1', 'default', 'nums')", user='good') == "0\n1\n" except Exception as ex: