More simple

This commit is contained in:
alesapin 2020-02-14 12:24:00 +03:00
parent 199c22c363
commit d9ebec472b
2 changed files with 1 additions and 28 deletions

View File

@ -256,16 +256,6 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
log_entry.toString(), log_entry.toString(),
zkutil::CreateMode::PersistentSequential)); zkutil::CreateMode::PersistentSequential));
/// We check metadata_version has the same version as shared node.
/// In other case we may have parts, which nobody will alter.
///
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/metadata", storage.getMetadataVersion()));
/// We update version of block_number/partition node to register fact of new insert.
/// If we want to be sure, that no inserts happend in some period of time, than we can receive
/// version of all partition nodes inside block numbers and then make check requirests in zookeeper transaction.
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/block_numbers/" + part->info.partition_id, "", -1));
/// Deletes the information that the block number is used for writing. /// Deletes the information that the block number is used for writing.
block_number_lock->getUnlockOps(ops); block_number_lock->getUnlockOps(ops);

View File

@ -3363,28 +3363,11 @@ void StorageReplicatedMergeTree::alter(
Coordination::Stat mutations_stat; Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat); zookeeper->get(mutations_path, &mutations_stat);
Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers");
std::vector<std::future<Coordination::GetResponse>> partition_futures;
for (const String & partition : partitions)
partition_futures.push_back(zookeeper->asyncGet(zookeeper_path + "/block_numbers/" + partition));
std::unordered_map<String, int> partition_versions;
for (size_t i = 0; i < partition_futures.size(); ++i)
{
auto stat = partition_futures[i].get().stat;
auto partition = partitions[i];
partition_versions[partition] = stat.version;
}
lock_holder.emplace( lock_holder.emplace(
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper); zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
for (const auto & lock : lock_holder->getLocks()) for (const auto & lock : lock_holder->getLocks())
{
mutation_entry.block_numbers[lock.partition_id] = lock.number; mutation_entry.block_numbers[lock.partition_id] = lock.number;
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers/" + lock.partition_id, partition_versions[lock.partition_id]));
}
mutation_entry.create_time = time(nullptr); mutation_entry.create_time = time(nullptr);
@ -3449,7 +3432,7 @@ void StorageReplicatedMergeTree::alter(
if (!unwaited.empty()) if (!unwaited.empty())
throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED); throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED);
if (mutation_znode) if (mutation_znode.has_value() && !mutation_znode->empty())
{ {
LOG_DEBUG(log, "Metadata changes applied. Will wait for data changes."); LOG_DEBUG(log, "Metadata changes applied. Will wait for data changes.");
waitMutation(*mutation_znode, query_context.getSettingsRef().replication_alter_partitions_sync); waitMutation(*mutation_znode, query_context.getSettingsRef().replication_alter_partitions_sync);