mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-05 14:02:21 +00:00
eae0f9957c
Description: ============ Currently if in the Whole CK instance has one table is read only status then the API /replicas_status will throw error, xxx table is read only For make this monitor can work in prod env, we can catch the read only status instead of directly throw error Solution: ========= Return other normal table's delay value even if the CK instance has readonly Replicatedxxx table Please enter the commit message for your changes. Lines starting
911 lines
44 KiB
C++
911 lines
44 KiB
C++
#pragma once
|
|
|
|
#include <base/UUID.h>
|
|
#include <atomic>
|
|
#include <pcg_random.hpp>
|
|
#include <Storages/IStorage.h>
|
|
#include <Storages/IStorageCluster.h>
|
|
#include <Storages/MergeTree/DataPartsExchange.h>
|
|
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
|
|
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
|
|
#include <Storages/MergeTree/MergeFromLogEntryTask.h>
|
|
#include <Storages/MergeTree/MergeTreeData.h>
|
|
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
|
|
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
|
#include <Storages/MergeTree/MergeTreeDataWriter.h>
|
|
#include <Storages/MergeTree/MergeTreePartsMover.h>
|
|
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeAttachThread.h>
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
|
|
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
|
|
#include <Storages/MergeTree/ReplicatedTableStatus.h>
|
|
#include <Storages/RenamingRestrictions.h>
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
#include <Interpreters/Cluster.h>
|
|
#include <Interpreters/PartLog.h>
|
|
#include <Common/randomSeed.h>
|
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
|
#include <Common/Throttler.h>
|
|
#include <Common/EventNotifier.h>
|
|
#include <base/defines.h>
|
|
#include <Core/BackgroundSchedulePool.h>
|
|
#include <QueryPipeline/Pipe.h>
|
|
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
|
|
|
|
|
namespace DB
|
|
{
|
|
|
|
/** The engine that uses the merge tree (see MergeTreeData) and is replicated through ZooKeeper.
|
|
*
|
|
* ZooKeeper is used for the following things:
|
|
* - the structure of the table (/metadata, /columns)
|
|
* - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...);
|
|
* - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host);
|
|
* - the leader replica election (/leader_election) - these are the replicas that assign merges, mutations
|
|
* and partition manipulations.
|
|
* (after ClickHouse version 20.5 we allow multiple leaders to act concurrently);
|
|
* - a set of parts of data on each replica (/replicas/replica_name/parts);
|
|
* - list of the last N blocks of data with checksum, for deduplication (/blocks);
|
|
* - the list of incremental block numbers (/block_numbers) that we are about to insert,
|
|
* to ensure the linear order of data insertion and data merge only on the intervals in this sequence;
|
|
* - coordinate writes with quorum (/quorum).
|
|
* - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations).
|
|
* See comments in StorageReplicatedMergeTree::mutate() for details.
|
|
*/
|
|
|
|
/** The replicated tables have a common log (/log/log-...).
|
|
* Log - a sequence of entries (LogEntry) about what to do.
|
|
* Each entry is one of:
|
|
* - normal data insertion (GET),
|
|
* - data insertion with a possible attach from local data (ATTACH),
|
|
* - merge (MERGE),
|
|
* - delete the partition (DROP).
|
|
*
|
|
* Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...)
|
|
* and then executes them (queueTask).
|
|
* Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry).
|
|
* In addition, the records in the queue can be generated independently (not from the log), in the following cases:
|
|
* - when creating a new replica, actions are put on GET from other replicas (createReplica);
|
|
* - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check
|
|
* (at start - checkParts, while running - searchForMissingPart), actions are put on GET from other replicas;
|
|
*
|
|
* The replica to which INSERT was made in the queue will also have an entry of the GET of this data.
|
|
* Such an entry is considered to be executed as soon as the queue handler sees it.
|
|
*
|
|
* The log entry has a creation time. This time is generated by the clock of server that created entry
|
|
* - the one on which the corresponding INSERT or ALTER query came.
|
|
*
|
|
* For the entries in the queue that the replica made for itself,
|
|
* as the time will take the time of creation the appropriate part on any of the replicas.
|
|
*/
|
|
|
|
class ZooKeeperWithFaultInjection;
|
|
using ZooKeeperWithFaultInjectionPtr = std::shared_ptr<ZooKeeperWithFaultInjection>;
|
|
|
|
class StorageReplicatedMergeTree final : public MergeTreeData
|
|
{
|
|
public:
|
|
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
|
|
*/
|
|
StorageReplicatedMergeTree(
|
|
const String & zookeeper_path_,
|
|
const String & replica_name_,
|
|
bool attach,
|
|
const StorageID & table_id_,
|
|
const String & relative_data_path_,
|
|
const StorageInMemoryMetadata & metadata_,
|
|
ContextMutablePtr context_,
|
|
const String & date_column_name,
|
|
const MergingParams & merging_params_,
|
|
std::unique_ptr<MergeTreeSettings> settings_,
|
|
bool has_force_restore_data_flag,
|
|
RenamingRestrictions renaming_restrictions_);
|
|
|
|
void startup() override;
|
|
void shutdown() override;
|
|
void partialShutdown();
|
|
void flush() override;
|
|
~StorageReplicatedMergeTree() override;
|
|
|
|
static String getDefaultZooKeeperPath(const Poco::Util::AbstractConfiguration & config);
|
|
static String getDefaultReplicaName(const Poco::Util::AbstractConfiguration & config);
|
|
|
|
std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree"; }
|
|
|
|
bool supportsParallelInsert() const override { return true; }
|
|
bool supportsReplication() const override { return true; }
|
|
bool supportsDeduplication() const override { return true; }
|
|
|
|
void read(
|
|
QueryPlan & query_plan,
|
|
const Names & column_names,
|
|
const StorageSnapshotPtr & storage_snapshot,
|
|
SelectQueryInfo & query_info,
|
|
ContextPtr context,
|
|
QueryProcessingStage::Enum processed_stage,
|
|
size_t max_block_size,
|
|
size_t num_streams) override;
|
|
|
|
std::optional<UInt64> totalRows(const Settings & settings) const override;
|
|
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context) const override;
|
|
std::optional<UInt64> totalBytes(const Settings & settings) const override;
|
|
|
|
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
|
|
|
|
std::optional<QueryPipeline> distributedWrite(const ASTInsertQuery & /*query*/, ContextPtr /*context*/) override;
|
|
|
|
bool optimize(
|
|
const ASTPtr & query,
|
|
const StorageMetadataPtr & metadata_snapshot,
|
|
const ASTPtr & partition,
|
|
bool final,
|
|
bool deduplicate,
|
|
const Names & deduplicate_by_columns,
|
|
ContextPtr query_context) override;
|
|
|
|
void alter(const AlterCommands & commands, ContextPtr query_context, AlterLockHolder & table_lock_holder) override;
|
|
|
|
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
|
|
void waitMutation(const String & znode_name, size_t mutations_sync) const;
|
|
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
|
|
CancellationCode killMutation(const String & mutation_id) override;
|
|
|
|
bool hasLightweightDeletedMask() const override;
|
|
|
|
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
|
|
*/
|
|
void drop() override;
|
|
|
|
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr query_context, TableExclusiveLockHolder &) override;
|
|
|
|
void checkTableCanBeRenamed(const StorageID & new_name) const override;
|
|
|
|
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
|
|
|
|
bool supportsIndexForIn() const override { return true; }
|
|
|
|
void checkTableCanBeDropped() const override;
|
|
|
|
ActionLock getActionLock(StorageActionBlockType action_type) override;
|
|
|
|
void onActionLockRemove(StorageActionBlockType action_type) override;
|
|
|
|
/// Wait when replication queue size becomes less or equal than queue_size
|
|
/// If timeout is exceeded returns false
|
|
bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0);
|
|
|
|
/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
|
|
void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true);
|
|
|
|
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
|
|
void getQueue(LogEntriesData & res, String & replica_name);
|
|
|
|
std::vector<PartMovesBetweenShardsOrchestrator::Entry> getPartMovesBetweenShardsEntries();
|
|
|
|
/// Get replica delay relative to current time.
|
|
time_t getAbsoluteDelay() const;
|
|
|
|
/// If the absolute delay is greater than min_relative_delay_to_measure,
|
|
/// will also calculate the difference from the unprocessed time of the best replica.
|
|
/// NOTE: Will communicate to ZooKeeper to calculate relative delay.
|
|
void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay);
|
|
|
|
/// Add a part to the queue of parts whose data you want to check in the background thread.
|
|
void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0);
|
|
|
|
CheckResults checkData(const ASTPtr & query, ContextPtr context) override;
|
|
|
|
/// Checks ability to use granularity
|
|
bool canUseAdaptiveGranularity() const override;
|
|
|
|
/// Returns the default path to the table in ZooKeeper.
|
|
/// It's used if not set in engine's arguments while creating a replicated table.
|
|
static String getDefaultReplicaPath(const ContextPtr & context_);
|
|
|
|
/// Returns the default replica name in ZooKeeper.
|
|
/// It's used if not set in engine's arguments while creating a replicated table.
|
|
static String getDefaultReplicaName(const ContextPtr & context_);
|
|
|
|
int getMetadataVersion() const { return metadata_version; }
|
|
|
|
/// Modify a CREATE TABLE query to make a variant which must be written to a backup.
|
|
void adjustCreateQueryForBackup(ASTPtr & create_query) const override;
|
|
|
|
/// Makes backup entries to backup the data of the storage.
|
|
void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;
|
|
|
|
/// Extract data from the backup and put it to the storage.
|
|
void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;
|
|
|
|
/** Remove a specific replica from zookeeper.
|
|
*/
|
|
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
|
|
Poco::Logger * logger, MergeTreeSettingsPtr table_settings = nullptr);
|
|
|
|
/// Removes table from ZooKeeper after the last replica was dropped
|
|
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
|
|
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
|
|
|
|
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
|
|
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
|
|
|
|
/// Checks that fetches are not disabled with action blocker and pool for fetches
|
|
/// is not overloaded
|
|
bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;
|
|
|
|
/// Fetch part only when it stored on shared storage like S3
|
|
MutableDataPartStoragePtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
|
|
|
|
/// Lock part in zookeeper for use shared data in several nodes
|
|
void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional<HardlinkedFiles> hardlinked_files) const override;
|
|
void lockSharedData(
|
|
const IMergeTreeDataPart & part,
|
|
const ZooKeeperWithFaultInjectionPtr & zookeeper,
|
|
bool replace_existing_lock,
|
|
std::optional<HardlinkedFiles> hardlinked_files) const;
|
|
|
|
void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
|
|
|
|
/// Unlock shared data part in zookeeper
|
|
/// Return true if data unlocked
|
|
/// Return false if data is still used by another node
|
|
std::pair<bool, NameSet> unlockSharedData(const IMergeTreeDataPart & part) const override;
|
|
std::pair<bool, NameSet>
|
|
unlockSharedData(const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper) const;
|
|
|
|
/// Unlock shared data part in zookeeper by part id
|
|
/// Return true if data unlocked
|
|
/// Return false if data is still used by another node
|
|
static std::pair<bool, NameSet> unlockSharedDataByID(
|
|
String part_id,
|
|
const String & table_uuid,
|
|
const String & part_name,
|
|
const String & replica_name_,
|
|
const std::string & disk_type,
|
|
const ZooKeeperWithFaultInjectionPtr & zookeeper_,
|
|
const MergeTreeSettings & settings,
|
|
Poco::Logger * logger,
|
|
const String & zookeeper_path_old,
|
|
MergeTreeDataFormatVersion data_format_version);
|
|
|
|
/// Fetch part only if some replica has it on shared storage like S3
|
|
MutableDataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
|
|
|
|
/// Get best replica having this partition on a same type remote disk
|
|
String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const;
|
|
|
|
inline const String & getReplicaName() const { return replica_name; }
|
|
|
|
/// Restores table metadata if ZooKeeper lost it.
|
|
/// Used only on restarted readonly replicas (not checked). All active (Active) parts are moved to detached/
|
|
/// folder and attached. Parts in all other states are just moved to detached/ folder.
|
|
void restoreMetadataInZooKeeper();
|
|
|
|
/// Get throttler for replicated fetches
|
|
ThrottlerPtr getFetchesThrottler() const
|
|
{
|
|
return replicated_fetches_throttler;
|
|
}
|
|
|
|
/// Get throttler for replicated sends
|
|
ThrottlerPtr getSendsThrottler() const
|
|
{
|
|
return replicated_sends_throttler;
|
|
}
|
|
|
|
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
|
|
|
|
// Return default or custom zookeeper name for table
|
|
const String & getZooKeeperName() const { return zookeeper_name; }
|
|
|
|
const String & getZooKeeperPath() const { return zookeeper_path; }
|
|
|
|
// Return table id, common for different replicas
|
|
String getTableSharedID() const override;
|
|
|
|
/// Returns the same as getTableSharedID(), but extracts it from a create query.
|
|
static std::optional<String> tryGetTableSharedIDFromCreateQuery(const IAST & create_query, const ContextPtr & global_context);
|
|
|
|
static const String & getDefaultZooKeeperName() { return default_zookeeper_name; }
|
|
|
|
/// Check if there are new broken disks and enqueue part recovery tasks.
|
|
void checkBrokenDisks();
|
|
|
|
static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
|
|
const String & replica_name, const String & zookeeper_path, const ContextPtr & local_context, const zkutil::ZooKeeperPtr & zookeeper);
|
|
|
|
bool canUseZeroCopyReplication() const;
|
|
|
|
bool isTableReadOnly () { return is_readonly; }
|
|
private:
|
|
std::atomic_bool are_restoring_replica {false};
|
|
|
|
/// Get a sequential consistent view of current parts.
|
|
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
|
|
|
|
/// Delete old parts from disk and from ZooKeeper.
|
|
void clearOldPartsAndRemoveFromZK();
|
|
|
|
template<bool async_insert>
|
|
friend class ReplicatedMergeTreeSinkImpl;
|
|
friend class ReplicatedMergeTreePartCheckThread;
|
|
friend class ReplicatedMergeTreeCleanupThread;
|
|
friend class ReplicatedMergeTreeAlterThread;
|
|
friend class ReplicatedMergeTreeRestartingThread;
|
|
friend class ReplicatedMergeTreeAttachThread;
|
|
friend class ReplicatedMergeTreeMergeStrategyPicker;
|
|
friend struct ReplicatedMergeTreeLogEntry;
|
|
friend class ScopedPartitionMergeLock;
|
|
friend class ReplicatedMergeTreeQueue;
|
|
friend class PartMovesBetweenShardsOrchestrator;
|
|
friend class MergeTreeData;
|
|
friend class MergeFromLogEntryTask;
|
|
friend class MutateFromLogEntryTask;
|
|
friend class ReplicatedMergeMutateTaskBase;
|
|
|
|
using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
|
|
using LogEntry = ReplicatedMergeTreeLogEntry;
|
|
using LogEntryPtr = LogEntry::Ptr;
|
|
|
|
using MergeTreeData::MutableDataPartPtr;
|
|
|
|
zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below.
|
|
mutable std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread.
|
|
|
|
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
|
|
zkutil::ZooKeeperPtr getZooKeeper() const;
|
|
/// Get connection from global context and reconnect if needed.
|
|
/// NOTE: use it only when table is shut down, in all other cases
|
|
/// use getZooKeeper() because it is managed by restarting thread
|
|
/// which guarantees that we have only one connected object
|
|
/// for table.
|
|
zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const;
|
|
zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const;
|
|
void setZooKeeper();
|
|
|
|
/// If true, the table is offline and can not be written to it.
|
|
/// This flag is managed by RestartingThread.
|
|
std::atomic_bool is_readonly {true};
|
|
/// If nullopt - ZooKeeper is not available, so we don't know if there is table metadata.
|
|
/// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case.
|
|
std::optional<bool> has_metadata_in_zookeeper;
|
|
|
|
static const String default_zookeeper_name;
|
|
const String zookeeper_name;
|
|
const String zookeeper_path;
|
|
const String replica_name;
|
|
const String replica_path;
|
|
|
|
/** /replicas/me/is_active.
|
|
*/
|
|
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
|
|
|
|
/** Is this replica "leading". The leader replica selects the parts to merge.
|
|
* It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
|
|
*/
|
|
std::atomic<bool> is_leader {false};
|
|
|
|
InterserverIOEndpointPtr data_parts_exchange_endpoint;
|
|
|
|
MergeTreeDataSelectExecutor reader;
|
|
MergeTreeDataWriter writer;
|
|
MergeTreeDataMergerMutator merger_mutator;
|
|
|
|
MergeStrategyPicker merge_strategy_picker;
|
|
|
|
/** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
|
|
* In ZK entries in chronological order. Here it is not necessary.
|
|
*/
|
|
ReplicatedMergeTreeQueue queue;
|
|
std::atomic<time_t> last_queue_update_start_time{0};
|
|
std::atomic<time_t> last_queue_update_finish_time{0};
|
|
|
|
mutable std::mutex last_queue_update_exception_lock;
|
|
String last_queue_update_exception;
|
|
String getLastQueueUpdateException() const;
|
|
|
|
DataPartsExchange::Fetcher fetcher;
|
|
|
|
/// When activated, replica is initialized and startup() method could exit
|
|
Poco::Event startup_event;
|
|
|
|
/// Do I need to complete background threads (except restarting_thread)?
|
|
std::atomic<bool> partial_shutdown_called {false};
|
|
|
|
/// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires.
|
|
Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET
|
|
|
|
std::atomic<bool> shutdown_called {false};
|
|
std::atomic<bool> flush_called {false};
|
|
|
|
int metadata_version = 0;
|
|
/// Threads.
|
|
|
|
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
|
|
bool queue_update_in_progress = false;
|
|
BackgroundSchedulePool::TaskHolder queue_updating_task;
|
|
|
|
BackgroundSchedulePool::TaskHolder mutations_updating_task;
|
|
|
|
/// A task that selects parts to merge.
|
|
BackgroundSchedulePool::TaskHolder merge_selecting_task;
|
|
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
|
|
std::mutex merge_selecting_mutex;
|
|
|
|
/// A task that marks finished mutations as done.
|
|
BackgroundSchedulePool::TaskHolder mutations_finalizing_task;
|
|
|
|
/// A thread that removes old parts, log entries, and blocks.
|
|
ReplicatedMergeTreeCleanupThread cleanup_thread;
|
|
|
|
/// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
|
|
ReplicatedMergeTreePartCheckThread part_check_thread;
|
|
|
|
/// A thread that processes reconnection to ZooKeeper when the session expires.
|
|
ReplicatedMergeTreeRestartingThread restarting_thread;
|
|
EventNotifier::HandlerPtr session_expired_callback_handler;
|
|
|
|
/// A thread that attaches the table using ZooKeeper
|
|
std::optional<ReplicatedMergeTreeAttachThread> attach_thread;
|
|
|
|
PartMovesBetweenShardsOrchestrator part_moves_between_shards_orchestrator;
|
|
|
|
std::atomic<bool> initialization_done{false};
|
|
|
|
/// True if replica was created for existing table with fixed granularity
|
|
bool other_replicas_fixed_granularity = false;
|
|
|
|
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro
|
|
const RenamingRestrictions renaming_restrictions;
|
|
|
|
/// Throttlers used in DataPartsExchange to lower maximum fetch/sends
|
|
/// speed.
|
|
ThrottlerPtr replicated_fetches_throttler;
|
|
ThrottlerPtr replicated_sends_throttler;
|
|
|
|
/// Global ID, synced via ZooKeeper between replicas
|
|
mutable std::mutex table_shared_id_mutex;
|
|
mutable UUID table_shared_id;
|
|
|
|
std::mutex last_broken_disks_mutex;
|
|
std::set<String> last_broken_disks;
|
|
|
|
static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);
|
|
|
|
template <class Func>
|
|
void foreachActiveParts(Func && func, bool select_sequential_consistency) const;
|
|
|
|
/** Creates the minimum set of nodes in ZooKeeper and create first replica.
|
|
* Returns true if was created, false if exists.
|
|
*/
|
|
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
|
|
|
|
/**
|
|
* Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas.
|
|
*/
|
|
void createReplica(const StorageMetadataPtr & metadata_snapshot);
|
|
|
|
/** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running.
|
|
*/
|
|
void createNewZooKeeperNodes();
|
|
|
|
void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);
|
|
|
|
/// A part of ALTER: apply metadata changes only (data parts are altered separately).
|
|
/// Must be called under IStorage::lockForAlter() lock.
|
|
void setTableStructure(const StorageID & table_id, const ContextPtr & local_context,
|
|
ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff);
|
|
|
|
/** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/).
|
|
* If any parts described in ZK are not locally, throw an exception.
|
|
* If any local parts are not mentioned in ZK, remove them.
|
|
* But if there are too many, throw an exception just in case - it's probably a configuration error.
|
|
*/
|
|
void checkParts(bool skip_sanity_checks);
|
|
|
|
/// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor
|
|
/// to be used for deduplication.
|
|
void syncPinnedPartUUIDs();
|
|
|
|
/** Check that the part's checksum is the same as the checksum of the same part on some other replica.
|
|
* If no one has such a part, nothing checks.
|
|
* Not very reliable: if two replicas add a part almost at the same time, no checks will occur.
|
|
* Adds actions to `ops` that add data about the part into ZooKeeper.
|
|
* Call under lockForShare.
|
|
*/
|
|
void checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const DataPartPtr & part,
|
|
Coordination::Requests & ops, String part_name = "", NameSet * absent_replicas_paths = nullptr);
|
|
|
|
String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const;
|
|
|
|
/// Accepts a PreActive part, atomically checks its checksums with ones on other replicas and commit the part
|
|
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const DataPartPtr & part, std::optional<HardlinkedFiles> hardlinked_files = {});
|
|
|
|
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
|
|
|
|
void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const String & block_id_path = "") const;
|
|
|
|
void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const std::vector<String> & block_id_paths) const;
|
|
|
|
/// Adds actions to `ops` that remove a part from ZooKeeper.
|
|
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
|
|
void getRemovePartFromZooKeeperOps(const String & part_name, Coordination::Requests & ops, bool has_children);
|
|
|
|
/// Just removes part from ZooKeeper using previous method
|
|
void removePartFromZooKeeper(const String & part_name);
|
|
|
|
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
|
|
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
|
|
NameSet * parts_should_be_retried = nullptr);
|
|
|
|
/// Remove parts from ZooKeeper, throw exception if unable to do so after max_retries.
|
|
void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
|
|
void removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries = 5);
|
|
|
|
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
|
|
void removePartAndEnqueueFetch(const String & part_name, bool storage_init);
|
|
|
|
/// Running jobs from the queue.
|
|
|
|
/** Execute the action from the queue. Throws an exception if something is wrong.
|
|
* Returns whether or not it succeeds. If it did not work, write it to the end of the queue.
|
|
*/
|
|
bool executeLogEntry(LogEntry & entry);
|
|
|
|
/// Lookup the part for the entry in the detached/ folder.
|
|
/// returns nullptr if the part is corrupt or missing.
|
|
MutableDataPartPtr attachPartHelperFoundValidPart(const LogEntry& entry) const;
|
|
|
|
void executeDropRange(const LogEntry & entry);
|
|
|
|
/// Execute alter of table metadata. Set replica/metadata and replica/columns
|
|
/// nodes in zookeeper and also changes in memory metadata.
|
|
/// New metadata and columns values stored in entry.
|
|
bool executeMetadataAlter(const LogEntry & entry);
|
|
|
|
/// Fetch part from other replica (inserted or merged/mutated)
|
|
/// NOTE: Attention! First of all tries to find covering part on other replica
|
|
/// and set it into entry.actual_new_part_name. After that tries to fetch this new covering part.
|
|
/// If fetch was not successful, clears entry.actual_new_part_name.
|
|
bool executeFetch(LogEntry & entry, bool need_to_check_missing_part=true);
|
|
|
|
bool executeReplaceRange(const LogEntry & entry);
|
|
void executeClonePartFromShard(const LogEntry & entry);
|
|
|
|
/** Updates the queue.
|
|
*/
|
|
void queueUpdatingTask();
|
|
|
|
void mutationsUpdatingTask();
|
|
|
|
/** Clone data from another replica.
|
|
* If replica can not be cloned throw Exception.
|
|
*/
|
|
void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);
|
|
|
|
/// Repairs metadata of staled replica. Called from cloneReplica(...)
|
|
void cloneMetadataIfNeeded(const String & source_replica, const String & source_path, zkutil::ZooKeeperPtr & zookeeper);
|
|
|
|
/// Clone replica if it is lost.
|
|
void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);
|
|
|
|
|
|
ReplicatedMergeTreeQueue::SelectedEntryPtr selectQueueEntry();
|
|
|
|
|
|
MergeFromLogEntryTaskPtr getTaskToProcessMergeQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);
|
|
|
|
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);
|
|
|
|
/// Start being leader (if not disabled by setting).
|
|
/// Since multi-leaders are allowed, it just sets is_leader flag.
|
|
void startBeingLeader();
|
|
void stopBeingLeader();
|
|
|
|
/** Selects the parts to merge and writes to the log.
|
|
*/
|
|
void mergeSelectingTask();
|
|
|
|
/// Checks if some mutations are done and marks them as done.
|
|
void mutationsFinalizingTask();
|
|
|
|
/** Write the selected parts to merge into the log,
|
|
* Call when merge_selecting_mutex is locked.
|
|
* Returns false if any part is not in ZK.
|
|
*/
|
|
enum class CreateMergeEntryResult { Ok, MissingPart, LogUpdated, Other };
|
|
|
|
CreateMergeEntryResult createLogEntryToMergeParts(
|
|
zkutil::ZooKeeperPtr & zookeeper,
|
|
const DataPartsVector & parts,
|
|
const String & merged_name,
|
|
const UUID & merged_part_uuid,
|
|
const MergeTreeDataPartType & merged_part_type,
|
|
bool deduplicate,
|
|
const Names & deduplicate_by_columns,
|
|
ReplicatedMergeTreeLogEntryData * out_log_entry,
|
|
int32_t log_version,
|
|
MergeType merge_type);
|
|
|
|
CreateMergeEntryResult createLogEntryToMutatePart(
|
|
const IMergeTreeDataPart & part,
|
|
const UUID & new_part_uuid,
|
|
Int64 mutation_version,
|
|
int32_t alter_version,
|
|
int32_t log_version);
|
|
|
|
/// Exchange parts.
|
|
|
|
ConnectionTimeouts getFetchPartHTTPTimeouts(ContextPtr context);
|
|
|
|
/** Returns an empty string if no one has a part.
|
|
*/
|
|
String findReplicaHavingPart(const String & part_name, bool active);
|
|
static String findReplicaHavingPart(const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_);
|
|
|
|
bool checkReplicaHavePart(const String & replica, const String & part_name);
|
|
bool checkIfDetachedPartExists(const String & part_name);
|
|
bool checkIfDetachedPartitionExists(const String & partition_name);
|
|
|
|
/** Find replica having specified part or any part that covers it.
|
|
* If active = true, consider only active replicas.
|
|
* If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part.
|
|
* If not found, returns empty string.
|
|
*/
|
|
String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
|
|
String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name);
|
|
|
|
/** Download the specified part from the specified replica.
|
|
* If `to_detached`, the part is placed in the `detached` directory.
|
|
* If quorum != 0, then the node for tracking the quorum is updated.
|
|
* Returns false if part is already fetching right now.
|
|
*/
|
|
bool fetchPart(
|
|
const String & part_name,
|
|
const StorageMetadataPtr & metadata_snapshot,
|
|
const String & source_replica_path,
|
|
bool to_detached,
|
|
size_t quorum,
|
|
zkutil::ZooKeeper::Ptr zookeeper_ = nullptr,
|
|
bool try_fetch_shared = true,
|
|
String entry_znode = "");
|
|
|
|
/** Download the specified part from the specified replica.
|
|
* Used for replace local part on the same s3-shared part in hybrid storage.
|
|
* Returns false if part is already fetching right now.
|
|
*/
|
|
MutableDataPartStoragePtr fetchExistsPart(
|
|
const String & part_name,
|
|
const StorageMetadataPtr & metadata_snapshot,
|
|
const String & replica_path,
|
|
DiskPtr replaced_disk,
|
|
String replaced_part_path);
|
|
|
|
/// Required only to avoid races between executeLogEntry and fetchPartition
|
|
std::unordered_set<String> currently_fetching_parts;
|
|
std::mutex currently_fetching_parts_mutex;
|
|
|
|
/// With the quorum being tracked, add a replica to the quorum for the part.
|
|
void updateQuorum(const String & part_name, bool is_parallel);
|
|
|
|
/// Deletes info from quorum/last_part node for particular partition_id.
|
|
void cleanLastPartNode(const String & partition_id);
|
|
|
|
/// Part name is stored in quorum/last_part for corresponding partition_id.
|
|
bool partIsLastQuorumPart(const MergeTreePartInfo & part_info) const;
|
|
|
|
/// Part currently inserting with quorum (node quorum/parallel/part_name exists)
|
|
bool partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const;
|
|
|
|
/// Creates new block number if block with such block_id does not exist
|
|
/// If zookeeper_path_prefix specified then allocate block number on this path
|
|
/// (can be used if we want to allocate blocks on other replicas)
|
|
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
|
|
const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper,
|
|
const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const;
|
|
|
|
template<typename T>
|
|
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
|
|
const String & partition_id,
|
|
const ZooKeeperWithFaultInjectionPtr & zookeeper,
|
|
const T & zookeeper_block_id_path,
|
|
const String & zookeeper_path_prefix = "") const;
|
|
|
|
/** Wait until all replicas, including this, execute the specified action from the log.
|
|
* If replicas are added at the same time, it can not wait the added replica.
|
|
*
|
|
* Waits for inactive replicas no more than wait_for_inactive_timeout.
|
|
* Returns list of inactive replicas that have not executed entry or throws exception.
|
|
*
|
|
* NOTE: This method must be called without table lock held.
|
|
* Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
|
|
*/
|
|
void waitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
|
|
Int64 wait_for_inactive_timeout, const String & error_context = {});
|
|
Strings tryWaitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
|
|
Int64 wait_for_inactive_timeout);
|
|
|
|
/** Wait until the specified replica executes the specified action from the log.
|
|
* NOTE: See comment about locks above.
|
|
*/
|
|
bool tryWaitForReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name,
|
|
const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0);
|
|
|
|
/// Depending on settings, do nothing or wait for this replica or all replicas process log entry.
|
|
void waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context = {});
|
|
|
|
/// Throw an exception if the table is readonly.
|
|
void assertNotReadonly() const;
|
|
|
|
/// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
|
|
/// Returns false if the partition doesn't exist yet.
|
|
/// Caller must hold delimiting_block_lock until creation of drop/replace entry in log.
|
|
/// Otherwise some replica may assign merge which intersects part_info.
|
|
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info,
|
|
std::optional<EphemeralLockInZooKeeper> & delimiting_block_lock, bool for_replace_range = false);
|
|
|
|
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
|
|
mutable std::unordered_set<std::string> existing_nodes_cache;
|
|
mutable std::mutex existing_nodes_cache_mutex;
|
|
bool existsNodeCached(const ZooKeeperWithFaultInjectionPtr & zookeeper, const std::string & path) const;
|
|
|
|
/// Cancels INSERTs in the block range by removing ephemeral block numbers
|
|
void clearLockedBlockNumbersInPartition(zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
|
|
|
|
void getClearBlocksInPartitionOps(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
|
|
|
|
void getClearBlocksInPartitionOpsImpl(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num, const String & blocks_dir_name);
|
|
/// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range.
|
|
void clearBlocksInPartition(
|
|
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
|
|
|
|
/// Info about how other replicas can access this one.
|
|
ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const;
|
|
|
|
bool addOpsToDropAllPartsInPartition(
|
|
zkutil::ZooKeeper & zookeeper, const String & partition_id, bool detach,
|
|
Coordination::Requests & ops, std::vector<LogEntryPtr> & entries,
|
|
std::vector<EphemeralLockInZooKeeper> & delimiting_block_locks,
|
|
std::vector<size_t> & log_entry_ops_idx);
|
|
void dropAllPartsInPartitions(
|
|
zkutil::ZooKeeper & zookeeper, const Strings & partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach);
|
|
|
|
LogEntryPtr dropAllPartsInPartition(
|
|
zkutil::ZooKeeper & zookeeper, const String & partition_id, ContextPtr query_context, bool detach);
|
|
|
|
|
|
void dropAllPartitionsImpl(const zkutil::ZooKeeperPtr & zookeeper, bool detach, ContextPtr query_context);
|
|
|
|
void dropPartNoWaitNoThrow(const String & part_name) override;
|
|
void dropPart(const String & part_name, bool detach, ContextPtr query_context) override;
|
|
|
|
// Partition helpers
|
|
void dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) override;
|
|
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
|
|
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
|
|
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
|
|
void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, ContextPtr query_context) override;
|
|
CancellationCode killPartMoveToShard(const UUID & task_uuid) override;
|
|
void fetchPartition(
|
|
const ASTPtr & partition,
|
|
const StorageMetadataPtr & metadata_snapshot,
|
|
const String & from,
|
|
bool fetch_part,
|
|
ContextPtr query_context) override;
|
|
|
|
/// NOTE: there are no guarantees for concurrent merges. Dropping part can
|
|
/// be concurrently merged into some covering part and dropPart will do
|
|
/// nothing. There are some fundamental problems with it. But this is OK
|
|
/// because:
|
|
///
|
|
/// dropPart used in the following cases:
|
|
/// 1) Remove empty parts after TTL.
|
|
/// 2) Remove parts after move between shards.
|
|
/// 3) User queries: ALTER TABLE DROP PART 'part_name'.
|
|
///
|
|
/// In the first case merge of empty part is even better than DROP. In the
|
|
/// second case part UUIDs used to forbid merges for moving parts so there
|
|
/// is no problem with concurrent merges. The third case is quite rare and
|
|
/// we give very weak guarantee: there will be no active part with this
|
|
/// name, but possibly it was merged to some other part.
|
|
///
|
|
/// NOTE: don't rely on dropPart if you 100% need to remove non-empty part
|
|
/// and don't use any explicit locking mechanism for merges.
|
|
bool dropPartImpl(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);
|
|
|
|
/// Check granularity of already existing replicated table in zookeeper if it exists
|
|
/// return true if it's fixed
|
|
bool checkFixedGranularityInZookeeper();
|
|
|
|
/// Wait for timeout seconds mutation is finished on replicas
|
|
void waitMutationToFinishOnReplicas(
|
|
const Strings & replicas, const String & mutation_id) const;
|
|
|
|
MutationCommands getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const override;
|
|
|
|
void startBackgroundMovesIfNeeded() override;
|
|
|
|
/// Attaches restored parts to the storage.
|
|
void attachRestoredParts(MutableDataPartsVector && parts) override;
|
|
|
|
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
|
|
|
|
PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
|
|
const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const;
|
|
|
|
static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, const std::string & disk_type, const String & table_uuid,
|
|
const String & part_name, const String & zookeeper_path_old);
|
|
|
|
static void createZeroCopyLockNode(
|
|
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node,
|
|
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
|
|
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});
|
|
|
|
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;
|
|
|
|
/// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.
|
|
void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;
|
|
|
|
// Create table id if needed
|
|
void createTableSharedID() const;
|
|
|
|
bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk);
|
|
|
|
std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);
|
|
|
|
/// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
|
|
/// If somebody already holding the lock -- return std::nullopt.
|
|
std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;
|
|
|
|
void startupImpl(bool from_attach_thread);
|
|
};
|
|
|
|
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);
|
|
|
|
|
|
/** There are three places for each part, where it should be
|
|
* 1. In the RAM, data_parts, all_data_parts.
|
|
* 2. In the filesystem (FS), the directory with the data of the table.
|
|
* 3. in ZooKeeper (ZK).
|
|
*
|
|
* When adding a part, it must be added immediately to these three places.
|
|
* This is done like this
|
|
* - [FS] first write the part into a temporary directory on the filesystem;
|
|
* - [FS] rename the temporary part to the result on the filesystem;
|
|
* - [RAM] immediately afterwards add it to the `data_parts`, and remove from `data_parts` any parts covered by this one;
|
|
* - [RAM] also set the `Transaction` object, which in case of an exception (in next point),
|
|
* rolls back the changes in `data_parts` (from the previous point) back;
|
|
* - [ZK] then send a transaction (multi) to add a part to ZooKeeper (and some more actions);
|
|
* - [FS, ZK] by the way, removing the covered (old) parts from filesystem, from ZooKeeper and from `all_data_parts`
|
|
* is delayed, after a few minutes.
|
|
*
|
|
* There is no atomicity here.
|
|
* It could be possible to achieve atomicity using undo/redo logs and a flag in `DataPart` when it is completely ready.
|
|
* But it would be inconvenient - I would have to write undo/redo logs for each `Part` in ZK, and this would increase already large number of interactions.
|
|
*
|
|
* Instead, we are forced to work in a situation where at any time
|
|
* (from another thread, or after server restart), there may be an unfinished transaction.
|
|
* (note - for this the part should be in RAM)
|
|
* From these cases the most frequent one is when the part is already in the data_parts, but it's not yet in ZooKeeper.
|
|
* This case must be distinguished from the case where such a situation is achieved due to some kind of damage to the state.
|
|
*
|
|
* Do this with the threshold for the time.
|
|
* If the part is young enough, its lack in ZooKeeper will be perceived optimistically - as if it just did not have time to be added there
|
|
* - as if the transaction has not yet been executed, but will soon be executed.
|
|
* And if the part is old, its absence in ZooKeeper will be perceived as an unfinished transaction that needs to be rolled back.
|
|
*
|
|
* PS. Perhaps it would be better to add a flag to the DataPart that a part is inserted into ZK.
|
|
* But here it's too easy to get confused with the consistency of this flag.
|
|
*/
|
|
/// NOLINTNEXTLINE
|
|
#define MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER (5 * 60)
|
|
|
|
}
|