Add ability to alter settings with other types of alter

This commit is contained in:
alesapin 2019-08-27 12:34:53 +03:00
parent cf31187254
commit 7851d8fe0a
13 changed files with 90 additions and 79 deletions

View File

@ -370,15 +370,9 @@ TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id)
}
void IStorage::alterSettings(
const SettingsChanges & new_changes,
const Context & context,
TableStructureWriteLockHolder & /* table_lock_holder */)
IDatabase::ASTModifier IStorage::getSettingsModifier(const SettingsChanges & new_changes) const
{
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
IDatabase::ASTModifier storage_modifier = [&] (IAST & ast)
return [&] (IAST & ast)
{
if (!new_changes.empty())
{
@ -399,7 +393,6 @@ void IStorage::alterSettings(
}
}
};
context.getDatabase(current_database_name)->alterTable(context, current_table_name, getColumns(), getIndices(), getConstraints(), storage_modifier);
}
@ -408,19 +401,21 @@ void IStorage::alter(
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
{
if (params.isMutable())
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
const String database_name = getDatabaseName();
const String table_name = getTableName();
if (params.isSettingsAlter())
{
SettingsChanges new_changes;
params.applyForSettingsOnly(new_changes);
alterSettings(new_changes, context, table_lock_holder);
return;
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
context.getDatabase(database_name)->alterTable(context, table_name, getColumns(), getIndices(), getConstraints(), settings_modifier);
}
if (params.isMutable())
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
else
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndices();
@ -428,6 +423,7 @@ void IStorage::alter(
params.applyForColumnsOnly(new_columns);
context.getDatabase(database_name)->alterTable(context, table_name, new_columns, new_indices, new_constraints, {});
setColumns(std::move(new_columns));
}
}
}

View File

