This commit is contained in:
ivan-kush 2019-04-25 21:57:43 +03:00
parent 3f01882310
commit 33201fa9a8
4 changed files with 14 additions and 5 deletions

View File

@ -41,6 +41,7 @@ struct Settings
M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough.") \
M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.") \
M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.") \
M(SettingMaxAlterThreads, max_alter_threads, 0, "The maximum number of threads to execute the ALTER requests. By default, it is determined automatically.") \
M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.") \
M(SettingUInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).") \
M(SettingUInt64, max_query_size, 262144, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)") \

View File

@ -87,6 +87,12 @@ struct SettingMaxThreads
UInt64 getAutoValueImpl() const;
};
struct SettingMaxAlterThreads: public SettingMaxThreads
{
SettingMaxAlterThreads(UInt64 x = 0):
SettingMaxThreads(x)
{}
};
struct SettingSeconds
{

View File

@ -194,15 +194,17 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & /*ne
std::vector<MergeTreeData::AlterDataPartTransactionPtr> StorageMergeTree::prepare_alter_transactions(
const ColumnsDescription& new_columns, const IndicesDescription& new_indices, const size_t thread_pool_size)
const ColumnsDescription& new_columns, const IndicesDescription& new_indices, const Context & context)
{
ThreadPool thread_pool(thread_pool_size);
auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions(parts.size());
const auto& columns_for_parts = new_columns.getAllPhysical();
const Settings & settings = context.getSettingsRef();
size_t thread_pool_size = std::min<size_t>(parts.size(), settings.max_alter_threads);
ThreadPool thread_pool(thread_pool_size);
size_t i = 0;
for (const auto & part : parts)
{
@ -259,7 +261,7 @@ void StorageMergeTree::alter(
ASTPtr new_primary_key_ast = data.primary_key_ast;
params.apply(new_columns, new_indices, new_order_by_ast, new_primary_key_ast);
auto transactions = prepare_alter_transactions(new_columns, new_indices, 2 * getNumberOfPhysicalCPUCores());
auto transactions = prepare_alter_transactions(new_columns, new_indices, context);
auto table_hard_lock = lockStructureForAlter(context.getCurrentQueryId());

View File

@ -133,7 +133,7 @@ private:
BackgroundProcessingPool::TaskHandle background_task_handle;
std::vector<MergeTreeData::AlterDataPartTransactionPtr> prepare_alter_transactions(
const ColumnsDescription& new_columns, const IndicesDescription& new_indices, const size_t thread_pool_size);
const ColumnsDescription& new_columns, const IndicesDescription& new_indices, const Context & context);
void loadMutations();