This commit is contained in:
Michael Kolupaev 2014-05-08 12:03:03 +04:00
parent e2e7a61c61
commit 6232f3e9b4
2 changed files with 27 additions and 0 deletions

View File

@ -62,6 +62,8 @@ public:
BlockOutputStreamPtr write(ASTPtr query) override;
bool optimize() override;
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
*/
void drop() override;
@ -243,6 +245,7 @@ private:
/// Для чтения данных из директории unreplicated.
std::unique_ptr<MergeTreeData> unreplicated_data;
std::unique_ptr<MergeTreeDataSelectExecutor> unreplicated_reader;
std::unique_ptr<MergeTreeDataMerger> unreplicated_merger;
/// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь.
std::thread queue_updating_thread;

View File

@ -70,6 +70,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_,
database_name_ + "." + table_name + "[unreplicated]"));
unreplicated_reader.reset(new MergeTreeDataSelectExecutor(*unreplicated_data));
unreplicated_merger.reset(new MergeTreeDataMerger(*unreplicated_data));
}
}
@ -1027,6 +1028,10 @@ void StorageReplicatedMergeTree::shutdown()
replica_is_active_node = nullptr;
endpoint_holder = nullptr;
merger.cancelAll();
if (unreplicated_merger)
unreplicated_merger->cancelAll();
LOG_TRACE(log, "Waiting for threads to finish");
if (is_leader_node)
{
@ -1098,6 +1103,25 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id);
}
bool StorageReplicatedMergeTree::optimize()
{
/// Померджим какие-нибудь куски из директории unreplicated. TODO: Мерджить реплицируемые куски тоже.
if (!unreplicated_data)
return false;
unreplicated_data->clearOldParts();
MergeTreeData::DataPartsVector parts;
String merged_name;
auto always_can_merge = [](const MergeTreeData::DataPartPtr &a, const MergeTreeData::DataPartPtr &b) { return true; };
if (!unreplicated_merger->selectPartsToMerge(parts, merged_name, 0, true, true, false, always_can_merge))
return false;
unreplicated_merger->mergeParts(parts, merged_name);
return true;
}
void StorageReplicatedMergeTree::drop()
{
shutdown();