mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 11:02:08 +00:00
Merge pull request #4009 from yandex/test_for_remote_global_in_user
Fix bug with wrong user in remote table function.
This commit is contained in:
commit
08f4e792b1
@ -14,14 +14,8 @@ namespace DB
|
|||||||
namespace ClusterProxy
|
namespace ClusterProxy
|
||||||
{
|
{
|
||||||
|
|
||||||
BlockInputStreams executeQuery(
|
Context removeUserRestrictionsFromSettings(const Context & context, const Settings & settings)
|
||||||
IStreamFactory & stream_factory, const ClusterPtr & cluster,
|
|
||||||
const ASTPtr & query_ast, const Context & context, const Settings & settings)
|
|
||||||
{
|
{
|
||||||
BlockInputStreams res;
|
|
||||||
|
|
||||||
const std::string query = queryToString(query_ast);
|
|
||||||
|
|
||||||
Settings new_settings = settings;
|
Settings new_settings = settings;
|
||||||
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);
|
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);
|
||||||
|
|
||||||
@ -39,6 +33,19 @@ BlockInputStreams executeQuery(
|
|||||||
Context new_context(context);
|
Context new_context(context);
|
||||||
new_context.setSettings(new_settings);
|
new_context.setSettings(new_settings);
|
||||||
|
|
||||||
|
return new_context;
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockInputStreams executeQuery(
|
||||||
|
IStreamFactory & stream_factory, const ClusterPtr & cluster,
|
||||||
|
const ASTPtr & query_ast, const Context & context, const Settings & settings)
|
||||||
|
{
|
||||||
|
BlockInputStreams res;
|
||||||
|
|
||||||
|
const std::string query = queryToString(query_ast);
|
||||||
|
|
||||||
|
Context new_context = removeUserRestrictionsFromSettings(context, settings);
|
||||||
|
|
||||||
ThrottlerPtr user_level_throttler;
|
ThrottlerPtr user_level_throttler;
|
||||||
if (auto process_list_element = context.getProcessListElement())
|
if (auto process_list_element = context.getProcessListElement())
|
||||||
user_level_throttler = process_list_element->getUserNetworkThrottler();
|
user_level_throttler = process_list_element->getUserNetworkThrottler();
|
||||||
|
@ -16,6 +16,10 @@ namespace ClusterProxy
|
|||||||
|
|
||||||
class IStreamFactory;
|
class IStreamFactory;
|
||||||
|
|
||||||
|
/// removes different restrictions (like max_concurrent_queries_for_user, max_memory_usage_for_user, etc.)
|
||||||
|
/// from settings and creates new context with them
|
||||||
|
Context removeUserRestrictionsFromSettings(const Context & context, const Settings & settings);
|
||||||
|
|
||||||
/// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read.
|
/// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read.
|
||||||
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query
|
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query
|
||||||
/// (currently SELECT, DESCRIBE).
|
/// (currently SELECT, DESCRIBE).
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#include "getStructureOfRemoteTable.h"
|
#include "getStructureOfRemoteTable.h"
|
||||||
#include <Interpreters/Cluster.h>
|
#include <Interpreters/Cluster.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
|
#include <Interpreters/ClusterProxy/executeQuery.h>
|
||||||
#include <Interpreters/InterpreterDescribeQuery.h>
|
#include <Interpreters/InterpreterDescribeQuery.h>
|
||||||
#include <DataStreams/RemoteBlockInputStream.h>
|
#include <DataStreams/RemoteBlockInputStream.h>
|
||||||
#include <DataTypes/DataTypeFactory.h>
|
#include <DataTypes/DataTypeFactory.h>
|
||||||
@ -54,7 +55,10 @@ ColumnsDescription getStructureOfRemoteTable(
|
|||||||
|
|
||||||
ColumnsDescription res;
|
ColumnsDescription res;
|
||||||
|
|
||||||
auto input = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, InterpreterDescribeQuery::getSampleBlock(), context);
|
|
||||||
|
auto new_context = ClusterProxy::removeUserRestrictionsFromSettings(context, context.getSettingsRef());
|
||||||
|
/// Execute remote query without restrictions (because it's not real user query, but part of implementation)
|
||||||
|
auto input = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, InterpreterDescribeQuery::getSampleBlock(), new_context);
|
||||||
input->setPoolMode(PoolMode::GET_ONE);
|
input->setPoolMode(PoolMode::GET_ONE);
|
||||||
if (!table_func_ptr)
|
if (!table_func_ptr)
|
||||||
input->setMainTable(QualifiedTableName{database, table});
|
input->setMainTable(QualifiedTableName{database, table});
|
||||||
|
@ -0,0 +1,38 @@
|
|||||||
|
<yandex>
|
||||||
|
<profiles>
|
||||||
|
<default>
|
||||||
|
<max_memory_usage>10000000000</max_memory_usage>
|
||||||
|
<use_uncompressed_cache>0</use_uncompressed_cache>
|
||||||
|
<load_balancing>random</load_balancing>
|
||||||
|
</default>
|
||||||
|
<good>
|
||||||
|
<max_memory_usage>10000000000</max_memory_usage>
|
||||||
|
<use_uncompressed_cache>0</use_uncompressed_cache>
|
||||||
|
<load_balancing>random</load_balancing>
|
||||||
|
<max_concurrent_queries_for_user>2</max_concurrent_queries_for_user>
|
||||||
|
</good>
|
||||||
|
</profiles>
|
||||||
|
<users>
|
||||||
|
<default>
|
||||||
|
<password></password>
|
||||||
|
<networks incl="networks" replace="replace">
|
||||||
|
<ip>::/0</ip>
|
||||||
|
</networks>
|
||||||
|
<profile>default</profile>
|
||||||
|
<quota>default</quota>
|
||||||
|
</default>
|
||||||
|
<good>
|
||||||
|
<password></password>
|
||||||
|
<networks incl="networks" replace="replace">
|
||||||
|
<ip>::/0</ip>
|
||||||
|
</networks>
|
||||||
|
<profile>good</profile>
|
||||||
|
<quota>default</quota>
|
||||||
|
</good>
|
||||||
|
</users>
|
||||||
|
|
||||||
|
<quotas>
|
||||||
|
<default>
|
||||||
|
</default>
|
||||||
|
</quotas>
|
||||||
|
</yandex>
|
@ -0,0 +1,38 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from multiprocessing.dummy import Pool
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
|
||||||
|
node1 = cluster.add_instance('node1', user_configs=['configs/user_restrictions.xml'])
|
||||||
|
node2 = cluster.add_instance('node2', user_configs=['configs/user_restrictions.xml'])
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def started_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
node1.query("create table nums (number UInt64) ENGINE = MergeTree() order by tuple()")
|
||||||
|
node1.query("insert into nums values(0),(1)")
|
||||||
|
|
||||||
|
yield cluster
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
def test_exception_message(started_cluster):
|
||||||
|
assert node1.query("select number from nums order by number") == "0\n1\n"
|
||||||
|
|
||||||
|
def node_busy(_):
|
||||||
|
for i in xrange(10):
|
||||||
|
node1.query("select sleep(2)", user='default')
|
||||||
|
|
||||||
|
busy_pool = Pool(3)
|
||||||
|
busy_pool.map_async(node_busy, xrange(3))
|
||||||
|
time.sleep(1) # wait a little until polling starts
|
||||||
|
try:
|
||||||
|
assert node2.query("select number from remote('node1', 'default', 'nums')", user='good') == "0\n1\n"
|
||||||
|
except Exception as ex:
|
||||||
|
print ex.message
|
||||||
|
assert False, "Exception thrown while max_concurrent_queries_for_user is not exceeded"
|
Loading…
Reference in New Issue
Block a user