#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { /** The engine that uses the merge tree (see MergeTreeData) and is replicated through ZooKeeper. * * ZooKeeper is used for the following things: * - the structure of the table (/metadata, /columns) * - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...); * - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host); * - the leader replica election (/leader_election) - these are the replicas that assign merges, mutations * and partition manipulations. * (after ClickHouse version 20.5 we allow multiple leaders to act concurrently); * - a set of parts of data on each replica (/replicas/replica_name/parts); * - list of the last N blocks of data with checksum, for deduplication (/blocks); * - the list of incremental block numbers (/block_numbers) that we are about to insert, * to ensure the linear order of data insertion and data merge only on the intervals in this sequence; * - coordinate writes with quorum (/quorum). * - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations). * See comments in StorageReplicatedMergeTree::mutate() for details. */ /** The replicated tables have a common log (/log/log-...). * Log - a sequence of entries (LogEntry) about what to do. * Each entry is one of: * - normal data insertion (GET), * - data insertion with a possible attach from local data (ATTACH), * - merge (MERGE), * - delete the partition (DROP). * * Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...) * and then executes them (queueTask). * Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry). * In addition, the records in the queue can be generated independently (not from the log), in the following cases: * - when creating a new replica, actions are put on GET from other replicas (createReplica); * - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check * (at start - checkParts, while running - searchForMissingPart), actions are put on GET from other replicas; * * The replica to which INSERT was made in the queue will also have an entry of the GET of this data. * Such an entry is considered to be executed as soon as the queue handler sees it. * * The log entry has a creation time. This time is generated by the clock of server that created entry * - the one on which the corresponding INSERT or ALTER query came. * * For the entries in the queue that the replica made for itself, * as the time will take the time of creation the appropriate part on any of the replicas. */ class ZooKeeperWithFaultInjection; using ZooKeeperWithFaultInjectionPtr = std::shared_ptr; class StorageReplicatedMergeTree final : public MergeTreeData { public: /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table. */ StorageReplicatedMergeTree( const String & zookeeper_path_, const String & replica_name_, bool attach, const StorageID & table_id_, const String & relative_data_path_, const StorageInMemoryMetadata & metadata_, ContextMutablePtr context_, const String & date_column_name, const MergingParams & merging_params_, std::unique_ptr settings_, bool has_force_restore_data_flag, RenamingRestrictions renaming_restrictions_, bool need_check_structure); void startup() override; /// To many shutdown methods.... /// /// Partial shutdown called if we loose connection to zookeeper. /// Table can also recover after partial shutdown and continue /// to work. This method can be called regularly. void partialShutdown(); /// These two methods are called during final table shutdown (DROP/DETACH/overall server shutdown). /// The shutdown process is split into two methods to make it more soft and fast. In database shutdown() /// looks like: /// for (table : tables) /// table->flushAndPrepareForShutdown() /// /// for (table : tables) /// table->shutdown() /// /// So we stop producing all the parts first for all tables (fast operation). And after we can wait in shutdown() /// for other replicas to download parts. /// /// In flushAndPrepareForShutdown we cancel all part-producing operations: /// merges, fetches, moves and so on. If it wasn't called before shutdown() -- shutdown() will /// call it (defensive programming). void flushAndPrepareForShutdown() override; /// In shutdown we completely terminate table -- remove /// is_active node and interserver handler. Also optionally /// wait until other replicas will download some parts from our replica. void shutdown(bool is_drop) override; ~StorageReplicatedMergeTree() override; static String getDefaultZooKeeperPath(const Poco::Util::AbstractConfiguration & config); static String getDefaultReplicaName(const Poco::Util::AbstractConfiguration & config); std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree"; } bool supportsParallelInsert() const override { return true; } bool supportsReplication() const override { return true; } bool supportsDeduplication() const override { return true; } void read( QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, size_t num_streams) override; std::optional totalRows(const Settings & settings) const override; std::optional totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context) const override; std::optional totalBytes(const Settings & settings) const override; std::optional totalBytesUncompressed(const Settings & settings) const override; SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override; std::optional distributedWrite(const ASTInsertQuery & /*query*/, ContextPtr /*context*/) override; bool optimize( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & partition, bool final, bool deduplicate, const Names & deduplicate_by_columns, bool cleanup, ContextPtr query_context) override; void alter(const AlterCommands & commands, ContextPtr query_context, AlterLockHolder & table_lock_holder) override; void mutate(const MutationCommands & commands, ContextPtr context) override; void waitMutation(const String & znode_name, size_t mutations_sync) const; std::vector getMutationsStatus() const override; CancellationCode killMutation(const String & mutation_id) override; bool hasLightweightDeletedMask() const override; /** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper. */ void drop() override; void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr query_context, TableExclusiveLockHolder &) override; void checkTableCanBeRenamed(const StorageID & new_name) const override; void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override; void checkTableCanBeDropped([[ maybe_unused ]] ContextPtr query_context) const override; ActionLock getActionLock(StorageActionBlockType action_type) override; void onActionLockRemove(StorageActionBlockType action_type) override; /// Wait till replication queue's current last entry is processed or till size becomes 0 /// If timeout is exceeded returns false bool waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode); /// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK. void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true); using LogEntriesData = std::vector; void getQueue(LogEntriesData & res, String & replica_name); std::vector getPartMovesBetweenShardsEntries(); /// Get replica delay relative to current time. time_t getAbsoluteDelay() const; /// If the absolute delay is greater than min_relative_delay_to_measure, /// will also calculate the difference from the unprocessed time of the best replica. /// NOTE: Will communicate to ZooKeeper to calculate relative delay. void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay); /// Add a part to the queue of parts whose data you want to check in the background thread. void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0); DataValidationTasksPtr getCheckTaskList(const CheckTaskFilter & check_task_filter, ContextPtr context) override; std::optional checkDataNext(DataValidationTasksPtr & check_task_list) override; /// Checks ability to use granularity bool canUseAdaptiveGranularity() const override; /// Returns the default path to the table in ZooKeeper. /// It's used if not set in engine's arguments while creating a replicated table. static String getDefaultReplicaPath(const ContextPtr & context_); /// Returns the default replica name in ZooKeeper. /// It's used if not set in engine's arguments while creating a replicated table. static String getDefaultReplicaName(const ContextPtr & context_); /// Modify a CREATE TABLE query to make a variant which must be written to a backup. void adjustCreateQueryForBackup(ASTPtr & create_query) const override; /// Makes backup entries to backup the data of the storage. void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional & partitions) override; /// Extract data from the backup and put it to the storage. void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional & partitions) override; /** Remove a specific replica from zookeeper. */ static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger, MergeTreeSettingsPtr table_settings = nullptr, std::optional * has_metadata_out = nullptr); void dropReplica(const String & drop_zookeeper_path, const String & drop_replica, Poco::Logger * logger); /// Removes table from ZooKeeper after the last replica was dropped static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger); /// Schedules job to execute in background pool (merge, mutate, drop range and so on) bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override; /// Checks that fetches are not disabled with action blocker and pool for fetches /// is not overloaded bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const; /// Fetch part only when it stored on shared storage like S3 MutableDataPartPtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path); /// Lock part in zookeeper for use shared data in several nodes void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional hardlinked_files) const override; void lockSharedData( const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper, bool replace_existing_lock, std::optional hardlinked_files) const; void getLockSharedDataOps( const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper, bool replace_existing_lock, std::optional hardlinked_files, Coordination::Requests & requests) const; zkutil::EphemeralNodeHolderPtr lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const; /// Unlock shared data part in zookeeper /// Return true if data unlocked /// Return false if data is still used by another node std::pair unlockSharedData(const IMergeTreeDataPart & part) const override; std::pair unlockSharedData(const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper) const; /// Unlock shared data part in zookeeper by part id /// Return true if data unlocked /// Return false if data is still used by another node static std::pair unlockSharedDataByID( String part_id, const String & table_uuid, const MergeTreePartInfo & part_info, const String & replica_name_, const std::string & disk_type, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger, const String & zookeeper_path_old, MergeTreeDataFormatVersion data_format_version); /// Fetch part only if some replica has it on shared storage like S3 MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override; /// Get best replica having this partition on a same type remote disk String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const; inline const String & getReplicaName() const { return replica_name; } /// Restores table metadata if ZooKeeper lost it. /// Used only on restarted readonly replicas (not checked). All active (Active) parts are moved to detached/ /// folder and attached. Parts in all other states are just moved to detached/ folder. void restoreMetadataInZooKeeper(); /// Get throttler for replicated fetches ThrottlerPtr getFetchesThrottler() const { return replicated_fetches_throttler; } /// Get throttler for replicated sends ThrottlerPtr getSendsThrottler() const { return replicated_sends_throttler; } bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name); // Return default or custom zookeeper name for table const String & getZooKeeperName() const { return zookeeper_name; } const String & getZooKeeperPath() const { return zookeeper_path; } // Return table id, common for different replicas String getTableSharedID() const override; std::map getUnfinishedMutationCommands() const override; /// Returns the same as getTableSharedID(), but extracts it from a create query. static std::optional tryGetTableSharedIDFromCreateQuery(const IAST & create_query, const ContextPtr & global_context); static const String & getDefaultZooKeeperName() { return default_zookeeper_name; } /// Check if there are new broken disks and enqueue part recovery tasks. void checkBrokenDisks(); static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid, const String & replica_name, const String & zookeeper_path, const ContextPtr & local_context, const zkutil::ZooKeeperPtr & zookeeper); bool canUseZeroCopyReplication() const; bool isTableReadOnly () { return is_readonly; } /// Get a sequential consistent view of current parts. ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const; void addLastSentPart(const MergeTreePartInfo & info); /// Wait required amount of milliseconds to give other replicas a chance to /// download unique parts from our replica using ShutdownDeadline = std::chrono::time_point; void waitForUniquePartsToBeFetchedByOtherReplicas(ShutdownDeadline shutdown_deadline); private: std::atomic_bool are_restoring_replica {false}; /// Delete old parts from disk and from ZooKeeper. Returns the number of removed parts size_t clearOldPartsAndRemoveFromZK(); void clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKeeperPtr zookeeper, DataPartsVector && parts); template friend class ReplicatedMergeTreeSinkImpl; friend class ReplicatedMergeTreePartCheckThread; friend class ReplicatedMergeTreeCleanupThread; friend class AsyncBlockIDsCache; friend class ReplicatedMergeTreeAlterThread; friend class ReplicatedMergeTreeRestartingThread; friend class ReplicatedMergeTreeAttachThread; friend class ReplicatedMergeTreeMergeStrategyPicker; friend struct ReplicatedMergeTreeLogEntry; friend class ScopedPartitionMergeLock; friend class ReplicatedMergeTreeQueue; friend class PartMovesBetweenShardsOrchestrator; friend class MergeTreeData; friend class MergeFromLogEntryTask; friend class MutateFromLogEntryTask; friend class ReplicatedMergeMutateTaskBase; using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker; using LogEntry = ReplicatedMergeTreeLogEntry; using LogEntryPtr = LogEntry::Ptr; using MergeTreeData::MutableDataPartPtr; zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below. mutable std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread. zkutil::ZooKeeperPtr tryGetZooKeeper() const; zkutil::ZooKeeperPtr getZooKeeper() const; /// Get connection from global context and reconnect if needed. /// NOTE: use it only when table is shut down, in all other cases /// use getZooKeeper() because it is managed by restarting thread /// which guarantees that we have only one connected object /// for table. zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const; zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const; void setZooKeeper(); String getEndpointName() const; /// If true, the table is offline and can not be written to it. /// This flag is managed by RestartingThread. std::atomic_bool is_readonly {true}; /// If nullopt - ZooKeeper is not available, so we don't know if there is table metadata. /// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case. std::optional has_metadata_in_zookeeper; bool is_readonly_metric_set = false; static const String default_zookeeper_name; const String zookeeper_name; const String zookeeper_path; const String replica_name; const String replica_path; /** /replicas/me/is_active. */ zkutil::EphemeralNodeHolderPtr replica_is_active_node; /** Is this replica "leading". The leader replica selects the parts to merge. * It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders. */ std::atomic is_leader {false}; InterserverIOEndpointPtr data_parts_exchange_endpoint; MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; MergeTreeDataMergerMutator merger_mutator; MergeStrategyPicker merge_strategy_picker; /** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/). * In ZK entries in chronological order. Here it is not necessary. */ ReplicatedMergeTreeQueue queue; std::atomic last_queue_update_start_time{0}; std::atomic last_queue_update_finish_time{0}; mutable std::mutex last_queue_update_exception_lock; String last_queue_update_exception; String getLastQueueUpdateException() const; DataPartsExchange::Fetcher fetcher; /// When activated, replica is initialized and startup() method could exit Poco::Event startup_event; /// Do I need to complete background threads (except restarting_thread)? std::atomic partial_shutdown_called {false}; /// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires. Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET std::atomic shutdown_called {false}; std::atomic shutdown_prepared_called {false}; std::optional shutdown_deadline; /// We call flushAndPrepareForShutdown before acquiring DDLGuard, so we can shutdown a table that is being created right now mutable std::mutex flush_and_shutdown_mutex; mutable std::mutex last_sent_parts_mutex; std::condition_variable last_sent_parts_cv; std::deque last_sent_parts; /// Threads. /// /// A task that keeps track of the updates in the logs of all replicas and loads them into the queue. bool queue_update_in_progress = false; BackgroundSchedulePool::TaskHolder queue_updating_task; BackgroundSchedulePool::TaskHolder mutations_updating_task; Coordination::WatchCallbackPtr mutations_watch_callback; /// A task that selects parts to merge. BackgroundSchedulePool::TaskHolder merge_selecting_task; /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query. std::mutex merge_selecting_mutex; UInt64 merge_selecting_sleep_ms; /// A task that marks finished mutations as done. BackgroundSchedulePool::TaskHolder mutations_finalizing_task; /// A thread that removes old parts, log entries, and blocks. ReplicatedMergeTreeCleanupThread cleanup_thread; AsyncBlockIDsCache async_block_ids_cache; /// A thread that checks the data of the parts, as well as the queue of the parts to be checked. ReplicatedMergeTreePartCheckThread part_check_thread; /// A thread that processes reconnection to ZooKeeper when the session expires. ReplicatedMergeTreeRestartingThread restarting_thread; EventNotifier::HandlerPtr session_expired_callback_handler; /// A thread that attaches the table using ZooKeeper std::optional attach_thread; PartMovesBetweenShardsOrchestrator part_moves_between_shards_orchestrator; std::atomic initialization_done{false}; /// True if replica was created for existing table with fixed granularity bool other_replicas_fixed_granularity = false; /// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro const RenamingRestrictions renaming_restrictions; /// Throttlers used in DataPartsExchange to lower maximum fetch/sends /// speed. ThrottlerPtr replicated_fetches_throttler; ThrottlerPtr replicated_sends_throttler; /// Global ID, synced via ZooKeeper between replicas mutable std::mutex table_shared_id_mutex; mutable UUID table_shared_id; std::mutex last_broken_disks_mutex; std::set last_broken_disks; std::mutex existing_zero_copy_locks_mutex; struct ZeroCopyLockDescription { std::string replica; std::shared_ptr> exists; }; std::unordered_map existing_zero_copy_locks; static std::optional distributedWriteFromClusterStorage(const std::shared_ptr & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context); void readLocalImpl( QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, size_t num_streams); void readLocalSequentialConsistencyImpl( QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, size_t num_streams); void readParallelReplicasImpl( QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, QueryProcessingStage::Enum processed_stage); template void foreachActiveParts(Func && func, bool select_sequential_consistency) const; /** Creates the minimum set of nodes in ZooKeeper and create first replica. * Returns true if was created, false if exists. */ bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot); /** * Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas. */ void createReplica(const StorageMetadataPtr & metadata_snapshot); /** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running. */ void createNewZooKeeperNodes(); bool checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, bool strict_check = true); /// A part of ALTER: apply metadata changes only (data parts are altered separately). /// Must be called under IStorage::lockForAlter() lock. void setTableStructure( const StorageID & table_id, const ContextPtr & local_context, ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff, int32_t new_metadata_version); /** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/). * If any parts described in ZK are not locally, throw an exception. * If any local parts are not mentioned in ZK, remove them. * But if there are too many, throw an exception just in case - it's probably a configuration error. */ void checkParts(bool skip_sanity_checks); bool checkPartsImpl(bool skip_sanity_checks); /// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor /// to be used for deduplication. void syncPinnedPartUUIDs(); /** Check that the part's checksum is the same as the checksum of the same part on some other replica. * If no one has such a part, nothing checks. * Not very reliable: if two replicas add a part almost at the same time, no checks will occur. * Adds actions to `ops` that add data about the part into ZooKeeper. * Call under lockForShare. */ bool checkPartChecksumsAndAddCommitOps( const ZooKeeperWithFaultInjectionPtr & zookeeper, const DataPartPtr & part, Coordination::Requests & ops, String part_name, NameSet & absent_replicas_paths); String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const; bool getOpsToCheckPartChecksumsAndCommit(const ZooKeeperWithFaultInjectionPtr & zookeeper, const MutableDataPartPtr & part, std::optional hardlinked_files, bool replace_zero_copy_lock, Coordination::Requests & ops, size_t & num_check_ops); /// Accepts a PreActive part, atomically checks its checksums with ones on other replicas and commit the part DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional hardlinked_files = {}, bool replace_zero_copy_lock=false); bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const String & block_id_path = "") const; void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const std::vector & block_id_paths) const; /// Adds actions to `ops` that remove a part from ZooKeeper. /// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes). void getRemovePartFromZooKeeperOps(const String & part_name, Coordination::Requests & ops, bool has_children); /// Quickly removes big set of parts from ZooKeeper (using async multi queries) void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names, NameSet * parts_should_be_retried = nullptr); /// Remove parts from ZooKeeper, throw exception if unable to do so after max_retries. void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5); void removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries = 5); void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & part_name) override; void paranoidCheckForCoveredPartsInZooKeeperOnStart(const Strings & parts_in_zk, const Strings & parts_to_fetch) const; /// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts. void removePartAndEnqueueFetch(const String & part_name, bool storage_init); /// Running jobs from the queue. /** Execute the action from the queue. Throws an exception if something is wrong. * Returns whether or not it succeeds. If it did not work, write it to the end of the queue. */ bool executeLogEntry(LogEntry & entry); /// Lookup the part for the entry in the detached/ folder. /// returns nullptr if the part is corrupt or missing. MutableDataPartPtr attachPartHelperFoundValidPart(const LogEntry& entry) const; void executeDropRange(const LogEntry & entry); /// Execute alter of table metadata. Set replica/metadata and replica/columns /// nodes in zookeeper and also changes in memory metadata. /// New metadata and columns values stored in entry. bool executeMetadataAlter(const LogEntry & entry); /// Fetch part from other replica (inserted or merged/mutated) /// NOTE: Attention! First of all tries to find covering part on other replica /// and set it into entry.actual_new_part_name. After that tries to fetch this new covering part. /// If fetch was not successful, clears entry.actual_new_part_name. bool executeFetch(LogEntry & entry, bool need_to_check_missing_part=true); bool executeReplaceRange(LogEntry & entry); void executeClonePartFromShard(const LogEntry & entry); /** Updates the queue. */ void queueUpdatingTask(); void mutationsUpdatingTask(); /** Clone data from another replica. * If replica can not be cloned throw Exception. */ void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper); /// Repairs metadata of staled replica. Called from cloneReplica(...) void cloneMetadataIfNeeded(const String & source_replica, const String & source_path, zkutil::ZooKeeperPtr & zookeeper); /// Clone replica if it is lost. void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper); ReplicatedMergeTreeQueue::SelectedEntryPtr selectQueueEntry(); MergeFromLogEntryTaskPtr getTaskToProcessMergeQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry); bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry); /// Start being leader (if not disabled by setting). /// Since multi-leaders are allowed, it just sets is_leader flag. void startBeingLeader(); void stopBeingLeader(); /** Selects the parts to merge and writes to the log. */ void mergeSelectingTask(); /// Checks if some mutations are done and marks them as done. void mutationsFinalizingTask(); /** Write the selected parts to merge into the log, * Call when merge_selecting_mutex is locked. * Returns false if any part is not in ZK. */ enum class CreateMergeEntryResult { Ok, MissingPart, LogUpdated, Other }; CreateMergeEntryResult createLogEntryToMergeParts( zkutil::ZooKeeperPtr & zookeeper, const DataPartsVector & parts, const String & merged_name, const UUID & merged_part_uuid, const MergeTreeDataPartFormat & merged_part_format, bool deduplicate, const Names & deduplicate_by_columns, bool cleanup, ReplicatedMergeTreeLogEntryData * out_log_entry, int32_t log_version, MergeType merge_type); CreateMergeEntryResult createLogEntryToMutatePart( const IMergeTreeDataPart & part, const UUID & new_part_uuid, Int64 mutation_version, int32_t alter_version, int32_t log_version); /** Returns an empty string if no one has a part. */ String findReplicaHavingPart(const String & part_name, bool active); static String findReplicaHavingPart(const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_); bool checkReplicaHavePart(const String & replica, const String & part_name); bool checkIfDetachedPartExists(const String & part_name); bool checkIfDetachedPartitionExists(const String & partition_name); /** Find replica having specified part or any part that covers it. * If active = true, consider only active replicas. * If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part. * If not found, returns empty string. */ String findReplicaHavingCoveringPart(LogEntry & entry, bool active); bool findReplicaHavingCoveringPart(const String & part_name, bool active); String findReplicaHavingCoveringPartImplLowLevel(LogEntry * entry, const String & part_name, String & found_part_name, bool active); static std::set findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_); /** Download the specified part from the specified replica. * If `to_detached`, the part is placed in the `detached` directory. * If quorum != 0, then the node for tracking the quorum is updated. * Returns false if part is already fetching right now. */ bool fetchPart( const String & part_name, const StorageMetadataPtr & metadata_snapshot, const String & source_zookeeper_name, const String & source_replica_path, bool to_detached, size_t quorum, zkutil::ZooKeeper::Ptr zookeeper_ = nullptr, bool try_fetch_shared = true); /** Download the specified part from the specified replica. * Used for replace local part on the same s3-shared part in hybrid storage. * Returns false if part is already fetching right now. */ MutableDataPartPtr fetchExistsPart( const String & part_name, const StorageMetadataPtr & metadata_snapshot, const String & replica_path, DiskPtr replaced_disk, String replaced_part_path); /// Required only to avoid races between executeLogEntry and fetchPartition std::unordered_set currently_fetching_parts; std::mutex currently_fetching_parts_mutex; /// With the quorum being tracked, add a replica to the quorum for the part. void updateQuorum(const String & part_name, bool is_parallel); /// Deletes info from quorum/last_part node for particular partition_id. void cleanLastPartNode(const String & partition_id); /// Part name is stored in quorum/last_part for corresponding partition_id. bool partIsLastQuorumPart(const MergeTreePartInfo & part_info) const; /// Part currently inserting with quorum (node quorum/parallel/part_name exists) bool partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const; /// Creates new block number if block with such block_id does not exist /// If zookeeper_path_prefix specified then allocate block number on this path /// (can be used if we want to allocate blocks on other replicas) std::optional allocateBlockNumber( const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const; template std::optional allocateBlockNumber( const String & partition_id, const ZooKeeperWithFaultInjectionPtr & zookeeper, const T & zookeeper_block_id_path, const String & zookeeper_path_prefix = "") const; /** Wait until all replicas, including this, execute the specified action from the log. * If replicas are added at the same time, it can not wait the added replica. * * Waits for inactive replicas no more than wait_for_inactive_timeout. * Returns list of inactive replicas that have not executed entry or throws exception. * * NOTE: This method must be called without table lock held. * Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock. */ void waitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout, const String & error_context = {}); Strings tryWaitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); /** Wait until the specified replica executes the specified action from the log. * NOTE: See comment about locks above. */ bool tryWaitForReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); /// Depending on settings, do nothing or wait for this replica or all replicas process log entry. void waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context = {}); /// Throw an exception if the table is readonly. void assertNotReadonly() const; /// Produce an imaginary part info covering all parts in the specified partition (at the call moment). /// Returns false if the partition doesn't exist yet. /// Caller must hold delimiting_block_lock until creation of drop/replace entry in log. /// Otherwise some replica may assign merge which intersects part_info. bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, std::optional & delimiting_block_lock, bool for_replace_range = false); /// Check for a node in ZK. If it is, remember this information, and then immediately answer true. mutable std::unordered_set existing_nodes_cache; mutable std::mutex existing_nodes_cache_mutex; bool existsNodeCached(const ZooKeeperWithFaultInjectionPtr & zookeeper, const std::string & path) const; /// Cancels INSERTs in the block range by removing ephemeral block numbers void clearLockedBlockNumbersInPartition(zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num); void getClearBlocksInPartitionOps(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num); void getClearBlocksInPartitionOpsImpl(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num, const String & blocks_dir_name); /// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range. void clearBlocksInPartition( zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num); /// Info about how other replicas can access this one. ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const; bool addOpsToDropAllPartsInPartition( zkutil::ZooKeeper & zookeeper, const String & partition_id, bool detach, Coordination::Requests & ops, std::vector & entries, std::vector & delimiting_block_locks, std::vector & log_entry_ops_idx); void dropAllPartsInPartitions( zkutil::ZooKeeper & zookeeper, const Strings & partition_ids, std::vector & entries, ContextPtr query_context, bool detach); LogEntryPtr dropAllPartsInPartition( zkutil::ZooKeeper & zookeeper, const String & partition_id, ContextPtr query_context, bool detach); void dropAllPartitionsImpl(const zkutil::ZooKeeperPtr & zookeeper, bool detach, ContextPtr query_context); void dropPartNoWaitNoThrow(const String & part_name) override; void dropPart(const String & part_name, bool detach, ContextPtr query_context) override; // Partition helpers void dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) override; PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override; void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override; void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override; void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, ContextPtr query_context) override; CancellationCode killPartMoveToShard(const UUID & task_uuid) override; void fetchPartition( const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, bool fetch_part, ContextPtr query_context) override; /// NOTE: there are no guarantees for concurrent merges. Dropping part can /// be concurrently merged into some covering part and dropPart will do /// nothing. There are some fundamental problems with it. But this is OK /// because: /// /// dropPart used in the following cases: /// 1) Remove empty parts after TTL. /// 2) Remove parts after move between shards. /// 3) User queries: ALTER TABLE DROP PART 'part_name'. /// /// In the first case merge of empty part is even better than DROP. In the /// second case part UUIDs used to forbid merges for moving parts so there /// is no problem with concurrent merges. The third case is quite rare and /// we give very weak guarantee: there will be no active part with this /// name, but possibly it was merged to some other part. /// /// NOTE: don't rely on dropPart if you 100% need to remove non-empty part /// and don't use any explicit locking mechanism for merges. bool dropPartImpl(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop); /// Check granularity of already existing replicated table in zookeeper if it exists /// return true if it's fixed bool checkFixedGranularityInZookeeper(); /// Wait for timeout seconds mutation is finished on replicas void waitMutationToFinishOnReplicas( const Strings & replicas, const String & mutation_id) const; std::map getAlterMutationCommandsForPart(const DataPartPtr & part) const override; void startBackgroundMovesIfNeeded() override; /// Attaches restored parts to the storage. void attachRestoredParts(MutableDataPartsVector && parts) override; std::unique_ptr getDefaultSettings() const override; PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions( const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const; static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, const std::string & disk_type, const String & table_uuid, const String & part_name, const String & zookeeper_path_old); static void createZeroCopyLockNode( const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false, const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {}); static void getZeroCopyLockNodeCreateOps( const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests, int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false, const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {}); bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override; /// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled. void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override; // Create table id if needed void createTableSharedID() const; bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica); void watchZeroCopyLock(const String & part_name, const DiskPtr & disk); std::optional getZeroCopyPartPath(const String & part_name, const DiskPtr & disk); /// Create ephemeral lock in zookeeper for part and disk which support zero copy replication. /// If no connection to zookeeper, shutdown, readonly -- return std::nullopt. /// If somebody already holding the lock -- return unlocked ZeroCopyLock object (not std::nullopt). std::optional tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override; /// Wait for ephemral lock to disappear. Return true if table shutdown/readonly/timeout exceeded, etc. /// Or if node actually disappeared. bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override; void startupImpl(bool from_attach_thread); struct DataValidationTasks : public IStorage::DataValidationTasksBase { explicit DataValidationTasks(DataPartsVector && parts_, std::unique_lock && parts_check_lock_) : parts_check_lock(std::move(parts_check_lock_)), parts(std::move(parts_)), it(parts.begin()) {} DataPartPtr next() { std::lock_guard lock(mutex); if (it == parts.end()) return nullptr; return *(it++); } size_t size() const override { std::lock_guard lock(mutex); return std::distance(it, parts.end()); } std::unique_lock parts_check_lock; mutable std::mutex mutex; DataPartsVector parts; DataPartsVector::const_iterator it; }; }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); /** There are three places for each part, where it should be * 1. In the RAM, data_parts, all_data_parts. * 2. In the filesystem (FS), the directory with the data of the table. * 3. in ZooKeeper (ZK). * * When adding a part, it must be added immediately to these three places. * This is done like this * - [FS] first write the part into a temporary directory on the filesystem; * - [FS] rename the temporary part to the result on the filesystem; * - [RAM] immediately afterwards add it to the `data_parts`, and remove from `data_parts` any parts covered by this one; * - [RAM] also set the `Transaction` object, which in case of an exception (in next point), * rolls back the changes in `data_parts` (from the previous point) back; * - [ZK] then send a transaction (multi) to add a part to ZooKeeper (and some more actions); * - [FS, ZK] by the way, removing the covered (old) parts from filesystem, from ZooKeeper and from `all_data_parts` * is delayed, after a few minutes. * * There is no atomicity here. * It could be possible to achieve atomicity using undo/redo logs and a flag in `DataPart` when it is completely ready. * But it would be inconvenient - I would have to write undo/redo logs for each `Part` in ZK, and this would increase already large number of interactions. * * Instead, we are forced to work in a situation where at any time * (from another thread, or after server restart), there may be an unfinished transaction. * (note - for this the part should be in RAM) * From these cases the most frequent one is when the part is already in the data_parts, but it's not yet in ZooKeeper. * This case must be distinguished from the case where such a situation is achieved due to some kind of damage to the state. * * Do this with the threshold for the time. * If the part is young enough, its lack in ZooKeeper will be perceived optimistically - as if it just did not have time to be added there * - as if the transaction has not yet been executed, but will soon be executed. * And if the part is old, its absence in ZooKeeper will be perceived as an unfinished transaction that needs to be rolled back. * * PS. Perhaps it would be better to add a flag to the DataPart that a part is inserted into ZK. * But here it's too easy to get confused with the consistency of this flag. */ /// NOLINTNEXTLINE #define MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER (5 * 60) }