#pragma once #include #include #include #include #include #include #include #include #include namespace DB { /** Движок, использующий merge-дерево и реплицируемый через ZooKeeper. */ class StorageReplicatedMergeTree : public IStorage { public: /** Если !attach, либо создает новую таблицу в ZK, либо добавляет реплику в существующую таблицу. */ static StoragePtr create( const String & zookeeper_path_, const String & replica_name_, bool attach, const String & path_, const String & name_, NamesAndTypesListPtr columns_, Context & context_, ASTPtr & primary_expr_ast_, const String & date_column_name_, const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, MergeTreeData::Mode mode_ = MergeTreeData::Ordinary, const String & sign_column_ = "", const MergeTreeSettings & settings_ = MergeTreeSettings()); void shutdown(); ~StorageReplicatedMergeTree(); std::string getName() const override { return "Replicated" + data.getModePrefix() + "MergeTree"; } std::string getTableName() const override { return table_name; } std::string getSignColumnName() const { return data.getSignColumnName(); } bool supportsSampling() const override { return data.supportsSampling(); } bool supportsFinal() const override { return data.supportsFinal(); } bool supportsPrewhere() const override { return data.supportsPrewhere(); } const NamesAndTypesList & getColumnsList() const override { return data.getColumnsList(); } BlockInputStreams read( const Names & column_names, ASTPtr query, const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1) override; BlockOutputStreamPtr write(ASTPtr query) override; /** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper. */ void drop() override; private: friend class ReplicatedMergeTreeBlockOutputStream; /// Добавляет куски в множество currently_merging. struct CurrentlyMergingPartsTagger { Strings parts; StorageReplicatedMergeTree & storage; CurrentlyMergingPartsTagger(const Strings & parts_, StorageReplicatedMergeTree & storage_) : parts(parts_), storage(storage_) { Poco::ScopedLock lock(storage.currently_merging_mutex); for (const auto & name : parts) { if (storage.currently_merging.count(name)) throw Exception("Tagging alreagy tagged part " + name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); } storage.currently_merging.insert(parts.begin(), parts.end()); } ~CurrentlyMergingPartsTagger() { try { Poco::ScopedLock lock(storage.currently_merging_mutex); for (const auto & name : parts) { if (!storage.currently_merging.count(name)) throw Exception("Untagging already untagged part " + name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); storage.currently_merging.erase(name); } } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } } }; typedef Poco::SharedPtr CurrentlyMergingPartsTaggerPtr; /// Добавляет кусок в множество future_parts. struct FuturePartTagger { String part; StorageReplicatedMergeTree & storage; FuturePartTagger(const String & part_, StorageReplicatedMergeTree & storage_) : part(part_), storage(storage_) { if (!storage.future_parts.insert(part).second) throw Exception("Tagging already tagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); } ~FuturePartTagger() { try { Poco::ScopedLock lock(storage.queue_mutex); if (!storage.future_parts.erase(part)) throw Exception("Untagging already untagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } } }; typedef Poco::SharedPtr FuturePartTaggerPtr; struct LogEntry { enum Type { GET_PART, MERGE_PARTS, }; String znode_name; Type type; String source_replica; String new_part_name; Strings parts_to_merge; CurrentlyMergingPartsTaggerPtr currently_merging_tagger; FuturePartTaggerPtr future_part_tagger; void tagPartsAsCurrentlyMerging(StorageReplicatedMergeTree & storage) { if (type == MERGE_PARTS) currently_merging_tagger = new CurrentlyMergingPartsTagger(parts_to_merge, storage); } void tagPartAsFuture(StorageReplicatedMergeTree & storage) { if (type == MERGE_PARTS || type == GET_PART) future_part_tagger = new FuturePartTagger(new_part_name, storage); } void writeText(WriteBuffer & out) const; void readText(ReadBuffer & in); String toString() const { String s; { WriteBufferFromString out(s); writeText(out); } return s; } static LogEntry parse(const String & s) { ReadBufferFromString in(s); LogEntry res; res.readText(in); assertEOF(in); return res; } }; typedef std::list LogEntries; typedef std::set StringSet; typedef std::vector Threads; Context & context; zkutil::ZooKeeper & zookeeper; /// Куски, для которых в очереди есть задание на слияние. StringSet currently_merging; Poco::FastMutex currently_merging_mutex; /** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/). * В ZK записи в хронологическом порядке. Здесь - не обязательно. */ LogEntries queue; Poco::FastMutex queue_mutex; /** Куски, которые появятся в результате действий, выполняемых прямо сейчас фоновыми потоками (этих действий нет в очереди). * Использовать под залоченным queue_mutex. */ StringSet future_parts; String table_name; String full_path; String zookeeper_path; String replica_name; String replica_path; /** /replicas/me/is_active. */ zkutil::EphemeralNodeHolderPtr replica_is_active_node; /** Является ли эта реплика "ведущей". Ведущая реплика выбирает куски для слияния. */ bool is_leader_node; InterserverIOEndpointHolderPtr endpoint_holder; MergeTreeData data; MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; MergeTreeDataMerger merger; ReplicatedMergeTreePartsFetcher fetcher; zkutil::LeaderElectionPtr leader_election; /// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь. std::thread queue_updating_thread; /// Потоки, выполняющие действия из очереди. Threads queue_threads; /// Поток, выбирающий куски для слияния. std::thread merge_selecting_thread; Logger * log; volatile bool shutdown_called; StorageReplicatedMergeTree( const String & zookeeper_path_, const String & replica_name_, bool attach, const String & path_, const String & name_, NamesAndTypesListPtr columns_, Context & context_, ASTPtr & primary_expr_ast_, const String & date_column_name_, const ASTPtr & sampling_expression_, size_t index_granularity_, MergeTreeData::Mode mode_ = MergeTreeData::Ordinary, const String & sign_column_ = "", const MergeTreeSettings & settings_ = MergeTreeSettings()); /// Инициализация. /** Проверяет, что в ZooKeeper в таблице нет данных. */ bool isTableEmpty(); /** Создает минимальный набор нод в ZooKeeper. */ void createTable(); void createReplica(); /** Отметить в ZooKeeper, что эта реплика сейчас активна. */ void activateReplica(); /** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata). * Если нет - бросить исключение. */ void checkTableStructure(); /** Проверить, что множество кусков соответствует тому, что в ZK (/replicas/me/parts/). * Если каких-то кусков, описанных в ZK нет локально, бросить исключение. * Если какие-то локальные куски не упоминаются в ZK, удалить их. * Но если таких слишком много, на всякий случай бросить исключение - скорее всего, это ошибка конфигурации. */ void checkParts(); /// Выполнение заданий из очереди. /** Кладет в queue записи из ZooKeeper (/replicas/me/queue/). */ void loadQueue(); /** Копирует новые записи из логов всех реплик в очередь этой реплики. */ void pullLogsToQueue(); /** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое. * Вызывается под queue_mutex. */ bool shouldExecuteLogEntry(const LogEntry & entry); /** Выполнить действие из очереди. Бросает исключение, если что-то не так. */ void executeLogEntry(const LogEntry & entry); /** В бесконечном цикле обновляет очередь. */ void queueUpdatingThread(); /** В бесконечном цикле выполняет действия из очереди. */ void queueThread(); /// Выбор кусков для слияния. void becomeLeader(); /** В бесконечном цикле выбирает куски для слияния и записывает в лог. */ void mergeSelectingThread(); /// Вызывается во время выбора кусков для слияния. bool canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right); /// Обмен кусками. /** Бросает исключение, если куска ни у кого нет. */ String findActiveReplicaHavingPart(const String & part_name); /** Скачать указанный кусок с указанной реплики. */ void fetchPart(const String & part_name, const String & replica_name); }; }