diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 2f91dfa0c3d..ca746fc7173 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -62,6 +62,8 @@ public: BlockOutputStreamPtr write(ASTPtr query) override; + bool optimize() override; + /** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper. */ void drop() override; @@ -243,6 +245,7 @@ private: /// Для чтения данных из директории unreplicated. std::unique_ptr unreplicated_data; std::unique_ptr unreplicated_reader; + std::unique_ptr unreplicated_merger; /// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь. std::thread queue_updating_thread; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index ce343be4f2e..b930dea0ff5 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -70,6 +70,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name + "[unreplicated]")); unreplicated_reader.reset(new MergeTreeDataSelectExecutor(*unreplicated_data)); + unreplicated_merger.reset(new MergeTreeDataMerger(*unreplicated_data)); } } @@ -1027,6 +1028,10 @@ void StorageReplicatedMergeTree::shutdown() replica_is_active_node = nullptr; endpoint_holder = nullptr; + merger.cancelAll(); + if (unreplicated_merger) + unreplicated_merger->cancelAll(); + LOG_TRACE(log, "Waiting for threads to finish"); if (is_leader_node) { @@ -1098,6 +1103,25 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query) return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id); } +bool StorageReplicatedMergeTree::optimize() +{ + /// Померджим какие-нибудь куски из директории unreplicated. TODO: Мерджить реплицируемые куски тоже. + + if (!unreplicated_data) + return false; + + unreplicated_data->clearOldParts(); + + MergeTreeData::DataPartsVector parts; + String merged_name; + auto always_can_merge = [](const MergeTreeData::DataPartPtr &a, const MergeTreeData::DataPartPtr &b) { return true; }; + if (!unreplicated_merger->selectPartsToMerge(parts, merged_name, 0, true, true, false, always_can_merge)) + return false; + + unreplicated_merger->mergeParts(parts, merged_name); + return true; +} + void StorageReplicatedMergeTree::drop() { shutdown();