From 7851d8fe0a6dfc98278fcda40f939a6fd898efd8 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 27 Aug 2019 12:34:53 +0300 Subject: [PATCH] Add ability to alter settings with other types of alter --- dbms/src/Storages/IStorage.cpp | 40 +++++++++---------- dbms/src/Storages/IStorage.h | 11 ++--- dbms/src/Storages/Kafka/KafkaSettings.h | 20 +++++----- dbms/src/Storages/Kafka/StorageKafka.cpp | 9 ----- dbms/src/Storages/Kafka/StorageKafka.h | 5 --- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 18 ++++----- dbms/src/Storages/MergeTree/MergeTreeData.h | 8 ++-- dbms/src/Storages/StorageMergeTree.cpp | 27 ++++++++----- .../Storages/StorageReplicatedMergeTree.cpp | 19 ++++++++- .../00980_merge_alter_settings.reference | 1 + .../00980_merge_alter_settings.sql | 4 ++ ...keeper_merge_tree_alter_settings.reference | 2 + ...80_zookeeper_merge_tree_alter_settings.sql | 5 +++ 13 files changed, 90 insertions(+), 79 deletions(-) diff --git a/dbms/src/Storages/IStorage.cpp b/dbms/src/Storages/IStorage.cpp index 58974010c27..2f3a48d90b6 100644 --- a/dbms/src/Storages/IStorage.cpp +++ b/dbms/src/Storages/IStorage.cpp @@ -370,15 +370,9 @@ TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id) } -void IStorage::alterSettings( - const SettingsChanges & new_changes, - const Context & context, - TableStructureWriteLockHolder & /* table_lock_holder */) +IDatabase::ASTModifier IStorage::getSettingsModifier(const SettingsChanges & new_changes) const { - const String current_database_name = getDatabaseName(); - const String current_table_name = getTableName(); - - IDatabase::ASTModifier storage_modifier = [&] (IAST & ast) + return [&] (IAST & ast) { if (!new_changes.empty()) { @@ -399,7 +393,6 @@ void IStorage::alterSettings( } } }; - context.getDatabase(current_database_name)->alterTable(context, current_table_name, getColumns(), getIndices(), getConstraints(), storage_modifier); } @@ -408,26 +401,29 @@ void IStorage::alter( const Context & context, TableStructureWriteLockHolder & table_lock_holder) { + if (params.isMutable()) + throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); + const String database_name = getDatabaseName(); const String table_name = getTableName(); + if (params.isSettingsAlter()) { SettingsChanges new_changes; params.applyForSettingsOnly(new_changes); - alterSettings(new_changes, context, table_lock_holder); - return; + IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes); + context.getDatabase(database_name)->alterTable(context, table_name, getColumns(), getIndices(), getConstraints(), settings_modifier); + } + else + { + lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); + auto new_columns = getColumns(); + auto new_indices = getIndices(); + auto new_constraints = getConstraints(); + params.applyForColumnsOnly(new_columns); + context.getDatabase(database_name)->alterTable(context, table_name, new_columns, new_indices, new_constraints, {}); + setColumns(std::move(new_columns)); } - - if (params.isMutable()) - throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); - - lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); - auto new_columns = getColumns(); - auto new_indices = getIndices(); - auto new_constraints = getConstraints(); - params.applyForColumnsOnly(new_columns); - context.getDatabase(database_name)->alterTable(context, table_name, new_columns, new_indices, new_constraints, {}); - setColumns(std::move(new_columns)); } } diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 5672a1ba3fb..35c1439964a 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -147,6 +147,10 @@ protected: /// still thread-unsafe part. /// Returns whether the column is virtual - by default all columns are real. /// Initially reserved virtual column name may be shadowed by real column. virtual bool isVirtualColumn(const String & column_name) const; + + /// Returns modifier of settings in storage definition + IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const; + private: ColumnsDescription columns; /// combined real and virtual columns const ColumnsDescription virtuals = {}; @@ -291,13 +295,6 @@ public: throw Exception("Partition operations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } - /** ALTER table settings if possible. Otherwise throws exception. - */ - virtual void alterSettings( - const SettingsChanges & new_changes, - const Context & context, - TableStructureWriteLockHolder & table_lock_holder); - /** Perform any background work. For example, combining parts in a MergeTree type table. * Returns whether any work has been done. */ diff --git a/dbms/src/Storages/Kafka/KafkaSettings.h b/dbms/src/Storages/Kafka/KafkaSettings.h index e43ea7cd70e..bc453238b51 100644 --- a/dbms/src/Storages/Kafka/KafkaSettings.h +++ b/dbms/src/Storages/Kafka/KafkaSettings.h @@ -17,16 +17,16 @@ struct KafkaSettings : public SettingsCollection /// M (mutable) for normal settings, IM (immutable) for not updateable settings. #define LIST_OF_KAFKA_SETTINGS(M, IM) \ - M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \ - M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \ - M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \ - M(SettingString, kafka_format, "", "The message format for Kafka engine.") \ - M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \ - M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \ - M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \ - M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \ - M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \ - M(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block") + IM(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \ + IM(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \ + IM(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \ + IM(SettingString, kafka_format, "", "The message format for Kafka engine.") \ + IM(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \ + IM(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \ + IM(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \ + IM(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \ + IM(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \ + IM(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block") DECLARE_SETTINGS_COLLECTION(LIST_OF_KAFKA_SETTINGS) diff --git a/dbms/src/Storages/Kafka/StorageKafka.cpp b/dbms/src/Storages/Kafka/StorageKafka.cpp index 1694c8e3ade..d53469259ab 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.cpp +++ b/dbms/src/Storages/Kafka/StorageKafka.cpp @@ -412,15 +412,6 @@ bool StorageKafka::hasSetting(const String & setting_name) const return KafkaSettings::findIndex(setting_name) != KafkaSettings::npos; } -void StorageKafka::alterSettings( - const SettingsChanges & /* new_changes */, - const Context & /* context */, - TableStructureWriteLockHolder & /* table_lock_holder */) -{ - throw Exception("Storage '" + getName() + "' doesn't support settings alter", ErrorCodes::UNSUPPORTED_METHOD); -} - - void registerStorageKafka(StorageFactory & factory) { factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args) diff --git a/dbms/src/Storages/Kafka/StorageKafka.h b/dbms/src/Storages/Kafka/StorageKafka.h index 27f1e7e0ec7..c2ff3562116 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.h +++ b/dbms/src/Storages/Kafka/StorageKafka.h @@ -59,11 +59,6 @@ public: bool hasSetting(const String & setting_name) const override; - void alterSettings( - const SettingsChanges & new_changes, - const Context & context, - TableStructureWriteLockHolder & table_lock_holder) override; - protected: StorageKafka( const std::string & table_name_, diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 0767fb35326..b2d4a4b9d73 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1645,18 +1645,16 @@ void MergeTreeData::alterDataPart( return; } -void MergeTreeData::alterSettings( +void MergeTreeData::changeSettings( const SettingsChanges & new_changes, - const Context & context, - TableStructureWriteLockHolder & table_lock_holder) + TableStructureWriteLockHolder & /* table_lock_holder */) { - const String current_database_name = getDatabaseName(); - const String current_table_name = getTableName(); - - MergeTreeSettings copy = *getSettings(); - copy.updateFromChanges(new_changes); - IStorage::alterSettings(new_changes, context, table_lock_holder); - storage_settings.set(std::make_unique(copy)); + if (!new_changes.empty()) + { + MergeTreeSettings copy = *getSettings(); + copy.updateFromChanges(new_changes); + storage_settings.set(std::make_unique(copy)); + } } bool MergeTreeData::hasSetting(const String & setting_name) const diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index f759b87f986..0440a3181c8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -537,12 +537,10 @@ public: bool skip_sanity_checks, AlterDataPartTransactionPtr& transaction); - /// Performs ALTER of table settings (MergeTreeSettings). Lightweight operation, affects metadata only. - /// Not atomic, have to be done with alter intention lock. - void alterSettings( + /// Change MergeTreeSettings + void changeSettings( const SettingsChanges & new_changes, - const Context & context, - TableStructureWriteLockHolder & table_lock_holder) override; + TableStructureWriteLockHolder & table_lock_holder); /// All MergeTreeData children have settings. bool hasSetting(const String & setting_name) const override; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 1504413801c..4b7b2c446f6 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -253,15 +253,6 @@ void StorageMergeTree::alter( if (!params.isMutable()) { - SettingsChanges new_changes; - /// We don't need to lock table structure exclusively to ALTER settings. - if (params.isSettingsAlter()) - { - params.applyForSettingsOnly(new_changes); - alterSettings(new_changes, context, table_lock_holder); - return; - } - lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); auto new_columns = getColumns(); auto new_indices = getIndices(); @@ -269,8 +260,14 @@ void StorageMergeTree::alter( ASTPtr new_order_by_ast = order_by_ast; ASTPtr new_primary_key_ast = primary_key_ast; ASTPtr new_ttl_table_ast = ttl_table_ast; + SettingsChanges new_changes; + params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes); - context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, {}); + + changeSettings(new_changes, table_lock_holder); + + IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes); + context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, settings_modifier); setColumns(std::move(new_columns)); return; } @@ -305,9 +302,19 @@ void StorageMergeTree::alter( if (new_ttl_table_ast.get() != ttl_table_ast.get()) storage_ast.set(storage_ast.ttl_table, new_ttl_table_ast); + + if (!new_changes.empty()) + { + auto settings_modifier = getSettingsModifier(new_changes); + settings_modifier(ast); + } }; + + changeSettings(new_changes, table_lock_holder); + context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier); + /// Reinitialize primary key because primary key column types might have changed. setProperties(new_order_by_ast, new_primary_key_ast, new_columns, new_indices, new_constraints); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index cb3828f5817..5c1924c322c 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3158,7 +3158,12 @@ void StorageReplicatedMergeTree::alter( LOG_DEBUG(log, "ALTER storage_settings_ptr only"); SettingsChanges new_changes; params.applyForSettingsOnly(new_changes); - alterSettings(new_changes, query_context, table_lock_holder); + + changeSettings(new_changes, table_lock_holder); + + IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes); + global_context.getDatabase(current_database_name)->alterTable( + query_context, current_table_name, getColumns(), getIndices(), getConstraints(), settings_modifier); return; } @@ -3231,6 +3236,18 @@ void StorageReplicatedMergeTree::alter( if (new_metadata_str != ReplicatedMergeTreeTableMetadata(*this).toString()) changed_nodes.emplace_back(zookeeper_path, "metadata", new_metadata_str); + /// Perform settings update locally + if (!new_changes.empty()) + { + IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes); + + changeSettings(new_changes, table_lock_holder); + + global_context.getDatabase(current_database_name)->alterTable( + query_context, current_table_name, getColumns(), getIndices(), getConstraints(), settings_modifier); + + } + /// Modify shared metadata nodes in ZooKeeper. Coordination::Requests ops; for (const auto & node : changed_nodes) diff --git a/dbms/tests/queries/0_stateless/00980_merge_alter_settings.reference b/dbms/tests/queries/0_stateless/00980_merge_alter_settings.reference index c7f912ddc79..ee3818d25dc 100644 --- a/dbms/tests/queries/0_stateless/00980_merge_alter_settings.reference +++ b/dbms/tests/queries/0_stateless/00980_merge_alter_settings.reference @@ -3,3 +3,4 @@ CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = Merge CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 100, parts_to_delay_insert = 100 2 CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 100, parts_to_delay_insert = 100, check_delay_period = 30 +CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String, `Data2` UInt64) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 100, parts_to_delay_insert = 100, check_delay_period = 15 diff --git a/dbms/tests/queries/0_stateless/00980_merge_alter_settings.sql b/dbms/tests/queries/0_stateless/00980_merge_alter_settings.sql index 43838b8a727..ed42a79ebbf 100644 --- a/dbms/tests/queries/0_stateless/00980_merge_alter_settings.sql +++ b/dbms/tests/queries/0_stateless/00980_merge_alter_settings.sql @@ -47,5 +47,9 @@ ALTER TABLE table_for_alter MODIFY SETTING check_delay_period=10, check_delay_pe SHOW CREATE TABLE table_for_alter; +ALTER TABLE table_for_alter ADD COLUMN Data2 UInt64, MODIFY SETTING check_delay_period=5, check_delay_period=10, check_delay_period=15; + +SHOW CREATE TABLE table_for_alter; + DROP TABLE IF EXISTS table_for_alter; diff --git a/dbms/tests/queries/0_stateless/00980_zookeeper_merge_tree_alter_settings.reference b/dbms/tests/queries/0_stateless/00980_zookeeper_merge_tree_alter_settings.reference index e55bfadd538..159102e1ca7 100644 --- a/dbms/tests/queries/0_stateless/00980_zookeeper_merge_tree_alter_settings.reference +++ b/dbms/tests/queries/0_stateless/00980_zookeeper_merge_tree_alter_settings.reference @@ -8,3 +8,5 @@ CREATE TABLE default.replicated_table_for_alter1 (`id` UInt64, `Data` String) EN 6 CREATE TABLE default.replicated_table_for_alter1 (`id` UInt64, `Data` String) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'1\') ORDER BY id SETTINGS index_granularity = 8192, use_minimalistic_part_header_in_zookeeper = 1 CREATE TABLE default.replicated_table_for_alter2 (`id` UInt64, `Data` String) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'2\') ORDER BY id SETTINGS index_granularity = 8192, parts_to_throw_insert = 1, parts_to_delay_insert = 1 +CREATE TABLE default.replicated_table_for_alter1 (`id` UInt64, `Data` String, `Data2` UInt64) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'1\') ORDER BY id SETTINGS index_granularity = 8192, use_minimalistic_part_header_in_zookeeper = 1, check_delay_period = 15 +CREATE TABLE default.replicated_table_for_alter2 (`id` UInt64, `Data` String, `Data2` UInt64) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'2\') ORDER BY id SETTINGS index_granularity = 8192, parts_to_throw_insert = 1, parts_to_delay_insert = 1 diff --git a/dbms/tests/queries/0_stateless/00980_zookeeper_merge_tree_alter_settings.sql b/dbms/tests/queries/0_stateless/00980_zookeeper_merge_tree_alter_settings.sql index 792a704b6a1..f2e453c99d2 100644 --- a/dbms/tests/queries/0_stateless/00980_zookeeper_merge_tree_alter_settings.sql +++ b/dbms/tests/queries/0_stateless/00980_zookeeper_merge_tree_alter_settings.sql @@ -58,5 +58,10 @@ ATTACH TABLE replicated_table_for_alter1; SHOW CREATE TABLE replicated_table_for_alter1; SHOW CREATE TABLE replicated_table_for_alter2; +ALTER TABLE replicated_table_for_alter1 ADD COLUMN Data2 UInt64, MODIFY SETTING check_delay_period=5, check_delay_period=10, check_delay_period=15; + +SHOW CREATE TABLE replicated_table_for_alter1; +SHOW CREATE TABLE replicated_table_for_alter2; + DROP TABLE IF EXISTS replicated_table_for_alter2; DROP TABLE IF EXISTS replicated_table_for_alter1;