ClickHouse/src/Storages/StorageReplicatedMergeTree.h

850 lines
40 KiB
C++
Raw Normal View History

2014-03-21 13:42:14 +00:00
#pragma once
2021-10-02 07:13:14 +00:00
#include <base/shared_ptr_helper.h>
#include <base/UUID.h>
#include <atomic>
#include <pcg_random.hpp>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
2019-08-19 17:59:16 +00:00
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
2020-09-18 10:57:33 +00:00
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Storages/MergeTree/MergeFromLogEntryTask.h>
#include <DataTypes/DataTypesNumber.h>
2018-12-28 17:11:52 +00:00
#include <Interpreters/Cluster.h>
#include <Interpreters/PartLog.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
2021-05-26 20:37:44 +00:00
#include <Common/Throttler.h>
2018-08-20 15:34:37 +00:00
#include <Core/BackgroundSchedulePool.h>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/Pipe.h>
2021-09-06 12:01:16 +00:00
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
2014-03-21 13:42:14 +00:00
namespace DB
{
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
/** The engine that uses the merge tree (see MergeTreeData) and is replicated through ZooKeeper.
*
2017-04-16 15:00:33 +00:00
* ZooKeeper is used for the following things:
2020-02-14 13:17:50 +00:00
* - the structure of the table (/metadata, /columns)
2017-04-16 15:00:33 +00:00
* - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...);
* - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host);
2021-01-12 18:46:03 +00:00
* - the leader replica election (/leader_election) - these are the replicas that assign merges, mutations
* and partition manipulations.
2020-12-16 03:03:43 +00:00
* (after ClickHouse version 20.5 we allow multiple leaders to act concurrently);
2017-04-16 15:00:33 +00:00
* - a set of parts of data on each replica (/replicas/replica_name/parts);
* - list of the last N blocks of data with checksum, for deduplication (/blocks);
* - the list of incremental block numbers (/block_numbers) that we are about to insert,
* to ensure the linear order of data insertion and data merge only on the intervals in this sequence;
2021-01-12 18:46:03 +00:00
* - coordinate writes with quorum (/quorum).
* - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations).
* See comments in StorageReplicatedMergeTree::mutate() for details.
2014-03-21 19:17:59 +00:00
*/
2017-04-16 15:00:33 +00:00
/** The replicated tables have a common log (/log/log-...).
* Log - a sequence of entries (LogEntry) about what to do.
* Each entry is one of:
* - normal data insertion (GET),
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
* - data insertion with a possible attach from local data (ATTACH),
2017-04-16 15:00:33 +00:00
* - merge (MERGE),
* - delete the partition (DROP).
*
* Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...)
2017-04-16 15:00:33 +00:00
* and then executes them (queueTask).
* Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry).
* In addition, the records in the queue can be generated independently (not from the log), in the following cases:
* - when creating a new replica, actions are put on GET from other replicas (createReplica);
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
* - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check
* (at start - checkParts, while running - searchForMissingPart), actions are put on GET from other replicas;
*
2017-04-16 15:00:33 +00:00
* The replica to which INSERT was made in the queue will also have an entry of the GET of this data.
* Such an entry is considered to be executed as soon as the queue handler sees it.
*
2017-04-16 15:00:33 +00:00
* The log entry has a creation time. This time is generated by the clock of server that created entry
* - the one on which the corresponding INSERT or ALTER query came.
*
2017-04-16 15:00:33 +00:00
* For the entries in the queue that the replica made for itself,
* as the time will take the time of creation the appropriate part on any of the replicas.
*/
2021-06-15 19:55:21 +00:00
class StorageReplicatedMergeTree final : public shared_ptr_helper<StorageReplicatedMergeTree>, public MergeTreeData
2014-03-21 13:42:14 +00:00
{
2021-06-15 19:55:21 +00:00
friend struct shared_ptr_helper<StorageReplicatedMergeTree>;
2014-03-21 13:42:14 +00:00
public:
void startup() override;
void shutdown() override;
2021-12-27 15:54:28 +00:00
void flush() override;
~StorageReplicatedMergeTree() override;
2019-05-03 02:00:57 +00:00
std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree"; }
bool supportsParallelInsert() const override { return true; }
2017-04-25 15:21:03 +00:00
bool supportsReplication() const override { return true; }
bool supportsDeduplication() const override { return true; }
2020-08-03 13:54:14 +00:00
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
2017-06-02 15:54:39 +00:00
unsigned num_streams) override;
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
2017-06-02 15:54:39 +00:00
unsigned num_streams) override;
2020-11-25 13:47:32 +00:00
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context) const override;
2020-11-25 13:47:32 +00:00
std::optional<UInt64> totalBytes(const Settings & settings) const override;
2021-07-23 19:33:59 +00:00
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
2020-06-27 20:13:16 +00:00
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
ContextPtr query_context) override;
2021-10-25 17:49:49 +00:00
void alter(const AlterCommands & commands, ContextPtr query_context, AlterLockHolder & table_lock_holder) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
2020-03-09 01:22:33 +00:00
void waitMutation(const String & znode_name, size_t mutations_sync) const;
2019-05-03 02:00:57 +00:00
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override;
2017-04-16 15:00:33 +00:00
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
*/
2020-01-22 11:30:11 +00:00
void drop() override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr query_context, TableExclusiveLockHolder &) override;
2018-04-21 00:35:20 +00:00
2022-04-13 14:51:59 +00:00
enum RenamingRestrictions
{
ALLOW_ANY,
ALLOW_PRESERVING_UUID,
DO_NOT_ALLOW,
};
void checkTableCanBeRenamed(const StorageID & new_name) const override;
2020-09-26 19:18:28 +00:00
2020-04-07 14:05:51 +00:00
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
bool supportsIndexForIn() const override { return true; }
void checkTableCanBeDropped() const override;
ActionLock getActionLock(StorageActionBlockType action_type) override;
2020-10-15 16:10:22 +00:00
void onActionLockRemove(StorageActionBlockType action_type) override;
/// Wait when replication queue size becomes less or equal than queue_size
/// If timeout is exceeded returns false
bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0);
2017-04-16 15:00:33 +00:00
/** For the system table replicas. */
struct Status
{
bool is_leader;
bool can_become_leader;
bool is_readonly;
bool is_session_expired;
ReplicatedMergeTreeQueue::Status queue;
UInt32 parts_to_check;
String zookeeper_path;
String replica_name;
String replica_path;
Int32 columns_version;
UInt64 log_max_index;
UInt64 log_pointer;
UInt64 absolute_delay;
UInt8 total_replicas;
UInt8 active_replicas;
String last_queue_update_exception;
2020-02-15 00:11:09 +00:00
/// If the error has happened fetching the info from ZooKeeper, this field will be set.
String zookeeper_exception;
std::unordered_map<std::string, bool> replica_is_active;
};
2017-04-16 15:00:33 +00:00
/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
void getStatus(Status & res, bool with_zk_fields = true);
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
void getQueue(LogEntriesData & res, String & replica_name);
std::vector<PartMovesBetweenShardsOrchestrator::Entry> getPartMovesBetweenShardsEntries();
/// Get replica delay relative to current time.
time_t getAbsoluteDelay() const;
2020-06-19 14:18:58 +00:00
/// If the absolute delay is greater than min_relative_delay_to_measure,
/// will also calculate the difference from the unprocessed time of the best replica.
/// NOTE: Will communicate to ZooKeeper to calculate relative delay.
void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay);
2017-04-16 15:00:33 +00:00
/// Add a part to the queue of parts whose data you want to check in the background thread.
2022-04-12 12:14:26 +00:00
void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0);
CheckResults checkData(const ASTPtr & query, ContextPtr context) override;
2019-07-03 13:17:19 +00:00
/// Checks ability to use granularity
bool canUseAdaptiveGranularity() const override;
2020-02-17 12:47:34 +00:00
int getMetadataVersion() const { return metadata_version; }
2020-02-05 11:18:11 +00:00
/** Remove a specific replica from zookeeper.
*/
2022-04-05 15:36:53 +00:00
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
Poco::Logger * logger, MergeTreeSettingsPtr table_settings = nullptr);
2021-04-19 08:21:42 +00:00
/// Removes table from ZooKeeper after the last replica was dropped
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
2021-04-19 10:40:20 +00:00
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
2021-04-19 08:21:42 +00:00
2021-06-21 13:36:21 +00:00
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
2021-09-08 00:21:21 +00:00
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
2020-11-09 09:14:20 +00:00
/// Checks that fetches are not disabled with action blocker and pool for fetches
/// is not overloaded
2020-10-26 11:02:47 +00:00
bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;
/// Fetch part only when it stored on shared storage like S3
2021-03-04 23:10:20 +00:00
bool executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
2021-07-05 03:32:56 +00:00
/// Lock part in zookeeper for use shared data in several nodes
2022-04-19 13:53:10 +00:00
void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional<HardlinkedFiles> hardlinked_files = {}) const override;
2022-02-14 09:20:27 +00:00
void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
2021-02-26 09:48:57 +00:00
2021-07-05 03:32:56 +00:00
/// Unlock shared data part in zookeeper
2021-02-26 09:48:57 +00:00
/// Return true if data unlocked
/// Return false if data is still used by another node
2022-04-18 23:09:09 +00:00
std::pair<bool, NameSet> unlockSharedData(const IMergeTreeDataPart & part) const override;
2021-02-26 09:48:57 +00:00
/// Unlock shared data part in zookeeper by part id
/// Return true if data unlocked
/// Return false if data is still used by another node
2022-04-18 23:09:09 +00:00
static std::pair<bool, NameSet> unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_,
DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger,
const String & zookeeper_path_old);
2021-02-26 09:48:57 +00:00
/// Fetch part only if some replica has it on shared storage like S3
2021-03-05 17:24:06 +00:00
bool tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
2021-02-26 09:48:57 +00:00
2021-06-24 08:25:05 +00:00
/// Get best replica having this partition on a same type remote disk
2021-08-24 22:24:47 +00:00
String getSharedDataReplica(const IMergeTreeDataPart & part, DiskType disk_type) const;
2021-02-26 09:48:57 +00:00
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
inline String getReplicaName() const { return replica_name; }
/// Restores table metadata if ZooKeeper lost it.
2021-12-30 14:27:22 +00:00
/// Used only on restarted readonly replicas (not checked). All active (Active) parts are moved to detached/
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
/// folder and attached. Parts in all other states are just moved to detached/ folder.
void restoreMetadataInZooKeeper();
2021-05-27 12:54:47 +00:00
/// Get throttler for replicated fetches
2021-05-26 20:37:44 +00:00
ThrottlerPtr getFetchesThrottler() const
{
return replicated_fetches_throttler;
}
2021-05-27 12:54:47 +00:00
/// Get throttler for replicated sends
2021-05-26 20:37:44 +00:00
ThrottlerPtr getSendsThrottler() const
{
return replicated_sends_throttler;
}
2021-06-30 15:24:51 +00:00
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
2021-12-27 16:27:06 +00:00
// Return default or custom zookeeper name for table
String getZooKeeperName() const { return zookeeper_name; }
2021-12-27 16:27:06 +00:00
// Return table id, common for different replicas
2022-04-20 19:08:26 +00:00
String getTableSharedID() const override;
static String getDefaultZooKeeperName() { return default_zookeeper_name; }
/// Check if there are new broken disks and enqueue part recovery tasks.
void checkBrokenDisks();
2014-03-21 13:42:14 +00:00
private:
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
std::atomic_bool are_restoring_replica {false};
/// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
/// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK();
2021-07-23 19:33:59 +00:00
friend class ReplicatedMergeTreeSink;
friend class ReplicatedMergeTreePartCheckThread;
friend class ReplicatedMergeTreeCleanupThread;
friend class ReplicatedMergeTreeAlterThread;
friend class ReplicatedMergeTreeRestartingThread;
2020-09-18 10:57:33 +00:00
friend class ReplicatedMergeTreeMergeStrategyPicker;
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReplicatedMergeTreeQueue;
friend class PartMovesBetweenShardsOrchestrator;
friend class MergeTreeData;
friend class MergeFromLogEntryTask;
friend class MutateFromLogEntryTask;
friend class ReplicatedMergeMutateTaskBase;
2016-01-28 01:00:27 +00:00
2020-09-18 10:57:33 +00:00
using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
2014-08-05 13:49:44 +00:00
using MergeTreeData::MutableDataPartPtr;
2017-04-16 15:00:33 +00:00
zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below.
mutable std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread.
2014-12-12 20:50:32 +00:00
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeper() const;
2022-02-03 10:10:05 +00:00
zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const;
2020-11-16 08:27:33 +00:00
void setZooKeeper();
2014-03-22 14:44:44 +00:00
2017-04-16 15:00:33 +00:00
/// If true, the table is offline and can not be written to it.
2022-02-03 10:10:05 +00:00
/// This flag is managed by RestartingThread.
std::atomic_bool is_readonly {true};
2022-01-24 09:16:13 +00:00
/// If nullopt - ZooKeeper is not available, so we don't know if there is table metadata.
/// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case.
2022-01-20 18:55:59 +00:00
std::optional<bool> has_metadata_in_zookeeper;
2020-11-16 08:27:33 +00:00
static constexpr auto default_zookeeper_name = "default";
String zookeeper_name;
String zookeeper_path;
String replica_name;
String replica_path;
/** /replicas/me/is_active.
*/
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
/** Is this replica "leading". The leader replica selects the parts to merge.
2020-06-15 02:14:59 +00:00
* It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
*/
std::atomic<bool> is_leader {false};
2020-01-14 14:27:48 +00:00
InterserverIOEndpointPtr data_parts_exchange_endpoint;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
2020-09-18 10:57:33 +00:00
MergeStrategyPicker merge_strategy_picker;
/** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
* In ZK entries in chronological order. Here it is not necessary.
*/
ReplicatedMergeTreeQueue queue;
std::atomic<time_t> last_queue_update_start_time{0};
std::atomic<time_t> last_queue_update_finish_time{0};
mutable std::mutex last_queue_update_exception_lock;
String last_queue_update_exception;
String getLastQueueUpdateException() const;
DataPartsExchange::Fetcher fetcher;
/// When activated, replica is initialized and startup() method could exit
Poco::Event startup_event;
2017-04-16 15:00:33 +00:00
/// Do I need to complete background threads (except restarting_thread)?
std::atomic<bool> partial_shutdown_called {false};
/// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires.
Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET
2021-12-27 15:54:28 +00:00
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};
2020-02-05 11:18:11 +00:00
int metadata_version = 0;
/// Threads.
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
bool queue_update_in_progress = false;
BackgroundSchedulePool::TaskHolder queue_updating_task;
BackgroundSchedulePool::TaskHolder mutations_updating_task;
/// A task that selects parts to merge.
BackgroundSchedulePool::TaskHolder merge_selecting_task;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
std::mutex merge_selecting_mutex;
/// A task that marks finished mutations as done.
BackgroundSchedulePool::TaskHolder mutations_finalizing_task;
2017-04-16 15:00:33 +00:00
/// A thread that removes old parts, log entries, and blocks.
ReplicatedMergeTreeCleanupThread cleanup_thread;
2017-04-16 15:00:33 +00:00
/// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
ReplicatedMergeTreePartCheckThread part_check_thread;
/// A thread that processes reconnection to ZooKeeper when the session expires.
ReplicatedMergeTreeRestartingThread restarting_thread;
PartMovesBetweenShardsOrchestrator part_moves_between_shards_orchestrator;
/// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false;
2020-09-26 19:18:28 +00:00
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro
2022-04-13 14:51:59 +00:00
const RenamingRestrictions renaming_restrictions;
2020-09-26 19:18:28 +00:00
2020-10-26 11:02:47 +00:00
const size_t replicated_fetches_pool_size;
2021-05-27 12:54:47 +00:00
/// Throttlers used in DataPartsExchange to lower maximum fetch/sends
/// speed.
2021-05-26 20:37:44 +00:00
ThrottlerPtr replicated_fetches_throttler;
ThrottlerPtr replicated_sends_throttler;
2021-12-30 09:57:38 +00:00
/// Global ID, synced via ZooKeeper between replicas
UUID table_shared_id;
std::mutex last_broken_disks_mutex;
std::set<String> last_broken_disks;
template <class Func>
2021-12-30 14:27:22 +00:00
void foreachActiveParts(Func && func, bool select_sequential_consistency) const;
/** Creates the minimum set of nodes in ZooKeeper and create first replica.
* Returns true if was created, false if exists.
*/
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
/**
* Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas.
*/
void createReplica(const StorageMetadataPtr & metadata_snapshot);
2017-04-16 15:00:33 +00:00
/** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running.
*/
void createNewZooKeeperNodes();
void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);
/// A part of ALTER: apply metadata changes only (data parts are altered separately).
2020-06-18 16:10:47 +00:00
/// Must be called under IStorage::lockForAlter() lock.
void setTableStructure(ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff);
2017-04-16 15:00:33 +00:00
/** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/).
* If any parts described in ZK are not locally, throw an exception.
* If any local parts are not mentioned in ZK, remove them.
* But if there are too many, throw an exception just in case - it's probably a configuration error.
*/
void checkParts(bool skip_sanity_checks);
/// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor
/// to be used for deduplication.
void syncPinnedPartUUIDs();
2017-04-16 15:00:33 +00:00
/** Check that the part's checksum is the same as the checksum of the same part on some other replica.
* If no one has such a part, nothing checks.
* Not very reliable: if two replicas add a part almost at the same time, no checks will occur.
* Adds actions to `ops` that add data about the part into ZooKeeper.
2020-06-22 09:49:21 +00:00
* Call under lockForShare.
*/
2019-05-03 02:00:57 +00:00
void checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const DataPartPtr & part,
Coordination::Requests & ops, String part_name = "", NameSet * absent_replicas_paths = nullptr);
String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const;
2021-12-30 14:27:22 +00:00
/// Accepts a PreActive part, atomically checks its checksums with ones on other replicas and commit the part
2022-04-19 13:53:10 +00:00
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const DataPartPtr & part, std::optional<HardlinkedFiles> hardlinked_files = {});
2019-09-05 13:12:29 +00:00
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
2019-08-16 15:57:19 +00:00
2019-09-05 13:12:29 +00:00
void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "") const;
2017-04-16 15:00:33 +00:00
/// Adds actions to `ops` that remove a part from ZooKeeper.
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
void removePartFromZooKeeper(const String & part_name, Coordination::Requests & ops, bool has_children);
/// Just removes part from ZooKeeper using previous method
void removePartFromZooKeeper(const String & part_name);
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
2018-04-06 19:48:54 +00:00
NameSet * parts_should_be_retried = nullptr);
RFC: Throw exception if removing parts from ZooKeeper fails. This is used for removing part metadata from ZooKeeper when executing queue events like `DROP_RANGE` triggered when a user tries to drop a part or a partition. There are other uses but I'll focus only on this one. Before this change the method was giving up silently if it was unable to remove parts from ZooKeeper and this behaviour seems to be problematic. It could lead to operation being reported as successful at first but data reappearing later (very rarely) or "stuck" events in replication queue. Here is one particular scenario which I think we've hit: * Execute a DETACH PARTITION * DROP_RANGE event put in the queue * Replicas try to execute dropRange but some of them get disconnected from ZK and 5 retries aren't enough (ZK is miss-behaving), return code (false) is ignored and log pointer advances. * One of the replica where dropRange failed is restarted. * checkParts is executed and it finds parts that weren't removed from ZK, logs `Removing locally missing part from ZooKeeper and queueing a fetch` and puts GET_PART on the queue. * Few things can happen from here: * There is a lagging replica that din't execute DROP_RANGE yet: part will be fetched. The other replica will execute DROP_RANGE later and we'll get diverging set of parts on replicas. * Another replica also silently failed to remove parts from ZK: both of them are left with GET_PART in the queue and none of them can make progress, logging: `No active replica has part ... or covering part`.
2021-03-05 09:50:26 +00:00
/// Remove parts from ZooKeeper, throw exception if unable to do so after max_retries.
void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
void removePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries = 5);
2017-04-16 15:00:33 +00:00
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void removePartAndEnqueueFetch(const String & part_name);
2017-04-16 15:00:33 +00:00
/// Running jobs from the queue.
2017-04-16 15:00:33 +00:00
/** Execute the action from the queue. Throws an exception if something is wrong.
* Returns whether or not it succeeds. If it did not work, write it to the end of the queue.
*/
bool executeLogEntry(LogEntry & entry);
/// Lookup the part for the entry in the detached/ folder.
/// returns nullptr if the part is corrupt or missing.
MutableDataPartPtr attachPartHelperFoundValidPart(const LogEntry& entry) const;
void executeDropRange(const LogEntry & entry);
2020-08-08 00:47:03 +00:00
/// Execute alter of table metadata. Set replica/metadata and replica/columns
2020-02-17 16:33:05 +00:00
/// nodes in zookeeper and also changes in memory metadata.
/// New metadata and columns values stored in entry.
2020-01-17 13:54:22 +00:00
bool executeMetadataAlter(const LogEntry & entry);
2020-01-13 16:39:20 +00:00
2020-02-17 16:33:05 +00:00
/// Fetch part from other replica (inserted or merged/mutated)
/// NOTE: Attention! First of all tries to find covering part on other replica
/// and set it into entry.actual_new_part_name. After that tries to fetch this new covering part.
/// If fetch was not successful, clears entry.actual_new_part_name.
bool executeFetch(LogEntry & entry, bool need_to_check_missing_part=true);
bool executeReplaceRange(const LogEntry & entry);
void executeClonePartFromShard(const LogEntry & entry);
2017-04-16 15:00:33 +00:00
/** Updates the queue.
*/
void queueUpdatingTask();
void mutationsUpdatingTask();
/** Clone data from another replica.
2018-08-27 23:59:49 +00:00
* If replica can not be cloned throw Exception.
*/
2018-08-27 13:51:22 +00:00
void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);
2021-05-07 17:09:39 +00:00
/// Repairs metadata of staled replica. Called from cloneReplica(...)
void cloneMetadataIfNeeded(const String & source_replica, const String & source_path, zkutil::ZooKeeperPtr & zookeeper);
/// Clone replica if it is lost.
void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);
2020-10-23 08:54:00 +00:00
ReplicatedMergeTreeQueue::SelectedEntryPtr selectQueueEntry();
2019-08-19 17:59:16 +00:00
MergeFromLogEntryTaskPtr getTaskToProcessMergeQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);
2020-10-23 08:54:00 +00:00
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);
2019-08-19 17:59:16 +00:00
/// Start being leader (if not disabled by setting).
/// Since multi-leaders are allowed, it just sets is_leader flag.
void startBeingLeader();
void stopBeingLeader();
2017-04-16 15:00:33 +00:00
/** Selects the parts to merge and writes to the log.
*/
void mergeSelectingTask();
/// Checks if some mutations are done and marks them as done.
void mutationsFinalizingTask();
2017-04-16 15:00:33 +00:00
/** Write the selected parts to merge into the log,
* Call when merge_selecting_mutex is locked.
* Returns false if any part is not in ZK.
*/
enum class CreateMergeEntryResult { Ok, MissingPart, LogUpdated, Other };
CreateMergeEntryResult createLogEntryToMergeParts(
zkutil::ZooKeeperPtr & zookeeper,
2019-05-03 02:00:57 +00:00
const DataPartsVector & parts,
const String & merged_name,
const UUID & merged_part_uuid,
const MergeTreeDataPartType & merged_part_type,
2017-04-17 15:14:56 +00:00
bool deduplicate,
const Names & deduplicate_by_columns,
ReplicatedMergeTreeLogEntryData * out_log_entry,
2020-09-03 13:00:13 +00:00
int32_t log_version,
MergeType merge_type);
CreateMergeEntryResult createLogEntryToMutatePart(
const IMergeTreeDataPart & part,
const UUID & new_part_uuid,
Int64 mutation_version,
int32_t alter_version,
int32_t log_version);
2017-04-16 15:00:33 +00:00
/// Exchange parts.
ConnectionTimeouts getFetchPartHTTPTimeouts(ContextPtr context);
2017-04-16 15:00:33 +00:00
/** Returns an empty string if no one has a part.
*/
String findReplicaHavingPart(const String & part_name, bool active);
2021-04-13 09:34:04 +00:00
static String findReplicaHavingPart(const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_);
2020-09-18 10:57:33 +00:00
bool checkReplicaHavePart(const String & replica, const String & part_name);
2021-04-14 02:05:41 +00:00
bool checkIfDetachedPartExists(const String & part_name);
bool checkIfDetachedPartitionExists(const String & partition_name);
2020-09-18 10:57:33 +00:00
/** Find replica having specified part or any part that covers it.
* If active = true, consider only active replicas.
* If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part.
* If not found, returns empty string.
*/
String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name);
2017-04-16 15:00:33 +00:00
/** Download the specified part from the specified replica.
* If `to_detached`, the part is placed in the `detached` directory.
* If quorum != 0, then the node for tracking the quorum is updated.
* Returns false if part is already fetching right now.
*/
bool fetchPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,
bool to_detached,
size_t quorum,
zkutil::ZooKeeper::Ptr zookeeper_ = nullptr);
/** Download the specified part from the specified replica.
* Used for replace local part on the same s3-shared part in hybrid storage.
* Returns false if part is already fetching right now.
*/
bool fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,
DiskPtr replaced_disk,
String replaced_part_path);
/// Required only to avoid races between executeLogEntry and fetchPartition
std::unordered_set<String> currently_fetching_parts;
std::mutex currently_fetching_parts_mutex;
/// With the quorum being tracked, add a replica to the quorum for the part.
void updateQuorum(const String & part_name, bool is_parallel);
2020-04-13 15:21:05 +00:00
/// Deletes info from quorum/last_part node for particular partition_id.
2020-04-20 10:56:59 +00:00
void cleanLastPartNode(const String & partition_id);
2020-04-10 21:29:54 +00:00
/// Part name is stored in quorum/last_part for corresponding partition_id.
bool partIsLastQuorumPart(const MergeTreePartInfo & part_info) const;
/// Part currently inserting with quorum (node quorum/parallel/part_name exists)
bool partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const;
2020-10-01 10:38:50 +00:00
/// Creates new block number if block with such block_id does not exist
/// If zookeeper_path_prefix specified then allocate block number on this path
/// (can be used if we want to allocate blocks on other replicas)
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const;
2017-04-16 15:00:33 +00:00
/** Wait until all replicas, including this, execute the specified action from the log.
2021-08-23 12:57:50 +00:00
* If replicas are added at the same time, it can not wait the added replica.
*
* Waits for inactive replicas no more than wait_for_inactive_timeout.
* Returns list of inactive replicas that have not executed entry or throws exception.
2019-08-20 01:46:48 +00:00
*
* NOTE: This method must be called without table lock held.
* Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
*/
2021-08-23 12:57:50 +00:00
void waitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
Int64 wait_for_inactive_timeout, const String & error_context = {});
Strings tryWaitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
Int64 wait_for_inactive_timeout);
2017-04-16 15:00:33 +00:00
/** Wait until the specified replica executes the specified action from the log.
2019-08-20 01:46:48 +00:00
* NOTE: See comment about locks above.
*/
2021-08-23 12:57:50 +00:00
bool tryWaitForReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name,
const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0);
/// Depending on settings, do nothing or wait for this replica or all replicas process log entry.
void waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context = {});
2017-04-16 15:00:33 +00:00
/// Throw an exception if the table is readonly.
void assertNotReadonly() const;
2016-01-21 16:30:05 +00:00
/// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
/// Returns false if the partition doesn't exist yet.
/// Caller must hold delimiting_block_lock until creation of drop/replace entry in log.
/// Otherwise some replica may assign merge which intersects part_info.
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info,
std::optional<EphemeralLockInZooKeeper> & delimiting_block_lock, bool for_replace_range = false);
2017-04-16 15:00:33 +00:00
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
mutable std::unordered_set<std::string> existing_nodes_cache;
mutable std::mutex existing_nodes_cache_mutex;
bool existsNodeCached(const std::string & path) const;
void getClearBlocksInPartitionOps(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
/// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range.
void clearBlocksInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
/// Info about how other replicas can access this one.
ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const;
2018-06-09 15:48:22 +00:00
bool dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, ContextPtr query_context, bool detach);
2021-05-17 14:26:36 +00:00
void dropPartNoWaitNoThrow(const String & part_name) override;
void dropPart(const String & part_name, bool detach, ContextPtr query_context) override;
// Partition helpers
2021-05-17 14:26:36 +00:00
void dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) override;
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, ContextPtr query_context) override;
CancellationCode killPartMoveToShard(const UUID & task_uuid) override;
void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from,
bool fetch_part,
ContextPtr query_context) override;
2021-06-30 12:29:09 +00:00
/// NOTE: there are no guarantees for concurrent merges. Dropping part can
/// be concurrently merged into some covering part and dropPart will do
/// nothing. There are some fundamental problems with it. But this is OK
/// because:
///
/// dropPart used in the following cases:
/// 1) Remove empty parts after TTL.
/// 2) Remove parts after move between shards.
/// 3) User queries: ALTER TABLE DROP PART 'part_name'.
///
/// In the first case merge of empty part is even better than DROP. In the
/// second case part UUIDs used to forbid merges for moving parts so there
/// is no problem with concurrent merges. The third case is quite rare and
/// we give very weak guarantee: there will be no active part with this
/// name, but possibly it was merged to some other part.
///
/// NOTE: don't rely on dropPart if you 100% need to remove non-empty part
/// and don't use any explicit locking mechanism for merges.
2021-05-27 23:10:44 +00:00
bool dropPartImpl(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);
/// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed
2021-06-28 17:02:22 +00:00
bool checkFixedGranularityInZookeeper();
2019-12-19 15:27:56 +00:00
/// Wait for timeout seconds mutation is finished on replicas
void waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const;
2020-11-28 08:17:20 +00:00
MutationCommands getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const override;
void startBackgroundMovesIfNeeded() override;
2021-07-05 12:44:58 +00:00
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const;
static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, DiskType disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old);
2022-04-19 12:01:30 +00:00
static void createZeroCopyLockNode(
const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node,
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed) override;
bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path);
2021-12-30 09:57:38 +00:00
/// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.
2021-12-27 16:27:06 +00:00
void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;
// Create table id if needed
void createTableSharedID();
2022-02-10 19:45:52 +00:00
bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk);
std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);
2022-01-17 11:52:51 +00:00
/// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
/// If somebody already holding the lock -- return std::nullopt.
2022-02-10 19:45:52 +00:00
std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;
protected:
2017-12-25 14:56:32 +00:00
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/
StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
bool attach,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
const String & relative_data_path_,
2020-06-08 18:23:26 +00:00
const StorageInMemoryMetadata & metadata_,
2021-05-31 14:49:02 +00:00
ContextMutablePtr context_,
const String & date_column_name,
2019-05-03 02:00:57 +00:00
const MergingParams & merging_params_,
2019-08-26 14:24:29 +00:00
std::unique_ptr<MergeTreeSettings> settings_,
2020-09-26 19:18:28 +00:00
bool has_force_restore_data_flag,
2022-04-13 14:51:59 +00:00
RenamingRestrictions renaming_restrictions_);
2016-01-21 16:30:05 +00:00
};
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);
2016-04-07 21:35:01 +00:00
2020-02-25 18:58:28 +00:00
/** There are three places for each part, where it should be
* 1. In the RAM, data_parts, all_data_parts.
* 2. In the filesystem (FS), the directory with the data of the table.
* 3. in ZooKeeper (ZK).
*
* When adding a part, it must be added immediately to these three places.
* This is done like this
* - [FS] first write the part into a temporary directory on the filesystem;
* - [FS] rename the temporary part to the result on the filesystem;
* - [RAM] immediately afterwards add it to the `data_parts`, and remove from `data_parts` any parts covered by this one;
* - [RAM] also set the `Transaction` object, which in case of an exception (in next point),
* rolls back the changes in `data_parts` (from the previous point) back;
* - [ZK] then send a transaction (multi) to add a part to ZooKeeper (and some more actions);
* - [FS, ZK] by the way, removing the covered (old) parts from filesystem, from ZooKeeper and from `all_data_parts`
* is delayed, after a few minutes.
*
* There is no atomicity here.
* It could be possible to achieve atomicity using undo/redo logs and a flag in `DataPart` when it is completely ready.
* But it would be inconvenient - I would have to write undo/redo logs for each `Part` in ZK, and this would increase already large number of interactions.
*
* Instead, we are forced to work in a situation where at any time
* (from another thread, or after server restart), there may be an unfinished transaction.
* (note - for this the part should be in RAM)
* From these cases the most frequent one is when the part is already in the data_parts, but it's not yet in ZooKeeper.
* This case must be distinguished from the case where such a situation is achieved due to some kind of damage to the state.
*
* Do this with the threshold for the time.
* If the part is young enough, its lack in ZooKeeper will be perceived optimistically - as if it just did not have time to be added there
* - as if the transaction has not yet been executed, but will soon be executed.
* And if the part is old, its absence in ZooKeeper will be perceived as an unfinished transaction that needs to be rolled back.
*
* PS. Perhaps it would be better to add a flag to the DataPart that a part is inserted into ZK.
* But here it's too easy to get confused with the consistency of this flag.
*/
/// NOLINTNEXTLINE
2020-02-25 18:58:28 +00:00
#define MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER (5 * 60)
2016-04-07 21:35:01 +00:00
2014-03-21 13:42:14 +00:00
}