Fix error; refinements

This commit is contained in:
Alexey Milovidov 2020-08-28 02:22:00 +03:00
parent c04dc4e722
commit 88db4938f5

View File

@ -218,6 +218,11 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
String temporary_part_name = part->name; String temporary_part_name = part->name;
/// There is one case when we need to retry transaction in a loop.
/// But don't do it too many times - just as defensive measure.
size_t loop_counter = 0;
constexpr size_t max_iterations = 10;
while (true) while (true)
{ {
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem. /// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
@ -229,6 +234,10 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : ""; String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : "";
auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
/// Prepare transaction to ZooKeeper
/// It will simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
Coordination::Requests ops;
Int64 block_number = 0; Int64 block_number = 0;
String existing_part_name; String existing_part_name;
if (block_number_lock) if (block_number_lock)
@ -242,6 +251,25 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
part->info.level = 0; part->info.level = 0;
part->name = part->getNewName(part->info); part->name = part->getNewName(part->info);
/// Will add log entry about new part.
StorageReplicatedMergeTree::LogEntry log_entry;
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
log_entry.create_time = time(nullptr);
log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part->name;
log_entry.quorum = quorum;
log_entry.block_id = block_id;
log_entry.new_part_type = part->getType();
ops.emplace_back(zkutil::makeCreateRequest(
storage.zookeeper_path + "/log/log-",
log_entry.toString(),
zkutil::CreateMode::PersistentSequential));
/// Deletes the information that the block number is used for writing.
block_number_lock->getUnlockOps(ops);
} }
else else
{ {
@ -269,43 +297,21 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
part->name = existing_part_name; part->name = existing_part_name;
part->info = MergeTreePartInfo::fromPartName(existing_part_name, storage.format_version); part->info = MergeTreePartInfo::fromPartName(existing_part_name, storage.format_version);
/// Used only for exception messages.
block_number = part->info.min_block; block_number = part->info.min_block;
/// Don't do subsequent duplicate check. /// Don't do subsequent duplicate check.
block_id_path.clear(); block_id_path.clear();
} }
StorageReplicatedMergeTree::LogEntry log_entry;
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
log_entry.create_time = time(nullptr);
log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part->name;
log_entry.quorum = quorum;
log_entry.block_id = block_id;
log_entry.new_part_type = part->getType();
/// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
/// Information about the part. /// Information about the part.
Coordination::Requests ops;
storage.getCommitPartOps(ops, part, block_id_path); storage.getCommitPartOps(ops, part, block_id_path);
/// Replication log. /** If we need a quorum - create a node in which the quorum is monitored.
ops.emplace_back(zkutil::makeCreateRequest( * (If such a node already exists, then someone has managed to make another quorum record at the same time,
storage.zookeeper_path + "/log/log-", * but for it the quorum has not yet been reached.
log_entry.toString(), * You can not do the next quorum record at this time.)
zkutil::CreateMode::PersistentSequential)); */
/// Deletes the information that the block number is used for writing.
if (block_number_lock)
block_number_lock->getUnlockOps(ops);
/** If you need a quorum - create a node in which the quorum is monitored.
* (If such a node already exists, then someone has managed to make another quorum record at the same time,
* but for it the quorum has not yet been reached.
* You can not do the next quorum record at this time.)
*/
if (quorum) /// TODO Duplicate blocks. if (quorum) /// TODO Duplicate blocks.
{ {
ReplicatedMergeTreeQuorumEntry quorum_entry; ReplicatedMergeTreeQuorumEntry quorum_entry;
@ -405,6 +411,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
part->state = MergeTreeDataPartState::Temporary; part->state = MergeTreeDataPartState::Temporary;
part->renameTo(temporary_part_name, false); part->renameTo(temporary_part_name, false);
++loop_counter;
if (loop_counter == max_iterations)
throw Exception("Too many transaction retires - it may indicate an error", ErrorCodes::DUPLICATE_DATA_PART);
continue; continue;
} }
else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path) else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)