diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 20e07d15916..3121389a6db 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -41,6 +41,7 @@ struct Settings M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough.") \ M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.") \ M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.") \ + M(SettingMaxAlterThreads, max_alter_threads, 0, "The maximum number of threads to execute the ALTER requests. By default, it is determined automatically.") \ M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.") \ M(SettingUInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).") \ M(SettingUInt64, max_query_size, 262144, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)") \ diff --git a/dbms/src/Interpreters/SettingsCommon.h b/dbms/src/Interpreters/SettingsCommon.h index ff2c0cd9339..abe6e052fbc 100644 --- a/dbms/src/Interpreters/SettingsCommon.h +++ b/dbms/src/Interpreters/SettingsCommon.h @@ -87,6 +87,12 @@ struct SettingMaxThreads UInt64 getAutoValueImpl() const; }; +struct SettingMaxAlterThreads: public SettingMaxThreads +{ + SettingMaxAlterThreads(UInt64 x = 0): + SettingMaxThreads(x) + {} +}; struct SettingSeconds { diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index a7a618873a8..620afcb0f91 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -194,15 +194,17 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & /*ne std::vector StorageMergeTree::prepare_alter_transactions( - const ColumnsDescription& new_columns, const IndicesDescription& new_indices, const size_t thread_pool_size) + const ColumnsDescription& new_columns, const IndicesDescription& new_indices, const Context & context) { - ThreadPool thread_pool(thread_pool_size); - auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated}); std::vector transactions(parts.size()); const auto& columns_for_parts = new_columns.getAllPhysical(); + const Settings & settings = context.getSettingsRef(); + size_t thread_pool_size = std::min(parts.size(), settings.max_alter_threads); + ThreadPool thread_pool(thread_pool_size); + size_t i = 0; for (const auto & part : parts) { @@ -259,7 +261,7 @@ void StorageMergeTree::alter( ASTPtr new_primary_key_ast = data.primary_key_ast; params.apply(new_columns, new_indices, new_order_by_ast, new_primary_key_ast); - auto transactions = prepare_alter_transactions(new_columns, new_indices, 2 * getNumberOfPhysicalCPUCores()); + auto transactions = prepare_alter_transactions(new_columns, new_indices, context); auto table_hard_lock = lockStructureForAlter(context.getCurrentQueryId()); diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 36b6cb9fc36..0f34c71213c 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -133,7 +133,7 @@ private: BackgroundProcessingPool::TaskHandle background_task_handle; std::vector prepare_alter_transactions( - const ColumnsDescription& new_columns, const IndicesDescription& new_indices, const size_t thread_pool_size); + const ColumnsDescription& new_columns, const IndicesDescription& new_indices, const Context & context); void loadMutations();