Merge pull request #4639 from IvanKush/feat/parallel-alter-modify

feat alter:  parallelize processing of parts in alter modify
This commit is contained in:
alexey-milovidov 2019-05-12 00:39:46 +03:00 committed by GitHub
commit f7343ecb6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 232 additions and 34 deletions

View File

@ -50,6 +50,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough.") \
M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.") \
M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.") \
M(SettingMaxThreads, max_alter_threads, 0, "The maximum number of threads to execute the ALTER requests. By default, it is determined automatically.") \
M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.") \
M(SettingUInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).") \
M(SettingUInt64, max_query_size, 262144, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)") \

View File

@ -1435,14 +1435,14 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
}
}
MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
const DataPartPtr & part,
void MergeTreeData::alterDataPart(
const NamesAndTypesList & new_columns,
const IndicesASTs & new_indices,
bool skip_sanity_checks)
bool skip_sanity_checks,
AlterDataPartTransactionPtr & transaction)
{
ExpressionActionsPtr expression;
AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part)); /// Blocks changes to the part.
const auto & part = transaction->getDataPart();
bool force_update_metadata;
createConvertExpression(part, part->columns, new_columns,
getIndices().indices, new_indices,
@ -1504,7 +1504,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
if (transaction->rename_map.empty() && !force_update_metadata)
{
transaction->clear();
return nullptr;
return;
}
/// Apply the expression and write the result to temporary files.
@ -1573,7 +1573,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
transaction->rename_map["columns.txt.tmp"] = "columns.txt";
}
return transaction;
return;
}
void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part)
@ -1596,9 +1596,11 @@ void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr
}
LOG_INFO(log, "Removing empty columns: " << log_message.str() << " from part " << data_part->name);
if (auto transaction = alterDataPart(data_part, new_columns, getIndices().indices, false))
AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(data_part));
alterDataPart(new_columns, getIndices().indices, false, transaction);
if (transaction->isValid())
transaction->commit();
empty_columns.clear();
}
@ -1607,10 +1609,24 @@ void MergeTreeData::freezeAll(const String & with_name, const Context & context)
freezePartitionsByMatcher([] (const DataPartPtr &){ return true; }, with_name, context);
}
bool MergeTreeData::AlterDataPartTransaction::isValid() const
{
return valid && data_part;
}
void MergeTreeData::AlterDataPartTransaction::clear()
{
valid = false;
}
void MergeTreeData::AlterDataPartTransaction::commit()
{
if (!isValid())
return;
if (!data_part)
return;
try
{
std::unique_lock<std::shared_mutex> lock(data_part->columns_lock);
@ -1668,11 +1684,14 @@ void MergeTreeData::AlterDataPartTransaction::commit()
MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
{
if (!isValid())
return;
if (!data_part)
return;
try
{
if (!data_part)
return;
LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->relative_path);
String path = data_part->getFullPath();

View File

@ -227,17 +227,17 @@ public:
const NamesAndTypesList & getNewColumns() const { return new_columns; }
const DataPart::Checksums & getNewChecksums() const { return new_checksums; }
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_), alter_lock(data_part->alter_mutex) {}
const DataPartPtr & getDataPart() const { return data_part; }
bool isValid() const;
private:
friend class MergeTreeData;
void clear();
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_), alter_lock(data_part->alter_mutex) {}
void clear()
{
alter_lock.unlock();
data_part = nullptr;
}
bool valid = true;
//don't interchange order of data_part & alter_lock
DataPartPtr data_part;
DataPartsLock alter_lock;
@ -526,11 +526,11 @@ public:
/// Returns an object allowing to rename temporary files to permanent files.
/// If the number of affected columns is suspiciously high and skip_sanity_checks is false, throws an exception.
/// If no data transformations are necessary, returns nullptr.
AlterDataPartTransactionPtr alterDataPart(
const DataPartPtr & part,
void alterDataPart(
const NamesAndTypesList & new_columns,
const IndicesASTs & new_indices,
bool skip_sanity_checks);
bool skip_sanity_checks,
AlterDataPartTransactionPtr& transaction);
/// Remove columns, that have been markedd as empty after zeroing values with expired ttl
void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);

View File

@ -150,8 +150,9 @@ void ReplicatedMergeTreeAlterThread::run()
/// Update the part and write result to temporary files.
/// TODO: You can skip checking for too large changes if ZooKeeper has, for example,
/// node /flags/force_alter.
auto transaction = storage.alterDataPart(part, columns_for_parts, indices_for_parts.indices, false);
if (!transaction)
MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
storage.alterDataPart(columns_for_parts, indices_for_parts.indices, false, transaction);
if (!transaction->isValid())
continue;
storage.updatePartHeaderInZooKeeperAndCommit(zookeeper, *transaction);

View File