@ -147,6 +147,10 @@ protected: /// still thread-unsafe part.
/// Returns whether the column is virtual - by default all columns are real.
/// Initially reserved virtual column name may be shadowed by real column.
virtual bool isVirtualColumn(const String & column_name) const;
/// Returns modifier of settings in storage definition
IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const;
private:
ColumnsDescription columns; /// combined real and virtual columns
const ColumnsDescription virtuals = {};
@ -291,13 +295,6 @@ public:
throw Exception("Partition operations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** ALTER table settings if possible. Otherwise throws exception.
*/
virtual void alterSettings(
const SettingsChanges & new_changes,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder);
/** Perform any background work. For example, combining parts in a MergeTree type table.
* Returns whether any work has been done.
*/

View File

@ -17,16 +17,16 @@ struct KafkaSettings : public SettingsCollection<KafkaSettings>
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
#define LIST_OF_KAFKA_SETTINGS(M, IM) \
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
M(SettingString, kafka_format, "", "The message format for Kafka engine.") \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
M(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
IM(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
IM(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
IM(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
IM(SettingString, kafka_format, "", "The message format for Kafka engine.") \
IM(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
IM(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
IM(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
IM(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
IM(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
IM(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
DECLARE_SETTINGS_COLLECTION(LIST_OF_KAFKA_SETTINGS)

View File

@ -412,15 +412,6 @@ bool StorageKafka::hasSetting(const String & setting_name) const
return KafkaSettings::findIndex(setting_name) != KafkaSettings::npos;
}
void StorageKafka::alterSettings(
const SettingsChanges & /* new_changes */,
const Context & /* context */,
TableStructureWriteLockHolder & /* table_lock_holder */)
{
throw Exception("Storage '" + getName() + "' doesn't support settings alter", ErrorCodes::UNSUPPORTED_METHOD);
}
void registerStorageKafka(StorageFactory & factory)
{
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)

View File

@ -59,11 +59,6 @@ public:
bool hasSetting(const String & setting_name) const override;
void alterSettings(
const SettingsChanges & new_changes,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder) override;
protected:
StorageKafka(
const std::string & table_name_,

View File

@ -1645,18 +1645,16 @@ void MergeTreeData::alterDataPart(
return;
}
void MergeTreeData::alterSettings(
void MergeTreeData::changeSettings(
const SettingsChanges & new_changes,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
TableStructureWriteLockHolder & /* table_lock_holder */)
{
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
if (!new_changes.empty())
{
MergeTreeSettings copy = *getSettings();
copy.updateFromChanges(new_changes);
IStorage::alterSettings(new_changes, context, table_lock_holder);
storage_settings.set(std::make_unique<const MergeTreeSettings>(copy));
}
}
bool MergeTreeData::hasSetting(const String & setting_name) const

View File

@ -537,12 +537,10 @@ public:
bool skip_sanity_checks,
AlterDataPartTransactionPtr& transaction);
/// Performs ALTER of table settings (MergeTreeSettings). Lightweight operation, affects metadata only.
/// Not atomic, have to be done with alter intention lock.
void alterSettings(
/// Change MergeTreeSettings
void changeSettings(
const SettingsChanges & new_changes,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder) override;
TableStructureWriteLockHolder & table_lock_holder);
/// All MergeTreeData children have settings.
bool hasSetting(const String & setting_name) const override;

View File

@ -253,15 +253,6 @@ void StorageMergeTree::alter(
if (!params.isMutable())
{
SettingsChanges new_changes;
/// We don't need to lock table structure exclusively to ALTER settings.
if (params.isSettingsAlter())
{
params.applyForSettingsOnly(new_changes);
alterSettings(new_changes, context, table_lock_holder);
return;
}
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndices();
@ -269,8 +260,14 @@ void StorageMergeTree::alter(
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, {});
changeSettings(new_changes, table_lock_holder);
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, settings_modifier);
setColumns(std::move(new_columns));
return;
}
@ -305,9 +302,19 @@ void StorageMergeTree::alter(
if (new_ttl_table_ast.get() != ttl_table_ast.get())
storage_ast.set(storage_ast.ttl_table, new_ttl_table_ast);
if (!new_changes.empty())
{
auto settings_modifier = getSettingsModifier(new_changes);
settings_modifier(ast);
}
};
changeSettings(new_changes, table_lock_holder);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(new_order_by_ast, new_primary_key_ast, new_columns, new_indices, new_constraints);

View File

@ -3158,7 +3158,12 @@ void StorageReplicatedMergeTree::alter(
LOG_DEBUG(log, "ALTER storage_settings_ptr only");
SettingsChanges new_changes;
params.applyForSettingsOnly(new_changes);
alterSettings(new_changes, query_context, table_lock_holder);
changeSettings(new_changes, table_lock_holder);
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
global_context.getDatabase(current_database_name)->alterTable(
query_context, current_table_name, getColumns(), getIndices(), getConstraints(), settings_modifier);
return;
}
@ -3231,6 +3236,18 @@ void StorageReplicatedMergeTree::alter(
if (new_metadata_str != ReplicatedMergeTreeTableMetadata(*this).toString())
changed_nodes.emplace_back(zookeeper_path, "metadata", new_metadata_str);
/// Perform settings update locally
if (!new_changes.empty())
{
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
changeSettings(new_changes, table_lock_holder);
global_context.getDatabase(current_database_name)->alterTable(
query_context, current_table_name, getColumns(), getIndices(), getConstraints(), settings_modifier);
}
/// Modify shared metadata nodes in ZooKeeper.
Coordination::Requests ops;
for (const auto & node : changed_nodes)

View File

@ -3,3 +3,4 @@ CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = Merge
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 100, parts_to_delay_insert = 100
2
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 100, parts_to_delay_insert = 100, check_delay_period = 30
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String, `Data2` UInt64) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 100, parts_to_delay_insert = 100, check_delay_period = 15

View File

@ -47,5 +47,9 @@ ALTER TABLE table_for_alter MODIFY SETTING check_delay_period=10, check_delay_pe
SHOW CREATE TABLE table_for_alter;
ALTER TABLE table_for_alter ADD COLUMN Data2 UInt64, MODIFY SETTING check_delay_period=5, check_delay_period=10, check_delay_period=15;
SHOW CREATE TABLE table_for_alter;
DROP TABLE IF EXISTS table_for_alter;

View File

@ -8,3 +8,5 @@ CREATE TABLE default.replicated_table_for_alter1 (`id` UInt64, `Data` String) EN
6
CREATE TABLE default.replicated_table_for_alter1 (`id` UInt64, `Data` String) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'1\') ORDER BY id SETTINGS index_granularity = 8192, use_minimalistic_part_header_in_zookeeper = 1
CREATE TABLE default.replicated_table_for_alter2 (`id` UInt64, `Data` String) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'2\') ORDER BY id SETTINGS index_granularity = 8192, parts_to_throw_insert = 1, parts_to_delay_insert = 1
CREATE TABLE default.replicated_table_for_alter1 (`id` UInt64, `Data` String, `Data2` UInt64) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'1\') ORDER BY id SETTINGS index_granularity = 8192, use_minimalistic_part_header_in_zookeeper = 1, check_delay_period = 15
CREATE TABLE default.replicated_table_for_alter2 (`id` UInt64, `Data` String, `Data2` UInt64) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'2\') ORDER BY id SETTINGS index_granularity = 8192, parts_to_throw_insert = 1, parts_to_delay_insert = 1

View File

@ -58,5 +58,10 @@ ATTACH TABLE replicated_table_for_alter1;
SHOW CREATE TABLE replicated_table_for_alter1;
SHOW CREATE TABLE replicated_table_for_alter2;
ALTER TABLE replicated_table_for_alter1 ADD COLUMN Data2 UInt64, MODIFY SETTING check_delay_period=5, check_delay_period=10, check_delay_period=15;
SHOW CREATE TABLE replicated_table_for_alter1;
SHOW CREATE TABLE replicated_table_for_alter2;
DROP TABLE IF EXISTS replicated_table_for_alter2;
DROP TABLE IF EXISTS replicated_table_for_alter1;