ClickHouse/dbms/src/Storages/StorageReplicatedMergeTree.cpp
2015-09-16 23:38:50 +03:00

3059 lines
113 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <statdaemons/ext/range.hpp>
#include <DB/Storages/ColumnsDescription.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
#include <DB/Storages/MergeTree/MergeList.h>
#include <DB/Storages/MergeTree/MergeTreeWhereOptimizer.h>
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Common/VirtualColumnUtils.h>
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/DataStreams/AddingConstColumnBlockInputStream.h>
#include <DB/Common/Macros.h>
#include <DB/Common/formatReadable.h>
#include <Poco/DirectoryIterator.h>
#include <time.h>
namespace DB
{
const auto ERROR_SLEEP_MS = 1000;
const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
const Int64 RESERVED_BLOCK_NUMBERS = 200;
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
bool attach,
const String & path_, const String & database_name_, const String & name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const Names & columns_to_sum_,
const MergeTreeSettings & settings_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
current_zookeeper(context.getZooKeeper()), database_name(database_name_),
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'),
zookeeper_path(context.getMacros().expand(zookeeper_path_)),
replica_name(context.getMacros().expand(replica_name_)),
data(full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_, sign_column_, columns_to_sum_,
settings_, database_name_ + "." + table_name, true,
std::bind(&StorageReplicatedMergeTree::enqueuePartForCheck, this, std::placeholders::_1)),
reader(data), writer(data), merger(data), fetcher(data), shutdown_event(false),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
replica_path = zookeeper_path + "/replicas/" + replica_name;
bool skip_sanity_checks = false;
try
{
if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data"))
{
skip_sanity_checks = true;
current_zookeeper->remove(replica_path + "/flags/force_restore_data");
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag "
<< replica_path << "/flags/force_restore_data).");
}
}
catch (const zkutil::KeeperException & e)
{
/// Не удалось соединиться с ZK (об этом стало известно при попытке выполнить первую операцию).
if (e.code == ZCONNECTIONLOSS)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
current_zookeeper = nullptr;
}
else
throw;
}
data.loadDataParts(skip_sanity_checks);
if (!current_zookeeper)
{
if (!attach)
throw Exception("Can't create replicated table without ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
/// Не активируем реплику. Она будет в режиме readonly.
return;
}
if (!attach)
{
createTableIfNotExists();
checkTableStructure(false, false);
createReplica();
}
else
{
checkTableStructure(skip_sanity_checks, true);
checkParts(skip_sanity_checks);
}
initVirtualParts();
String unreplicated_path = full_path + "unreplicated/";
if (Poco::File(unreplicated_path).exists())
{
LOG_INFO(log, "Have unreplicated data");
unreplicated_data.reset(new MergeTreeData(unreplicated_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_,
date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, columns_to_sum_, settings_,
database_name_ + "." + table_name + "[unreplicated]", false));
unreplicated_data->loadDataParts(skip_sanity_checks);
unreplicated_reader.reset(new MergeTreeDataSelectExecutor(*unreplicated_data));
unreplicated_merger.reset(new MergeTreeDataMerger(*unreplicated_data));
}
loadQueue();
/// В этом потоке реплика будет активирована.
restarting_thread.reset(new ReplicatedMergeTreeRestartingThread(*this));
}
StoragePtr StorageReplicatedMergeTree::create(
const String & zookeeper_path_,
const String & replica_name_,
bool attach,
const String & path_, const String & database_name_, const String & name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const Names & columns_to_sum_,
const MergeTreeSettings & settings_)
{
auto res = new StorageReplicatedMergeTree{
zookeeper_path_, replica_name_, attach,
path_, database_name_, name_,
columns_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_,
sign_column_, columns_to_sum_, settings_};
StoragePtr res_ptr = res->thisPtr();
if (res->getZooKeeper())
{
String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, *res);
res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
}
return res_ptr;
}
static String formattedAST(const ASTPtr & ast)
{
if (!ast)
return "";
std::stringstream ss;
formatAST(*ast, ss, 0, false, true);
return ss.str();
}
void StorageReplicatedMergeTree::createTableIfNotExists()
{
auto zookeeper = getZooKeeper();
if (zookeeper->exists(zookeeper_path))
return;
LOG_DEBUG(log, "Creating table " << zookeeper_path);
zookeeper->createAncestors(zookeeper_path);
/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними параметры таблицы.
std::stringstream metadata;
metadata << "metadata format version: 1" << std::endl;
metadata << "date column: " << data.date_column_name << std::endl;
metadata << "sampling expression: " << formattedAST(data.sampling_expression) << std::endl;
metadata << "index granularity: " << data.index_granularity << std::endl;
metadata << "mode: " << static_cast<int>(data.mode) << std::endl;
metadata << "sign column: " << data.sign_column << std::endl;
metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(zookeeper_path, "",
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/metadata", metadata.str(),
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/columns", ColumnsDescription<false>{
data.getColumnsListNonMaterialized(), data.materialized_columns,
data.alias_columns, data.column_defaults}.toString(),
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/log", "",
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/blocks", "",
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/block_numbers", "",
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/nonincrement_block_numbers", "",
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/leader_election", "",
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/temp", "",
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/replicas", "",
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
auto code = zookeeper->tryMulti(ops);
if (code != ZOK && code != ZNODEEXISTS)
throw zkutil::KeeperException(code);
}
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
*/
void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bool allow_alter)
{
auto zookeeper = getZooKeeper();
String metadata_str = zookeeper->get(zookeeper_path + "/metadata");
ReadBufferFromString buf(metadata_str);
assertString("metadata format version: 1", buf);
assertString("\ndate column: ", buf);
assertString(data.date_column_name, buf);
assertString("\nsampling expression: ", buf);
assertString(formattedAST(data.sampling_expression), buf);
assertString("\nindex granularity: ", buf);
assertString(toString(data.index_granularity), buf);
assertString("\nmode: ", buf);
assertString(toString(static_cast<int>(data.mode)), buf);
assertString("\nsign column: ", buf);
assertString(data.sign_column, buf);
assertString("\nprimary key: ", buf);
/// NOTE: Можно сделать менее строгую проверку совпадения выражений, чтобы таблицы не ломались от небольших изменений
/// в коде formatAST.
assertString(formattedAST(data.primary_expr_ast), buf);
assertString("\n", buf);
assertEOF(buf);
zkutil::Stat stat;
auto columns_desc = ColumnsDescription<true>::parse(zookeeper->get(zookeeper_path + "/columns", &stat));
auto & columns = columns_desc.columns;
auto & materialized_columns = columns_desc.materialized;
auto & alias_columns = columns_desc.alias;
auto & column_defaults = columns_desc.defaults;
columns_version = stat.version;
if (columns != data.getColumnsListNonMaterialized() ||
materialized_columns != data.materialized_columns ||
alias_columns != data.alias_columns ||
column_defaults != data.column_defaults)
{
if (allow_alter &&
(skip_sanity_checks ||
data.getColumnsListNonMaterialized().sizeOfDifference(columns) +
data.materialized_columns.sizeOfDifference(materialized_columns) <= 2))
{
LOG_WARNING(log, "Table structure in ZooKeeper is a little different from local table structure. Assuming ALTER.");
/// Без всяких блокировок, потому что таблица еще не создана.
InterpreterAlterQuery::updateMetadata(database_name, table_name, columns,
materialized_columns, alias_columns, column_defaults, context);
data.setColumnsList(columns);
data.materialized_columns = std::move(materialized_columns);
data.alias_columns = std::move(alias_columns);
data.column_defaults = std::move(column_defaults);
}
else
{
throw Exception("Table structure in ZooKeeper is too different from local table structure.",
ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
}
void StorageReplicatedMergeTree::createReplica()
{
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Creating replica " << replica_path);
/// Создадим пустую реплику. Ноду columns создадим в конце - будем использовать ее в качестве признака, что создание реплики завершено.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(replica_path, "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(replica_path + "/host", "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(replica_path + "/log_pointer", "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(replica_path + "/queue", "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(replica_path + "/parts", "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(replica_path + "/flags", "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
try
{
zookeeper->multi(ops);
}
catch (const zkutil::KeeperException & e)
{
if (e.code == ZNODEEXISTS)
throw Exception("Replica " + replica_path + " is already exist.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
throw;
}
/** Нужно изменить данные ноды /replicas на что угодно, чтобы поток, удаляющий старые записи в логе,
* споткнулся об это изменение и не удалил записи, которые мы еще не прочитали.
*/
zookeeper->set(zookeeper_path + "/replicas", "last added replica: " + replica_name);
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
/** "Эталонная" реплика, у которой мы возьмем информацию о множестве кусков, очередь и указатель на лог.
* Возьмем случайную из реплик, созданных раньше этой.
*/
String source_replica;
Stat stat;
zookeeper->exists(replica_path, &stat);
auto my_create_time = stat.czxid;
std::random_shuffle(replicas.begin(), replicas.end());
for (const String & replica : replicas)
{
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica, &stat))
throw Exception("Replica " + zookeeper_path + "/replicas/" + replica + " was removed from right under our feet.",
ErrorCodes::NO_SUCH_REPLICA);
if (stat.czxid < my_create_time)
{
source_replica = replica;
break;
}
}
if (source_replica.empty())
{
LOG_INFO(log, "This is the first replica");
}
else
{
LOG_INFO(log, "Will mimic " << source_replica);
String source_path = zookeeper_path + "/replicas/" + source_replica;
/** Если эталонная реплика еще не до конца создана, подождем.
* NOTE: Если при ее создании что-то пошло не так, можем провисеть тут вечно.
* Можно создавать на время создания эфемерную ноду, чтобы быть уверенным, что реплика создается, а не заброшена.
* То же можно делать и для таблицы. Можно автоматически удалять ноду реплики/таблицы,
* если видно, что она создана не до конца, а создающий ее умер.
*/
while (!zookeeper->exists(source_path + "/columns"))
{
LOG_INFO(log, "Waiting for replica " << source_path << " to be fully created");
zkutil::EventPtr event = new Poco::Event;
if (zookeeper->exists(source_path + "/columns", nullptr, event))
{
LOG_WARNING(log, "Oops, a watch has leaked");
break;
}
event->wait();
}
/// Порядок следующих трех действий важен. Записи в логе могут продублироваться, но не могут потеряться.
/// Скопируем у эталонной реплики ссылку на лог.
zookeeper->set(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"));
/// Запомним очередь эталонной реплики.
Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
std::sort(source_queue_names.begin(), source_queue_names.end());
Strings source_queue;
for (const String & entry_name : source_queue_names)
{
String entry;
if (!zookeeper->tryGet(source_path + "/queue/" + entry_name, entry))
continue;
source_queue.push_back(entry);
}
/// Добавим в очередь задания на получение всех активных кусков, которые есть у эталонной реплики.
Strings parts = zookeeper->getChildren(source_path + "/parts");
ActiveDataPartSet active_parts_set(parts);
Strings active_parts = active_parts_set.getParts();
for (const String & name : active_parts)
{
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = name;
zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
}
LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched");
/// Добавим в очередь содержимое очереди эталонной реплики.
for (const String & entry : source_queue)
{
zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
}
LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
}
zookeeper->create(replica_path + "/columns", ColumnsDescription<false>{
data.getColumnsListNonMaterialized(),
data.materialized_columns,
data.alias_columns,
data.column_defaults
}.toString(), zkutil::CreateMode::Persistent);
}
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
auto zookeeper = getZooKeeper();
Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts");
/// Куски в ZK.
NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
MergeTreeData::DataParts parts = data.getAllDataParts();
/// Локальные куски, которых нет в ZK.
MergeTreeData::DataParts unexpected_parts;
for (const auto & part : parts)
{
if (expected_parts.count(part->name))
expected_parts.erase(part->name);
else
unexpected_parts.insert(part);
}
/// Какие локальные куски добавить в ZK.
MergeTreeData::DataPartsVector parts_to_add;
/// Какие куски нужно забрать с других реплик.
Strings parts_to_fetch;
for (const String & missing_name : expected_parts)
{
/// Если локально не хватает какого-то куска, но есть покрывающий его кусок, можно заменить в ZK недостающий покрывающим.
auto containing = data.getActiveContainingPart(missing_name);
if (containing)
{
LOG_ERROR(log, "Ignoring missing local part " << missing_name << " because part " << containing->name << " exists");
if (unexpected_parts.count(containing))
{
parts_to_add.push_back(containing);
unexpected_parts.erase(containing);
}
}
else
{
LOG_ERROR(log, "Fetching missing part " << missing_name);
parts_to_fetch.push_back(missing_name);
}
}
for (const String & name : parts_to_fetch)
expected_parts.erase(name);
String sanity_report =
"There are " + toString(unexpected_parts.size()) + " unexpected parts, "
+ toString(parts_to_add.size()) + " unexpectedly merged parts, "
+ toString(expected_parts.size()) + " missing obsolete parts, "
+ toString(parts_to_fetch.size()) + " missing parts";
/** Можно автоматически синхронизировать данные,
* если количество ошибок каждого из четырёх типов не больше соответствующих порогов,
* или если отношение общего количества ошибок к общему количеству кусков (минимальному - в локальной файловой системе или в ZK)
* не больше некоторого отношения (например 5%).
*/
size_t min_parts_local_or_expected = std::min(expected_parts_vec.size(), parts.size());
bool insane =
(parts_to_add.size() > data.settings.replicated_max_unexpectedly_merged_parts
|| unexpected_parts.size() > data.settings.replicated_max_unexpected_parts
|| expected_parts.size() > data.settings.replicated_max_missing_obsolete_parts
|| parts_to_fetch.size() > data.settings.replicated_max_missing_active_parts)
&& ((parts_to_add.size() + unexpected_parts.size() + expected_parts.size() + parts_to_fetch.size())
> min_parts_local_or_expected * data.settings.replicated_max_ratio_of_wrong_parts);
if (insane)
{
if (skip_sanity_checks)
LOG_WARNING(log, sanity_report);
else
throw Exception("The local set of parts of table " + getTableName() + " doesn't look like the set of parts in ZooKeeper. "
+ sanity_report, ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
}
/// Добавим в ZK информацию о кусках, покрывающих недостающие куски.
for (const MergeTreeData::DataPartPtr & part : parts_to_add)
{
LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
}
/// Удалим из ZK информацию о кусках, покрытых только что добавленными.
for (const String & name : expected_parts)
{
LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
zookeeper->multi(ops);
}
/// Добавим в очередь задание забрать недостающие куски с других реплик и уберем из ZK информацию, что они у нас есть.
for (const String & name : parts_to_fetch)
{
LOG_ERROR(log, "Removing missing part from ZooKeeper and queueing a fetch: " << name);
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = name;
/// Полагаемся, что это происходит до загрузки очереди (loadQueue).
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
zookeeper->multi(ops);
}
/// Удалим лишние локальные куски.
for (const MergeTreeData::DataPartPtr & part : unexpected_parts)
{
LOG_ERROR(log, "Renaming unexpected part " << part->name << " to ignored_" + part->name);
data.renameAndDetachPart(part, "ignored_", true);
}
}
void StorageReplicatedMergeTree::initVirtualParts()
{
auto parts = data.getDataParts();
for (const auto & part : parts)
virtual_parts.add(part->name);
}
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String part_name)
{
auto zookeeper = getZooKeeper();
if (part_name.empty())
part_name = part->name;
check(part->columns);
int expected_columns_version = columns_version;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
std::random_shuffle(replicas.begin(), replicas.end());
String expected_columns_str = part->columns.toString();
for (const String & replica : replicas)
{
zkutil::Stat stat_before, stat_after;
String columns_str;
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/columns", columns_str, &stat_before))
continue;
if (columns_str != expected_columns_str)
{
LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica
<< " because columns are different");
continue;
}
String checksums_str;
/// Проверим, что версия ноды со столбцами не изменилась, пока мы читали checksums.
/// Это гарантирует, что столбцы и чексуммы относятся к одним и тем же данным.
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/checksums", checksums_str) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/columns", &stat_after) ||
stat_before.version != stat_after.version)
{
LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica
<< " because part changed while we were reading its checksums");
continue;
}
auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
checksums.checkEqual(part->checksums, true);
}
if (zookeeper->exists(replica_path + "/parts/" + part_name))
{
LOG_ERROR(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part_name << " already exists");
return;
}
ops.push_back(new zkutil::Op::Check(
zookeeper_path + "/columns",
expected_columns_version));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part_name,
"",
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part_name + "/columns",
part->columns.toString(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part_name + "/checksums",
part->checksums.toString(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
}
void StorageReplicatedMergeTree::loadQueue()
{
auto zookeeper = getZooKeeper();
std::lock_guard<std::mutex> lock(queue_mutex);
Strings children = zookeeper->getChildren(replica_path + "/queue");
std::sort(children.begin(), children.end());
for (const String & child : children)
{
zkutil::Stat stat;
String s = zookeeper->get(replica_path + "/queue/" + child, &stat);
LogEntryPtr entry = LogEntry::parse(s);
entry->create_time = stat.ctime / 1000;
entry->znode_name = child;
entry->addResultToVirtualParts(*this);
queue.push_back(entry);
}
}
void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event)
{
auto zookeeper = getZooKeeper();
std::lock_guard<std::mutex> lock(queue_mutex);
String index_str = zookeeper->get(replica_path + "/log_pointer");
UInt64 index;
if (index_str.empty())
{
/// Если у нас еще нет указателя на лог, поставим указатель на первую запись в нем.
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
index = entries.empty() ? 0 : parse<UInt64>(std::min_element(entries.begin(), entries.end())->substr(strlen("log-")));
zookeeper->set(replica_path + "/log_pointer", toString(index));
}
else
{
index = parse<UInt64>(index_str);
}
UInt64 first_index = index;
size_t count = 0;
String entry_str;
zkutil::Stat stat;
while (zookeeper->tryGet(zookeeper_path + "/log/log-" + padIndex(index), entry_str, &stat))
{
++count;
++index;
LogEntryPtr entry = LogEntry::parse(entry_str);
entry->create_time = stat.ctime / 1000;
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", entry_str, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/log_pointer", toString(index), -1));
auto results = zookeeper->multi(ops);
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
entry->addResultToVirtualParts(*this);
queue.push_back(entry);
}
if (next_update_event)
{
if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event))
next_update_event->set();
}
if (!count)
return;
if (queue_task_handle)
queue_task_handle->wake();
LOG_DEBUG(log, "Pulled " << count << " entries to queue: log-" << padIndex(first_index) << " - log-" << padIndex(index - 1));
}
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
{
if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
&& future_parts.count(entry.new_part_name))
{
LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name <<
" because another log entry for the same part is being processed. This shouldn't happen often.");
return false;
}
if (entry.type == LogEntry::MERGE_PARTS)
{
/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
* Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж.
* Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
* Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
*/
for (const auto & name : entry.parts_to_merge)
{
if (future_parts.count(name))
{
LOG_TRACE(log, "Not merging into part " << entry.new_part_name << " because part " << name << " is not ready yet.");
return false;
}
}
}
return true;
}
bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context)
{
auto zookeeper = getZooKeeper();
if (entry.type == LogEntry::DROP_RANGE)
{
executeDropRange(entry);
return true;
}
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS ||
entry.type == LogEntry::ATTACH_PART)
{
/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name);
/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper. Проверим, что он там есть.
if (containing_part && zookeeper->exists(replica_path + "/parts/" + containing_part->name))
{
if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
return true;
}
}
if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
bool do_fetch = false;
if (entry.type == LogEntry::GET_PART)
{
do_fetch = true;
}
else if (entry.type == LogEntry::ATTACH_PART)
{
do_fetch = !executeAttachPart(entry);
}
else if (entry.type == LogEntry::MERGE_PARTS)
{
std::stringstream log_message;
log_message << "Executing log entry to merge parts ";
for (auto i : ext::range(0, entry.parts_to_merge.size()))
log_message << (i != 0 ? ", " : "") << entry.parts_to_merge[i];
log_message << " to " << entry.new_part_name;
LOG_TRACE(log, log_message.rdbuf());
MergeTreeData::DataPartsVector parts;
bool have_all_parts = true;
for (const String & name : entry.parts_to_merge)
{
MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name);
if (!part)
{
have_all_parts = false;
break;
}
if (part->name != name)
{
LOG_WARNING(log, "Part " << name << " is covered by " << part->name
<< " but should be merged into " << entry.new_part_name << ". This shouldn't happen often.");
have_all_parts = false;
break;
}
parts.push_back(part);
}
if (!have_all_parts)
{
/// Если нет всех нужных кусков, попробуем взять у кого-нибудь уже помердженный кусок.
do_fetch = true;
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
}
else
{
/// Если собираемся сливать большие куски, увеличим счетчик потоков, сливающих большие куски.
for (const auto & part : parts)
{
if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small)
{
pool_context.incrementCounter("big merges");
pool_context.incrementCounter("replicated big merges");
break;
}
}
size_t sum_parts_size_in_bytes = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts);
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, sum_parts_size_in_bytes); /// Может бросить исключение.
auto table_lock = lockStructure(false);
const auto & merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name);
MergeTreeData::Transaction transaction;
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name, *merge_entry, aio_threshold, &transaction, reserved_space);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
/** TODO: Переименование нового куска лучше делать здесь, а не пятью строчками выше,
* чтобы оно было как можно ближе к zookeeper->multi.
*/
zookeeper->multi(ops);
/** Удаление старых кусков из ZK и с диска делается отложенно - см. ReplicatedMergeTreeCleanupThread, clearOldParts.
*/
/** При ZCONNECTIONLOSS или ZOPERATIONTIMEOUT можем зря откатить локальные изменения кусков.
* Это не проблема, потому что в таком случае слияние останется в очереди, и мы попробуем снова.
*/
transaction.commit();
merge_selecting_event.set();
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
}
}
else
{
throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)));
}
if (do_fetch)
{
String replica;
try
{
replica = findReplicaHavingPart(entry.new_part_name, true);
if (replica.empty() && entry.type == LogEntry::ATTACH_PART)
{
/** Если ATTACH - куска может не быть, потому что реплика, на которой кусок есть, ещё сама не успела его прицепить.
* В таком случае, надо подождать этого.
*/
/// Кусок должен быть на реплике-инициаторе.
if (entry.source_replica.empty() || entry.source_replica == replica_name)
throw Exception("Logical error: no source replica specified for ATTACH_PART log entry;"
" or trying to fetch part on source replica", ErrorCodes::LOGICAL_ERROR);
/// Подождём, пока реплика-инициатор подцепит кусок.
waitForReplicaToProcessLogEntry(entry.source_replica, entry);
replica = findReplicaHavingPart(entry.new_part_name, true);
}
if (replica.empty())
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
}
fetchPart(entry.new_part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum);
if (entry.type == LogEntry::MERGE_PARTS)
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
}
catch (...)
{
/** Если не получилось скачать кусок, нужный для какого-то мерджа, лучше не пытаться получить другие куски для этого мерджа,
* а попытаться сразу получить помердженный кусок. Чтобы так получилось, переместим действия для получения остальных кусков
* для этого мерджа в конец очереди.
*/
try
{
std::lock_guard<std::mutex> lock(queue_mutex);
/// Найдем действие по объединению этого куска с другими. Запомним других.
StringSet parts_for_merge;
LogEntries::iterator merge_entry;
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
if ((*it)->type == LogEntry::MERGE_PARTS)
{
if (std::find((*it)->parts_to_merge.begin(), (*it)->parts_to_merge.end(), entry.new_part_name)
!= (*it)->parts_to_merge.end())
{
parts_for_merge = StringSet((*it)->parts_to_merge.begin(), (*it)->parts_to_merge.end());
merge_entry = it;
break;
}
}
}
if (!parts_for_merge.empty())
{
/// Переместим в конец очереди действия, получающие parts_for_merge.
for (LogEntries::iterator it = queue.begin(); it != queue.end();)
{
auto it0 = it;
++it;
if (it0 == merge_entry)
break;
if (((*it0)->type == LogEntry::MERGE_PARTS || (*it0)->type == LogEntry::GET_PART)
&& parts_for_merge.count((*it0)->new_part_name))
{
queue.splice(queue.end(), queue, it0, it);
}
}
/** Если этого куска ни у кого нет, но в очереди упоминается мердж с его участием, то наверно этот кусок такой старый,
* что его все померджили и удалили. Не будем бросать исключение, чтобы queueTask лишний раз не спала.
*/
if (replica.empty())
{
LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead.");
return false;
}
}
/** Если ни у какой активной реплики нет куска, и в очереди нет слияний с его участием,
* проверим, есть ли у любой (активной или неактивной) реплики такой кусок или покрывающий его.
*/
if (replica.empty())
enqueuePartForCheck(entry.new_part_name);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
throw;
}
}
return true;
}
void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry)
{
auto zookeeper = getZooKeeper();
LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << ".");
{
LogEntries to_wait;
size_t removed_entries = 0;
/// Удалим из очереди операции с кусками, содержащимися в удаляемом диапазоне.
std::unique_lock<std::mutex> lock(queue_mutex);
for (LogEntries::iterator it = queue.begin(); it != queue.end();)
{
if (((*it)->type == LogEntry::GET_PART || (*it)->type == LogEntry::MERGE_PARTS) &&
ActiveDataPartSet::contains(entry.new_part_name, (*it)->new_part_name))
{
if ((*it)->currently_executing)
to_wait.push_back(*it);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
if (code != ZOK)
LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code));
queue.erase(it++);
++removed_entries;
}
else
++it;
}
LOG_DEBUG(log, "Removed " << removed_entries << " entries from queue. "
"Waiting for " << to_wait.size() << " entries that are currently executing.");
/// Дождемся завершения операций с кусками, содержащимися в удаляемом диапазоне.
for (LogEntryPtr & entry : to_wait)
entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; });
}
LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
size_t removed_parts = 0;
/// Удалим куски, содержащиеся в удаляемом диапазоне.
auto parts = data.getDataParts();
for (const auto & part : parts)
{
if (!ActiveDataPartSet::contains(entry.new_part_name, part->name))
continue;
LOG_DEBUG(log, "Removing part " << part->name);
++removed_parts;
/// Если кусок удалять не нужно, надежнее переместить директорию до изменений в ZooKeeper.
if (entry.detach)
data.renameAndDetachPart(part);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
zookeeper->multi(ops);
/// Если кусок нужно удалить, надежнее удалить директорию после изменений в ZooKeeper.
if (!entry.detach)
data.replaceParts({part}, {}, true);
}
LOG_INFO(log, (entry.detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << entry.new_part_name << ".");
}
bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry)
{
auto zookeeper = getZooKeeper();
String source_path = (entry.attach_unreplicated ? "unreplicated/" : "detached/") + entry.source_part_name;
LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name);
if (!Poco::File(data.getFullPath() + source_path).exists())
{
LOG_INFO(log, "No part at " << source_path << ". Will fetch it instead");
return false;
}
LOG_DEBUG(log, "Checking data");
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops, entry.new_part_name);
if (entry.attach_unreplicated && unreplicated_data)
{
MergeTreeData::DataPartPtr unreplicated_part = unreplicated_data->getPartIfExists(entry.source_part_name);
if (unreplicated_part)
unreplicated_data->detachPartInPlace(unreplicated_part);
else
LOG_WARNING(log, "Unreplicated part " << entry.source_part_name << " is already detached");
}
zookeeper->multi(ops);
/// NOTE: Не можем использовать renameTempPartAndAdd, потому что кусок не временный - если что-то пойдет не так, его не нужно удалять.
part->renameTo(entry.new_part_name);
part->name = entry.new_part_name;
ActiveDataPartSet::parsePartName(part->name, *part);
data.attachPart(part);
LOG_INFO(log, "Finished attaching part " << entry.new_part_name);
/// На месте удаленных кусков могут появиться новые, с другими данными.
context.resetCaches();
return true;
}
void StorageReplicatedMergeTree::queueUpdatingThread()
{
while (!shutdown_called)
{
try
{
pullLogsToQueue(queue_updating_event);
queue_updating_event->wait();
}
catch (const zkutil::KeeperException & e)
{
if (e.code == ZINVALIDSTATE)
restarting_thread->wakeup();
tryLogCurrentException(__PRETTY_FUNCTION__);
queue_updating_event->tryWait(ERROR_SLEEP_MS);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
queue_updating_event->tryWait(ERROR_SLEEP_MS);
}
}
LOG_DEBUG(log, "Queue updating thread finished");
}
bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & pool_context)
{
LogEntryPtr entry;
try
{
std::lock_guard<std::mutex> lock(queue_mutex);
bool empty = queue.empty();
if (!empty)
{
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
if (!(*it)->currently_executing && shouldExecuteLogEntry(**it))
{
entry = *it;
entry->tagPartAsFuture(*this);
queue.splice(queue.end(), queue, it);
entry->currently_executing = true;
break;
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!entry)
return false;
bool exception = true;
bool success = false;
try
{
if (executeLogEntry(*entry, pool_context))
{
auto zookeeper = getZooKeeper();
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
if (code != ZOK)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often.");
success = true;
}
exception = false;
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
/// Если ни у кого нет нужного куска, наверно, просто не все реплики работают; не будем писать в лог с уровнем Error.
LOG_INFO(log, e.displayText());
else
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
entry->future_part_tagger = nullptr;
std::lock_guard<std::mutex> lock(queue_mutex);
entry->currently_executing = false;
entry->execution_complete.notify_all();
if (success)
{
/// Удалим задание из очереди.
/// Нельзя просто обратиться по заранее сохраненному итератору, потому что задание мог успеть удалить кто-то другой.
for (LogEntries::iterator it = queue.end(); it != queue.begin();)
{
--it;
if (*it == entry)
{
queue.erase(it);
break;
}
}
}
/// Если не было исключения, не нужно спать.
return !exception;
}
void StorageReplicatedMergeTree::mergeSelectingThread()
{
bool need_pull = true;
/** Может много времени тратиться на определение, можно ли мерджить два рядом стоящих куска.
* Два рядом стоящих куска можно мерджить, если все номера блоков между их номерами не используются ("заброшены", abandoned).
* Это значит, что между этими кусками не может быть вставлен другой кусок.
*
* Но если номера соседних блоков отличаются достаточно сильно (обычно, если между ними много "заброшенных" блоков),
* то делается слишком много чтений из ZooKeeper, чтобы узнать, можно ли их мерджить.
*
* Воспользуемся утверждением, что если пару кусков было можно мерджить, и их мердж ещё не запланирован,
* то и сейчас их можно мерджить, и будем запоминать это состояние, чтобы не делать много раз одинаковые запросы в ZooKeeper.
*
* TODO Интересно, как это сочетается с DROP PARTITION и затем ATTACH PARTITION.
*/
std::set<std::pair<std::string, std::string>> memoized_parts_that_could_be_merged;
auto can_merge = [&memoized_parts_that_could_be_merged, this]
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) -> bool
{
/// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать.
if (virtual_parts.getContainingPart(left->name) != left->name ||
virtual_parts.getContainingPart(right->name) != right->name)
return false;
auto key = std::make_pair(left->name, right->name);
if (memoized_parts_that_could_be_merged.count(key))
return true;
String month_name = left->name.substr(0, 6);
auto zookeeper = getZooKeeper();
/// Нельзя сливать куски, среди которых находится кусок, для которого неудовлетворён кворум.
/// Замечание: теоретически, это можно было бы разрешить. Но это сделает логику более сложной.
String quorum_node_value;
if (zookeeper->tryGet(zookeeper_path + "/quorum/status", quorum_node_value))
{
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(quorum_node_value);
ActiveDataPartSet::Part part_info;
ActiveDataPartSet::parsePartName(quorum_entry.part_name, part_info);
if (part_info.left != part_info.right)
throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR);
if (left->right <= part_info.left && right->left >= part_info.right)
return false;
}
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
/// Номера до RESERVED_BLOCK_NUMBERS всегда не соответствуют никаким блокам.
for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, left->right + 1); number <= right->left - 1; ++number)
{
String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number);
if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
return false;
}
memoized_parts_that_could_be_merged.insert(key);
return true;
};
while (!shutdown_called && is_leader_node)
{
bool success = false;
try
{
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
if (need_pull)
{
/// Нужно загрузить новую запись в очередь перед тем, как выбирать куски для слияния.
/// (чтобы кусок добавился в virtual_parts).
pullLogsToQueue();
need_pull = false;
}
/** Сколько в очереди или в фоновом потоке мерджей крупных кусков.
* Если их больше половины от размера пула потоков для мерджа, то можно мерджить только мелкие куски.
*/
auto & background_pool = context.getBackgroundPool();
size_t big_merges_current = background_pool.getCounter("replicated big merges");
size_t max_number_of_big_merges = background_pool.getNumberOfThreads() / 2;
size_t merges_queued = 0;
size_t big_merges_queued = 0;
if (big_merges_current < max_number_of_big_merges)
{
std::lock_guard<std::mutex> lock(queue_mutex);
for (const auto & entry : queue)
{
if (entry->type == LogEntry::MERGE_PARTS)
{
++merges_queued;
if (big_merges_current + big_merges_queued < max_number_of_big_merges)
{
for (const String & name : entry->parts_to_merge)
{
MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name);
if (!part || part->name != name)
continue;
if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small)
{
++big_merges_queued;
break;
}
}
}
}
}
}
bool only_small = big_merges_current + big_merges_queued >= max_number_of_big_merges;
if (big_merges_current || merges_queued)
LOG_TRACE(log, "Currently executing big merges: " << big_merges_current
<< ". Queued big merges: " << big_merges_queued
<< ". All merges in queue: " << merges_queued
<< ". Max number of big merges: " << max_number_of_big_merges
<< (only_small ? ". So, will select only small parts to merge." : "."));
do
{
auto zookeeper = getZooKeeper();
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges is greater than max_replicated_merges_in_queue, so won't select new parts to merge.");
break;
}
MergeTreeData::DataPartsVector parts;
String merged_name;
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
if ( !merger.selectPartsToMerge(parts, merged_name, disk_space, false, false, only_small, can_merge)
&& !merger.selectPartsToMerge(parts, merged_name, disk_space, true, false, only_small, can_merge))
{
break;
}
bool all_in_zk = true;
for (const auto & part : parts)
{
/// Если о каком-то из кусков нет информации в ZK, не будем сливать.
if (!zookeeper->exists(replica_path + "/parts/" + part->name))
{
LOG_WARNING(log, "Part " << part->name << " exists locally but not in ZooKeeper.");
enqueuePartForCheck(part->name);
all_in_zk = false;
}
}
if (!all_in_zk)
break;
LogEntry entry;
entry.type = LogEntry::MERGE_PARTS;
entry.source_replica = replica_name;
entry.new_part_name = merged_name;
for (const auto & part : parts)
entry.parts_to_merge.push_back(part->name);
need_pull = true;
zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
String month_name = parts[0]->name.substr(0, 6);
for (size_t i = 0; i + 1 < parts.size(); ++i)
{
/// Уберем больше не нужные отметки о несуществующих блоках.
for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, parts[i]->right + 1); number <= parts[i + 1]->left - 1; ++number)
{
zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number));
zookeeper->tryRemove(zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number));
}
}
success = true;
}
while (false);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (shutdown_called || !is_leader_node)
break;
if (!success)
merge_selecting_event.tryWait(MERGE_SELECTING_SLEEP_MS);
}
LOG_DEBUG(log, "Merge selecting thread finished");
}
void StorageReplicatedMergeTree::alterThread()
{
bool force_recheck_parts = true;
while (!shutdown_called)
{
try
{
/** Имеем описание столбцов в ZooKeeper, общее для всех реплик (Пример: /clickhouse/tables/02-06/visits/columns),
* а также описание столбцов в локальном файле с метаданными (data.getColumnsList()).
*
* Если эти описания отличаются - нужно сделать ALTER.
*
* Если запомненная версия ноды (columns_version) отличается от версии в ZK,
* то описание столбцов в ZK не обязательно отличается от локального
* - такое может быть при цикле из ALTER-ов, который в целом, ничего не меняет.
* В этом случае, надо обновить запомненный номер версии,
* а также всё-равно проверить структуру кусков, и, при необходимости, сделать ALTER.
*
* Запомненный номер версии нужно обновить после обновления метаданных, под блокировкой.
* Этот номер версии проверяется на соответствие актуальному при INSERT-е.
* То есть, так добиваемся, чтобы вставлялись блоки с правильной структурой.
*
* При старте сервера, мог быть не завершён предыдущий ALTER.
* Поэтому, в первый раз, независимо от изменений, проверяем структуру всех part-ов,
* (Пример: /clickhouse/tables/02-06/visits/replicas/example02-06-1.yandex.ru/parts/20140806_20140831_131664_134988_3296/columns)
* и делаем ALTER, если необходимо.
*
* TODO: Слишком сложно, всё переделать.
*/
auto zookeeper = getZooKeeper();
zkutil::Stat stat;
const String columns_str = zookeeper->get(zookeeper_path + "/columns", &stat, alter_thread_event);
auto columns_desc = ColumnsDescription<true>::parse(columns_str);
auto & columns = columns_desc.columns;
auto & materialized_columns = columns_desc.materialized;
auto & alias_columns = columns_desc.alias;
auto & column_defaults = columns_desc.defaults;
bool changed_version = (stat.version != columns_version);
MergeTreeData::DataParts parts;
/// Если описание столбцов изменилось, обновим структуру таблицы локально.
if (changed_version)
{
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
auto table_lock = lockStructureForAlter();
const auto columns_changed = columns != data.getColumnsListNonMaterialized();
const auto materialized_columns_changed = materialized_columns != data.materialized_columns;
const auto alias_columns_changed = alias_columns != data.alias_columns;
const auto column_defaults_changed = column_defaults != data.column_defaults;
if (columns_changed || materialized_columns_changed || alias_columns_changed ||
column_defaults_changed)
{
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
InterpreterAlterQuery::updateMetadata(database_name, table_name, columns,
materialized_columns, alias_columns, column_defaults, context);
if (columns_changed)
{
data.setColumnsList(columns);
if (unreplicated_data)
unreplicated_data->setColumnsList(columns);
}
if (materialized_columns_changed)
{
this->materialized_columns = materialized_columns;
data.materialized_columns = std::move(materialized_columns);
}
if (alias_columns_changed)
{
this->alias_columns = alias_columns;
data.alias_columns = std::move(alias_columns);
}
if (column_defaults_changed)
{
this->column_defaults = column_defaults;
data.column_defaults = std::move(column_defaults);
}
LOG_INFO(log, "Applied changes to table.");
}
else
{
LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs.");
}
/// Нужно получить список кусков под блокировкой таблицы, чтобы избежать race condition с мерджем.
parts = data.getDataParts();
columns_version = stat.version;
}
/// Обновим куски.
if (changed_version || force_recheck_parts)
{
auto table_lock = lockStructure(false);
if (changed_version)
LOG_INFO(log, "ALTER-ing parts");
int changed_parts = 0;
if (!changed_version)
parts = data.getDataParts();
const auto columns_plus_materialized = data.getColumnsList();
for (const MergeTreeData::DataPartPtr & part : parts)
{
/// Обновим кусок и запишем результат во временные файлы.
/// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например,
/// нода /flags/force_alter.
auto transaction = data.alterDataPart(part, columns_plus_materialized);
if (!transaction)
continue;
++changed_parts;
/// Обновим метаданные куска в ZooKeeper.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::SetData(
replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/parts/" + part->name + "/checksums", transaction->getNewChecksums().toString(), -1));
zookeeper->multi(ops);
/// Применим изменения файлов.
transaction->commit();
}
/// То же самое для нереплицируемых данных.
if (unreplicated_data)
{
parts = unreplicated_data->getDataParts();
for (const MergeTreeData::DataPartPtr & part : parts)
{
auto transaction = unreplicated_data->alterDataPart(part, columns_plus_materialized);
if (!transaction)
continue;
++changed_parts;
transaction->commit();
}
}
/// Список столбцов для конкретной реплики.
zookeeper->set(replica_path + "/columns", columns_str);
if (changed_version)
{
if (changed_parts != 0)
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
else
LOG_INFO(log, "No parts ALTER-ed");
}
force_recheck_parts = false;
}
parts.clear();
alter_thread_event->wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
force_recheck_parts = true;
alter_thread_event->tryWait(ERROR_SLEEP_MS);
}
}
LOG_DEBUG(log, "Alter thread finished");
}
void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name)
{
auto zookeeper = getZooKeeper();
String part_path = replica_path + "/parts/" + part_name;
LogEntryPtr log_entry = new LogEntry;
log_entry->type = LogEntry::GET_PART;
log_entry->create_time = time(0);
log_entry->source_replica = "";
log_entry->new_part_name = part_name;
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", log_entry->toString(), zookeeper->getDefaultACL(),
zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::Remove(part_path + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(part_path + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(part_path, -1));
auto results = zookeeper->multi(ops);
{
std::lock_guard<std::mutex> lock(queue_mutex);
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
log_entry->addResultToVirtualParts(*this);
queue.push_back(log_entry);
}
}
void StorageReplicatedMergeTree::enqueuePartForCheck(const String & name)
{
std::lock_guard<std::mutex> lock(parts_to_check_mutex);
if (parts_to_check_set.count(name))
return;
parts_to_check_queue.push_back(name);
parts_to_check_set.insert(name);
parts_to_check_event.set();
}
void StorageReplicatedMergeTree::searchForMissingPart(const String & part_name)
{
auto zookeeper = getZooKeeper();
String part_path = replica_path + "/parts/" + part_name;
/// Если кусок есть в ZooKeeper, удалим его оттуда и добавим в очередь задание скачать его.
if (zookeeper->exists(part_path))
{
LOG_WARNING(log, "Checker: Part " << part_name << " exists in ZooKeeper but not locally. "
"Removing from ZooKeeper and queueing a fetch.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
removePartAndEnqueueFetch(part_name);
return;
}
/// Если куска нет в ZooKeeper, проверим есть ли он хоть у кого-то.
ActiveDataPartSet::Part part_info;
ActiveDataPartSet::parsePartName(part_name, part_info);
/** Логика такая:
* - если у какой-то живой или неактивной реплики есть такой кусок, или покрывающий его кусок
* - всё Ок, ничего делать не нужно, он скачается затем при обработке очереди, когда реплика оживёт;
* - или, если реплика никогда не оживёт, то администратор удалит или создаст новую реплику с тем же адресом и см. всё сначала;
* - если ни у кого нет такого или покрывающего его куска, то
* - если у кого-то есть все составляющие куски, то ничего делать не будем - это просто значит, что другие реплики ещё недоделали мердж
* - если ни у кого нет всех составляющих кусков, то признаем кусок навечно потерянным,
* и удалим запись из очереди репликации.
*/
LOG_WARNING(log, "Checker: Checking if anyone has part covering " << part_name << ".");
bool found = false;
size_t part_length_in_blocks = part_info.right + 1 - part_info.left;
std::vector<char> found_blocks(part_length_in_blocks);
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
{
if (part_on_replica == part_name || ActiveDataPartSet::contains(part_on_replica, part_name))
{
found = true;
LOG_WARNING(log, "Checker: Found part " << part_on_replica << " on " << replica);
break;
}
if (ActiveDataPartSet::contains(part_name, part_on_replica))
{
ActiveDataPartSet::Part part_on_replica_info;
ActiveDataPartSet::parsePartName(part_on_replica, part_on_replica_info);
for (auto block_num = part_on_replica_info.left; block_num <= part_on_replica_info.right; ++block_num)
found_blocks[block_num - part_info.left] = 1;
}
}
if (found)
break;
}
if (found)
{
/// На какой-то живой или мёртвой реплике есть нужный кусок или покрывающий его.
return;
}
/// TODO Временное решение, так как логика ниже не верна для остальных случаев.
if (part_length_in_blocks != 1)
return;
size_t num_found_blocks = 0;
for (auto found_block : found_blocks)
num_found_blocks += (found_block == 1);
if (num_found_blocks == part_length_in_blocks)
{
/// На совокупности живых или мёртвых реплик есть все куски, из которых можно составить нужный кусок. Ничего делать не будем.
LOG_WARNING(log, "Checker: Found all blocks for missing part. Will wait for them to be merged.");
return;
}
/// Ни у кого нет такого куска.
LOG_ERROR(log, "Checker: No replica has part covering " << part_name);
if (num_found_blocks != 0)
LOG_WARNING(log, "When looking for smaller parts, that is covered by " << part_name
<< ", we found just " << num_found_blocks << " of " << part_length_in_blocks << " blocks.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/// Есть ли он в очереди репликации? Если есть - удалим, так как задачу невозможно обработать.
bool was_in_queue = false;
{
std::lock_guard<std::mutex> lock(queue_mutex);
for (LogEntries::iterator it = queue.begin(); it != queue.end();)
{
if ((*it)->new_part_name == part_name)
{
zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
queue.erase(it++);
was_in_queue = true;
}
else
++it;
}
}
if (!was_in_queue)
{
/// Куска не было в нашей очереди. С чего бы это?
LOG_ERROR(log, "Checker: Missing part " << part_name << " is not in our queue.");
return;
}
/** Такая ситуация возможна, если на всех репликах, где был кусок, он испортился.
* Например, у реплики, которая только что его записала, отключили питание, и данные не записались из кеша на диск.
*/
LOG_ERROR(log, "Checker: Part " << part_name << " is lost forever.");
ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss);
/** Нужно добавить отсутствующий кусок в block_numbers, чтобы он не мешал слияниям.
* Вот только в сам block_numbers мы его добавить не можем - если так сделать,
* ZooKeeper зачем-то пропустит один номер для автоинкремента,
* и в номерах блоков все равно останется дырка.
* Специально из-за этого приходится отдельно иметь nonincrement_block_numbers.
*
* Кстати, если мы здесь сдохнем, то слияния не будут делаться сквозь эти отсутствующие куски.
*/
const auto partition_str = part_name.substr(0, 6);
for (auto i = part_info.left; i <= part_info.right; ++i)
{
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers", "");
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers/" + partition_str, "");
AbandonableLockInZooKeeper::createAbandonedIfNotExists(
zookeeper_path + "/nonincrement_block_numbers/" + partition_str + "/block-" + padIndex(i),
*zookeeper);
}
}
void StorageReplicatedMergeTree::checkPart(const String & part_name)
{
LOG_WARNING(log, "Checker: Checking part " << part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks);
auto part = data.getActiveContainingPart(part_name);
/// Этого или покрывающего куска у нас нет.
if (!part)
{
searchForMissingPart(part_name);
}
/// У нас есть этот кусок, и он активен. Будем проверять, нужен ли нам этот кусок и правильные ли у него данные.
else if (part->name == part_name)
{
auto zookeeper = getZooKeeper();
auto table_lock = lockStructure(false);
/// Если кусок есть в ZooKeeper, сверим его данные с его чексуммами, а их с ZooKeeper.
if (zookeeper->exists(replica_path + "/parts/" + part_name))
{
LOG_WARNING(log, "Checker: Checking data of part " << part_name << ".");
try
{
auto zk_checksums = MergeTreeData::DataPart::Checksums::parse(
zookeeper->get(replica_path + "/parts/" + part_name + "/checksums"));
zk_checksums.checkEqual(part->checksums, true);
auto zk_columns = NamesAndTypesList::parse(
zookeeper->get(replica_path + "/parts/" + part_name + "/columns"));
if (part->columns != zk_columns)
throw Exception("Columns of local part " + part_name + " are different from ZooKeeper");
MergeTreePartChecker::Settings settings;
settings.setIndexGranularity(data.index_granularity);
settings.setRequireChecksums(true);
settings.setRequireColumnFiles(true);
MergeTreePartChecker::checkDataPart(
data.getFullPath() + part_name, settings, data.primary_key_sample);
LOG_INFO(log, "Checker: Part " << part_name << " looks good.");
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_ERROR(log, "Checker: Part " << part_name << " looks broken. Removing it and queueing a fetch.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
removePartAndEnqueueFetch(part_name);
/// Удалим кусок локально.
data.renameAndDetachPart(part, "broken_");
}
}
else if (part->modification_time + 5 * 60 < time(0))
{
/// Если куска нет в ZooKeeper, удалим его локально.
/// Возможно, кусок кто-то только что записал, и еще не успел добавить в ZK.
/// Поэтому удаляем только если кусок старый (не очень надежно).
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
LOG_ERROR(log, "Checker: Unexpected part " << part_name << ". Removing.");
data.renameAndDetachPart(part, "unexpected_");
}
}
else
{
/// Если у нас есть покрывающий кусок, игнорируем все проблемы с этим куском.
/// В худшем случае в лог еще old_parts_lifetime секунд будут валиться ошибки, пока кусок не удалится как старый.
LOG_WARNING(log, "Checker: We have part " << part->name << " covering part " << part_name);
}
}
void StorageReplicatedMergeTree::partCheckThread()
{
while (!shutdown_called)
{
try
{
/// Достанем из очереди кусок для проверки.
String part_name;
{
std::lock_guard<std::mutex> lock(parts_to_check_mutex);
if (parts_to_check_queue.empty())
{
if (!parts_to_check_set.empty())
{
LOG_ERROR(log, "Checker: Non-empty parts_to_check_set with empty parts_to_check_queue. This is a bug.");
parts_to_check_set.clear();
}
}
else
{
part_name = parts_to_check_queue.front();
}
}
if (part_name.empty())
{
parts_to_check_event.wait();
continue;
}
checkPart(part_name);
/// Удалим кусок из очереди проверок.
{
std::lock_guard<std::mutex> lock(parts_to_check_mutex);
if (parts_to_check_queue.empty() || parts_to_check_queue.front() != part_name)
{
LOG_ERROR(log, "Checker: Someone changed parts_to_check_queue.front(). This is a bug.");
}
else
{
parts_to_check_queue.pop_front();
parts_to_check_set.erase(part_name);
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
parts_to_check_event.tryWait(ERROR_SLEEP_MS);
}
}
}
void StorageReplicatedMergeTree::becomeLeader()
{
LOG_INFO(log, "Became leader");
is_leader_node = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
}
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
{
auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
/// Из реплик, у которых есть кусок, выберем одну равновероятно.
std::random_shuffle(replicas.begin(), replicas.end());
for (const String & replica : replicas)
{
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
(!active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
return replica;
/// Конечно, реплика может перестать быть активной или даже перестать существовать после возврата из этой функции.
}
return "";
}
/** Если для куска отслеживается кворум, то обновить информацию о нём в ZK.
*/
static void updateQuorum(
zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_path,
const String & replica_name,
const String & part_name,
size_t quorum)
{
if (!quorum)
return;
const String quorum_status_path = zookeeper_path + "/quorum/status";
String value;
zkutil::Stat stat;
/// Если узла нет, значит по всем кворумным INSERT-ам уже был достигнут кворум, и ничего делать не нужно.
while (zookeeper->tryGet(quorum_status_path, value, &stat))
{
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(value);
if (quorum_entry.part_name != part_name)
{
/// Кворум уже был достигнут. Более того, уже начался другой INSERT с кворумом.
break;
}
if (quorum_entry.required_number_of_replicas != quorum)
throw Exception("Logical error: quorum size in log entry is different than quorum size in node /quorum/status",
ErrorCodes::LOGICAL_ERROR);
quorum_entry.replicas.insert(replica_name);
if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas)
{
/// Кворум достигнут. Удаляем узел.
auto code = zookeeper->tryRemove(quorum_status_path, stat.version);
if (code == ZOK)
{
break;
}
else if (code == ZNONODE)
{
/// Кворум уже был достигнут.
break;
}
else if (code == ZBADVERSION)
{
/// Узел успели обновить. Надо заново его прочитать и повторить все действия.
continue;
}
else
throw zkutil::KeeperException(code, quorum_status_path);
}
else
{
/// Обновляем узел, прописывая туда на одну реплику больше.
auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version);
if (code == ZOK)
{
break;
}
else if (code == ZNONODE)
{
/// Кворум уже был достигнут.
break;
}
else if (code == ZBADVERSION)
{
/// Узел успели обновить. Надо заново его прочитать и повторить все действия.
continue;
}
else
throw zkutil::KeeperException(code, quorum_status_path);
}
}
}
void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum)
{
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_path);
TableStructureReadLockPtr table_lock;
if (!to_detached)
table_lock = lockStructure(true);
String host;
int port;
String host_port_str = zookeeper->get(replica_path + "/host");
ReadBufferFromString buf(host_port_str);
assertString("host: ", buf);
readString(host, buf);
assertString("\nport: ", buf);
readText(port, buf);
assertString("\n", buf);
assertEOF(buf);
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, replica_path, host, port, to_detached);
if (!to_detached)
{
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops, part_name);
MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
zookeeper->multi(ops);
transaction.commit();
/** Если для этого куска отслеживается кворум, то надо его обновить.
* TODO Обработка в случае неизвестной ошибки, потери сессии, при перезапуске сервера.
*/
updateQuorum(zookeeper, zookeeper_path, replica_name, part_name, quorum);
merge_selecting_event.set();
for (const auto & removed_part : removed_parts)
{
LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
}
}
else
{
Poco::File(data.getFullPath() + "detached/tmp_" + part_name).renameTo(data.getFullPath() + "detached/" + part_name);
}
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_name << (to_detached ? " (to 'detached' directory)" : ""));
}
void StorageReplicatedMergeTree::shutdown()
{
if (restarting_thread)
{
restarting_thread->stop();
restarting_thread.reset();
}
endpoint_holder = nullptr;
}
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
try
{
shutdown();
}
catch(...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
BlockInputStreams StorageReplicatedMergeTree::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
Names virt_column_names;
Names real_column_names;
for (const auto & it : column_names)
if (it == "_replicated")
virt_column_names.push_back(it);
else
real_column_names.push_back(it);
ASTSelectQuery & select = *typeid_cast<ASTSelectQuery*>(&*query);
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere)
if (select.where_expression && !select.prewhere_expression)
MergeTreeWhereOptimizer{select, data, real_column_names, log};
Block virtual_columns_block;
ColumnUInt8 * column = new ColumnUInt8(2);
ColumnPtr column_ptr = column;
column->getData()[0] = 0;
column->getData()[1] = 1;
virtual_columns_block.insert(ColumnWithTypeAndName(column_ptr, new DataTypeUInt8, "_replicated"));
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if (!virt_column_names.empty())
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
std::multiset<UInt8> values = VirtualColumnUtils::extractSingleValueFromBlock<UInt8>(virtual_columns_block, "_replicated");
BlockInputStreams res;
size_t part_index = 0;
if ((settings.parallel_replica_offset == 0) && unreplicated_reader && values.count(0))
{
res = unreplicated_reader->read(real_column_names, query,
context, settings, processed_stage,
max_block_size, threads, &part_index);
for (auto & virtual_column : virt_column_names)
{
if (virtual_column == "_replicated")
{
for (auto & stream : res)
stream = new AddingConstColumnBlockInputStream<UInt8>(stream, new DataTypeUInt8, 0, "_replicated");
}
}
}
if (values.count(1))
{
auto res2 = reader.read(real_column_names, query, context, settings, processed_stage, max_block_size, threads, &part_index);
for (auto & virtual_column : virt_column_names)
{
if (virtual_column == "_replicated")
{
for (auto & stream : res2)
stream = new AddingConstColumnBlockInputStream<UInt8>(stream, new DataTypeUInt8, 1, "_replicated");
}
}
res.insert(res.end(), res2.begin(), res2.end());
}
return res;
}
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query, const Settings & settings)
{
if (is_readonly)
throw Exception("Table is in readonly mode", ErrorCodes::TABLE_IS_READ_ONLY);
String insert_id;
if (query)
if (ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(&*query))
insert_id = insert->insert_id;
return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id, settings.insert_quorum);
}
bool StorageReplicatedMergeTree::optimize(const Settings & settings)
{
/// Померджим какие-нибудь куски из директории unreplicated.
/// TODO: Мерджить реплицируемые куски тоже.
if (!unreplicated_data)
return false;
std::lock_guard<std::mutex> lock(unreplicated_mutex);
unreplicated_data->clearOldParts();
MergeTreeData::DataPartsVector parts;
String merged_name;
auto always_can_merge = [](const MergeTreeData::DataPartPtr & a, const MergeTreeData::DataPartPtr & b) { return true; };
if (!unreplicated_merger->selectPartsToMerge(parts, merged_name, 0, true, true, false, always_can_merge))
return false;
const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name);
unreplicated_merger->mergeParts(parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io);
return true;
}
void StorageReplicatedMergeTree::alter(const AlterCommands & params,
const String & database_name, const String & table_name, Context & context)
{
auto zookeeper = getZooKeeper();
const MergeTreeMergeBlocker merge_blocker{merger};
const auto unreplicated_merge_blocker = unreplicated_merger ?
std::make_unique<MergeTreeMergeBlocker>(*unreplicated_merger) : nullptr;
LOG_DEBUG(log, "Doing ALTER");
NamesAndTypesList new_columns;
NamesAndTypesList new_materialized_columns;
NamesAndTypesList new_alias_columns;
ColumnDefaults new_column_defaults;
String new_columns_str;
int new_columns_version;
zkutil::Stat stat;
{
auto table_lock = lockStructureForAlter();
if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
data.checkAlter(params);
new_columns = data.getColumnsListNonMaterialized();
new_materialized_columns = data.materialized_columns;
new_alias_columns = data.alias_columns;
new_column_defaults = data.column_defaults;
params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
new_columns_str = ColumnsDescription<false>{
new_columns, new_materialized_columns,
new_alias_columns, new_column_defaults
}.toString();
/// Делаем ALTER.
zookeeper->set(zookeeper_path + "/columns", new_columns_str, -1, &stat);
new_columns_version = stat.version;
}
LOG_DEBUG(log, "Updated columns in ZooKeeper. Waiting for replicas to apply changes.");
/// Ждем, пока все реплики обновят данные.
/// Подпишемся на изменения столбцов, чтобы перестать ждать, если кто-то еще сделает ALTER.
if (!zookeeper->exists(zookeeper_path + "/columns", &stat, alter_query_event))
throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != new_columns_version)
{
LOG_WARNING(log, zookeeper_path + "/columns changed before this ALTER finished; "
"overlapping ALTER-s are fine but use caution with nontransitive changes");
return;
}
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");
while (!shutdown_called)
{
String replica_columns_str;
/// Реплику могли успеть удалить.
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat))
{
LOG_WARNING(log, replica << " was removed");
break;
}
int replica_columns_version = stat.version;
if (replica_columns_str == new_columns_str)
break;
if (!zookeeper->exists(zookeeper_path + "/columns", &stat))
throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != new_columns_version)
{
LOG_WARNING(log, zookeeper_path + "/columns changed before ALTER finished; "
"overlapping ALTER-s are fine but use caution with nontransitive changes");
return;
}
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event))
{
LOG_WARNING(log, replica << " was removed");
break;
}
if (stat.version != replica_columns_version)
continue;
alter_query_event->wait();
}
if (shutdown_called)
break;
}
LOG_DEBUG(log, "ALTER finished");
}
/// Название воображаемого куска, покрывающего все возможные куски в указанном месяце с номерами в указанном диапазоне.
static String getFakePartNameForDrop(const String & month_name, UInt64 left, UInt64 right)
{
/// Диапазон дат - весь месяц.
const auto & lut = DateLUT::instance();
time_t start_time = lut.YYYYMMDDToDate(parse<UInt32>(month_name + "01"));
DayNum_t left_date = lut.toDayNum(start_time);
DayNum_t right_date = DayNum_t(static_cast<size_t>(left_date) + lut.daysInMonth(start_time) - 1);
/// Уровень - right-left+1: кусок не мог образоваться в результате такого или большего количества слияний.
return ActiveDataPartSet::getPartName(left_date, right_date, left, right, right - left + 1);
}
void StorageReplicatedMergeTree::dropUnreplicatedPartition(const Field & partition, const bool detach, const Settings & settings)
{
if (!unreplicated_data)
return;
/// Просит завершить мерджи и не позволяет им начаться.
/// Это защищает от "оживания" данных за удалённую партицию после завершения мерджа.
const MergeTreeMergeBlocker merge_blocker{*unreplicated_merger};
auto structure_lock = lockStructure(true);
const DayNum_t month = MergeTreeData::getMonthDayNum(partition);
size_t removed_parts = 0;
MergeTreeData::DataParts parts = unreplicated_data->getDataParts();
for (const auto & part : parts)
{
if (part->month != month)
continue;
LOG_DEBUG(log, "Removing unreplicated part " << part->name);
++removed_parts;
if (detach)
unreplicated_data->renameAndDetachPart(part, "");
else
unreplicated_data->replaceParts({part}, {}, false);
}
LOG_INFO(log, (detach ? "Detached " : "Removed ") << removed_parts << " unreplicated parts inside " << apply_visitor(FieldVisitorToString(), partition) << ".");
}
void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach, bool unreplicated, const Settings & settings)
{
if (unreplicated)
{
dropUnreplicatedPartition(field, detach, settings);
return;
}
auto zookeeper = getZooKeeper();
String month_name = MergeTreeData::getMonthName(field);
/// TODO: Делать запрос в лидера по TCP.
if (!is_leader_node)
throw Exception(String(detach ? "DETACH" : "DROP") + " PARTITION can only be done on leader replica.", ErrorCodes::NOT_LEADER);
/** Пропустим один номер в block_numbers для удаляемого месяца, и будем удалять только куски до этого номера.
* Это запретит мерджи удаляемых кусков с новыми вставляемыми данными.
* Инвариант: в логе не появятся слияния удаляемых кусков с другими кусками.
* NOTE: Если понадобится аналогично поддержать запрос DROP PART, для него придется придумать какой-нибудь новый механизм,
* чтобы гарантировать этот инвариант.
*/
Int64 right;
{
AbandonableLockInZooKeeper block_number_lock = allocateBlockNumber(month_name);
right = block_number_lock.getNumber();
block_number_lock.unlock();
}
/// Такого никогда не должно происходить.
if (right == 0)
return;
--right;
String fake_part_name = getFakePartNameForDrop(month_name, 0, right);
/** Запретим выбирать для слияния удаляемые куски - сделаем вид, что их всех уже собираются слить в fake_part_name.
* Инвариант: после появления в логе записи DROP_RANGE, в логе не появятся слияния удаляемых кусков.
*/
{
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
virtual_parts.add(fake_part_name);
}
/// Наконец, добившись нужных инвариантов, можно положить запись в лог.
LogEntry entry;
entry.type = LogEntry::DROP_RANGE;
entry.source_replica = replica_name;
entry.new_part_name = fake_part_name;
entry.detach = detach;
String log_znode_path = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
/// Если надо - дожидаемся выполнения операции на себе или на всех репликах.
if (settings.replication_alter_partitions_sync != 0)
{
if (settings.replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
else
waitForAllReplicasToProcessLogEntry(entry);
}
}
void StorageReplicatedMergeTree::attachPartition(const Field & field, bool unreplicated, bool attach_part, const Settings & settings)
{
auto zookeeper = getZooKeeper();
String partition;
if (attach_part)
partition = field.safeGet<String>();
else
partition = MergeTreeData::getMonthName(field);
String source_dir = (unreplicated ? "unreplicated/" : "detached/");
/// Составим список кусков, которые нужно добавить.
Strings parts;
if (attach_part)
{
parts.push_back(partition);
}
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition << " in " << source_dir);
ActiveDataPartSet active_parts;
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
String name = it.name();
if (!ActiveDataPartSet::isPartDirectory(name))
continue;
if (0 != name.compare(0, partition.size(), partition))
continue;
LOG_DEBUG(log, "Found part " << name);
active_parts.add(name);
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
parts = active_parts.getParts();
}
/// Синхронно проверим, что добавляемые куски существуют и не испорчены хотя бы на этой реплике. Запишем checksums.txt, если его нет.
LOG_DEBUG(log, "Checking parts");
for (const String & part : parts)
{
LOG_DEBUG(log, "Checking part " << part);
data.loadPartAndFixMetadata(source_dir + part);
}
/// Выделим добавляемым кускам максимальные свободные номера, меньшие RESERVED_BLOCK_NUMBERS.
/// NOTE: Проверка свободности номеров никак не синхронизируется. Выполнять несколько запросов ATTACH/DETACH/DROP одновременно нельзя.
Int64 min_used_number = RESERVED_BLOCK_NUMBERS;
DayNum_t month = DateLUT::instance().makeDayNum(parse<UInt16>(partition.substr(0, 4)), parse<UInt8>(partition.substr(4, 2)), 0);
{
auto existing_parts = data.getDataParts();
for (const auto & part : existing_parts)
if (part->month == month)
min_used_number = std::min(min_used_number, part->left);
}
/// Добавим записи в лог.
std::reverse(parts.begin(), parts.end());
std::list<LogEntry> entries;
zkutil::Ops ops;
for (const String & part_name : parts)
{
ActiveDataPartSet::Part part;
ActiveDataPartSet::parsePartName(part_name, part);
part.left = part.right = --min_used_number;
String new_part_name = ActiveDataPartSet::getPartName(part.left_date, part.right_date, part.left, part.right, part.level);
LOG_INFO(log, "Will attach " << part_name << " as " << new_part_name);
entries.emplace_back();
LogEntry & entry = entries.back();
entry.type = LogEntry::ATTACH_PART;
entry.source_replica = replica_name;
entry.source_part_name = part_name;
entry.new_part_name = new_part_name;
entry.attach_unreplicated = unreplicated;
ops.push_back(new zkutil::Op::Create(
zookeeper_path + "/log/log-", entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
}
LOG_DEBUG(log, "Adding attaches to log");
zookeeper->multi(ops);
/// Если надо - дожидаемся выполнения операции на себе или на всех репликах.
if (settings.replication_alter_partitions_sync != 0)
{
size_t i = 0;
for (LogEntry & entry : entries)
{
String log_znode_path = dynamic_cast<zkutil::Op::Create &>(ops[i]).getPathCreated();
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
if (settings.replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
else
waitForAllReplicasToProcessLogEntry(entry);
++i;
}
}
}
void StorageReplicatedMergeTree::drop()
{
if (is_readonly)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
auto zookeeper = getZooKeeper();
shutdown();
LOG_INFO(log, "Removing replica " << replica_path);
replica_is_active_node = nullptr;
zookeeper->tryRemoveRecursive(replica_path);
/// Проверяем, что zookeeper_path существует: его могла удалить другая реплика после выполнения предыдущей строки.
Strings replicas;
if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == ZOK && replicas.empty())
{
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
zookeeper->tryRemoveRecursive(zookeeper_path);
}
data.dropAllData();
}
void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
data.setPath(new_full_path, true);
if (unreplicated_data)
unreplicated_data->setPath(new_full_path + "unreplicated/", false);
database_name = new_database_name;
table_name = new_table_name;
full_path = new_full_path;
/// TODO: Можно обновить названия логгеров.
}
AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const String & month_name)
{
auto zookeeper = getZooKeeper();
String month_path = zookeeper_path + "/block_numbers/" + month_name;
if (!zookeeper->exists(month_path))
{
/// Создадим в block_numbers ноду для месяца и пропустим в ней 200 значений инкремента.
/// Нужно, чтобы в будущем при необходимости можно было добавить данные в начало.
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
ops.push_back(new zkutil::Op::Create(month_path, "", acl, zkutil::CreateMode::Persistent));
for (size_t i = 0; i < RESERVED_BLOCK_NUMBERS; ++i)
{
ops.push_back(new zkutil::Op::Create(month_path + "/skip_increment", "", acl, zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Remove(month_path + "/skip_increment", -1));
}
/// Игнорируем ошибки - не получиться могло только если кто-то еще выполнил эту строчку раньше нас.
zookeeper->tryMulti(ops);
}
return AbandonableLockInZooKeeper(
zookeeper_path + "/block_numbers/" + month_name + "/block-",
zookeeper_path + "/temp", *zookeeper);
}
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const LogEntry & entry)
{
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name);
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
waitForReplicaToProcessLogEntry(replica, entry);
LOG_DEBUG(log, "Finished waiting for all replicas to process " << entry.znode_name);
}
void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const LogEntry & entry)
{
auto zookeeper = getZooKeeper();
String entry_str = entry.toString();
String log_node_name;
/** В эту функцию могут передать entry двух видов:
* 1. (более часто) Из директории log - общего лога, откуда реплики копируют записи в свою queue.
* 2. Из директории queue одной из реплик.
*
* Проблема в том, что номера (sequential нод) элементов очереди в log и в queue не совпадают.
* (И в queue не совпадают номера у одного и того же элемента лога для разных реплик.)
*
* Поэтому следует рассматривать эти случаи по-отдельности.
*/
/** Первое - нужно дождаться, пока реплика возьмёт к себе в queue элемент очереди из log,
* если она ещё этого не сделала (см. функцию pullLogsToQueue).
*
* Для этого проверяем её узел log_pointer - максимальный номер взятого элемента из log плюс единица.
*/
if (0 == entry.znode_name.compare(0, strlen("log-"), "log-"))
{
/** В этом случае просто берём номер из имени ноды log-xxxxxxxxxx.
*/
UInt64 log_index = parse<UInt64>(entry.znode_name.substr(entry.znode_name.size() - 10));
log_node_name = entry.znode_name;
LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");
/// Дождемся, пока запись попадет в очередь реплики.
while (true)
{
zkutil::EventPtr event = new Poco::Event;
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
break;
event->wait();
}
}
else if (0 == entry.znode_name.compare(0, strlen("queue-"), "queue-"))
{
/** В этом случае номер log-ноды неизвестен. Нужно просмотреть все от log_pointer до конца,
* ища ноду с таким же содержимым. И если мы её не найдём - значит реплика уже взяла эту запись в свою queue.
*/
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer");
Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log");
UInt64 log_index = 0;
bool found = false;
for (const String & log_entry_name : log_entries)
{
log_index = parse<UInt64>(log_entry_name.substr(log_entry_name.size() - 10));
if (!log_pointer.empty() && log_index < parse<UInt64>(log_pointer))
continue;
String log_entry_str;
bool exists = zookeeper->tryGet(zookeeper_path + "/log/" + log_entry_name, log_entry_str);
if (exists && entry_str == log_entry_str)
{
found = true;
log_node_name = log_entry_name;
break;
}
}
if (found)
{
LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");
/// Дождемся, пока запись попадет в очередь реплики.
while (true)
{
zkutil::EventPtr event = new Poco::Event;
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
break;
event->wait();
}
}
}
else
throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR);
if (!log_node_name.empty())
LOG_DEBUG(log, "Looking for node corresponding to " << log_node_name << " in " << replica << " queue");
else
LOG_DEBUG(log, "Looking for corresponding node in " << replica << " queue");
/** Второе - найдем соответствующую запись в очереди указанной реплики (replica).
* Её номер может не совпадать ни с log-узлом, ни с queue-узлом у текущей реплики (у нас).
* Поэтому, ищем путём сравнения содержимого.
*/
Strings queue_entries = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/queue");
String queue_entry_to_wait_for;
for (const String & entry_name : queue_entries)
{
String queue_entry_str;
bool exists = zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str);
if (exists && queue_entry_str == entry_str)
{
queue_entry_to_wait_for = entry_name;
break;
}
}
/// Пока искали запись, ее уже выполнили и удалили.
if (queue_entry_to_wait_for.empty())
{
LOG_DEBUG(log, "No corresponding node found. Assuming it has been already processed.");
return;
}
LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue");
/// Третье - дождемся, пока запись исчезнет из очереди реплики.
zookeeper->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for);
}
void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{
auto zookeeper = getZooKeeper();
res.is_leader = is_leader_node;
res.is_readonly = is_readonly;
res.is_session_expired = !zookeeper || zookeeper->expired();
{
std::lock_guard<std::mutex> lock(queue_mutex);
res.future_parts = future_parts.size();
res.queue_size = queue.size();
res.inserts_in_queue = 0;
res.merges_in_queue = 0;
res.queue_oldest_time = 0;
res.inserts_oldest_time = 0;
res.merges_oldest_time = 0;
for (const LogEntryPtr & entry : queue)
{
if (entry->create_time && (!res.queue_oldest_time || entry->create_time < res.queue_oldest_time))
res.queue_oldest_time = entry->create_time;
if (entry->type == LogEntry::GET_PART)
{
++res.inserts_in_queue;
if (entry->create_time && (!res.inserts_oldest_time || entry->create_time < res.inserts_oldest_time))
res.inserts_oldest_time = entry->create_time;
}
if (entry->type == LogEntry::MERGE_PARTS)
{
++res.merges_in_queue;
if (entry->create_time && (!res.merges_oldest_time || entry->create_time < res.merges_oldest_time))
res.merges_oldest_time = entry->create_time;
}
}
}
{
std::lock_guard<std::mutex> lock(parts_to_check_mutex);
res.parts_to_check = parts_to_check_set.size();
}
res.zookeeper_path = zookeeper_path;
res.replica_name = replica_name;
res.replica_path = replica_path;
res.columns_version = columns_version;
if (res.is_session_expired || !with_zk_fields)
{
res.log_max_index = 0;
res.log_pointer = 0;
res.total_replicas = 0;
res.active_replicas = 0;
}
else
{
auto log_entries = zookeeper->getChildren(zookeeper_path + "/log");
if (log_entries.empty())
{
res.log_max_index = 0;
}
else
{
const String & last_log_entry = *std::max_element(log_entries.begin(), log_entries.end());
res.log_max_index = parse<UInt64>(last_log_entry.substr(strlen("log-")));
}
String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
res.log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
auto all_replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
res.total_replicas = all_replicas.size();
res.active_replicas = 0;
for (const String & replica : all_replicas)
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
++res.active_replicas;
}
}
void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings)
{
auto zookeeper = getZooKeeper();
String partition_str = MergeTreeData::getMonthName(partition);
String from = from_;
if (from.back() == '/')
from.resize(from.size() - 1);
LOG_INFO(log, "Will fetch partition " << partition_str << " from shard " << from_);
/** Проверим, что в директории detached (куда мы будем записывать скаченные куски) ещё нет такой партиции.
* Ненадёжно (есть race condition) - такая партиция может появиться чуть позже.
*/
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it{data.getFullPath() + "detached/"}; dir_it != dir_end; ++dir_it)
if (0 == dir_it.name().compare(0, partition_str.size(), partition_str))
throw Exception("Detached partition " + partition_str + " is already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
/// Список реплик шарда-источника.
zkutil::Strings replicas = zookeeper->getChildren(from + "/replicas");
/// Оставим только активные реплики.
zkutil::Strings active_replicas;
active_replicas.reserve(replicas.size());
for (const String & replica : replicas)
if (zookeeper->exists(from + "/replicas/" + replica + "/is_active"))
active_replicas.push_back(replica);
if (active_replicas.empty())
throw Exception("No active replicas for shard " + from, ErrorCodes::NO_ACTIVE_REPLICAS);
/** Надо выбрать лучшую (наиболее актуальную) реплику.
* Это реплика с максимальным log_pointer, затем с минимальным размером queue.
* NOTE Это не совсем лучший критерий. Для скачивания старых партиций это не имеет смысла,
* и было бы неплохо уметь выбирать реплику, ближайшую по сети.
* NOTE Разумеется, здесь есть data race-ы. Можно решить ретраями.
*/
Int64 max_log_pointer = -1;
UInt64 min_queue_size = std::numeric_limits<UInt64>::max();
String best_replica;
for (const String & replica : active_replicas)
{
String current_replica_path = from + "/replicas/" + replica;
String log_pointer_str = zookeeper->get(current_replica_path + "/log_pointer");
Int64 log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
zkutil::Stat stat;
zookeeper->get(current_replica_path + "/queue", &stat);
size_t queue_size = stat.numChildren;
if (log_pointer > max_log_pointer
|| (log_pointer == max_log_pointer && queue_size < min_queue_size))
{
max_log_pointer = log_pointer;
min_queue_size = queue_size;
best_replica = replica;
}
}
if (best_replica.empty())
throw Exception("Logical error: cannot choose best replica.", ErrorCodes::LOGICAL_ERROR);
LOG_INFO(log, "Found " << replicas.size() << " replicas, " << active_replicas.size() << " of them are active."
<< " Selected " << best_replica << " to fetch from.");
String best_replica_path = from + "/replicas/" + best_replica;
/// Выясним, какие куски есть на лучшей реплике.
/** Пытаемся скачать эти куски.
* Часть из них могла удалиться из-за мерджа.
* В этом случае, обновляем информацию о доступных кусках и пробуем снова.
*/
unsigned try_no = 0;
Strings missing_parts;
do
{
if (try_no)
LOG_INFO(log, "Some of parts (" << missing_parts.size() << ") are missing. Will try to fetch covering parts.");
if (try_no >= 5)
throw Exception("Too much retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MUCH_RETRIES_TO_FETCH_PARTS);
Strings parts = zookeeper->getChildren(best_replica_path + "/parts");
ActiveDataPartSet active_parts_set(parts);
Strings parts_to_fetch;
if (missing_parts.empty())
{
parts_to_fetch = active_parts_set.getParts();
/// Оставляем только куски нужной партиции.
Strings parts_to_fetch_partition;
for (const String & part : parts_to_fetch)
if (0 == part.compare(0, partition_str.size(), partition_str))
parts_to_fetch_partition.push_back(part);
parts_to_fetch = std::move(parts_to_fetch_partition);
if (parts_to_fetch.empty())
throw Exception("Partition " + partition_str + " on " + best_replica_path + " doesn't exist", ErrorCodes::PARTITION_DOESNT_EXIST);
}
else
{
for (const String & missing_part : missing_parts)
{
String containing_part = active_parts_set.getContainingPart(missing_part);
if (!containing_part.empty())
parts_to_fetch.push_back(containing_part);
else
LOG_WARNING(log, "Part " << missing_part << " on replica " << best_replica_path << " has been vanished.");
}
}
LOG_INFO(log, "Parts to fetch: " << parts_to_fetch.size());
missing_parts.clear();
for (const String & part : parts_to_fetch)
{
try
{
fetchPart(part, best_replica_path, true, 0);
}
catch (const DB::Exception & e)
{
if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER)
throw;
LOG_INFO(log, e.displayText());
missing_parts.push_back(part);
}
}
++try_no;
} while (!missing_parts.empty());
}
void StorageReplicatedMergeTree::freezePartition(const Field & partition, const Settings & settings)
{
/// Префикс может быть произвольным. Не обязательно месяц - можно указать лишь год.
String prefix = partition.getType() == Field::Types::UInt64
? toString(partition.get<UInt64>())
: partition.safeGet<String>();
data.freezePartition(prefix);
if (unreplicated_data)
unreplicated_data->freezePartition(prefix);
}
}