@ -3,6 +3,7 @@
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/FieldVisitors.h>
#include <Common/ThreadPool.h>
#include <Common/localBackup.h>
#include <Interpreters/InterpreterAlterQuery.h>
@ -193,6 +194,47 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & /*ne
}
std::vector<MergeTreeData::AlterDataPartTransactionPtr> StorageMergeTree::prepareAlterTransactions(
const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context)
{
auto parts = getDataParts({MergeTreeDataPartState::PreCommitted,
MergeTreeDataPartState::Committed,
MergeTreeDataPartState::Outdated});
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
transactions.reserve(parts.size());
const auto & columns_for_parts = new_columns.getAllPhysical();
const Settings & settings = context.getSettingsRef();
size_t thread_pool_size = std::min<size_t>(parts.size(), settings.max_alter_threads);
ThreadPool thread_pool(thread_pool_size);
for (const auto & part : parts)
{
transactions.push_back(std::make_unique<MergeTreeData::AlterDataPartTransaction>(part));
thread_pool.schedule(
[this, & transaction = transactions.back(), & columns_for_parts, & new_indices = new_indices.indices]
{
this->alterDataPart(columns_for_parts, new_indices, false, transaction);
}
);
}
thread_pool.wait();
auto erase_pos = std::remove_if(transactions.begin(), transactions.end(),
[](const MergeTreeData::AlterDataPartTransactionPtr & transaction)
{
return !transaction->isValid();
}
);
transactions.erase(erase_pos, transactions.end());
return transactions;
}
void StorageMergeTree::alter(
const AlterCommands & params,
const String & current_database_name,
@ -225,14 +267,7 @@ void StorageMergeTree::alter(
ASTPtr new_ttl_table_ast = ttl_table_ast;
params.apply(new_columns, new_indices, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast);
auto parts = getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
auto columns_for_parts = new_columns.getAllPhysical();
std::vector<AlterDataPartTransactionPtr> transactions;
for (const DataPartPtr & part : parts)
{
if (auto transaction = alterDataPart(part, columns_for_parts, new_indices.indices, false))
transactions.push_back(std::move(transaction));
}
auto transactions = prepareAlterTransactions(new_columns, new_indices, context);
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
@ -258,7 +293,10 @@ void StorageMergeTree::alter(
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
for (auto & transaction : transactions)
{
transaction->commit();
transaction.reset();
}
/// Columns sizes could be changed
recalculateColumnSizes();
@ -812,7 +850,9 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
if (part->info.partition_id != partition_id)
throw Exception("Unexpected partition ID " + part->info.partition_id + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
if (auto transaction = alterDataPart(part, columns_for_parts, new_indices.indices, false))
MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
alterDataPart(columns_for_parts, new_indices.indices, false, transaction);
if (transaction->isValid())
transactions.push_back(std::move(transaction));
LOG_DEBUG(log, "Removing column " << get<String>(column_name) << " from part " << part->name);
@ -822,7 +862,10 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
return;
for (auto & transaction : transactions)
{
transaction->commit();
transaction.reset();
}
/// Recalculate columns size (not only for the modified column)
recalculateColumnSizes();

View File

@ -94,6 +94,9 @@ private:
BackgroundProcessingPool::TaskHandle background_task_handle;
std::vector<MergeTreeData::AlterDataPartTransactionPtr> prepareAlterTransactions(
const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context);
void loadMutations();
/** Determines what parts should be merged and merges it.

View File

@ -1529,8 +1529,9 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry &
LOG_DEBUG(log, "Clearing column " << entry.column_name << " in part " << part->name);
auto transaction = alterDataPart(part, columns_for_parts, new_indices.indices, false);
if (!transaction)
MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
alterDataPart(columns_for_parts, new_indices.indices, false, transaction);
if (!transaction->isValid())
continue;
updatePartHeaderInZooKeeperAndCommit(zookeeper, *transaction);

View File

@ -0,0 +1,106 @@
*** Check SHOW CREATE TABLE ***
CREATE TABLE test.alter_column (`x` UInt32, `y` Int32) ENGINE = MergeTree PARTITION BY x ORDER BY x SETTINGS index_granularity = 8192
*** Check parts ***
0 0
10 -10
11 -11
12 -12
13 -13
14 -14
15 -15
16 -16
17 -17
18 -18
19 -19
1 -1
20 -20
21 -21
22 -22
23 -23
24 -24
25 -25
26 -26
27 -27
28 -28
29 -29
2 -2
30 -30
31 -31
32 -32
33 -33
34 -34
35 -35
36 -36
37 -37
38 -38
39 -39
3 -3
40 -40
41 -41
42 -42
43 -43
44 -44
45 -45
46 -46
47 -47
48 -48
49 -49
4 -4
5 -5
6 -6
7 -7
8 -8
9 -9
*** Check SHOW CREATE TABLE after ALTER MODIFY ***
CREATE TABLE test.alter_column (`x` UInt32, `y` Int64) ENGINE = MergeTree PARTITION BY x ORDER BY x SETTINGS index_granularity = 8192
*** Check parts after ALTER MODIFY ***
0 0
10 -10
11 -11
12 -12
13 -13
14 -14
15 -15
16 -16
17 -17
18 -18
19 -19
1 -1
20 -20
21 -21
22 -22
23 -23
24 -24
25 -25
26 -26
27 -27
28 -28
29 -29
2 -2
30 -30
31 -31
32 -32
33 -33
34 -34
35 -35
36 -36
37 -37
38 -38
39 -39
3 -3
40 -40
41 -41
42 -42
43 -43
44 -44
45 -45
46 -46
47 -47
48 -48
49 -49
4 -4
5 -5
6 -6
7 -7
8 -8
9 -9

View File

@ -0,0 +1,24 @@
-- check ALTER MODIFY COLUMN with partitions
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.alter_column;
CREATE TABLE test.alter_column(x UInt32, y Int32) ENGINE MergeTree PARTITION BY x ORDER BY x;
INSERT INTO test.alter_column (x, y) SELECT number AS x, -number AS y FROM system.numbers LIMIT 50;
SELECT '*** Check SHOW CREATE TABLE ***';
SHOW CREATE TABLE test.alter_column;
SELECT '*** Check parts ***';
SELECT * FROM test.alter_column ORDER BY _part;
ALTER TABLE test.alter_column MODIFY COLUMN y Int64;
SELECT '*** Check SHOW CREATE TABLE after ALTER MODIFY ***';
SHOW CREATE TABLE test.alter_column;
SELECT '*** Check parts after ALTER MODIFY ***';
SELECT * FROM test.alter_column ORDER BY _part;
DROP TABLE test.alter_column;