Fix restoring parts while a storage is readonly.

This commit is contained in:
Vitaly Baranov 2024-03-30 00:14:03 +01:00
parent a642f4d3ec
commit b0865cf2e3
3 changed files with 18 additions and 12 deletions

View File

@ -128,7 +128,8 @@ ReplicatedMergeTreeSinkImpl<async_insert>::ReplicatedMergeTreeSinkImpl(
bool deduplicate_,
bool majority_quorum,
ContextPtr context_,
bool is_attach_)
bool is_attach_,
bool allow_attach_while_readonly_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
@ -136,6 +137,7 @@ ReplicatedMergeTreeSinkImpl<async_insert>::ReplicatedMergeTreeSinkImpl(
, quorum_timeout_ms(quorum_timeout_ms_)
, max_parts_per_block(max_parts_per_block_)
, is_attach(is_attach_)
, allow_attach_while_readonly(allow_attach_while_readonly_)
, quorum_parallel(quorum_parallel_)
, deduplicate(deduplicate_)
, log(getLogger(storage.getLogName() + " (Replicated OutputStream)"))
@ -440,7 +442,7 @@ void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithF
try
{
bool deduplicated = commitPart(zookeeper, part, partition.block_id, delayed_chunk->replicas_num, false).second;
bool deduplicated = commitPart(zookeeper, part, partition.block_id, delayed_chunk->replicas_num).second;
last_block_is_duplicate = last_block_is_duplicate || deduplicated;
@ -485,7 +487,7 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
while (true)
{
partition.temp_part.finalize();
auto conflict_block_ids = commitPart(zookeeper, partition.temp_part.part, partition.block_id, delayed_chunk->replicas_num, false).first;
auto conflict_block_ids = commitPart(zookeeper, partition.temp_part.part, partition.block_id, delayed_chunk->replicas_num).first;
if (conflict_block_ids.empty())
{
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
@ -556,7 +558,7 @@ bool ReplicatedMergeTreeSinkImpl<false>::writeExistingPart(MergeTreeData::Mutabl
{
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
String block_id = deduplicate ? fmt::format("{}_{}", part->info.partition_id, part->checksums.getTotalChecksumHex()) : "";
bool deduplicated = commitPart(zookeeper, part, block_id, replicas_num, /* writing_existing_part */ true).second;
bool deduplicated = commitPart(zookeeper, part, block_id, replicas_num).second;
/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
@ -647,8 +649,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
const ZooKeeperWithFaultInjectionPtr & zookeeper,
MergeTreeData::MutableDataPartPtr & part,
const BlockIDsType & block_id,
size_t replicas_num,
bool writing_existing_part)
size_t replicas_num)
{
/// It is possible that we alter a part with different types of source columns.
/// In this case, if column was not altered, the result type will be different with what we have in metadata.
@ -800,7 +801,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
/// When we attach existing parts it's okay to be in read-only mode
/// For example during RESTORE REPLICA.
if (!writing_existing_part)
if (!allow_attach_while_readonly)
{
retries_ctl.setUserError(
Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode: replica_path={}", storage.replica_path));

View File

@ -45,7 +45,8 @@ public:
ContextPtr context_,
// special flag to determine the ALTER TABLE ATTACH PART without the query context,
// needed to set the special LogEntryType::ATTACH_PART
bool is_attach_ = false);
bool is_attach_ = false,
bool allow_attach_while_readonly_ = false);
~ReplicatedMergeTreeSinkImpl() override;
@ -93,8 +94,7 @@ private:
const ZooKeeperWithFaultInjectionPtr & zookeeper,
MergeTreeData::MutableDataPartPtr & part,
const BlockIDsType & block_id,
size_t replicas_num,
bool writing_existing_part);
size_t replicas_num);
/// Wait for quorum to be satisfied on path (quorum_path) form part (part_name)
@ -123,6 +123,7 @@ private:
UInt64 cache_version = 0;
bool is_attach = false;
bool allow_attach_while_readonly = false;
bool quorum_parallel = false;
const bool deduplicate = true;
bool last_block_is_duplicate = false;

View File

@ -6512,7 +6512,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
/// TODO Allow to use quorum here.
ReplicatedMergeTreeSink output(*this, metadata_snapshot, /* quorum */ 0, /* quorum_timeout_ms */ 0, /* max_parts_per_block */ 0,
/* quorum_parallel */ false, query_context->getSettingsRef().insert_deduplicate,
/* majority_quorum */ false, query_context, /*is_attach*/true);
/* majority_quorum */ false, query_context, /* is_attach */ true, /* allow_attach_while_readonly */ true);
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
@ -10500,7 +10500,11 @@ void StorageReplicatedMergeTree::restoreDataFromBackup(RestorerFromBackup & rest
void StorageReplicatedMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
auto sink = std::make_shared<ReplicatedMergeTreeSink>(*this, metadata_snapshot, 0, 0, 0, false, false, false, getContext(), /*is_attach*/true);
auto sink = std::make_shared<ReplicatedMergeTreeSink>(
*this, metadata_snapshot, /* quorum */ 0, /* quorum_timeout_ms */ 0, /* max_parts_per_block */ 0, /* quorum_parallel */ false,
/* deduplicate */ false, /* majority_quorum */ false, getContext(), /* is_attach */ true, /* allow_attach_while_readonly */ false);
for (auto part : parts)
sink->writeExistingPart(part);
}