dbms: added setting replication_alter_partitions_sync [#METR-13158].

This commit is contained in:
Alexey Milovidov 2014-10-18 23:14:09 +04:00
parent 43ff614c79
commit d3664eac89
10 changed files with 50 additions and 34 deletions

View File

@ -76,6 +76,9 @@ struct Settings
/** Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree */ \
M(SettingBool, optimize_move_to_prewhere, false) \
\
/** Ожидать выполнения действий по манипуляции с партициями. 0 - не ждать, 1 - ждать выполнения только у себя, 2 - ждать всех. */ \
M(SettingUInt64, replication_alter_partitions_sync, 1) \
\
M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM) \
\
M(SettingTotalsMode, totals_mode, TotalsMode::BEFORE_HAVING) \

View File

@ -62,7 +62,7 @@ public:
/** Возвращает true, если хранилище поддерживает запросы с секцией FINAL.
*/
virtual bool supportsFinal() const { return false; }
/** Возвращает true, если хранилище поддерживает запросы с секцией PREWHERE.
*/
virtual bool supportsPrewhere() const { return false; }
@ -207,21 +207,21 @@ public:
/** Выполнить запрос (DROP|DETACH) PARTITION.
*/
virtual void dropPartition(const Field & partition, bool detach)
virtual void dropPartition(const Field & partition, bool detach, const Settings & settings)
{
throw Exception("Method dropPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Выполнить запрос ATTACH [UNREPLICATED] (PART|PARTITION).
*/
virtual void attachPartition(const Field & partition, bool unreplicated, bool part)
virtual void attachPartition(const Field & partition, bool unreplicated, bool part, const Settings & settings)
{
throw Exception("Method attachPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Выполнить запрос FETCH [UNREPLICATED] PARTITION.
/** Выполнить запрос FETCH PARTITION.
*/
virtual void fetchPartition(const Field & partition, const String & from)
virtual void fetchPartition(const Field & partition, const String & from, const Settings & settings)
{
throw Exception("Method fetchPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
@ -252,7 +252,7 @@ public:
virtual void shutdown() {}
/** Возвращает владеющий указатель на себя.
*/
*/
std::shared_ptr<IStorage> thisPtr()
{
std::shared_ptr<IStorage> res = this_ptr.lock();

View File

@ -46,7 +46,7 @@ public:
void drop() override {}
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override { name = new_table_name; }
/// в подтаблицах добавлять и удалять столбы нужно вручную
/// структура подтаблиц не проверяется
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
@ -57,7 +57,7 @@ private:
String source_database;
OptimizedRegularExpression table_name_regexp;
const Context & context;
StorageMerge(
const std::string & name_,
NamesAndTypesListPtr columns_,

View File

@ -77,8 +77,8 @@ public:
return merge(true);
}
void dropPartition(const Field & partition, bool detach) override;
void attachPartition(const Field & partition, bool unreplicated, bool part) override;
void dropPartition(const Field & partition, bool detach, const Settings & settings) override;
void attachPartition(const Field & partition, bool unreplicated, bool part, const Settings & settings) override;
void drop() override;

View File

@ -81,9 +81,9 @@ public:
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
void dropPartition(const Field & partition, bool detach) override;
void attachPartition(const Field & partition, bool unreplicated, bool part) override;
void fetchPartition(const Field & partition, const String & from) override;
void dropPartition(const Field & partition, bool detach, const Settings & settings) override;
void attachPartition(const Field & partition, bool unreplicated, bool part, const Settings & settings) override;
void fetchPartition(const Field & partition, const String & from, const Settings & settings) override;
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
*/

View File

@ -42,15 +42,15 @@ void InterpreterAlterQuery::execute()
switch (command.type)
{
case PartitionCommand::DROP_PARTITION:
table->dropPartition(command.partition, command.detach);
table->dropPartition(command.partition, command.detach, context.getSettingsRef());
break;
case PartitionCommand::ATTACH_PARTITION:
table->attachPartition(command.partition, command.unreplicated, command.part);
table->attachPartition(command.partition, command.unreplicated, command.part, context.getSettingsRef());
break;
case PartitionCommand::FETCH_PARTITION:
table->fetchPartition(command.partition, command.from);
table->fetchPartition(command.partition, command.from, context.getSettingsRef());
break;
default:

View File

@ -18,7 +18,7 @@ static String generateActiveNodeIdentifier()
ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_)
: storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, RestartingThread)")),
active_node_identifier(generateActiveNodeIdentifier()),
thread([this] { run(); })
{

View File

@ -215,7 +215,7 @@ bool StorageMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, co
}
void StorageMergeTree::dropPartition(const Field & partition, bool detach)
void StorageMergeTree::dropPartition(const Field & partition, bool detach, const Settings & settings)
{
/** TODO В этот момент могут идти мерджи кусков в удаляемой партиции.
* Когда эти мерджи завершатся, то часть данных из удаляемой партиции "оживёт".
@ -245,7 +245,7 @@ void StorageMergeTree::dropPartition(const Field & partition, bool detach)
}
void StorageMergeTree::attachPartition(const Field & field, bool unreplicated, bool part)
void StorageMergeTree::attachPartition(const Field & field, bool unreplicated, bool part, const Settings & settings)
{
if (unreplicated)
throw Exception("UNREPLICATED option for ATTACH has meaning only for ReplicatedMergeTree", ErrorCodes::BAD_ARGUMENTS);

View File

@ -2074,7 +2074,7 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn
}
void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach)
void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach, const Settings & settings)
{
String month_name = MergeTreeData::getMonthName(field);
@ -2121,12 +2121,18 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach)
String log_znode_path = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
/// Дождемся, пока все реплики выполнят дроп.
waitForAllReplicasToProcessLogEntry(entry);
/// Если надо - дожидаемся выполнения операции на себе или на всех репликах.
if (settings.replication_alter_partitions_sync != 0)
{
if (settings.replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
else
waitForAllReplicasToProcessLogEntry(entry);
}
}
void StorageReplicatedMergeTree::attachPartition(const Field & field, bool unreplicated, bool attach_part)
void StorageReplicatedMergeTree::attachPartition(const Field & field, bool unreplicated, bool attach_part, const Settings & settings)
{
String partition;
@ -2174,11 +2180,10 @@ void StorageReplicatedMergeTree::attachPartition(const Field & field, bool unrep
UInt64 min_used_number = RESERVED_BLOCK_NUMBERS;
{
/// TODO Это необходимо лишь в пределах одного месяца.
auto existing_parts = data.getDataParts();
for (const auto & part : existing_parts)
{
min_used_number = std::min(min_used_number, part->left);
}
}
if (parts.size() > min_used_number)
@ -2212,14 +2217,22 @@ void StorageReplicatedMergeTree::attachPartition(const Field & field, bool unrep
LOG_DEBUG(log, "Adding attaches to log");
zookeeper->multi(ops);
size_t i = 0;
for (LogEntry & entry : entries)
/// Если надо - дожидаемся выполнения операции на себе или на всех репликах.
if (settings.replication_alter_partitions_sync != 0)
{
String log_znode_path = dynamic_cast<zkutil::Op::Create &>(ops[i]).getPathCreated();
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
size_t i = 0;
for (LogEntry & entry : entries)
{
String log_znode_path = dynamic_cast<zkutil::Op::Create &>(ops[i]).getPathCreated();
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
waitForAllReplicasToProcessLogEntry(entry);
++i;
if (settings.replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
else
waitForAllReplicasToProcessLogEntry(entry);
++i;
}
}
}
@ -2425,7 +2438,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
}
void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_)
void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings)
{
String partition_str = MergeTreeData::getMonthName(partition);

View File

@ -1,3 +1,5 @@
SET replication_alter_partitions_sync = 2;
DROP TABLE IF EXISTS test.attach_r1;
DROP TABLE IF EXISTS test.attach_r2;
@ -18,8 +20,6 @@ SELECT d FROM test.attach_r2 ORDER BY d;
ALTER TABLE test.attach_r1 ATTACH PARTITION 201402;
SELECT * WHERE sleep(0.05); -- заменить на что-нибудь получше
SELECT d FROM test.attach_r1 ORDER BY d;
SELECT d FROM test.attach_r2 ORDER BY d;