ClickHouse/src/Storages/StorageReplicatedMergeTree.h

649 lines
30 KiB
C++
Raw Normal View History

2014-03-21 13:42:14 +00:00
#pragma once
2017-06-06 17:18:32 +00:00
#include <ext/shared_ptr_helper.h>
#include <atomic>
#include <pcg_random.hpp>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
2019-08-19 17:59:16 +00:00
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
2020-09-18 10:57:33 +00:00
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/LeaderElection.h>
#include <DataTypes/DataTypesNumber.h>
2018-12-28 17:11:52 +00:00
#include <Interpreters/Cluster.h>
#include <Interpreters/PartLog.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
2018-08-20 15:34:37 +00:00
#include <Core/BackgroundSchedulePool.h>
#include <Processors/Pipe.h>
2020-10-14 07:22:48 +00:00
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
2014-03-21 13:42:14 +00:00
namespace DB
{
2017-04-16 15:00:33 +00:00
/** The engine that uses the merge tree (see MergeTreeData) and replicated through ZooKeeper.
*
2017-04-16 15:00:33 +00:00
* ZooKeeper is used for the following things:
2020-02-14 13:17:50 +00:00
* - the structure of the table (/metadata, /columns)
2017-04-16 15:00:33 +00:00
* - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...);
* - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host);
* - select the leader replica (/leader_election) - this is the replica that assigns the merge;
* - a set of parts of data on each replica (/replicas/replica_name/parts);
* - list of the last N blocks of data with checksum, for deduplication (/blocks);
* - the list of incremental block numbers (/block_numbers) that we are about to insert,
* to ensure the linear order of data insertion and data merge only on the intervals in this sequence;
* - coordinates writes with quorum (/quorum).
* - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations).
* See comments in StorageReplicatedMergeTree::mutate() for details.
2014-03-21 19:17:59 +00:00
*/
2017-04-16 15:00:33 +00:00
/** The replicated tables have a common log (/log/log-...).
* Log - a sequence of entries (LogEntry) about what to do.
* Each entry is one of:
* - normal data insertion (GET),
* - merge (MERGE),
* - delete the partition (DROP).
*
* Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...)
2017-04-16 15:00:33 +00:00
* and then executes them (queueTask).
* Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry).
* In addition, the records in the queue can be generated independently (not from the log), in the following cases:
* - when creating a new replica, actions are put on GET from other replicas (createReplica);
* - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check (at start - checkParts, while running - searchForMissingPart),
* actions are put on GET from other replicas;
*
2017-04-16 15:00:33 +00:00
* The replica to which INSERT was made in the queue will also have an entry of the GET of this data.
* Such an entry is considered to be executed as soon as the queue handler sees it.
*
2017-04-16 15:00:33 +00:00
* The log entry has a creation time. This time is generated by the clock of server that created entry
* - the one on which the corresponding INSERT or ALTER query came.
*
2017-04-16 15:00:33 +00:00
* For the entries in the queue that the replica made for itself,
* as the time will take the time of creation the appropriate part on any of the replicas.
*/
class StorageReplicatedMergeTree final : public ext::shared_ptr_helper<StorageReplicatedMergeTree>, public MergeTreeData
2014-03-21 13:42:14 +00:00
{
2019-08-26 19:07:29 +00:00
friend struct ext::shared_ptr_helper<StorageReplicatedMergeTree>;
2014-03-21 13:42:14 +00:00
public:
void startup() override;
void shutdown() override;
~StorageReplicatedMergeTree() override;
2019-05-03 02:00:57 +00:00
std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree"; }
bool supportsParallelInsert() const override { return true; }
2017-04-25 15:21:03 +00:00
bool supportsReplication() const override { return true; }
bool supportsDeduplication() const override { return true; }
2020-08-03 13:54:14 +00:00
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
2017-06-02 15:54:39 +00:00
unsigned num_streams) override;
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
2017-06-02 15:54:39 +00:00
unsigned num_streams) override;
std::optional<UInt64> totalRows(const Context & context) const override;
2020-09-21 10:13:01 +00:00
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const override;
std::optional<UInt64> totalBytes(const Context & context) const override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
2020-06-27 20:13:16 +00:00
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Context & query_context) override;
2020-08-27 13:10:10 +00:00
void alter(const AlterCommands & commands, const Context & query_context, TableLockHolder & table_lock_holder) override;
void mutate(const MutationCommands & commands, const Context & context) override;
2020-03-09 01:22:33 +00:00
void waitMutation(const String & znode_name, size_t mutations_sync) const;
2019-05-03 02:00:57 +00:00
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override;
2017-04-16 15:00:33 +00:00
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
*/
2020-01-22 11:30:11 +00:00
void drop() override;
2020-06-18 16:10:47 +00:00
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
2018-04-21 00:35:20 +00:00
2020-09-26 19:18:28 +00:00
void checkTableCanBeRenamed() const override;
2020-04-07 14:05:51 +00:00
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
bool supportsIndexForIn() const override { return true; }
void checkTableCanBeDropped() const override;
ActionLock getActionLock(StorageActionBlockType action_type) override;
2020-10-15 16:10:22 +00:00
void onActionLockRemove(StorageActionBlockType action_type) override;
/// Wait when replication queue size becomes less or equal than queue_size
/// If timeout is exceeded returns false
bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0);
2017-04-16 15:00:33 +00:00
/** For the system table replicas. */
struct Status
{
bool is_leader;
bool can_become_leader;
bool is_readonly;
bool is_session_expired;
ReplicatedMergeTreeQueue::Status queue;
UInt32 parts_to_check;
String zookeeper_path;
String replica_name;
String replica_path;
Int32 columns_version;
UInt64 log_max_index;
UInt64 log_pointer;
UInt64 absolute_delay;
UInt8 total_replicas;
UInt8 active_replicas;
2020-02-15 00:11:09 +00:00
/// If the error has happened fetching the info from ZooKeeper, this field will be set.
String zookeeper_exception;
};
2017-04-16 15:00:33 +00:00
/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
void getStatus(Status & res, bool with_zk_fields = true);
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
void getQueue(LogEntriesData & res, String & replica_name);
/// Get replica delay relative to current time.
time_t getAbsoluteDelay() const;
2020-06-19 14:18:58 +00:00
/// If the absolute delay is greater than min_relative_delay_to_measure,
/// will also calculate the difference from the unprocessed time of the best replica.
/// NOTE: Will communicate to ZooKeeper to calculate relative delay.
void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay);
2017-04-16 15:00:33 +00:00
/// Add a part to the queue of parts whose data you want to check in the background thread.
void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0)
{
part_check_thread.enqueuePart(part_name, delay_to_check_seconds);
}
2019-07-03 13:17:19 +00:00
CheckResults checkData(const ASTPtr & query, const Context & context) override;
/// Checks ability to use granularity
bool canUseAdaptiveGranularity() const override;
2020-02-17 12:47:34 +00:00
int getMetadataVersion() const { return metadata_version; }
2020-02-05 11:18:11 +00:00
/** Remove a specific replica from zookeeper.
*/
2020-06-23 12:01:51 +00:00
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger);
2020-11-09 09:14:20 +00:00
/// Get job to execute in background pool (merge, mutate, drop range and so on)
2020-10-16 10:12:31 +00:00
std::optional<JobAndPool> getDataProcessingJob() override;
2020-11-09 09:14:20 +00:00
/// Checks that fetches are not disabled with action blocker and pool for fetches
/// is not overloaded
2020-10-26 11:02:47 +00:00
bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;
2014-03-21 13:42:14 +00:00
private:
/// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
/// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK();
friend class ReplicatedMergeTreeBlockOutputStream;
friend class ReplicatedMergeTreePartCheckThread;
friend class ReplicatedMergeTreeCleanupThread;
friend class ReplicatedMergeTreeAlterThread;
friend class ReplicatedMergeTreeRestartingThread;
2020-09-18 10:57:33 +00:00
friend class ReplicatedMergeTreeMergeStrategyPicker;
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReplicatedMergeTreeQueue;
friend class MergeTreeData;
2016-01-28 01:00:27 +00:00
2020-09-18 10:57:33 +00:00
using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
2014-08-05 13:49:44 +00:00
2017-04-16 15:00:33 +00:00
zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below.
mutable std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread.
2014-12-12 20:50:32 +00:00
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeper() const;
void setZooKeeper(zkutil::ZooKeeperPtr zookeeper);
2014-03-22 14:44:44 +00:00
2017-04-16 15:00:33 +00:00
/// If true, the table is offline and can not be written to it.
2017-10-31 19:19:36 +00:00
std::atomic_bool is_readonly {false};
/// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case.
bool has_metadata_in_zookeeper = true;
String zookeeper_path;
String replica_name;
String replica_path;
/** /replicas/me/is_active.
*/
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
/** Is this replica "leading". The leader replica selects the parts to merge.
2020-06-15 02:14:59 +00:00
* It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
*/
std::atomic<bool> is_leader {false};
zkutil::LeaderElectionPtr leader_election;
2020-01-14 14:27:48 +00:00
InterserverIOEndpointPtr data_parts_exchange_endpoint;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
2020-09-18 10:57:33 +00:00
MergeStrategyPicker merge_strategy_picker;
/** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
* In ZK entries in chronological order. Here it is not necessary.
*/
ReplicatedMergeTreeQueue queue;
std::atomic<time_t> last_queue_update_start_time{0};
std::atomic<time_t> last_queue_update_finish_time{0};
DataPartsExchange::Fetcher fetcher;
/// When activated, replica is initialized and startup() method could exit
Poco::Event startup_event;
2017-04-16 15:00:33 +00:00
/// Do I need to complete background threads (except restarting_thread)?
std::atomic<bool> partial_shutdown_called {false};
/// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires.
Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET
/// Limiting parallel fetches per one table
std::atomic_uint current_table_fetches {0};
2020-02-05 11:18:11 +00:00
int metadata_version = 0;
/// Threads.
2020-10-14 07:22:48 +00:00
BackgroundJobsExecutor background_executor;
2020-10-14 14:56:42 +00:00
BackgroundMovesExecutor background_moves_executor;
2020-10-14 07:22:48 +00:00
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
bool queue_update_in_progress = false;
BackgroundSchedulePool::TaskHolder queue_updating_task;
BackgroundSchedulePool::TaskHolder mutations_updating_task;
/// A task that selects parts to merge.
BackgroundSchedulePool::TaskHolder merge_selecting_task;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
std::mutex merge_selecting_mutex;
/// A task that marks finished mutations as done.
BackgroundSchedulePool::TaskHolder mutations_finalizing_task;
2017-04-16 15:00:33 +00:00
/// A thread that removes old parts, log entries, and blocks.
ReplicatedMergeTreeCleanupThread cleanup_thread;
2017-04-16 15:00:33 +00:00
/// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
ReplicatedMergeTreePartCheckThread part_check_thread;
/// A thread that processes reconnection to ZooKeeper when the session expires.
ReplicatedMergeTreeRestartingThread restarting_thread;
/// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false;
2020-09-26 19:18:28 +00:00
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro
const bool allow_renaming;
2020-10-26 11:02:47 +00:00
const size_t replicated_fetches_pool_size;
template <class Func>
void foreachCommittedParts(const Func & func, const Context & context) const;
/** Creates the minimum set of nodes in ZooKeeper and create first replica.
* Returns true if was created, false if exists.
*/
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
2017-04-16 15:00:33 +00:00
/** Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas.
*/
void createReplica(const StorageMetadataPtr & metadata_snapshot);
2017-04-16 15:00:33 +00:00
/** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running.
*/
void createNewZooKeeperNodes();
void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);
/// A part of ALTER: apply metadata changes only (data parts are altered separately).
2020-06-18 16:10:47 +00:00
/// Must be called under IStorage::lockForAlter() lock.
void setTableStructure(ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff);
2017-04-16 15:00:33 +00:00
/** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/).
* If any parts described in ZK are not locally, throw an exception.
* If any local parts are not mentioned in ZK, remove them.
* But if there are too many, throw an exception just in case - it's probably a configuration error.
*/
void checkParts(bool skip_sanity_checks);
2017-04-16 15:00:33 +00:00
/** Check that the part's checksum is the same as the checksum of the same part on some other replica.
* If no one has such a part, nothing checks.
* Not very reliable: if two replicas add a part almost at the same time, no checks will occur.
* Adds actions to `ops` that add data about the part into ZooKeeper.
2020-06-22 09:49:21 +00:00
* Call under lockForShare.
*/
2019-05-03 02:00:57 +00:00
void checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const DataPartPtr & part,
Coordination::Requests & ops, String part_name = "", NameSet * absent_replicas_paths = nullptr);
String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const;
/// Accepts a PreComitted part, atomically checks its checksums with ones on other replicas and commit the part
2019-05-03 02:00:57 +00:00
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction,
const DataPartPtr & part);
2019-09-05 13:12:29 +00:00
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
2019-08-16 15:57:19 +00:00
2019-09-05 13:12:29 +00:00
void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "") const;
2017-04-16 15:00:33 +00:00
/// Adds actions to `ops` that remove a part from ZooKeeper.
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
void removePartFromZooKeeper(const String & part_name, Coordination::Requests & ops, bool has_children);
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
2018-04-06 19:48:54 +00:00
NameSet * parts_should_be_retried = nullptr);
bool tryRemovePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
2019-05-03 02:00:57 +00:00
bool tryRemovePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries = 5);
2017-04-16 15:00:33 +00:00
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void removePartAndEnqueueFetch(const String & part_name);
2017-04-16 15:00:33 +00:00
/// Running jobs from the queue.
2017-04-16 15:00:33 +00:00
/** Execute the action from the queue. Throws an exception if something is wrong.
* Returns whether or not it succeeds. If it did not work, write it to the end of the queue.
*/
bool executeLogEntry(LogEntry & entry);
void executeDropRange(const LogEntry & entry);
/// Do the merge or recommend to make the fetch instead of the merge
bool tryExecuteMerge(const LogEntry & entry);
2020-08-08 00:47:03 +00:00
/// Execute alter of table metadata. Set replica/metadata and replica/columns
2020-02-17 16:33:05 +00:00
/// nodes in zookeeper and also changes in memory metadata.
/// New metadata and columns values stored in entry.
2020-01-17 13:54:22 +00:00
bool executeMetadataAlter(const LogEntry & entry);
2020-01-13 16:39:20 +00:00
2020-02-17 16:33:05 +00:00
/// Execute MUTATE_PART entry. Part name and mutation commands
/// stored in entry. This function relies on MergerMutator class.
bool tryExecutePartMutation(const LogEntry & entry);
2019-08-19 14:40:12 +00:00
2020-02-17 16:33:05 +00:00
/// Fetch part from other replica (inserted or merged/mutated)
/// NOTE: Attention! First of all tries to find covering part on other replica
/// and set it into entry.actual_new_part_name. After that tries to fetch this new covering part.
/// If fetch was not successful, clears entry.actual_new_part_name.
bool executeFetch(LogEntry & entry);
bool executeReplaceRange(const LogEntry & entry);
2017-04-16 15:00:33 +00:00
/** Updates the queue.
*/
void queueUpdatingTask();
void mutationsUpdatingTask();
/** Clone data from another replica.
2018-08-27 23:59:49 +00:00
* If replica can not be cloned throw Exception.
*/
2018-08-27 13:51:22 +00:00
void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);
/// Clone replica if it is lost.
void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);
2020-10-23 08:54:00 +00:00
ReplicatedMergeTreeQueue::SelectedEntryPtr selectQueueEntry();
2019-08-19 17:59:16 +00:00
2020-10-23 08:54:00 +00:00
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);
2019-08-19 17:59:16 +00:00
/// Postcondition:
/// either leader_election is fully initialized (node in ZK is created and the watching thread is launched)
/// or an exception is thrown and leader_election is destroyed.
void enterLeaderElection();
/// Postcondition:
/// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr.
/// leader_election node in ZK is either deleted, or the session is marked expired.
void exitLeaderElection();
2017-04-16 15:00:33 +00:00
/** Selects the parts to merge and writes to the log.
*/
void mergeSelectingTask();
/// Checks if some mutations are done and marks them as done.
void mutationsFinalizingTask();
2017-04-16 15:00:33 +00:00
/** Write the selected parts to merge into the log,
* Call when merge_selecting_mutex is locked.
* Returns false if any part is not in ZK.
*/
enum class CreateMergeEntryResult { Ok, MissingPart, LogUpdated, Other };
CreateMergeEntryResult createLogEntryToMergeParts(
zkutil::ZooKeeperPtr & zookeeper,
2019-05-03 02:00:57 +00:00
const DataPartsVector & parts,
const String & merged_name,
const UUID & merged_part_uuid,
const MergeTreeDataPartType & merged_part_type,
2017-04-17 15:14:56 +00:00
bool deduplicate,
ReplicatedMergeTreeLogEntryData * out_log_entry,
2020-09-03 13:00:13 +00:00
int32_t log_version,
MergeType merge_type);
CreateMergeEntryResult createLogEntryToMutatePart(
const IMergeTreeDataPart & part,
const UUID & new_part_uuid,
Int64 mutation_version,
int32_t alter_version,
int32_t log_version);
2017-04-16 15:00:33 +00:00
/// Exchange parts.
2017-04-16 15:00:33 +00:00
/** Returns an empty string if no one has a part.
*/
String findReplicaHavingPart(const String & part_name, bool active);
2020-09-18 10:57:33 +00:00
bool checkReplicaHavePart(const String & replica, const String & part_name);
/** Find replica having specified part or any part that covers it.
* If active = true, consider only active replicas.
* If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part.
* If not found, returns empty string.
*/
String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name);
2017-04-16 15:00:33 +00:00
/** Download the specified part from the specified replica.
* If `to_detached`, the part is placed in the `detached` directory.
* If quorum != 0, then the node for tracking the quorum is updated.
* Returns false if part is already fetching right now.
*/
bool fetchPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,
bool to_detached,
size_t quorum,
zkutil::ZooKeeper::Ptr zookeeper_ = nullptr);
/// Required only to avoid races between executeLogEntry and fetchPartition
std::unordered_set<String> currently_fetching_parts;
std::mutex currently_fetching_parts_mutex;
2019-09-02 11:35:53 +00:00
/// With the quorum being tracked, add a replica to the quorum for the part.
void updateQuorum(const String & part_name, bool is_parallel);
2020-04-13 15:21:05 +00:00
/// Deletes info from quorum/last_part node for particular partition_id.
2020-04-20 10:56:59 +00:00
void cleanLastPartNode(const String & partition_id);
2020-04-10 21:29:54 +00:00
/// Part name is stored in quorum/last_part for corresponding partition_id.
bool partIsLastQuorumPart(const MergeTreePartInfo & part_info) const;
/// Part currently inserting with quorum (node quorum/parallel/part_name exists)
bool partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const;
2020-10-01 10:38:50 +00:00
/// Creates new block number if block with such block_id does not exist
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "") const;
2017-04-16 15:00:33 +00:00
/** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica .
2019-08-20 01:46:48 +00:00
*
* NOTE: This method must be called without table lock held.
* Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
* TODO: There are wrong usages of this method that are not fixed yet.
*/
2020-01-17 13:54:22 +00:00
Strings waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
2017-04-16 15:00:33 +00:00
/** Wait until the specified replica executes the specified action from the log.
2019-08-20 01:46:48 +00:00
* NOTE: See comment about locks above.
*/
bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
2017-04-16 15:00:33 +00:00
/// Throw an exception if the table is readonly.
void assertNotReadonly() const;
2016-01-21 16:30:05 +00:00
/// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
/// Returns false if the partition doesn't exist yet.
2020-10-05 13:52:03 +00:00
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition = false);
2017-04-16 15:00:33 +00:00
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
mutable std::unordered_set<std::string> existing_nodes_cache;
mutable std::mutex existing_nodes_cache_mutex;
bool existsNodeCached(const std::string & path) const;
void getClearBlocksInPartitionOps(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
/// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range.
void clearBlocksInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
/// Info about how other replicas can access this one.
ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const;
2018-06-09 15:48:22 +00:00
2020-11-12 17:36:02 +00:00
bool dropPart(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);
bool dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, bool detach);
// Partition helpers
2020-11-12 17:36:02 +00:00
void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & query_context, bool throw_if_noop) override;
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & query_context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & query_context) override;
void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, const Context & query_context) override;
/// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed
bool checkFixedGranualrityInZookeeper();
2019-12-19 15:27:56 +00:00
/// Wait for timeout seconds mutation is finished on replicas
void waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const;
2020-03-24 17:05:38 +00:00
MutationCommands getFirtsAlterMutationCommandsForPart(const DataPartPtr & part) const override;
void startBackgroundMovesIfNeeded() override;
std::set<String> getPartitionIdsAffectedByCommands(const MutationCommands & commands, const Context & query_context) const;
PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
const MutationCommands & commands, const Context & query_context, const zkutil::ZooKeeperPtr & zookeeper) const;
protected:
2017-12-25 14:56:32 +00:00
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/
StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
bool attach,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
const String & relative_data_path_,
2020-06-08 18:23:26 +00:00
const StorageInMemoryMetadata & metadata_,
Context & context_,
const String & date_column_name,
2019-05-03 02:00:57 +00:00
const MergingParams & merging_params_,
2019-08-26 14:24:29 +00:00
std::unique_ptr<MergeTreeSettings> settings_,
2020-09-26 19:18:28 +00:00
bool has_force_restore_data_flag,
bool allow_renaming_);
2016-01-21 16:30:05 +00:00
};
2016-04-07 21:35:01 +00:00
2020-02-25 18:58:28 +00:00
/** There are three places for each part, where it should be
* 1. In the RAM, data_parts, all_data_parts.
* 2. In the filesystem (FS), the directory with the data of the table.
* 3. in ZooKeeper (ZK).
*
* When adding a part, it must be added immediately to these three places.
* This is done like this
* - [FS] first write the part into a temporary directory on the filesystem;
* - [FS] rename the temporary part to the result on the filesystem;
* - [RAM] immediately afterwards add it to the `data_parts`, and remove from `data_parts` any parts covered by this one;
* - [RAM] also set the `Transaction` object, which in case of an exception (in next point),
* rolls back the changes in `data_parts` (from the previous point) back;
* - [ZK] then send a transaction (multi) to add a part to ZooKeeper (and some more actions);
* - [FS, ZK] by the way, removing the covered (old) parts from filesystem, from ZooKeeper and from `all_data_parts`
* is delayed, after a few minutes.
*
* There is no atomicity here.
* It could be possible to achieve atomicity using undo/redo logs and a flag in `DataPart` when it is completely ready.
* But it would be inconvenient - I would have to write undo/redo logs for each `Part` in ZK, and this would increase already large number of interactions.
*
* Instead, we are forced to work in a situation where at any time
* (from another thread, or after server restart), there may be an unfinished transaction.
* (note - for this the part should be in RAM)
* From these cases the most frequent one is when the part is already in the data_parts, but it's not yet in ZooKeeper.
* This case must be distinguished from the case where such a situation is achieved due to some kind of damage to the state.
*
* Do this with the threshold for the time.
* If the part is young enough, its lack in ZooKeeper will be perceived optimistically - as if it just did not have time to be added there
* - as if the transaction has not yet been executed, but will soon be executed.
* And if the part is old, its absence in ZooKeeper will be perceived as an unfinished transaction that needs to be rolled back.
*
* PS. Perhaps it would be better to add a flag to the DataPart that a part is inserted into ZK.
* But here it's too easy to get confused with the consistency of this flag.
*/
#define MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER (5 * 60)
2016-04-07 21:35:01 +00:00
2014-03-21 13:42:14 +00:00
}