Uniq settings

This commit is contained in:
alesapin 2019-08-06 17:09:36 +03:00
parent 303c4e5a58
commit ca29343f54
8 changed files with 39 additions and 23 deletions

View File

@ -599,7 +599,7 @@ public:
loadFromChange(change);
}
/// Applies changes to the settings, checks settings mutability
/// Applies changes to the settings, checks settings mutability.
void updateFromChange(const SettingChange & change)
{
update(change.name, change.value);

View File

@ -15,16 +15,16 @@ struct KafkaSettings : public SettingsCollection<KafkaSettings>
{
#define LIST_OF_KAFKA_SETTINGS(M, IM) \
IM(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
IM(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
IM(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
IM(SettingString, kafka_format, "", "The message format for Kafka engine.") \
IM(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
IM(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
IM(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
IM(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
IM(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
IM(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
M(SettingString, kafka_format, "", "The message format for Kafka engine.") \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
M(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
DECLARE_SETTINGS_COLLECTION(LIST_OF_KAFKA_SETTINGS)

View File

@ -1595,8 +1595,16 @@ void MergeTreeData::alterSettings(
{
if (!new_changes.empty())
{
auto & storage_ast = ast.as<ASTStorage &>();
storage_ast.settings->changes.insert(storage_ast.settings->changes.end(), new_changes.begin(), new_changes.end());
auto & storage_changes = ast.as<ASTStorage &>().settings->changes;
/// Make storage settings unique
for (const auto & change : new_changes)
{
auto finder = [&change] (const SettingChange & c) { return c.name == change.name;};
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
it->value = change.value;
else
storage_changes.push_back(change);
}
}
};
context.getDatabase(current_database_name)->alterTable(context, current_table_name, getColumns(), getIndices(), storage_modifier);

View File

@ -509,6 +509,7 @@ public:
bool skip_sanity_checks,
AlterDataPartTransactionPtr& transaction);
/// Performs ALTER of table settings
void alterSettings(
const SettingsChanges & new_changes,
const String & current_database_name,

View File

@ -67,7 +67,7 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
#define ADD_IF_ABSENT(NAME) \
if (std::find_if(changes.begin(), changes.end(), \
[](const SettingChange & c) { return c.name == #NAME; }) \
[](const SettingChange & c) { return c.name == #NAME; }) \
== changes.end()) \
changes.push_back(SettingChange{#NAME, NAME.value});

View File

@ -260,16 +260,8 @@ void StorageMergeTree::alter(
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
params.apply(new_columns, new_indices, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
IDatabase::ASTModifier storage_modifier = [&] (IAST & ast)
{
auto & storage_ast = ast.as<ASTStorage &>();
if (!new_changes.empty())
storage_ast.settings->changes.insert(storage_ast.settings->changes.end(), new_changes.begin(), new_changes.end());
};
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, storage_modifier);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, {});
setColumns(std::move(new_columns));
settings.updateFromChanges(new_changes);
return;
}

View File

@ -1,2 +1,5 @@
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 1, parts_to_delay_insert = 1
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 100, parts_to_delay_insert = 100
2
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 100, parts_to_delay_insert = 100, check_delay_period = 30

View File

@ -35,5 +35,17 @@ INSERT INTO table_for_alter VALUES (2, '2'); -- { serverError 252 }
ALTER TABLE table_for_alter MODIFY SETTING xxx_yyy=124; -- { serverError 115 }
ALTER TABLE table_for_alter MODIFY SETTING parts_to_throw_insert = 100, parts_to_delay_insert = 100;
INSERT INTO table_for_alter VALUES (2, '2');
SHOW CREATE TABLE table_for_alter;
SELECT COUNT() FROM table_for_alter;
ALTER TABLE table_for_alter MODIFY SETTING check_delay_period=10, check_delay_period=20, check_delay_period=30;
SHOW CREATE TABLE table_for_alter;
DROP TABLE IF EXISTS table_for_alter;