diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 4c44dd4cdc4..c2630f907d6 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -107,6 +107,9 @@ struct MergeTreeSettings /// Если в таблице parts_to_delay_insert + k кусков, спать insert_delay_step^k миллисекунд перед вставкой каждого блока. /// Таким образом, скорость вставок автоматически замедлится примерно до скорости слияний. double insert_delay_step = 1.1; + + /// Для скольки блоков, вставленных с непустым insert ID, хранить хеши в ZooKeeper. + size_t replicated_deduplication_window = 10000; }; class MergeTreeData : public ITableDeclaration diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index f09c5566150..8a781f83120 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -33,7 +33,7 @@ public: if (!block_id.empty() && storage.zookeeper.tryGet( storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str)) { - LOG_INFO(log, "Block with this ID already exists; ignoring it"); + LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it"); /// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять. auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str); diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index ad1748aa6b1..b29cfef869b 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -227,7 +227,7 @@ private: /** Является ли эта реплика "ведущей". Ведущая реплика выбирает куски для слияния. */ - bool is_leader_node; + bool is_leader_node = false; InterserverIOEndpointHolderPtr endpoint_holder; @@ -247,9 +247,13 @@ private: /// Поток, выбирающий куски для слияния. std::thread merge_selecting_thread; + /// Когда последний раз выбрасывали старые данные из ZooKeeper. + time_t clear_old_blocks_time = 0; + time_t clear_old_logs_time = 0; + Logger * log; - volatile bool shutdown_called; + volatile bool shutdown_called = false; StorageReplicatedMergeTree( const String & zookeeper_path_, @@ -302,6 +306,12 @@ private: void clearOldParts(); + /// Удалить из ZooKeeper старые записи в логе. + void clearOldLogs(); + + /// Удалить из ZooKeeper старые хеши блоков. Это делает ведущая реплика. + void clearOldBlocks(); + /// Выполнение заданий из очереди. /** Кладет в queue записи из ZooKeeper (/replicas/me/queue/). diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 16d55819503..ee1fd7969c3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -66,9 +66,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa cur_max_rows_to_merge_parts *= data.settings.merge_parts_at_night_inc; if (only_small) - { cur_max_rows_to_merge_parts = data.settings.max_rows_to_merge_parts_second; - } /// Найдем суммарный размер еще не пройденных кусков (то есть всех). size_t size_of_remaining_parts = 0; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 885bebed471..9a95a8626d7 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -32,12 +32,11 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( : context(context_), zookeeper(context.getZooKeeper()), table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_), - replica_name(replica_name_), is_leader_node(false), + replica_name(replica_name_), data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_,mode_, sign_column_, settings_), reader(data), writer(data), merger(data), fetcher(data), - log(&Logger::get("StorageReplicatedMergeTree: " + table_name)), - shutdown_called(false) + log(&Logger::get("StorageReplicatedMergeTree: " + table_name)) { if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/') zookeeper_path.erase(zookeeper_path.end() - 1); @@ -335,6 +334,65 @@ void StorageReplicatedMergeTree::clearOldParts() LOG_DEBUG(log, "Removed " << parts.size() << " old parts"); } +void StorageReplicatedMergeTree::clearOldLogs() +{ + Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas"); + UInt64 min_pointer = std::numeric_limits::max(); + for (const String & replica : replicas) + { + String pointer; + if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, pointer)) + return; + min_pointer = std::min(min_pointer, parse(pointer)); + } + + Strings entries = zookeeper.getChildren(replica_path + "/log"); + std::sort(entries.begin(), entries.end()); + size_t removed = 0; + + for (const String & entry : entries) + { + UInt64 index = parse(entry.substr(strlen("log-"))); + if (index >= min_pointer) + break; + zookeeper.remove(replica_path + "/log/" + entry); + ++removed; + } + + if (removed > 0) + LOG_DEBUG(log, "Removed " << removed << " old log entries"); +} + +void StorageReplicatedMergeTree::clearOldBlocks() +{ + zkutil::Stat stat; + if (!zookeeper.exists(zookeeper_path + "/blocks", &stat)) + throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE); + + /// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно. + if (static_cast(stat.getnumChildren()) < data.settings.replicated_deduplication_window * 1.1) + return; + + Strings blocks = zookeeper.getChildren(zookeeper_path + "/blocks"); + + std::vector > timed_blocks; + + for (const String & block : blocks) + { + zkutil::Stat stat; + zookeeper.exists(zookeeper_path + "/blocks/" + block, &stat); + timed_blocks.push_back(std::make_pair(stat.getczxid(), block)); + } + + std::sort(timed_blocks.begin(), timed_blocks.end()); + for (size_t i = data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i) + { + zookeeper.remove(zookeeper_path + "/blocks/" + timed_blocks[i].second); + } + + LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper"); +} + void StorageReplicatedMergeTree::loadQueue() { Poco::ScopedLock lock(queue_mutex); @@ -621,6 +679,13 @@ void StorageReplicatedMergeTree::queueUpdatingThread() pullLogsToQueue(); clearOldParts(); + + /// Каждую минуту выбрасываем ненужные записи из лога. + if (time(0) - clear_old_logs_time > 60) + { + clear_old_logs_time = time(0); + clearOldLogs(); + } } catch (...) { @@ -820,6 +885,13 @@ void StorageReplicatedMergeTree::mergeSelectingThread() if (shutdown_called) break; + /// Каждую минуту выбрасываем старые блоки. + if (time(0) - clear_old_blocks_time > 60) + { + clear_old_blocks_time = time(0); + clearOldBlocks(); + } + if (!success) std::this_thread::sleep_for(MERGE_SELECTING_SLEEP); }