make stages commit

This commit is contained in:
Sema Checherinda 2023-11-30 02:09:44 +01:00
parent 7efa76f08d
commit e4becc01ba
4 changed files with 481 additions and 415 deletions

View File

@ -3769,25 +3769,6 @@ void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const
resetObjectColumnsFromActiveParts(acquired_lock);
}
void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove)
{
auto lock = lockParts();
for (const auto & part : remove)
{
auto it_part = data_parts_by_info.find(part->info);
if (it_part == data_parts_by_info.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} not found in data_parts", part->getNameWithState());
assert(part->getState() == MergeTreeDataPartState::PreActive);
modifyPartState(part, MergeTreeDataPartState::Temporary);
/// Erase immediately
LOG_TEST(log, "removePartsFromWorkingSetImmediatelyAndSetTemporaryState: removing {} from data_parts_indexes", part->getNameWithState());
data_parts_indexes.erase(it_part);
}
}
void MergeTreeData::removePartsFromWorkingSet(
MergeTreeTransaction * txn, const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock * acquired_lock)
{
@ -6300,24 +6281,6 @@ MergeTreeData::Transaction::Transaction(MergeTreeData & data_, MergeTreeTransact
data.transactions_enabled.store(true);
}
void MergeTreeData::Transaction::rollbackPartsToTemporaryState()
{
if (!isEmpty())
{
WriteBufferFromOwnString buf;
buf << " Rollbacking parts state to temporary and removing from working set:";
for (const auto & part : precommitted_parts)
buf << " " << part->getDataPartStorage().getPartDirectory();
buf << ".";
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
data.removePartsFromWorkingSetImmediatelyAndSetTemporaryState(
DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()));
}
clear();
}
TransactionID MergeTreeData::Transaction::getTID() const
{
if (txn)

View File

@ -260,10 +260,6 @@ public:
void rollback(DataPartsLock * lock = nullptr);
/// Immediately remove parts from table's data_parts set and change part
/// state to temporary. Useful for new parts which not present in table.
void rollbackPartsToTemporaryState();
size_t size() const { return precommitted_parts.size(); }
bool isEmpty() const { return precommitted_parts.empty(); }
@ -601,11 +597,6 @@ public:
DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr);
/// Remove parts from working set immediately (without wait for background
/// process). Transfer part state to temporary. Have very limited usage only
/// for new parts which aren't already present in table.
void removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove);
/// Removes parts from the working set parts.
/// Parts in add must already be in data_parts with PreActive, Active, or Outdated states.
/// If clear_without_timeout is true, the parts will be deleted at once, or during the next call to

View File

