mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Shard now clamps the settings got from the initiator to the shard's constaints instead of throwing an exception.
This commit is contained in:
parent
0f04ff0749
commit
fc0376a170
@ -875,48 +875,55 @@ void TCPHandler::receiveQuery()
|
||||
query_context->setCurrentQueryId(state.query_id);
|
||||
|
||||
/// Client info
|
||||
ClientInfo & client_info = query_context->getClientInfo();
|
||||
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
|
||||
client_info.read(*in, client_revision);
|
||||
|
||||
/// For better support of old clients, that does not send ClientInfo.
|
||||
if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY)
|
||||
{
|
||||
ClientInfo & client_info = query_context->getClientInfo();
|
||||
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
|
||||
client_info.read(*in, client_revision);
|
||||
|
||||
/// For better support of old clients, that does not send ClientInfo.
|
||||
if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY)
|
||||
{
|
||||
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
|
||||
client_info.client_name = client_name;
|
||||
client_info.client_version_major = client_version_major;
|
||||
client_info.client_version_minor = client_version_minor;
|
||||
client_info.client_version_patch = client_version_patch;
|
||||
client_info.client_revision = client_revision;
|
||||
}
|
||||
|
||||
/// Set fields, that are known apriori.
|
||||
client_info.interface = ClientInfo::Interface::TCP;
|
||||
|
||||
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
|
||||
{
|
||||
/// 'Current' fields was set at receiveHello.
|
||||
client_info.initial_user = client_info.current_user;
|
||||
client_info.initial_query_id = client_info.current_query_id;
|
||||
client_info.initial_address = client_info.current_address;
|
||||
}
|
||||
else
|
||||
{
|
||||
query_context->setInitialRowPolicy();
|
||||
}
|
||||
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
|
||||
client_info.client_name = client_name;
|
||||
client_info.client_version_major = client_version_major;
|
||||
client_info.client_version_minor = client_version_minor;
|
||||
client_info.client_version_patch = client_version_patch;
|
||||
client_info.client_revision = client_revision;
|
||||
}
|
||||
|
||||
/// Per query settings.
|
||||
Settings custom_settings{};
|
||||
/// Set fields, that are known apriori.
|
||||
client_info.interface = ClientInfo::Interface::TCP;
|
||||
|
||||
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
|
||||
{
|
||||
/// 'Current' fields was set at receiveHello.
|
||||
client_info.initial_user = client_info.current_user;
|
||||
client_info.initial_query_id = client_info.current_query_id;
|
||||
client_info.initial_address = client_info.current_address;
|
||||
}
|
||||
else
|
||||
{
|
||||
query_context->setInitialRowPolicy();
|
||||
}
|
||||
|
||||
/// Per query settings are also passed via TCP.
|
||||
/// We need to check them before applying due to they can violate the settings constraints.
|
||||
auto settings_format = (client_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsBinaryFormat::STRINGS
|
||||
: SettingsBinaryFormat::OLD;
|
||||
custom_settings.deserialize(*in, settings_format);
|
||||
auto settings_changes = custom_settings.changes();
|
||||
query_context->checkSettingsConstraints(settings_changes);
|
||||
Settings passed_settings;
|
||||
passed_settings.deserialize(*in, settings_format);
|
||||
auto settings_changes = passed_settings.changes();
|
||||
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
|
||||
{
|
||||
/// Throw an exception if the passed settings violate the constraints.
|
||||
query_context->checkSettingsConstraints(settings_changes);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Quietly clamp to the constraints if it's not an initial query.
|
||||
query_context->clampToSettingsConstraints(settings_changes);
|
||||
}
|
||||
query_context->applySettingsChanges(settings_changes);
|
||||
|
||||
Settings & settings = query_context->getSettingsRef();
|
||||
const Settings & settings = query_context->getSettingsRef();
|
||||
|
||||
/// Sync timeouts on client and server during current query to avoid dangling queries on server
|
||||
/// NOTE: We use settings.send_timeout for the receive timeout and vice versa (change arguments ordering in TimeoutSetter),
|
||||
|
@ -199,6 +199,77 @@ void SettingsConstraints::check(const Settings & current_settings, const Setting
|
||||
}
|
||||
|
||||
|
||||
void SettingsConstraints::clamp(const Settings & current_settings, SettingChange & change) const
|
||||
{
|
||||
const String & name = change.name;
|
||||
size_t setting_index = Settings::findIndex(name);
|
||||
if (setting_index == Settings::npos)
|
||||
return;
|
||||
|
||||
Field new_value = Settings::valueToCorrespondingType(setting_index, change.value);
|
||||
Field current_value = current_settings.get(setting_index);
|
||||
|
||||
/// Setting isn't checked if value wasn't changed.
|
||||
if (current_value == new_value)
|
||||
return;
|
||||
|
||||
if (!current_settings.allow_ddl && name == "allow_ddl")
|
||||
{
|
||||
change.value = current_value;
|
||||
return;
|
||||
}
|
||||
|
||||
/** The `readonly` value is understood as follows:
|
||||
* 0 - everything allowed.
|
||||
* 1 - only read queries can be made; you can not change the settings.
|
||||
* 2 - You can only do read queries and you can change the settings, except for the `readonly` setting.
|
||||
*/
|
||||
if (current_settings.readonly == 1)
|
||||
{
|
||||
change.value = current_value;
|
||||
return;
|
||||
}
|
||||
|
||||
if (current_settings.readonly > 1 && name == "readonly")
|
||||
{
|
||||
change.value = current_value;
|
||||
return;
|
||||
}
|
||||
|
||||
const Constraint * constraint = tryGetConstraint(setting_index);
|
||||
if (constraint)
|
||||
{
|
||||
if (constraint->read_only)
|
||||
{
|
||||
change.value = current_value;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!constraint->min_value.isNull() && (new_value < constraint->min_value))
|
||||
{
|
||||
if (!constraint->max_value.isNull() && (constraint->min_value > constraint->max_value))
|
||||
change.value = current_value;
|
||||
else
|
||||
change.value = constraint->min_value;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!constraint->max_value.isNull() && (new_value > constraint->max_value))
|
||||
{
|
||||
change.value = constraint->max_value;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void SettingsConstraints::clamp(const Settings & current_settings, SettingsChanges & changes) const
|
||||
{
|
||||
for (auto & change : changes)
|
||||
clamp(current_settings, change);
|
||||
}
|
||||
|
||||
|
||||
SettingsConstraints::Constraint & SettingsConstraints::getConstraintRef(size_t index)
|
||||
{
|
||||
auto it = constraints_by_index.find(index);
|
||||
|
@ -85,9 +85,14 @@ public:
|
||||
|
||||
Infos getInfo() const;
|
||||
|
||||
/// Checks whether `change` violates these constraints and throws an exception if so.
|
||||
void check(const Settings & current_settings, const SettingChange & change) const;
|
||||
void check(const Settings & current_settings, const SettingsChanges & changes) const;
|
||||
|
||||
/// Checks whether `change` violates these and clamps the `change` if so.
|
||||
void clamp(const Settings & current_settings, SettingChange & change) const;
|
||||
void clamp(const Settings & current_settings, SettingsChanges & changes) const;
|
||||
|
||||
/** Set multiple settings from "profile" (in server configuration file (users.xml), profiles contain groups of multiple settings).
|
||||
* The profile can also be set using the `set` functions, like the profile setting.
|
||||
*/
|
||||
|
@ -1216,20 +1216,32 @@ void Context::applySettingsChanges(const SettingsChanges & changes)
|
||||
}
|
||||
|
||||
|
||||
void Context::checkSettingsConstraints(const SettingChange & change)
|
||||
void Context::checkSettingsConstraints(const SettingChange & change) const
|
||||
{
|
||||
if (settings_constraints)
|
||||
settings_constraints->check(settings, change);
|
||||
}
|
||||
|
||||
|
||||
void Context::checkSettingsConstraints(const SettingsChanges & changes)
|
||||
void Context::checkSettingsConstraints(const SettingsChanges & changes) const
|
||||
{
|
||||
if (settings_constraints)
|
||||
settings_constraints->check(settings, changes);
|
||||
}
|
||||
|
||||
|
||||
void Context::clampToSettingsConstraints(SettingChange & change) const
|
||||
{
|
||||
if (settings_constraints)
|
||||
settings_constraints->clamp(settings, change);
|
||||
}
|
||||
|
||||
void Context::clampToSettingsConstraints(SettingsChanges & changes) const
|
||||
{
|
||||
if (settings_constraints)
|
||||
settings_constraints->clamp(settings, changes);
|
||||
}
|
||||
|
||||
|
||||
String Context::getCurrentDatabase() const
|
||||
{
|
||||
return current_database;
|
||||
|
@ -365,8 +365,10 @@ public:
|
||||
void applySettingsChanges(const SettingsChanges & changes);
|
||||
|
||||
/// Checks the constraints.
|
||||
void checkSettingsConstraints(const SettingChange & change);
|
||||
void checkSettingsConstraints(const SettingsChanges & changes);
|
||||
void checkSettingsConstraints(const SettingChange & change) const;
|
||||
void checkSettingsConstraints(const SettingsChanges & changes) const;
|
||||
void clampToSettingsConstraints(SettingChange & change) const;
|
||||
void clampToSettingsConstraints(SettingsChanges & changes) const;
|
||||
|
||||
/// Returns the current constraints (can return null).
|
||||
std::shared_ptr<const SettingsConstraints> getSettingsConstraints() const { return settings_constraints; }
|
||||
|
@ -5,12 +5,14 @@
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
<user>distributed</user>
|
||||
<user>normal</user>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
<user>distributed</user>
|
||||
<user>readonly</user>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster>
|
||||
|
@ -1,41 +1,33 @@
|
||||
<yandex>
|
||||
<profiles>
|
||||
<default>
|
||||
<max_memory_usage>10000000000</max_memory_usage>
|
||||
<use_uncompressed_cache>0</use_uncompressed_cache>
|
||||
<load_balancing>random</load_balancing>
|
||||
</default>
|
||||
<distributed_profile>
|
||||
<max_memory_usage>1000000</max_memory_usage>
|
||||
<use_uncompressed_cache>0</use_uncompressed_cache>
|
||||
<load_balancing>random</load_balancing>
|
||||
<readonly>2</readonly>
|
||||
<normal>
|
||||
<max_memory_usage>50000000</max_memory_usage>
|
||||
<constraints>
|
||||
<max_memory_usage>
|
||||
<min>1</min>
|
||||
<max>1000000</max>
|
||||
<min>11111111</min>
|
||||
<max>99999999</max>
|
||||
</max_memory_usage>
|
||||
</constraints>
|
||||
</distributed_profile>
|
||||
</normal>
|
||||
<readonly>
|
||||
<readonly>1</readonly>
|
||||
</readonly>
|
||||
</profiles>
|
||||
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
</default>
|
||||
<distributed>
|
||||
<password></password>
|
||||
<profile>distributed_profile</profile>
|
||||
<quota>default</quota>
|
||||
<networks incl="networks" replace="replace">
|
||||
<ip>::/0</ip>
|
||||
</networks>
|
||||
</distributed>
|
||||
</users>
|
||||
|
||||
<quotas>
|
||||
<default>
|
||||
</default>
|
||||
</quotas>
|
||||
<normal>
|
||||
<profile>normal</profile>
|
||||
<password></password>
|
||||
</normal>
|
||||
<readonly>
|
||||
<profile>readonly</profile>
|
||||
<password></password>
|
||||
</readonly>
|
||||
</users>
|
||||
</yandex>
|
||||
|
@ -1,34 +1,36 @@
|
||||
<yandex>
|
||||
<profiles>
|
||||
<default>
|
||||
<max_memory_usage>1000000</max_memory_usage>
|
||||
<use_uncompressed_cache>0</use_uncompressed_cache>
|
||||
<load_balancing>random</load_balancing>
|
||||
</default>
|
||||
<remote_profile>
|
||||
<max_memory_usage>2000000000</max_memory_usage>
|
||||
<normal>
|
||||
<max_memory_usage>80000000</max_memory_usage>
|
||||
<use_uncompressed_cache>0</use_uncompressed_cache>
|
||||
<load_balancing>random</load_balancing>
|
||||
</remote_profile>
|
||||
</normal>
|
||||
<wasteful>
|
||||
<max_memory_usage>2000000000</max_memory_usage>
|
||||
</wasteful>
|
||||
<readonly>
|
||||
<readonly>1</readonly>
|
||||
</readonly>
|
||||
</profiles>
|
||||
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
</default>
|
||||
<remote>
|
||||
<password></password>
|
||||
<profile>remote_profile</profile>
|
||||
<quota>default</quota>
|
||||
<networks incl="networks" replace="replace">
|
||||
<ip>::/0</ip>
|
||||
</networks>
|
||||
</remote>
|
||||
</users>
|
||||
|
||||
<quotas>
|
||||
<default>
|
||||
</default>
|
||||
</quotas>
|
||||
<normal>
|
||||
<profile>normal</profile>
|
||||
<password></password>
|
||||
</normal>
|
||||
<wasteful>
|
||||
<profile>wasteful</profile>
|
||||
<password></password>
|
||||
</wasteful>
|
||||
<readonly>
|
||||
<profile>readonly</profile>
|
||||
<password></password>
|
||||
</readonly>
|
||||
</users>
|
||||
</yandex>
|
||||
|
@ -8,8 +8,8 @@ from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users_on_cluster.xml'])
|
||||
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users_on_cluster.xml'])
|
||||
node1 = cluster.add_instance('node1', user_configs=['configs/users_on_cluster.xml'])
|
||||
node2 = cluster.add_instance('node2', user_configs=['configs/users_on_cluster.xml'])
|
||||
|
||||
distributed = cluster.add_instance('distributed', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users_on_distributed.xml'])
|
||||
|
||||
@ -24,19 +24,58 @@ def started_cluster():
|
||||
node.query("INSERT INTO sometable VALUES (toDate('2020-01-20'), 1, 1)")
|
||||
|
||||
distributed.query("CREATE TABLE proxy (date Date, id UInt32, value Int32) ENGINE = Distributed(test_cluster, default, sometable);")
|
||||
distributed.query("CREATE TABLE sysproxy (name String, value String) ENGINE = Distributed(test_cluster, system, settings);")
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
def test_settings_under_remote(started_cluster):
|
||||
assert distributed.query("SELECT COUNT() FROM proxy") == '1\n'
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
distributed.query("SELECT COUNT() FROM proxy", user='remote')
|
||||
def test_shard_doesnt_throw_on_constraint_violation(started_cluster):
|
||||
query = "SELECT COUNT() FROM proxy"
|
||||
assert distributed.query(query) == '2\n'
|
||||
assert distributed.query(query, user = 'normal') == '2\n'
|
||||
assert distributed.query(query, user = 'wasteful') == '2\n'
|
||||
assert distributed.query(query, user = 'readonly') == '2\n'
|
||||
|
||||
assert distributed.query(query, settings={"max_memory_usage": 40000000, "readonly": 2}) == '2\n'
|
||||
assert distributed.query(query, settings={"max_memory_usage": 3000000000, "readonly": 2}) == '2\n'
|
||||
|
||||
assert distributed.query("SELECT COUNT() FROM proxy", settings={"max_memory_usage": 1000000}, user='remote') == '1\n'
|
||||
query = "SELECT COUNT() FROM remote('node{1,2}', 'default', 'sometable')"
|
||||
assert distributed.query(query) == '2\n'
|
||||
assert distributed.query(query, user = 'normal') == '2\n'
|
||||
assert distributed.query(query, user = 'wasteful') == '2\n'
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
distributed.query("SELECT COUNT() FROM proxy", settings={"max_memory_usage": 1000001}, user='remote')
|
||||
|
||||
def test_shard_clamps_settings(started_cluster):
|
||||
query = "SELECT hostName() as host, name, value FROM sysproxy WHERE name = 'max_memory_usage' OR name = 'readonly' ORDER BY host, name, value"
|
||||
assert distributed.query(query) == 'node1\tmax_memory_usage\t99999999\n'\
|
||||
'node1\treadonly\t0\n'\
|
||||
'node2\tmax_memory_usage\t10000000000\n'\
|
||||
'node2\treadonly\t1\n'
|
||||
assert distributed.query(query, user = 'normal') == 'node1\tmax_memory_usage\t80000000\n'\
|
||||
'node1\treadonly\t0\n'\
|
||||
'node2\tmax_memory_usage\t10000000000\n'\
|
||||
'node2\treadonly\t1\n'
|
||||
assert distributed.query(query, user = 'wasteful') == 'node1\tmax_memory_usage\t99999999\n'\
|
||||
'node1\treadonly\t0\n'\
|
||||
'node2\tmax_memory_usage\t10000000000\n'\
|
||||
'node2\treadonly\t1\n'
|
||||
assert distributed.query(query, user = 'readonly') == 'node1\tmax_memory_usage\t99999999\n'\
|
||||
'node1\treadonly\t1\n'\
|
||||
'node2\tmax_memory_usage\t10000000000\n'\
|
||||
'node2\treadonly\t1\n'
|
||||
|
||||
assert distributed.query(query, settings={"max_memory_usage": 1}) == 'node1\tmax_memory_usage\t11111111\n'\
|
||||
'node1\treadonly\t0\n'\
|
||||
'node2\tmax_memory_usage\t10000000000\n'\
|
||||
'node2\treadonly\t1\n'
|
||||
assert distributed.query(query, settings={"max_memory_usage": 40000000, "readonly": 2}) == 'node1\tmax_memory_usage\t40000000\n'\
|
||||
'node1\treadonly\t2\n'\
|
||||
'node2\tmax_memory_usage\t10000000000\n'\
|
||||
'node2\treadonly\t1\n'
|
||||
assert distributed.query(query, settings={"max_memory_usage": 3000000000, "readonly": 2}) == 'node1\tmax_memory_usage\t99999999\n'\
|
||||
'node1\treadonly\t2\n'\
|
||||
'node2\tmax_memory_usage\t10000000000\n'\
|
||||
'node2\treadonly\t1\n'
|
||||
|
Loading…
Reference in New Issue
Block a user