#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { /** Движок, использующий merge-дерево (см. MergeTreeData) и реплицируемый через ZooKeeper. * * ZooKeeper используется для следующих вещей: * - структура таблицы (/metadata, /columns) * - лог действий с данными (/log/log-..., /replicas/replica_name/queue/queue-...); * - список реплик (/replicas), признак активности реплики (/replicas/replica_name/is_active), адреса реплик (/replicas/replica_name/host); * - выбор реплики-лидера (/leader_election) - это та реплика, которая назначает мерджи; * - набор кусков данных на каждой реплике (/replicas/replica_name/parts); * - список последних N блоков данных с чексуммами, для дедупликации (/blocks); * - список инкрементальных номеров блоков (/block_numbers), которые мы сейчас собираемся вставить, * или которые были неиспользованы (/nonincremental_block_numbers) * для обеспечения линейного порядка вставки данных и мерджа данных только по интервалам в этой последовательности; * - координация записей с кворумом (/quorum). */ /** У реплицируемых таблиц есть общий лог (/log/log-...). * Лог - последовательность записей (LogEntry) о том, что делать. * Каждая запись - это одно из: * - обычная вставка данных (GET), * - мердж (MERGE), * - чуть менее обычная вставка данных (ATTACH), * - удаление партиции (DROP). * * Каждая реплика копирует (queueUpdatingThread, pullLogsToQueue) записи из лога в свою очередь (/replicas/replica_name/queue/queue-...), * и затем выполняет их (queueTask). * Не смотря на название "очередь", выполнение может переупорядочиваться, при необходимости (shouldExecuteLogEntry, executeLogEntry). * Кроме того, записи в очереди могут генерироваться самостоятельно (не из лога), в следующих случаях: * - при создании новой реплики, туда помещаются действия на GET с других реплик (createReplica); * - если кусок повреждён (removePartAndEnqueueFetch) или отсутствовал при проверке (при старте - checkParts, во время работы - searchForMissingPart), * туда помещаются действия на GET с других реплик; * * У реплики, на которую был сделан INSERT, в очереди тоже будет запись о GET этих данных. * Такая запись считается выполненной, как только обработчик очереди её увидит. * * У записи в логе есть время создания. Это время генерируется по часам на сервере, который создал запись * - того, на которых пришёл соответствующий запрос INSERT или ALTER. * * Для записей в очереди, которые реплика сделала для себя самостоятельно, * в качестве времени будет браться время создания соответствующего куска на какой-либо из реплик. */ class StorageReplicatedMergeTree : public IStorage { public: /** Если !attach, либо создает новую таблицу в ZK, либо добавляет реплику в существующую таблицу. */ static StoragePtr create( const String & zookeeper_path_, const String & replica_name_, bool attach, const String & path_, const String & database_name_, const String & name_, NamesAndTypesListPtr columns_, const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & alias_columns_, const ColumnDefaults & column_defaults_, Context & context_, ASTPtr & primary_expr_ast_, const String & date_column_name_, const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, MergeTreeData::Mode mode_, const String & sign_column_, /// Для Collapsing режима. const Names & columns_to_sum_, /// Для Summing режима. const MergeTreeSettings & settings_); void shutdown() override; ~StorageReplicatedMergeTree() override; std::string getName() const override { return "Replicated" + data.getModePrefix() + "MergeTree"; } std::string getTableName() const override { return table_name; } bool supportsSampling() const override { return data.supportsSampling(); } bool supportsFinal() const override { return data.supportsFinal(); } bool supportsPrewhere() const override { return data.supportsPrewhere(); } bool supportsParallelReplicas() const override { return true; } const NamesAndTypesList & getColumnsListImpl() const override { return data.getColumnsListNonMaterialized(); } NameAndTypePair getColumn(const String & column_name) const override { if (column_name == "_replicated") return NameAndTypePair("_replicated", new DataTypeUInt8); return data.getColumn(column_name); } bool hasColumn(const String & column_name) const override { if (column_name == "_replicated") return true; return data.hasColumn(column_name); } BlockInputStreams read( const Names & column_names, ASTPtr query, const Context & context, const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1) override; BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override; bool optimize(const Settings & settings) override; void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override; void dropPartition(ASTPtr query, const Field & partition, bool detach, bool unreplicated, const Settings & settings) override; void attachPartition(ASTPtr query, const Field & partition, bool unreplicated, bool part, const Settings & settings) override; void fetchPartition(const Field & partition, const String & from, const Settings & settings) override; void freezePartition(const Field & partition, const Settings & settings) override; /** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper. */ void drop() override; void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; bool supportsIndexForIn() const override { return true; } /// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке. void enqueuePartForCheck(const String & name); MergeTreeData & getData() { return data; } MergeTreeData * getUnreplicatedData() { return unreplicated_data.get(); } /** Для системной таблицы replicas. */ struct Status { bool is_leader; bool is_readonly; bool is_session_expired; UInt32 future_parts; UInt32 parts_to_check; String zookeeper_path; String replica_name; String replica_path; Int32 columns_version; UInt32 queue_size; UInt32 inserts_in_queue; UInt32 merges_in_queue; UInt32 queue_oldest_time; UInt32 inserts_oldest_time; UInt32 merges_oldest_time; String oldest_part_to_get; String oldest_part_to_merge_to; UInt64 log_max_index; UInt64 log_pointer; UInt32 last_queue_update; UInt8 total_replicas; UInt8 active_replicas; }; /// Получить статус таблицы. Если with_zk_fields = false - не заполнять поля, требующие запросов в ZK. void getStatus(Status & res, bool with_zk_fields = true); using LogEntriesData = std::vector; void getQueue(LogEntriesData & res, String & replica_name); void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) const; private: void dropUnreplicatedPartition(const Field & partition, bool detach, const Settings & settings); friend class ReplicatedMergeTreeBlockOutputStream; friend class ReplicatedMergeTreeRestartingThread; friend class ReplicatedMergeTreeCleanupThread; friend struct ReplicatedMergeTreeLogEntry; friend struct FuturePartTagger; typedef ReplicatedMergeTreeLogEntry LogEntry; typedef LogEntry::Ptr LogEntryPtr; typedef std::list LogEntries; typedef std::set StringSet; typedef std::list StringList; Context & context; zkutil::ZooKeeperPtr current_zookeeper; /// Используйте только с помощью методов ниже. std::mutex current_zookeeper_mutex; /// Для пересоздания сессии в фоновом потоке. zkutil::ZooKeeperPtr getZooKeeper() { std::lock_guard lock(current_zookeeper_mutex); return current_zookeeper; } void setZooKeeper(zkutil::ZooKeeperPtr zookeeper) { std::lock_guard lock(current_zookeeper_mutex); current_zookeeper = zookeeper; } /// Если true, таблица в офлайновом режиме, и в нее нельзя писать. bool is_readonly = false; /// Каким будет множество активных кусков после выполнения всей текущей очереди. ActiveDataPartSet virtual_parts; /** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/). * В ZK записи в хронологическом порядке. Здесь - не обязательно. */ LogEntries queue; time_t last_queue_update = 0; std::mutex queue_mutex; /** Куски, которые появятся в результате действий, выполняемых прямо сейчас фоновыми потоками (этих действий нет в очереди). * Использовать под залоченным queue_mutex. */ StringSet future_parts; /** Куски, для которых нужно проверить одно из двух: * - Если кусок у нас есть, сверить, его данные с его контрольными суммами, а их с ZooKeeper. * - Если куска у нас нет, проверить, есть ли он (или покрывающий его кусок) хоть у кого-то. */ StringSet parts_to_check_set; StringList parts_to_check_queue; std::mutex parts_to_check_mutex; Poco::Event parts_to_check_event; String database_name; String table_name; String full_path; String zookeeper_path; String replica_name; String replica_path; /** /replicas/me/is_active. */ zkutil::EphemeralNodeHolderPtr replica_is_active_node; /** Версия ноды /columns в ZooKeeper, соответствующая текущим data.columns. * Читать и изменять вместе с data.columns - под TableStructureLock. */ int columns_version = -1; /** Является ли эта реплика "ведущей". Ведущая реплика выбирает куски для слияния. */ bool is_leader_node = false; InterserverIOEndpointHolderPtr endpoint_holder; MergeTreeData data; MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; MergeTreeDataMerger merger; ReplicatedMergeTreePartsFetcher fetcher; zkutil::LeaderElectionPtr leader_election; /// Для чтения данных из директории unreplicated. std::unique_ptr unreplicated_data; std::unique_ptr unreplicated_reader; std::unique_ptr unreplicated_merger; std::mutex unreplicated_mutex; /// Для мерджей и удаления нереплицируемых кусков. /// Нужно ли завершить фоновые потоки (кроме restarting_thread). volatile bool shutdown_called = false; Poco::Event shutdown_event; /// Потоки: /// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь. std::thread queue_updating_thread; zkutil::EventPtr queue_updating_event = zkutil::EventPtr(new Poco::Event); /// Задание, выполняющее действия из очереди. BackgroundProcessingPool::TaskHandle queue_task_handle; /// Поток, выбирающий куски для слияния. std::thread merge_selecting_thread; Poco::Event merge_selecting_event; std::mutex merge_selecting_mutex; /// Берется на каждую итерацию выбора кусков для слияния. /// Поток, удаляющий старые куски, записи в логе и блоки. std::unique_ptr cleanup_thread; /// Поток, обрабатывающий переподключение к ZooKeeper при истечении сессии. std::unique_ptr restarting_thread; /// Поток, следящий за изменениями списка столбцов в ZooKeeper и обновляющий куски в соответствии с этими изменениями. std::thread alter_thread; zkutil::EventPtr alter_thread_event = zkutil::EventPtr(new Poco::Event); /// Поток, проверяющий данные кусков. std::thread part_check_thread; /// Событие, пробуждающее метод alter от ожидания завершения запроса ALTER. zkutil::EventPtr alter_query_event = zkutil::EventPtr(new Poco::Event); Logger * log; StorageReplicatedMergeTree( const String & zookeeper_path_, const String & replica_name_, bool attach, const String & path_, const String & database_name_, const String & name_, NamesAndTypesListPtr columns_, const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & alias_columns_, const ColumnDefaults & column_defaults_, Context & context_, ASTPtr & primary_expr_ast_, const String & date_column_name_, const ASTPtr & sampling_expression_, size_t index_granularity_, MergeTreeData::Mode mode_, const String & sign_column_, const Names & columns_to_sum_, const MergeTreeSettings & settings_); /// Инициализация. /** Создает минимальный набор нод в ZooKeeper. */ void createTableIfNotExists(); /** Создает реплику в ZooKeeper и добавляет в очередь все, что нужно, чтобы догнать остальные реплики. */ void createReplica(); /** Создать узлы в ZK, которые должны быть всегда, но которые могли не существовать при работе старых версий сервера. */ void createNewZooKeeperNodes(); /** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata). * Если нет - бросить исключение. */ void checkTableStructure(bool skip_sanity_checks, bool allow_alter); /** Проверить, что множество кусков соответствует тому, что в ZK (/replicas/me/parts/). * Если каких-то кусков, описанных в ZK нет локально, бросить исключение. * Если какие-то локальные куски не упоминаются в ZK, удалить их. * Но если таких слишком много, на всякий случай бросить исключение - скорее всего, это ошибка конфигурации. */ void checkParts(bool skip_sanity_checks); /// Положить все куски из data в virtual_parts. void initVirtualParts(); /** Проверить, что чексумма куска совпадает с чексуммой того же куска на какой-нибудь другой реплике. * Если ни у кого нет такого куска, ничего не проверяет. * Не очень надежно: если две реплики добавляют кусок почти одновременно, ни одной проверки не произойдет. * Кладет в ops действия, добавляющие данные о куске в ZooKeeper. * Вызывать под TableStructureLock. */ void checkPartAndAddToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String name_override = ""); /// Кладет в ops действия, удаляющие кусок из ZooKeeper. void removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops); /// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками. void removePartAndEnqueueFetch(const String & part_name); /// Выполнение заданий из очереди. /** Кладет в queue записи из ZooKeeper (/replicas/me/queue/). */ void loadQueue(); /** Копирует новые записи из логов всех реплик в очередь этой реплики. * Если next_update_event != nullptr, вызовет это событие, когда в логе появятся новые записи. */ void pullLogsToQueue(zkutil::EventPtr next_update_event = nullptr); /** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое. * Вызывается под queue_mutex. */ bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason); /** Выполнить действие из очереди. Бросает исключение, если что-то не так. * Возвращает, получилось ли выполнить. Если не получилось, запись нужно положить в конец очереди. */ bool executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context); void executeDropRange(const LogEntry & entry); bool executeAttachPart(const LogEntry & entry); /// Возвращает false, если куска нет, и его нужно забрать с другой реплики. /** Обновляет очередь. */ void queueUpdatingThread(); /** Выполняет действия из очереди. */ bool queueTask(BackgroundProcessingPool::Context & context); /// Выбор кусков для слияния. void becomeLeader(); /** Выбирает куски для слияния и записывает в лог. */ void mergeSelectingThread(); /** Делает локальный ALTER, когда список столбцов в ZooKeeper меняется. */ void alterThread(); /** Проверяет целостность кусков. * Находит отсутствующие куски. */ void partCheckThread(); void checkPart(const String & part_name); void searchForMissingPart(const String & part_name); /// Обмен кусками. /** Возвращает пустую строку, если куска ни у кого нет. */ String findReplicaHavingPart(const String & part_name, bool active); /** Скачать указанный кусок с указанной реплики. * Если to_detached, то кусок помещается в директорию detached. * Если quorum != 0, то обновляется узел для отслеживания кворума. */ void fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum); /** При отслеживаемом кворуме - добавить реплику в кворум для куска. */ void updateQuorum(const String & part_name); AbandonableLockInZooKeeper allocateBlockNumber(const String & month_name); /** Дождаться, пока все реплики, включая эту, выполнят указанное действие из лога. * Если одновременно с этим добавляются реплики, может не дождаться добавленную реплику. */ void waitForAllReplicasToProcessLogEntry(const LogEntry & entry); /** Дождаться, пока указанная реплика выполнит указанное действие из лога. */ void waitForReplicaToProcessLogEntry(const String & replica_name, const LogEntry & entry); /** Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper. * Поддерживаются также отрицательные числа - для них имя ноды выглядит несколько глупо * и не соответствует никакой автоинкрементной ноде в ZK. */ static String padIndex(Int64 index) { String index_str = toString(index); return std::string(10 - index_str.size(), '0') + index_str; } }; }