@ -543,6 +543,20 @@ bool ReplicatedMergeTreeSinkImpl<false>::writeExistingPart(MergeTreeData::Mutabl
}
}
template<bool async_insert>
std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::detectConflictsInAsyncBlockIDs(const std::vector<String> & ids)
{
auto conflict_block_ids = storage.async_block_ids_cache.detectConflicts(ids, cache_version);
if (!conflict_block_ids.empty())
{
cache_version = 0;
}
return conflict_block_ids;
}
namespace
{
bool contains(const std::vector<String> & block_ids, const String & path)
{
for (const auto & local_block_id : block_ids)
@ -556,6 +570,56 @@ bool contains(const String & block_ids, const String & path)
return block_ids == path;
}
String getBlockIdPath(const String & zookeeper_path, const String & block_id)
{
if (!block_id.empty())
return zookeeper_path + "/blocks/" + block_id;
return String();
}
std::vector<String> getBlockIdPath(const String & zookeeper_path, const std::vector<String> & block_id)
{
std::vector<String> result;
result.reserve(block_id.size());
for (const auto & single_block_id : block_id)
result.push_back(zookeeper_path + "/async_blocks/" + single_block_id);
return result;
}
}
struct CommitRetryContext
{
enum Stages
{
LOCK_BLOCK,
COMMIT,
DUPLICATED_PART,
UNCERTAIN_COMMIT,
SUCCESS,
ERROR
};
/// Possible ways:
/// LOCK_BLOCK -> COMMIT
/// LOCK_BLOCK -> DUPLICATED_PART
/// DUPLICATED_PART -> SUCCESS
/// COMMIT -> SUCCESS
/// COMMIT -> DUPLICATED_PART
/// COMMIT -> UNCERTAIN_COMMIT
/// UNCERTAIN_COMMIT -> SUCCESS
/// * -> ERROR
Stages stage = LOCK_BLOCK;
String actual_part_name;
std::optional<EphemeralLockInZooKeeper> block_number_lock;
size_t block_number = 0;
std::vector<String> conflict_block_ids;
bool part_was_deduplicated = false;
Coordination::Error unsertain_keeper_error = Coordination::Error::ZOK;
};
template<bool async_insert>
std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
@ -570,60 +634,81 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
///
/// metadata_snapshot->check(part->getColumns());
const String temporary_part_relative_path = part->getDataPartStorage().getPartDirectory();
auto block_id_path = getBlockIdPath(storage.zookeeper_path, block_id);
/// for retries due to keeper error
bool part_committed_locally_but_zookeeper = false;
bool part_was_deduplicated = false;
Coordination::Error write_part_info_keeper_error = Coordination::Error::ZOK;
std::vector<String> conflict_block_ids;
CommitRetryContext retry_context;
ZooKeeperRetriesControl retries_ctl("commitPart", zookeeper_retries_info, context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
zookeeper->setKeeper(storage.getZooKeeper());
if (storage.is_readonly)
{
/// stop retries if in shutdown
if (storage.shutdown_prepared_called)
throw Exception(
ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode due to shutdown: replica_path={}", storage.replica_path);
/// When we attach existing parts it's okay to be in read-only mode
/// For example during RESTORE REPLICA.
if (!writing_existing_part)
auto resolve_duplicate_stage = [&] () -> bool
{
retries_ctl.setUserError(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode: replica_path={}", storage.replica_path);
return;
}
}
if (retries_ctl.isRetry())
if constexpr (async_insert)
{
/// If we are retrying, check if last iteration was actually successful,
/// we could get network error on committing part to zk
/// but the operation could be completed by zk server
/// If this flag is true, then part is in Active state, and we'll not retry anymore
/// we only check if part was committed to zk and return success or failure correspondingly
/// Note: if commit to zk failed then cleanup thread will mark the part as Outdated later
if (part_committed_locally_but_zookeeper)
{
/// check that info about the part was actually written in zk
if (zookeeper->exists(fs::path(storage.replica_path) / "parts" / part->name))
{
LOG_DEBUG(log, "Part was successfully committed on previous iteration: part_id={}", part->name);
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Conflict block ids and block number lock should not "
"be empty at the same time for async inserts");
}
else
{
/// This block was already written to some replica. Get the part name for it.
/// Note: race condition with DROP PARTITION operation is possible. User will get "No node" exception and it is Ok.
retry_context.actual_part_name = zookeeper->get(block_id_path);
bool exists_locally = bool(storage.getActiveContainingPart(retry_context.actual_part_name));
if (exists_locally)
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
LOG_INFO(log, "Block with ID {} {} as part {}; ignoring it.",
block_id,
exists_locally ? "already exists locally" : "already exists on other replicas",
retry_context.actual_part_name);
retry_context.part_was_deduplicated = true;
retry_context.stage = CommitRetryContext::SUCCESS;
return false;
}
};
auto resolve_uncertain_commit_stage = [&] ()
{
/// check that info about the part was actually written in zk
if (zookeeper->exists(fs::path(storage.replica_path) / "parts" / retry_context.actual_part_name))
{
LOG_DEBUG(log, "Part was successfully committed on previous iteration: part_id={}", part->name);
retry_context.stage = CommitRetryContext::SUCCESS;
return false;
}
else
{
/// if all retries will be exhausted by accessing zookeeper on fresh retry -> we'll add committed part to queue in the action
/// here lambda capture part name, it's ok since we'll not generate new one for this insert,
retries_ctl.actionAfterLastFailedRetry(
[&] ()
{
storage.enqueuePartForCheck(retry_context.actual_part_name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
});
retries_ctl.setUserError(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Insert failed due to zookeeper error. Please retry. Reason: {}",
write_part_info_keeper_error);
}
retry_context.unsertain_keeper_error);
retries_ctl.stopRetries();
return;
return false;
}
};
auto lock_part_number_stage = [&] ()
{
if constexpr (async_insert)
{
/// prefilter by cache
retry_context.conflict_block_ids = detectConflictsInAsyncBlockIDs(block_id);
if (!retry_context.conflict_block_ids.empty())
{
retry_context.stage = CommitRetryContext::ERROR;
return false;
}
}
@ -632,91 +717,49 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
/// Also, make deduplication check. If a duplicate is detected, no nodes are created.
/// Allocate new block number and check for duplicates
bool deduplicate_block = !block_id.empty();
BlockIDsType block_id_path ;
if constexpr (async_insert)
{
/// prefilter by cache
conflict_block_ids = storage.async_block_ids_cache.detectConflicts(block_id, cache_version);
if (!conflict_block_ids.empty())
{
cache_version = 0;
return;
}
for (const auto & single_block_id : block_id)
block_id_path.push_back(storage.zookeeper_path + "/async_blocks/" + single_block_id);
}
else if (deduplicate_block)
block_id_path = storage.zookeeper_path + "/blocks/" + block_id;
auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
retry_context.block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); /// 1 RTT
ThreadFuzzer::maybeInjectSleep();
/// Prepare transaction to ZooKeeper
/// It will simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
Coordination::Requests ops;
Int64 block_number = 0;
size_t block_unlock_op_idx = std::numeric_limits<size_t>::max();
if (block_number_lock)
if (!retry_context.block_number_lock)
{
retry_context.stage = CommitRetryContext::DUPLICATED_PART;
return true;
}
if constexpr (async_insert)
{
/// The truth is that we always get only one path from block_number_lock.
/// This is a restriction of Keeper. Here I would like to use vector because
/// I wanna keep extensibility for future optimization, for instance, using
/// cache to resolve conflicts in advance.
String conflict_path = block_number_lock->getConflictPath();
String conflict_path = retry_context.block_number_lock->getConflictPath();
if (!conflict_path.empty())
{
LOG_TRACE(log, "Cannot get lock, the conflict path is {}", conflict_path);
conflict_block_ids.push_back(conflict_path);
return;
retry_context.conflict_block_ids.push_back(conflict_path);
retry_context.stage = CommitRetryContext::ERROR;
return false;
}
}
block_number = block_number_lock->getNumber();
/// Set part attributes according to part_number. Prepare an entry for log.
retry_context.block_number = retry_context.block_number_lock->getNumber();
part->info.min_block = block_number;
part->info.max_block = block_number;
/// Set part attributes according to part_number.
part->info.min_block = retry_context.block_number;
part->info.max_block = retry_context.block_number;
part->info.level = 0;
part->info.mutation = 0;
part->setName(part->getNewName(part->info));
retry_context.actual_part_name = part->name;
StorageReplicatedMergeTree::LogEntry log_entry;
retry_context.stage = CommitRetryContext::COMMIT;
return true;
};
if (is_attach)
auto get_quorum_ops = [&] (Coordination::Requests & ops)
{
log_entry.type = StorageReplicatedMergeTree::LogEntry::ATTACH_PART;
/// We don't need to involve ZooKeeper to obtain checksums as by the time we get
/// MutableDataPartPtr here, we already have the data thus being able to
/// calculate the checksums.
log_entry.part_checksum = part->checksums.getTotalChecksumHex();
}
else
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
log_entry.create_time = time(nullptr);
log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part->name;
/// TODO maybe add UUID here as well?
log_entry.quorum = getQuorumSize(replicas_num);
log_entry.new_part_format = part->getFormat();
if constexpr (!async_insert)
log_entry.block_id = block_id;
ops.emplace_back(zkutil::makeCreateRequest(
storage.zookeeper_path + "/log/log-",
log_entry.toString(),
zkutil::CreateMode::PersistentSequential));
/// Deletes the information that the block number is used for writing.
block_unlock_op_idx = ops.size();
block_number_lock->getUnlockOp(ops);
/** If we need a quorum - create a node in which the quorum is monitored.
* (If such a node already exists, then someone has managed to make another quorum record at the same time,
* but for it the quorum has not yet been reached.
@ -736,8 +779,9 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
* which indicates that the quorum has been reached.
*/
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
if (quorum_parallel)
quorum_info.status_path = storage.zookeeper_path + "/quorum/parallel/" + part->name;
quorum_info.status_path = storage.zookeeper_path + "/quorum/parallel/" + retry_context.actual_part_name;
ops.emplace_back(
zkutil::makeCreateRequest(
@ -760,59 +804,65 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
storage.replica_path + "/host",
quorum_info.host_node_version));
}
}
else if constexpr (!async_insert)
};
auto get_logs_ops = [&] (Coordination::Requests & ops)
{
/// This block was already written to some replica. Get the part name for it.
/// Note: race condition with DROP PARTITION operation is possible. User will get "No node" exception and it is Ok.
String existing_part_name = zookeeper->get(storage.zookeeper_path + "/blocks/" + block_id);
ReplicatedMergeTreeLogEntryData log_entry;
bool exists_locally = bool(storage.getActiveContainingPart(existing_part_name));
part_was_deduplicated = true;
if (exists_locally)
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
if (isQuorumEnabled())
if (is_attach)
{
if (quorum_parallel)
quorum_info.status_path = storage.zookeeper_path + "/quorum/parallel/" + existing_part_name;
log_entry.type = ReplicatedMergeTreeLogEntry::ATTACH_PART;
LOG_INFO(log, "Block with ID {} {} as part {}; ignoring it, but checking quorum.",
block_id,
exists_locally ? "already exists locally" : "already exists on other replicas",
existing_part_name);
waitForQuorum(zookeeper, existing_part_name, quorum_info.status_path, quorum_info.is_active_node_version, replicas_num);
/// We don't need to involve ZooKeeper to obtain checksums as by the time we get
/// MutableDataPartPtr here, we already have the data thus being able to
/// calculate the checksums.
log_entry.part_checksum = part->checksums.getTotalChecksumHex();
}
else
log_entry.type = ReplicatedMergeTreeLogEntry::GET_PART;
log_entry.create_time = time(nullptr);
log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part->name;
/// TODO maybe add UUID here as well?
log_entry.quorum = getQuorumSize(replicas_num);
log_entry.new_part_format = part->getFormat();
if constexpr (!async_insert)
log_entry.block_id = block_id;
/// Prepare an entry for log.
ops.emplace_back(zkutil::makeCreateRequest(
storage.zookeeper_path + "/log/log-",
log_entry.toString(),
zkutil::CreateMode::PersistentSequential));
};
auto commit_new_part_stage = [&] ()
{
LOG_INFO(log, "Block with ID {} {} as part {}; ignoring it.",
block_id,
exists_locally ? "already exists locally" : "already exists on other replicas",
existing_part_name);
}
chassert(retry_context.block_number_lock.has_value());
return;
}
/// async_insert will never return null lock, because they need the conflict path.
else
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Conflict block ids and block number lock should not "
"be empty at the same time for async inserts");
/// Prepare transaction to ZooKeeper
/// It will simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
Coordination::Requests ops;
chassert(!async_insert || block_number_lock);
/// Deletes the information that the block number is used for writing.
size_t block_unlock_op_idx = ops.size();
retry_context.block_number_lock->getUnlockOp(ops);
size_t shared_lock_op_idx_first = ops.size();
get_logs_ops(ops);
get_quorum_ops(ops);
size_t shared_lock_ops_id_begin = ops.size();
storage.getLockSharedDataOps(*part, zookeeper, /*replace_zero_copy_lock*/ false, {}, ops);
size_t shared_lock_op_idx_last = ops.size();
size_t shared_lock_op_id_end = ops.size();
storage.getCommitPartOps(ops, part, block_id_path);
/// It's important to create it outside of lock scope because
/// otherwise it can lock parts in destructor and deadlock is possible.
MergeTreeData::Transaction transaction(storage, NO_TRANSACTION_RAW); /// If you can not add a part to ZK, we'll remove it back from the working set.
try
{
auto lock = storage.lockParts();
@ -820,8 +870,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::DUPLICATE_DATA_PART
|| e.code() == ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
if (e.code() == ErrorCodes::DUPLICATE_DATA_PART || e.code() == ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Part with name {} is already written by concurrent request."
@ -846,6 +895,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
Coordination::Responses responses;
Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
if (multi_code == Coordination::Error::ZOK)
{
auto sleep_before_commit_local_part_in_replicated_table_ms = storage.getSettings()->sleep_before_commit_local_part_in_replicated_table_ms;
@ -858,45 +908,25 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
part->new_part_was_committed_to_zookeeper_after_rename_on_disk = true;
transaction.commit();
storage.merge_selecting_task->schedule();
/// Lock nodes have been already deleted, do not delete them in destructor
block_number_lock->assumeUnlocked();
retry_context.block_number_lock->assumeUnlocked();
if (isQuorumEnabled())
waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_version, replicas_num);
return;
retry_context.stage = CommitRetryContext::SUCCESS;
return false;
}
if (Coordination::isHardwareError(multi_code))
{
write_part_info_keeper_error = multi_code;
retry_context.unsertain_keeper_error = multi_code;
/** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
*/
transaction.commit();
/// Setting this flag is point of no return
/// On next retry, we'll just check if actually operation succeed or failed
/// and return ok or error correspondingly
part_committed_locally_but_zookeeper = true;
/// if all retries will be exhausted by accessing zookeeper on fresh retry -> we'll add committed part to queue in the action
/// here lambda capture part name, it's ok since we'll not generate new one for this insert,
/// see comments around 'part_committed_locally_but_zookeeper' flag
retries_ctl.actionAfterLastFailedRetry(
[&my_storage = storage, part_name = part->name]
{
my_storage.enqueuePartForCheck(part_name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
});
/// We do not know whether or not data has been inserted.
retries_ctl.setUserError(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown status, client must retry. Reason: {}",
multi_code);
return;
retry_context.stage = CommitRetryContext::UNCERTAIN_COMMIT;
return true;
}
if (!Coordination::isUserError(multi_code))
@ -905,27 +935,15 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
throw Exception(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Unexpected ZooKeeper error while adding block {} with ID '{}': {}",
block_number,
retry_context.block_number,
toString(block_id),
multi_code);
}
auto failed_op_idx = zkutil::getFailedOpIndex(multi_code, responses);
String failed_op_path = ops[zkutil::getFailedOpIndex(multi_code, responses)] ->getPath();
String failed_op_path = ops[failed_op_idx]->getPath();
if (multi_code == Coordination::Error::ZNONODE && zkutil::getFailedOpIndex(multi_code, responses) == block_unlock_op_idx)
{
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED,
"Insert query (for block {}) was canceled by concurrent ALTER PARTITION or TRUNCATE", block_number_lock->getPath());
}
else if (shared_lock_op_idx_first <= failed_op_idx && failed_op_idx < shared_lock_op_idx_last)
{
transaction.rollback();
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Creating shared lock for part {} has failed with error: {}. It's a bug.",
part->name, multi_code);
}
else if (multi_code == Coordination::Error::ZNODEEXISTS && deduplicate_block && contains(block_id_path, failed_op_path))
if (multi_code == Coordination::Error::ZNODEEXISTS && !block_id_path.empty() && contains(block_id_path, failed_op_path))
{
/// Block with the same id have just appeared in table (or other replica), rollback the insertion.
LOG_INFO(log, "Block with ID {} already exists (it was just appeared) for part {}. Ignore it.",
@ -935,15 +953,30 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
if constexpr (async_insert)
{
conflict_block_ids = std::vector<String>({failed_op_path});
LOG_TRACE(log, "conflict when committing, the conflict block ids are {}", toString(conflict_block_ids));
retry_context.conflict_block_ids = std::vector<String>({failed_op_path});
LOG_TRACE(log, "conflict when committing, the conflict block ids are {}",
toString(retry_context.conflict_block_ids));
}
return;
retry_context.stage = CommitRetryContext::DUPLICATED_PART;
return true;
}
else if (multi_code == Coordination::Error::ZNONODE && failed_op_idx == block_unlock_op_idx)
{
transaction.rollback();
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED,
"Insert query (for block {}) was canceled by concurrent ALTER PARTITION or TRUNCATE",
retry_context.block_number_lock->getPath());
}
else if (shared_lock_ops_id_begin <= failed_op_idx && failed_op_idx < shared_lock_op_id_end)
{
transaction.rollback();
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Creating shared lock for part {} has failed with error: {}. It's a bug.",
part->name, multi_code);
}
else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
{
transaction.rollback();
throw Exception(ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE, "Another quorum insert has been already started");
}
@ -953,19 +986,96 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
throw Exception(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Unexpected logical error while adding block {} with ID '{}': {}, path {}",
block_number,
retry_context.block_number,
toString(block_id),
multi_code,
failed_op_path);
}
};
auto stage_switcher = [&] ()
{
try
{
switch (retry_context.stage)
{
case CommitRetryContext::LOCK_BLOCK:
return lock_part_number_stage();
case CommitRetryContext::COMMIT:
return commit_new_part_stage();
case CommitRetryContext::DUPLICATED_PART:
return resolve_duplicate_stage();
case CommitRetryContext::UNCERTAIN_COMMIT:
return resolve_uncertain_commit_stage();
case CommitRetryContext::SUCCESS:
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Operation is already succeed.");
case CommitRetryContext::ERROR:
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Operation is already in error state.");
}
}
catch (const zkutil::KeeperException &)
{
throw;
}
catch (DB::Exception &)
{
retry_context.stage = CommitRetryContext::ERROR;
throw;
}
};
retries_ctl.retryLoop([&]()
{
zookeeper->setKeeper(storage.getZooKeeper());
if (storage.is_readonly)
{
/// stop retries if in shutdown
if (storage.shutdown_prepared_called)
throw Exception(
ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode due to shutdown: replica_path={}", storage.replica_path);
/// When we attach existing parts it's okay to be in read-only mode
/// For example during RESTORE REPLICA.
if (!writing_existing_part)
{
retries_ctl.setUserError(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode: replica_path={}", storage.replica_path);
return;
}
}
while (stage_switcher())
{
// no op;
}
},
[&zookeeper]() { zookeeper->cleanupEphemeralNodes(); });
if (!conflict_block_ids.empty())
return {conflict_block_ids, part_was_deduplicated};
if (!retry_context.conflict_block_ids.empty())
return {retry_context.conflict_block_ids, false};
return {conflict_block_ids, part_was_deduplicated};
if (retry_context.stage == CommitRetryContext::SUCCESS)
{
storage.merge_selecting_task->schedule();
if (isQuorumEnabled())
{
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
if (quorum_parallel)
quorum_info.status_path = storage.zookeeper_path + "/quorum/parallel/" + retry_context.actual_part_name;
waitForQuorum(zookeeper, retry_context.actual_part_name, quorum_info.status_path, quorum_info.is_active_node_version, replicas_num);
}
}
return {retry_context.conflict_block_ids, retry_context.part_was_deduplicated};
}
template<bool async_insert>

View File

@ -70,6 +70,8 @@ public:
struct DelayedChunk;
private:
std::vector<String> detectConflictsInAsyncBlockIDs(const std::vector<String> & ids);
using BlockIDsType = std::conditional_t<async_insert, std::vector<String>, String>;
ZooKeeperRetriesInfo zookeeper_retries_info;
@ -95,6 +97,7 @@ private:
size_t replicas_num,
bool writing_existing_part);
/// Wait for quorum to be satisfied on path (quorum_path) form part (part_name)
/// Also checks that replica still alive.
void waitForQuorum(
@ -125,7 +128,6 @@ private:
bool last_block_is_duplicate = false;
UInt64 num_blocks_processed = 0;
using Logger = Poco::Logger;
Poco::Logger * log;
ContextPtr context;