Merge pull request #15551 from azat/for-user-limits-over-distributed

Pass through *_for_user settings via Distributed with cluster-secure
This commit is contained in:
alexey-milovidov 2020-10-04 04:18:20 +03:00 committed by GitHub
commit 53ce5e3892
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 117 additions and 54 deletions

View File

@ -15,18 +15,24 @@ namespace DB
namespace ClusterProxy
{
Context removeUserRestrictionsFromSettings(const Context & context, const Settings & settings, Poco::Logger * log)
Context updateSettingsForCluster(const Cluster & cluster, const Context & context, const Settings & settings, Poco::Logger * log)
{
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);
/// Does not matter on remote servers, because queries are sent under different user.
new_settings.max_concurrent_queries_for_user = 0;
new_settings.max_memory_usage_for_user = 0;
/// If "secret" (in remote_servers) is not in use,
/// user on the shard is not the same as the user on the initiator,
/// hence per-user limits should not be applied.
if (cluster.getSecret().empty())
{
/// Does not matter on remote servers, because queries are sent under different user.
new_settings.max_concurrent_queries_for_user = 0;
new_settings.max_memory_usage_for_user = 0;
/// Set as unchanged to avoid sending to remote server.
new_settings.max_concurrent_queries_for_user.changed = false;
new_settings.max_memory_usage_for_user.changed = false;
/// Set as unchanged to avoid sending to remote server.
new_settings.max_concurrent_queries_for_user.changed = false;
new_settings.max_memory_usage_for_user.changed = false;
}
if (settings.force_optimize_skip_unused_shards_nesting && settings.force_optimize_skip_unused_shards)
{
@ -84,7 +90,7 @@ Pipe executeQuery(
const std::string query = queryToString(query_ast);
Context new_context = removeUserRestrictionsFromSettings(context, settings, log);
Context new_context = updateSettingsForCluster(*cluster, context, settings, log);
ThrottlerPtr user_level_throttler;
if (auto * process_list_element = context.getProcessListElement())

View File

@ -18,9 +18,16 @@ namespace ClusterProxy
class IStreamFactory;
/// removes different restrictions (like max_concurrent_queries_for_user, max_memory_usage_for_user, etc.)
/// from settings and creates new context with them
Context removeUserRestrictionsFromSettings(const Context & context, const Settings & settings, Poco::Logger * log = nullptr);
/// Update settings for Distributed query.
///
/// - Removes different restrictions (like max_concurrent_queries_for_user, max_memory_usage_for_user, etc.)
/// (but only if cluster does not have secret, since if it has, the user is the same)
/// - Update some settings depends on force_optimize_skip_unused_shards and:
/// - force_optimize_skip_unused_shards_nesting
/// - optimize_skip_unused_shards_nesting
///
/// @return new Context with adjusted settings
Context updateSettingsForCluster(const Cluster & cluster, const Context & context, const Settings & settings, Poco::Logger * log = nullptr);
/// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read.
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query

View File

@ -25,43 +25,8 @@ namespace ErrorCodes
}
ColumnsDescription getStructureOfRemoteTable(
const Cluster & cluster,
const StorageID & table_id,
const Context & context,
const ASTPtr & table_func_ptr)
{
const auto & shards_info = cluster.getShardsInfo();
std::string fail_messages;
for (const auto & shard_info : shards_info)
{
try
{
const auto & res = getStructureOfRemoteTableInShard(shard_info, table_id, context, table_func_ptr);
/// Expect at least some columns.
/// This is a hack to handle the empty block case returned by Connection when skip_unavailable_shards is set.
if (res.empty())
continue;
return res;
}
catch (const NetException &)
{
std::string fail_message = getCurrentExceptionMessage(false);
fail_messages += fail_message + '\n';
continue;
}
}
throw NetException(
"All attempts to get table structure failed. Log: \n\n" + fail_messages + "\n",
ErrorCodes::NO_REMOTE_SHARD_AVAILABLE);
}
ColumnsDescription getStructureOfRemoteTableInShard(
const Cluster & cluster,
const Cluster::ShardInfo & shard_info,
const StorageID & table_id,
const Context & context,
@ -96,7 +61,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
ColumnsDescription res;
auto new_context = ClusterProxy::removeUserRestrictionsFromSettings(context, context.getSettingsRef());
auto new_context = ClusterProxy::updateSettingsForCluster(cluster, context, context.getSettingsRef());
/// Expect only needed columns from the result of DESC TABLE. NOTE 'comment' column is ignored for compatibility reasons.
Block sample_block
@ -151,4 +116,40 @@ ColumnsDescription getStructureOfRemoteTableInShard(
return res;
}
ColumnsDescription getStructureOfRemoteTable(
const Cluster & cluster,
const StorageID & table_id,
const Context & context,
const ASTPtr & table_func_ptr)
{
const auto & shards_info = cluster.getShardsInfo();
std::string fail_messages;
for (const auto & shard_info : shards_info)
{
try
{
const auto & res = getStructureOfRemoteTableInShard(cluster, shard_info, table_id, context, table_func_ptr);
/// Expect at least some columns.
/// This is a hack to handle the empty block case returned by Connection when skip_unavailable_shards is set.
if (res.empty())
continue;
return res;
}
catch (const NetException &)
{
std::string fail_message = getCurrentExceptionMessage(false);
fail_messages += fail_message + '\n';
continue;
}
}
throw NetException(
"All attempts to get table structure failed. Log: \n\n" + fail_messages + "\n",
ErrorCodes::NO_REMOTE_SHARD_AVAILABLE);
}
}

View File

@ -19,10 +19,4 @@ ColumnsDescription getStructureOfRemoteTable(
const Context & context,
const ASTPtr & table_func_ptr = nullptr);
ColumnsDescription getStructureOfRemoteTableInShard(
const Cluster::ShardInfo & shard_info,
const StorageID & table_id,
const Context & context,
const ASTPtr & table_func_ptr = nullptr);
}

View File

@ -79,6 +79,20 @@ def get_query_user_info(node, query_pattern):
type = 'QueryFinish'
""".format(query_pattern)).strip().split('\t')
# @return -- settings
def get_query_setting_on_shard(node, query_pattern, setting):
node.query("SYSTEM FLUSH LOGS")
return node.query("""
SELECT (arrayFilter(x -> ((x.1) = '{}'), arrayZip(Settings.Names, Settings.Values))[1]).2
FROM system.query_log
WHERE
query LIKE '%{}%' AND
NOT is_initial_query AND
query NOT LIKE '%system.query_log%' AND
type = 'QueryFinish'
LIMIT 1
""".format(setting, query_pattern)).strip()
def test_insecure():
n1.query('SELECT * FROM dist_insecure')
@ -149,4 +163,45 @@ def test_user_secure_cluster(user, password):
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(n2, id_) == [user, user]
@users
def test_per_user_inline_settings_insecure_cluster(user, password):
id_ = 'query-ddl-settings-dist_insecure-' + user
query_with_id(n1, id_, """
SELECT * FROM dist_insecure
SETTINGS
prefer_localhost_replica=0,
max_memory_usage_for_user=1e9,
max_untracked_memory=0
""", user=user, password=password)
assert get_query_setting_on_shard(n1, id_, 'max_memory_usage_for_user') == ''
@users
def test_per_user_inline_settings_secure_cluster(user, password):
id_ = 'query-ddl-settings-dist_secure-' + user
query_with_id(n1, id_, """
SELECT * FROM dist_secure
SETTINGS
prefer_localhost_replica=0,
max_memory_usage_for_user=1e9,
max_untracked_memory=0
""", user=user, password=password)
assert int(get_query_setting_on_shard(n1, id_, 'max_memory_usage_for_user')) == int(1e9)
@users
def test_per_user_protocol_settings_insecure_cluster(user, password):
id_ = 'query-protocol-settings-dist_insecure-' + user
query_with_id(n1, id_, 'SELECT * FROM dist_insecure', user=user, password=password, settings={
'prefer_localhost_replica': 0,
'max_memory_usage_for_user': int(1e9),
'max_untracked_memory': 0,
})
assert get_query_setting_on_shard(n1, id_, 'max_memory_usage_for_user') == ''
@users
def test_per_user_protocol_settings_secure_cluster(user, password):
id_ = 'query-protocol-settings-dist_secure-' + user
query_with_id(n1, id_, 'SELECT * FROM dist_secure', user=user, password=password, settings={
'prefer_localhost_replica': 0,
'max_memory_usage_for_user': int(1e9),
'max_untracked_memory': 0,
})
assert int(get_query_setting_on_shard(n1, id_, 'max_memory_usage_for_user')) == int(1e9)
# TODO: check user for INSERT