diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ebf1e43ca04..26348e9b50d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1352,29 +1352,25 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) } bool do_fetch = false; + if (entry.type == LogEntry::GET_PART) { + /// Before fetching the part, we need to look for it in the detached/ folder by checksum. + /// If we are lucky, we do not copy the part over the network. + // TODO do_fetch = true; } else if (entry.type == LogEntry::MERGE_PARTS) - { /// Sometimes it's better to fetch merged part instead of merge /// For example when we don't have all source parts for merge do_fetch = !tryExecuteMerge(entry); - } else if (entry.type == LogEntry::MUTATE_PART) - { /// Sometimes it's better to fetch mutated part instead of merge do_fetch = !tryExecutePartMutation(entry); - } else if (entry.type == LogEntry::ALTER_METADATA) - { return executeMetadataAlter(entry); - } else - { throw Exception("Unexpected log entry type: " + toString(static_cast(entry.type)), ErrorCodes::LOGICAL_ERROR); - } if (do_fetch) return executeFetch(entry); @@ -4460,13 +4456,19 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition( PartsTemporaryRename renamed_parts(*this, "detached/"); MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts); - ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, false); /// TODO Allow to use quorum here. + /// TODO Allow to use quorum here. + ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, false); + for (size_t i = 0; i < loaded_parts.size(); ++i) { - String old_name = loaded_parts[i]->name; + const String old_name = loaded_parts[i]->name; + output.writeExistingPart(loaded_parts[i]); + renamed_parts.old_and_new_names[i].first.clear(); + LOG_DEBUG(log, "Attached part {} as {}", old_name, loaded_parts[i]->name); + results.push_back(PartitionCommandResultInfo{ .partition_id = loaded_parts[i]->info.partition_id, .part_name = loaded_parts[i]->name, diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index cf36cf82fc9..9c6484e275f 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -39,13 +39,14 @@ namespace DB * - the structure of the table (/metadata, /columns) * - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...); * - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host); - * - select the leader replica (/leader_election) - these are the replicas that assigning merges, mutations and partition manipulations + * - the leader replica election (/leader_election) - these are the replicas that assign merges, mutations + * and partition manipulations. * (after ClickHouse version 20.5 we allow multiple leaders to act concurrently); * - a set of parts of data on each replica (/replicas/replica_name/parts); * - list of the last N blocks of data with checksum, for deduplication (/blocks); * - the list of incremental block numbers (/block_numbers) that we are about to insert, * to ensure the linear order of data insertion and data merge only on the intervals in this sequence; - * - coordinates writes with quorum (/quorum). + * - coordinate writes with quorum (/quorum). * - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations). * See comments in StorageReplicatedMergeTree::mutate() for details. */ @@ -65,6 +66,8 @@ namespace DB * - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check (at start - checkParts, while running - searchForMissingPart), * actions are put on GET from other replicas; * + * TODO Update the GET part after rewriting the code (search locally). + * * The replica to which INSERT was made in the queue will also have an entry of the GET of this data. * Such an entry is considered to be executed as soon as the queue handler sees it. *