ClickHouse/dbms/src/Storages/StorageReplicatedMergeTree.cpp

1024 lines
32 KiB
C++
Raw Normal View History

2014-03-21 13:42:14 +00:00
#include <DB/Storages/StorageReplicatedMergeTree.h>
2014-04-02 10:10:37 +00:00
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
2014-04-02 13:45:39 +00:00
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
2014-03-22 14:44:44 +00:00
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
2014-03-21 13:42:14 +00:00
namespace DB
{
2014-04-03 11:48:28 +00:00
const auto QUEUE_UPDATE_SLEEP = std::chrono::seconds(5);
const auto QUEUE_NO_WORK_SLEEP = std::chrono::seconds(5);
const auto QUEUE_ERROR_SLEEP = std::chrono::seconds(1);
const auto QUEUE_AFTER_WORK_SLEEP = std::chrono::seconds(0);
2014-04-04 10:37:33 +00:00
const auto MERGE_SELECTING_SLEEP = std::chrono::seconds(5);
2014-04-03 11:48:28 +00:00
2014-03-21 13:42:14 +00:00
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
2014-03-21 19:17:59 +00:00
bool attach,
2014-03-21 13:42:14 +00:00
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
2014-03-22 14:44:44 +00:00
Context & context_,
2014-03-21 13:42:14 +00:00
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_)
:
2014-03-22 14:44:44 +00:00
context(context_), zookeeper(context.getZooKeeper()),
2014-04-04 12:47:57 +00:00
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
2014-04-04 10:37:33 +00:00
replica_name(replica_name_), is_leader_node(false),
2014-03-21 13:42:14 +00:00
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_,mode_, sign_column_, settings_),
2014-04-04 10:37:33 +00:00
reader(data), writer(data), merger(data), fetcher(data),
2014-04-04 12:47:57 +00:00
log(&Logger::get("StorageReplicatedMergeTree: " + table_name)),
2014-03-21 13:42:14 +00:00
shutdown_called(false)
{
2014-03-22 14:44:44 +00:00
if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/')
zookeeper_path.erase(zookeeper_path.end() - 1);
replica_path = zookeeper_path + "/replicas/" + replica_name;
2014-03-21 19:17:59 +00:00
if (!attach)
{
2014-03-22 14:44:44 +00:00
if (!zookeeper.exists(zookeeper_path))
createTable();
if (!isTableEmpty())
throw Exception("Can't add new replica to non-empty table", ErrorCodes::ADDING_REPLICA_TO_NON_EMPTY_TABLE);
checkTableStructure();
createReplica();
2014-03-21 19:17:59 +00:00
}
else
{
checkTableStructure();
2014-04-02 07:59:43 +00:00
checkParts();
2014-03-21 19:17:59 +00:00
}
2014-03-22 14:44:44 +00:00
2014-04-03 08:47:59 +00:00
loadQueue();
2014-03-22 14:44:44 +00:00
activateReplica();
2014-04-03 11:48:28 +00:00
2014-04-04 10:37:33 +00:00
leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", zookeeper,
std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);
2014-04-03 11:48:28 +00:00
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
for (size_t i = 0; i < settings_.replication_threads; ++i)
queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this));
2014-03-21 13:42:14 +00:00
}
StoragePtr StorageReplicatedMergeTree::create(
const String & zookeeper_path_,
const String & replica_name_,
2014-03-21 19:17:59 +00:00
bool attach,
2014-03-21 13:42:14 +00:00
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
2014-03-22 14:44:44 +00:00
Context & context_,
2014-03-21 13:42:14 +00:00
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_)
{
2014-04-02 07:59:43 +00:00
StorageReplicatedMergeTree * res = new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach,
path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_);
StoragePtr res_ptr = res->thisPtr();
2014-04-02 13:45:39 +00:00
String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr);
res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
2014-04-02 07:59:43 +00:00
return res_ptr;
2014-03-21 13:42:14 +00:00
}
2014-03-22 14:44:44 +00:00
static String formattedAST(const ASTPtr & ast)
{
if (!ast)
return "";
std::stringstream ss;
formatAST(*ast, ss, 0, false, true);
return ss.str();
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::createTable()
2014-03-21 19:17:59 +00:00
{
2014-03-22 14:44:44 +00:00
zookeeper.create(zookeeper_path, "", zkutil::CreateMode::Persistent);
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними свою локальную структуру таблицы.
std::stringstream metadata;
metadata << "metadata format version: 1" << std::endl;
metadata << "date column: " << data.date_column_name << std::endl;
metadata << "sampling expression: " << formattedAST(data.sampling_expression) << std::endl;
metadata << "index granularity: " << data.index_granularity << std::endl;
metadata << "mode: " << static_cast<int>(data.mode) << std::endl;
metadata << "sign column: " << data.sign_column << std::endl;
metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
metadata << "columns:" << std::endl;
WriteBufferFromOStream buf(metadata);
for (auto & it : data.getColumnsList())
{
writeBackQuotedString(it.first, buf);
writeChar(' ', buf);
writeString(it.second->getName(), buf);
writeChar('\n', buf);
}
buf.next();
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
zookeeper.create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
2014-04-03 12:49:01 +00:00
zookeeper.create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
2014-04-04 10:37:33 +00:00
zookeeper.create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
2014-04-02 13:45:39 +00:00
zookeeper.create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
2014-03-22 14:44:44 +00:00
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
2014-03-21 19:17:59 +00:00
*/
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::checkTableStructure()
{
String metadata_str = zookeeper.get(zookeeper_path + "/metadata");
ReadBufferFromString buf(metadata_str);
assertString("metadata format version: 1", buf);
assertString("\ndate column: ", buf);
assertString(data.date_column_name, buf);
assertString("\nsampling expression: ", buf);
assertString(formattedAST(data.sampling_expression), buf);
assertString("\nindex granularity: ", buf);
assertString(toString(data.index_granularity), buf);
assertString("\nmode: ", buf);
assertString(toString(static_cast<int>(data.mode)), buf);
assertString("\nsign column: ", buf);
assertString(data.sign_column, buf);
assertString("\nprimary key: ", buf);
assertString(formattedAST(data.primary_expr_ast), buf);
assertString("\ncolumns:\n", buf);
for (auto & it : data.getColumnsList())
{
String name;
readBackQuotedString(name, buf);
if (name != it.first)
throw Exception("Unexpected column name in ZooKeeper: expected " + it.first + ", found " + name,
ErrorCodes::UNKNOWN_IDENTIFIER);
assertString(" ", buf);
assertString(it.second->getName(), buf);
assertString("\n", buf);
}
2014-04-02 07:59:43 +00:00
assertEOF(buf);
2014-03-22 14:44:44 +00:00
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::createReplica()
{
zookeeper.create(replica_path, "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::activateReplica()
{
std::stringstream host;
host << "host: " << context.getInterserverIOHost() << std::endl;
host << "port: " << context.getInterserverIOPort() << std::endl;
2014-03-21 19:17:59 +00:00
2014-04-03 08:47:59 +00:00
/// Одновременно объявим, что эта реплика активна, и обновим хост.
2014-03-22 14:44:44 +00:00
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(replica_path + "/is_active", "", zookeeper.getDefaultACL(), zkutil::CreateMode::Ephemeral));
ops.push_back(new zkutil::Op::SetData(replica_path + "/host", host.str(), -1));
2014-04-03 13:11:11 +00:00
try
{
zookeeper.multi(ops);
}
catch (zkutil::KeeperException & e)
{
if (e.code == zkutil::ReturnCode::NodeExists)
throw Exception("Replica " + replica_path + " appears to be already active. If you're sure it's not, "
"try again in a minute or remove znode " + replica_path + "/is_active manually", ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);
throw;
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", zookeeper);
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
bool StorageReplicatedMergeTree::isTableEmpty()
{
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
for (const auto & replica : replicas)
{
if (!zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty())
return false;
}
return true;
}
2014-03-21 19:17:59 +00:00
2014-04-02 07:59:43 +00:00
void StorageReplicatedMergeTree::checkParts()
{
Strings expected_parts_vec = zookeeper.getChildren(replica_path + "/parts");
NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
MergeTreeData::DataParts parts = data.getDataParts();
2014-04-08 17:45:21 +00:00
MergeTreeData::DataParts unexpected_parts;
2014-04-02 07:59:43 +00:00
for (const auto & part : parts)
{
if (expected_parts.count(part->name))
{
expected_parts.erase(part->name);
}
else
{
2014-04-08 17:45:21 +00:00
unexpected_parts.insert(part);
2014-04-02 07:59:43 +00:00
}
}
2014-04-08 17:45:21 +00:00
/// Если локально не хватает какого-то куска, но есть покрывающий его кусок, можно заменить в ZK недостающий покрывающим.
MergeTreeData::DataPartsVector parts_to_add;
for (const String & missing_name : expected_parts)
{
auto containing = data.getContainingPart(missing_name);
if (!containing)
throw Exception("Not found " + toString(expected_parts.size())
+ " parts (including " + missing_name + ") in table " + data.getTableName(),
ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
if (unexpected_parts.count(containing))
{
parts_to_add.push_back(containing);
unexpected_parts.erase(containing);
}
}
2014-04-02 07:59:43 +00:00
2014-04-08 17:45:21 +00:00
if (parts_to_add.size() > 2 ||
unexpected_parts.size() > 2 ||
expected_parts.size() > 20)
{
throw Exception("The local set of parts of table " + data.getTableName() + " doesn't look like the set of parts in ZooKeeper."
"There are " + toString(unexpected_parts.size()) + " unexpected parts, "
+ toString(parts_to_add.size()) + " unexpectedly merged parts, "
+ toString(expected_parts.size()) + " unexpectedly obsolete parts",
2014-04-02 07:59:43 +00:00
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
2014-04-08 17:45:21 +00:00
}
/// Добавим в ZK информацию о кусках, покрывающих недостающие куски.
for (MergeTreeData::DataPartPtr part : parts_to_add)
{
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper.multi(ops);
}
2014-04-02 07:59:43 +00:00
2014-04-08 17:45:21 +00:00
/// Удалим из ZK информацию о кусках, покрытых только что добавленными.
for (const String & name : expected_parts)
{
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
zookeeper.multi(ops);
}
/// Удалим лишние локальные куски.
2014-04-02 07:59:43 +00:00
for (MergeTreeData::DataPartPtr part : unexpected_parts)
{
LOG_ERROR(log, "Unexpected part " << part->name << ". Renaming it to ignored_" + part->name);
2014-04-02 10:10:37 +00:00
data.renameAndDetachPart(part, "ignored_");
2014-04-02 07:59:43 +00:00
}
}
2014-03-21 19:17:59 +00:00
2014-04-08 17:45:21 +00:00
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops)
{
String another_replica = findReplicaHavingPart(part->name, false);
if (!another_replica.empty())
{
String checksums_str =
zookeeper.get(zookeeper_path + "/replicas/" + another_replica + "/parts/" + part->name + "/checksums");
auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
checksums.check(part->checksums);
}
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name,
"",
zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/checksums",
part->checksums.toString(),
zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
}
2014-04-03 08:47:59 +00:00
void StorageReplicatedMergeTree::loadQueue()
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
Strings children = zookeeper.getChildren(replica_path + "/queue");
std::sort(children.begin(), children.end());
for (const String & child : children)
{
2014-04-03 12:49:01 +00:00
String s = zookeeper.get(replica_path + "/queue/" + child);
2014-04-03 11:48:28 +00:00
LogEntry entry = LogEntry::parse(s);
entry.znode_name = child;
2014-04-04 10:37:33 +00:00
entry.tagPartsAsCurrentlyMerging(*this);
2014-04-03 11:48:28 +00:00
queue.push_back(entry);
2014-04-03 08:47:59 +00:00
}
}
void StorageReplicatedMergeTree::pullLogsToQueue()
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
2014-03-21 19:17:59 +00:00
2014-04-03 12:49:01 +00:00
/// Сольем все логи в хронологическом порядке.
struct LogIterator
{
String replica; /// Имя реплики.
UInt64 index; /// Номер записи в логе (суффикс имени ноды).
Int64 timestamp; /// Время (czxid) создания записи в логе.
String entry_str; /// Сама запись.
bool operator<(const LogIterator & rhs) const
{
return timestamp < rhs.timestamp;
}
bool readEntry(zkutil::ZooKeeper & zookeeper, const String & zookeeper_path)
{
String index_str = toString(index);
while (index_str.size() < 10)
index_str = '0' + index_str;
zkutil::Stat stat;
if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log/log-" + index_str, entry_str, &stat))
return false;
timestamp = stat.getczxid();
return true;
}
};
typedef std::priority_queue<LogIterator> PriorityQueue;
PriorityQueue priority_queue;
2014-04-03 08:47:59 +00:00
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
2014-04-03 12:49:01 +00:00
2014-04-03 08:47:59 +00:00
for (const String & replica : replicas)
{
2014-04-03 12:49:01 +00:00
String index_str;
UInt64 index;
2014-04-03 08:47:59 +00:00
2014-04-03 12:49:01 +00:00
if (zookeeper.tryGet(replica_path + "/log_pointers/" + replica, index_str))
2014-04-03 08:47:59 +00:00
{
2014-04-07 17:39:45 +00:00
index = parse<UInt64>(index_str);
2014-04-03 08:47:59 +00:00
}
else
{
/// Если у нас еще нет указателя на лог этой реплики, поставим указатель на первую запись в нем.
2014-04-03 12:49:01 +00:00
Strings entries = zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/log");
2014-04-03 08:47:59 +00:00
std::sort(entries.begin(), entries.end());
2014-04-07 17:39:45 +00:00
index = entries.empty() ? 0 : parse<UInt64>(entries[0].substr(strlen("log-")));
2014-04-03 08:47:59 +00:00
2014-04-03 12:49:01 +00:00
zookeeper.create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent);
2014-04-03 08:47:59 +00:00
}
2014-04-03 12:49:01 +00:00
LogIterator iterator;
iterator.replica = replica;
iterator.index = index;
if (iterator.readEntry(zookeeper, zookeeper_path))
priority_queue.push(iterator);
}
2014-04-03 08:47:59 +00:00
2014-04-04 12:47:57 +00:00
size_t count = 0;
2014-04-03 12:49:01 +00:00
while (!priority_queue.empty())
{
LogIterator iterator = priority_queue.top();
priority_queue.pop();
2014-04-04 12:47:57 +00:00
++count;
2014-04-03 11:48:28 +00:00
2014-04-03 12:49:01 +00:00
LogEntry entry = LogEntry::parse(iterator.entry_str);
2014-04-03 08:47:59 +00:00
2014-04-03 12:49:01 +00:00
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", iterator.entry_str, zookeeper.getDefaultACL(), zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/log_pointers/" + iterator.replica, toString(iterator.index + 1), -1));
auto results = zookeeper.multi(ops);
String path_created = dynamic_cast<zkutil::OpResult::Create &>((*results)[0]).getPathCreated();
2014-04-04 12:47:57 +00:00
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
2014-04-04 10:37:33 +00:00
entry.tagPartsAsCurrentlyMerging(*this);
2014-04-03 12:49:01 +00:00
queue.push_back(entry);
++iterator.index;
if (iterator.readEntry(zookeeper, zookeeper_path))
priority_queue.push(iterator);
2014-04-03 08:47:59 +00:00
}
2014-04-04 12:47:57 +00:00
if (count > 0)
LOG_DEBUG(log, "Pulled " << count << " entries to queue");
2014-04-03 08:47:59 +00:00
}
2014-04-07 15:45:46 +00:00
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
2014-04-03 08:47:59 +00:00
{
2014-04-07 15:45:46 +00:00
if (entry.type == LogEntry::MERGE_PARTS)
{
/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
* Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж.
* Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
* Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
*/
for (const auto & name : entry.parts_to_merge)
{
if (future_parts.count(name))
{
LOG_TRACE(log, "Not merging into part " << entry.new_part_name << " yet because part " << name << " is not ready yet.");
return false;
}
}
}
return true;
2014-04-03 11:48:28 +00:00
}
void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
{
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS)
{
/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
MergeTreeData::DataPartPtr containing_part = data.getContainingPart(entry.new_part_name);
/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper.
if (containing_part && zookeeper.exists(replica_path + "/parts/" + containing_part->name))
2014-04-04 12:47:57 +00:00
{
2014-04-07 15:45:46 +00:00
if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
2014-04-03 11:48:28 +00:00
return;
2014-04-04 12:47:57 +00:00
}
2014-04-03 11:48:28 +00:00
}
2014-04-07 15:45:46 +00:00
if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
LOG_ERROR(log, "Part " << entry.new_part_name << " from own log doesn't exist. This is a bug.");
bool do_fetch = false;
2014-04-04 12:47:57 +00:00
if (entry.type == LogEntry::GET_PART)
{
2014-04-07 15:45:46 +00:00
do_fetch = true;
2014-04-04 12:47:57 +00:00
}
else if (entry.type == LogEntry::MERGE_PARTS)
{
2014-04-04 13:27:47 +00:00
MergeTreeData::DataPartsVector parts;
2014-04-07 15:45:46 +00:00
bool have_all_parts = true;;
2014-04-04 13:27:47 +00:00
for (const String & name : entry.parts_to_merge)
{
MergeTreeData::DataPartPtr part = data.getContainingPart(name);
2014-04-07 15:45:46 +00:00
if (!part)
{
have_all_parts = false;
break;
}
if (part->name != name)
{
LOG_ERROR(log, "Log and parts set look inconsistent: " << name << " is covered by " << part->name
<< " but should be merged into " << entry.new_part_name);
have_all_parts = false;
break;
}
2014-04-04 13:27:47 +00:00
parts.push_back(part);
}
2014-04-04 17:20:45 +00:00
2014-04-07 15:45:46 +00:00
if (!have_all_parts)
2014-04-04 13:27:47 +00:00
{
2014-04-07 15:45:46 +00:00
/// Если нет всех нужных кусков, попробуем взять у кого-нибудь уже помердженный кусок.
do_fetch = true;
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
2014-04-04 13:27:47 +00:00
}
2014-04-07 15:45:46 +00:00
else
{
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name);
zkutil::Ops ops;
2014-04-08 17:45:21 +00:00
checkPartAndAddToZooKeeper(part, ops);
2014-04-07 15:45:46 +00:00
for (const auto & part : parts)
{
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
}
2014-04-04 17:20:45 +00:00
2014-04-07 15:45:46 +00:00
zookeeper.multi(ops);
2014-04-07 18:44:10 +00:00
parts.clear();
2014-04-07 15:45:46 +00:00
data.clearOldParts();
2014-04-04 17:20:45 +00:00
2014-04-07 15:45:46 +00:00
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
}
2014-04-04 12:47:57 +00:00
}
else
{
throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)));
}
2014-04-07 15:45:46 +00:00
if (do_fetch)
{
try
{
2014-04-08 17:45:21 +00:00
String replica = findReplicaHavingPart(entry.new_part_name, true);
if (replica.empty())
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
}
2014-04-07 15:45:46 +00:00
fetchPart(entry.new_part_name, replica);
if (entry.type == LogEntry::MERGE_PARTS)
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
}
catch (...)
{
/** Если не получилось скачать кусок, нужный для какого-то мерджа, лучше не пытаться получить другие куски для этого мерджа,
* а попытаться сразу получить помердженный кусок. Чтобы так получилось, переместим действия для получения остальных кусков
* для этого мерджа в конец очереди.
*
*/
try
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/// Найдем действие по объединению этого куска с другими. Запомним других.
StringSet parts_for_merge;
LogEntries::iterator merge_entry;
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
if (it->type == LogEntry::MERGE_PARTS)
{
if (std::find(it->parts_to_merge.begin(), it->parts_to_merge.end(), entry.new_part_name)
!= it->parts_to_merge.end())
{
parts_for_merge = StringSet(it->parts_to_merge.begin(), it->parts_to_merge.end());
merge_entry = it;
break;
}
}
}
if (!parts_for_merge.empty())
{
/// Переместим в конец очереди действия, получающие parts_for_merge.
for (LogEntries::iterator it = queue.begin(); it != queue.end();)
{
auto it0 = it;
++it;
if (it0 == merge_entry)
break;
if ((it0->type == LogEntry::MERGE_PARTS || it0->type == LogEntry::GET_PART)
&& parts_for_merge.count(it0->new_part_name))
{
queue.splice(queue.end(), queue, it0, it);
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
throw;
}
}
2014-04-03 11:48:28 +00:00
}
void StorageReplicatedMergeTree::queueUpdatingThread()
{
while (!shutdown_called)
{
2014-04-04 10:37:33 +00:00
try
{
pullLogsToQueue();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2014-04-03 11:48:28 +00:00
std::this_thread::sleep_for(QUEUE_UPDATE_SLEEP);
}
2014-04-03 08:47:59 +00:00
}
2014-03-21 19:17:59 +00:00
2014-04-03 11:48:28 +00:00
void StorageReplicatedMergeTree::queueThread()
{
while (!shutdown_called)
{
LogEntry entry;
2014-04-07 15:45:46 +00:00
bool have_work = false;
2014-04-03 11:48:28 +00:00
2014-04-07 15:45:46 +00:00
try
2014-04-03 11:48:28 +00:00
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
2014-04-07 15:45:46 +00:00
bool empty = queue.empty();
2014-04-03 11:48:28 +00:00
if (!empty)
{
2014-04-07 15:45:46 +00:00
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
if (shouldExecuteLogEntry(*it))
{
entry = *it;
entry.tagPartAsFuture(*this);
queue.erase(it);
have_work = true;
break;
}
}
2014-04-03 11:48:28 +00:00
}
}
2014-04-07 15:45:46 +00:00
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2014-03-21 19:17:59 +00:00
2014-04-07 15:45:46 +00:00
if (!have_work)
2014-04-03 11:48:28 +00:00
{
std::this_thread::sleep_for(QUEUE_NO_WORK_SLEEP);
continue;
}
2014-03-21 19:17:59 +00:00
2014-04-03 11:48:28 +00:00
bool success = false;
2014-04-02 07:59:43 +00:00
2014-04-03 11:48:28 +00:00
try
{
executeLogEntry(entry);
2014-04-07 15:45:46 +00:00
auto code = zookeeper.tryRemove(replica_path + "/queue/" + entry.znode_name);
if (code != zkutil::ReturnCode::Ok)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
<< zkutil::ReturnCode::toString(code) + ". There must be a bug somewhere. Ignoring it.");
2014-04-03 11:48:28 +00:00
success = true;
}
2014-04-07 18:14:39 +00:00
catch (Exception & e)
{
if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
/// Если ни у кого нет нужного куска, это нормальная ситуация; не будем писать в лог с уровнем Error.
LOG_INFO(log, e.displayText());
else
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2014-04-03 11:48:28 +00:00
catch (...)
{
2014-04-04 10:37:33 +00:00
tryLogCurrentException(__PRETTY_FUNCTION__);
2014-04-03 11:48:28 +00:00
}
if (shutdown_called)
break;
if (success)
{
2014-04-07 18:44:10 +00:00
entry.currently_merging_tagger = nullptr;
2014-04-03 11:48:28 +00:00
std::this_thread::sleep_for(QUEUE_AFTER_WORK_SLEEP);
}
else
{
{
/// Добавим действие, которое не получилось выполнить, в конец очереди.
2014-04-07 15:45:46 +00:00
entry.future_part_tagger = nullptr;
2014-04-03 11:48:28 +00:00
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
queue.push_back(entry);
}
2014-04-07 18:44:10 +00:00
entry.currently_merging_tagger = nullptr;
2014-04-03 11:48:28 +00:00
std::this_thread::sleep_for(QUEUE_ERROR_SLEEP);
}
}
}
2014-04-04 10:37:33 +00:00
void StorageReplicatedMergeTree::mergeSelectingThread()
{
pullLogsToQueue();
while (!shutdown_called && is_leader_node)
{
2014-04-07 18:14:39 +00:00
bool success = false;
2014-04-04 10:37:33 +00:00
2014-04-07 18:14:39 +00:00
try
2014-04-04 10:37:33 +00:00
{
2014-04-07 18:14:39 +00:00
size_t merges_queued = 0;
2014-04-04 10:37:33 +00:00
2014-04-07 18:14:39 +00:00
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
2014-04-04 10:37:33 +00:00
2014-04-07 18:14:39 +00:00
for (const auto & entry : queue)
if (entry.type == LogEntry::MERGE_PARTS)
++merges_queued;
}
2014-04-04 10:37:33 +00:00
2014-04-07 18:14:39 +00:00
if (merges_queued >= data.settings.merging_threads)
{
std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
continue;
}
2014-04-04 10:37:33 +00:00
2014-04-07 18:14:39 +00:00
/// Есть ли активный мердж крупных кусков.
bool has_big_merge = false;
2014-04-04 10:37:33 +00:00
{
2014-04-07 18:14:39 +00:00
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
for (const auto & name : currently_merging)
2014-04-04 10:37:33 +00:00
{
2014-04-07 18:14:39 +00:00
MergeTreeData::DataPartPtr part = data.getContainingPart(name);
if (!part)
continue;
if (part->name != name)
2014-04-07 18:44:10 +00:00
{
LOG_INFO(log, "currently_merging contains obsolete part " << name << " contained in" << part->name);
continue;
}
2014-04-07 18:14:39 +00:00
if (part->size * data.index_granularity > 25 * 1024 * 1024)
{
has_big_merge = true;
break;
}
2014-04-04 10:37:33 +00:00
}
}
MergeTreeData::DataPartsVector parts;
2014-04-04 12:47:57 +00:00
2014-04-04 10:37:33 +00:00
{
2014-04-04 17:20:45 +00:00
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
2014-04-04 10:37:33 +00:00
2014-04-04 17:20:45 +00:00
String merged_name;
auto can_merge = std::bind(
&StorageReplicatedMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
LOG_TRACE(log, "Selecting parts to merge" << (has_big_merge ? " (only small)" : ""));
if (merger.selectPartsToMerge(parts, merged_name, 0, false, false, has_big_merge, can_merge) ||
merger.selectPartsToMerge(parts, merged_name, 0, true, false, has_big_merge, can_merge))
2014-04-04 10:37:33 +00:00
{
2014-04-04 17:20:45 +00:00
LogEntry entry;
entry.type = LogEntry::MERGE_PARTS;
2014-04-07 15:45:46 +00:00
entry.source_replica = replica_name;
2014-04-04 17:20:45 +00:00
entry.new_part_name = merged_name;
for (const auto & part : parts)
{
entry.parts_to_merge.push_back(part->name);
}
zookeeper.create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
success = true;
2014-04-04 10:37:33 +00:00
}
2014-04-04 17:20:45 +00:00
}
2014-04-04 10:37:33 +00:00
2014-04-04 17:20:45 +00:00
/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
/// (чтобы куски пометились как currently_merging).
pullLogsToQueue();
2014-04-04 10:37:33 +00:00
2014-04-04 17:20:45 +00:00
for (size_t i = 0; i + 1 < parts.size(); ++i)
{
/// Уберем больше не нужные отметки о несуществующих блоках.
for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/block-" + number_str;
2014-04-04 10:37:33 +00:00
2014-04-04 17:20:45 +00:00
zookeeper.tryRemove(path);
}
2014-04-04 10:37:33 +00:00
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (shutdown_called)
break;
if (!success)
2014-04-04 12:47:57 +00:00
std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
2014-04-04 10:37:33 +00:00
}
}
bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
if (currently_merging.count(left->name) || currently_merging.count(right->name))
return false;
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
for (UInt64 number = left->right + 1; number <= right->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/block-" + number_str;
if (AbandonableLockInZooKeeper::check(path, zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
2014-04-04 12:47:57 +00:00
{
LOG_DEBUG(log, "Can't merge parts " << left->name << " and " << right->name << " because block " << path << " exists");
2014-04-04 10:37:33 +00:00
return false;
2014-04-04 12:47:57 +00:00
}
2014-04-04 10:37:33 +00:00
}
return true;
}
void StorageReplicatedMergeTree::becomeLeader()
{
2014-04-04 12:47:57 +00:00
LOG_INFO(log, "Became leader");
2014-04-04 10:37:33 +00:00
is_leader_node = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
}
2014-04-08 17:45:21 +00:00
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
2014-04-03 11:48:28 +00:00
{
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
/// Из реплик, у которых есть кусок, выберем одну равновероятно.
std::random_shuffle(replicas.begin(), replicas.end());
for (const String & replica : replicas)
{
if (zookeeper.exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
2014-04-08 17:45:21 +00:00
(!active || zookeeper.exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
2014-04-03 11:48:28 +00:00
return replica;
}
2014-04-08 17:45:21 +00:00
return "";
2014-04-03 11:48:28 +00:00
}
void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_name)
{
2014-04-04 12:47:57 +00:00
LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_name);
2014-04-03 11:48:28 +00:00
auto table_lock = lockStructure(true);
String host;
int port;
String host_port_str = zookeeper.get(zookeeper_path + "/replicas/" + replica_name + "/host");
ReadBufferFromString buf(host_port_str);
assertString("host: ", buf);
readString(host, buf);
assertString("\nport: ", buf);
readText(port, buf);
assertString("\n", buf);
assertEOF(buf);
2014-04-03 12:49:01 +00:00
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
2014-04-07 15:45:46 +00:00
auto removed_parts = data.renameTempPartAndReplace(part);
2014-04-03 11:48:28 +00:00
zkutil::Ops ops;
2014-04-08 17:45:21 +00:00
checkPartAndAddToZooKeeper(part, ops);
2014-04-07 18:14:39 +00:00
for (const auto & removed_part : removed_parts)
{
LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + removed_part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + removed_part->name, -1));
}
2014-04-03 11:48:28 +00:00
zookeeper.multi(ops);
2014-04-04 12:47:57 +00:00
2014-04-07 15:45:46 +00:00
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
2014-04-04 12:47:57 +00:00
LOG_DEBUG(log, "Fetched part");
2014-04-03 11:48:28 +00:00
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::shutdown()
{
if (shutdown_called)
return;
2014-04-04 10:37:33 +00:00
leader_election = nullptr;
2014-03-22 14:44:44 +00:00
shutdown_called = true;
replica_is_active_node = nullptr;
endpoint_holder = nullptr;
2014-04-03 11:48:28 +00:00
LOG_TRACE(log, "Waiting for threads to finish");
2014-04-04 10:37:33 +00:00
if (is_leader_node)
merge_selecting_thread.join();
2014-04-03 11:48:28 +00:00
queue_updating_thread.join();
for (auto & thread : queue_threads)
thread.join();
LOG_TRACE(log, "Threads finished");
2014-03-22 14:44:44 +00:00
}
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
try
{
shutdown();
}
catch(...)
{
tryLogCurrentException("~StorageReplicatedMergeTree");
}
}
BlockInputStreams StorageReplicatedMergeTree::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
return reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
}
2014-04-02 10:10:37 +00:00
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
{
String insert_id;
if (ASTInsertQuery * insert = dynamic_cast<ASTInsertQuery *>(&*query))
insert_id = insert->insert_id;
return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id);
}
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::drop()
{
2014-04-03 11:48:28 +00:00
shutdown();
2014-03-22 14:44:44 +00:00
replica_is_active_node = nullptr;
zookeeper.removeRecursive(replica_path);
if (zookeeper.getChildren(zookeeper_path + "/replicas").empty())
zookeeper.removeRecursive(zookeeper_path);
}
2014-04-02 10:10:37 +00:00
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
writeString("format version: 1\n", out);
2014-04-07 15:45:46 +00:00
writeString("source replica: ", out);
writeString(source_replica, out);
writeString("\n", out);
2014-04-02 10:10:37 +00:00
switch (type)
{
case GET_PART:
writeString("get\n", out);
writeString(new_part_name, out);
break;
case MERGE_PARTS:
writeString("merge\n", out);
for (const String & s : parts_to_merge)
{
writeString(s, out);
writeString("\n", out);
}
writeString("into\n", out);
writeString(new_part_name, out);
break;
}
writeString("\n", out);
}
void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
{
String type_str;
assertString("format version: 1\n", in);
2014-04-07 17:39:45 +00:00
assertString("source replica: ", in);
2014-04-07 15:45:46 +00:00
readString(source_replica, in);
2014-04-02 10:10:37 +00:00
assertString("\n", in);
2014-04-07 17:39:45 +00:00
readString(type_str, in);
assertString("\n", in);
2014-04-02 10:10:37 +00:00
if (type_str == "get")
{
type = GET_PART;
readString(new_part_name, in);
}
else if (type_str == "merge")
{
type = MERGE_PARTS;
while (true)
{
String s;
readString(s, in);
assertString("\n", in);
if (s == "into")
break;
parts_to_merge.push_back(s);
}
readString(new_part_name, in);
}
assertString("\n", in);
}
2014-03-21 13:42:14 +00:00
}