This commit is contained in:
Michael Kolupaev 2014-05-13 14:10:26 +04:00
parent be5a9cf820
commit 90e93b171d
7 changed files with 136 additions and 94 deletions

View File

@ -38,11 +38,13 @@ public:
/// Примерное количество места на диске, нужное для мерджа. С запасом.
size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts);
/** Отменяет все текущие мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* После этого с этим экземпляром ничего делать нельзя.
/** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancelAll().
*/
void cancelAll() { canceled = true; }
void uncancelAll() { canceled = false; }
private:
MergeTreeData & data;

View File

@ -25,11 +25,11 @@ public:
time_t min_date_time = DateLUTSingleton::instance().fromDayNum(DayNum_t(current_block.min_date));
String month_name = toString(Date2OrderedIdentifier(min_date_time) / 100);
storage.zookeeper.tryCreate(storage.zookeeper_path + "/block_numbers/" + month_name, "", zkutil::CreateMode::Persistent);
storage.zookeeper->tryCreate(storage.zookeeper_path + "/block_numbers/" + month_name, "", zkutil::CreateMode::Persistent);
AbandonableLockInZooKeeper block_number_lock(
storage.zookeeper_path + "/block_numbers/" + month_name + "/block-",
storage.zookeeper_path + "/temp", storage.zookeeper);
storage.zookeeper_path + "/temp", *storage.zookeeper);
UInt64 part_number = block_number_lock.getNumber();
@ -42,7 +42,7 @@ public:
block_id = part->checksums.summaryDataChecksum();
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper.tryGet(
if (!block_id.empty() && storage.zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it");
@ -71,28 +71,28 @@ public:
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id,
"",
storage.zookeeper.getDefaultACL(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
part->checksums.toString(),
storage.zookeeper.getDefaultACL(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/number",
toString(part_number),
storage.zookeeper.getDefaultACL(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
}
storage.checkPartAndAddToZooKeeper(part, ops);
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/log/log-",
log_entry.toString(),
storage.zookeeper.getDefaultACL(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::PersistentSequential));
block_number_lock.getUnlockOps(ops);
storage.zookeeper.multi(ops);
storage.zookeeper->multi(ops);
}
}

View File

@ -200,7 +200,7 @@ private:
typedef std::vector<std::thread> Threads;
Context & context;
zkutil::ZooKeeper & zookeeper;
zkutil::ZooKeeperPtr zookeeper;
/// Куски, для которых в очереди есть задание на слияние.
StringSet currently_merging;
@ -227,6 +227,10 @@ private:
/** /replicas/me/is_active.
*/
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
/** Случайные данные, которые мы записали в /replicas/me/is_active.
*/
String active_node_identifier;
/** Является ли эта реплика "ведущей". Ведущая реплика выбирает куски для слияния.
*/

View File

@ -555,7 +555,7 @@ void Context::resetCaches() const
shared->mark_cache->reset();
}
void Context::setZooKeeper(SharedPtr<zkutil::ZooKeeper> zookeeper)
void Context::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
@ -565,7 +565,7 @@ void Context::setZooKeeper(SharedPtr<zkutil::ZooKeeper> zookeeper)
shared->zookeeper = zookeeper;
}
zkutil::ZooKeeper & Context::getZooKeeper() const
zkutil::ZooKeeperPtr Context::getZooKeeper() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);

View File

@ -4,6 +4,7 @@
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
#include <time.h>
namespace DB
{
@ -45,7 +46,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
if (!attach)
{
if (!zookeeper.exists(zookeeper_path))
if (!zookeeper->exists(zookeeper_path))
createTable();
if (!isTableEmpty())
@ -73,6 +74,12 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
unreplicated_merger.reset(new MergeTreeDataMerger(*unreplicated_data));
}
/// Сгенерируем этому экземпляру случайный идентификатор.
struct timespec times;
if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times))
throwFromErrno("Cannot clock_gettime.", ErrorCodes::CANNOT_CLOCK_GETTIME);
active_node_identifier = toString(times.tv_nsec);
restarting_thread = std::thread(&StorageReplicatedMergeTree::restartingThread, this);
}
@ -112,7 +119,7 @@ static String formattedAST(const ASTPtr & ast)
void StorageReplicatedMergeTree::createTable()
{
zookeeper.create(zookeeper_path, "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path, "", zkutil::CreateMode::Persistent);
/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними свою локальную структуру таблицы.
std::stringstream metadata;
@ -134,13 +141,13 @@ void StorageReplicatedMergeTree::createTable()
}
buf.next();
zookeeper.create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
}
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
@ -148,7 +155,7 @@ void StorageReplicatedMergeTree::createTable()
*/
void StorageReplicatedMergeTree::checkTableStructure()
{
String metadata_str = zookeeper.get(zookeeper_path + "/metadata");
String metadata_str = zookeeper->get(zookeeper_path + "/metadata");
ReadBufferFromString buf(metadata_str);
assertString("metadata format version: 1", buf);
assertString("\ndate column: ", buf);
@ -180,12 +187,12 @@ void StorageReplicatedMergeTree::checkTableStructure()
void StorageReplicatedMergeTree::createReplica()
{
zookeeper.create(replica_path, "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path, "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
}
void StorageReplicatedMergeTree::activateReplica()
@ -194,14 +201,23 @@ void StorageReplicatedMergeTree::activateReplica()
host << "host: " << context.getInterserverIOHost() << std::endl;
host << "port: " << context.getInterserverIOPort() << std::endl;
/** Если нода отмечена как активная, но отметка сделана в этом же экземпляре, удалим ее.
* Такое возможно только при истечении сессии в ZooKeeper.
* Здесь есть небольшой race condition (можем удалить не ту ноду, для которой сделали tryGet),
* но он крайне маловероятен при нормальном использовании.
*/
String data;
if (zookeeper->tryGet(replica_path + "/is_active", data) && data == active_node_identifier)
zookeeper->tryRemove(replica_path + "/is_active");
/// Одновременно объявим, что эта реплика активна, и обновим хост.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(replica_path + "/is_active", "", zookeeper.getDefaultACL(), zkutil::CreateMode::Ephemeral));
ops.push_back(new zkutil::Op::Create(replica_path + "/is_active", "", zookeeper->getDefaultACL(), zkutil::CreateMode::Ephemeral));
ops.push_back(new zkutil::Op::SetData(replica_path + "/host", host.str(), -1));
try
{
zookeeper.multi(ops);
zookeeper->multi(ops);
}
catch (zkutil::KeeperException & e)
{
@ -212,15 +228,15 @@ void StorageReplicatedMergeTree::activateReplica()
throw;
}
replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", zookeeper);
replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", *zookeeper);
}
bool StorageReplicatedMergeTree::isTableEmpty()
{
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const auto & replica : replicas)
{
if (!zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty())
if (!zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty())
return false;
}
return true;
@ -228,7 +244,7 @@ bool StorageReplicatedMergeTree::isTableEmpty()
void StorageReplicatedMergeTree::checkParts()
{
Strings expected_parts_vec = zookeeper.getChildren(replica_path + "/parts");
Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts");
NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
MergeTreeData::DataParts parts = data.getAllDataParts();
@ -280,7 +296,7 @@ void StorageReplicatedMergeTree::checkParts()
LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper.multi(ops);
zookeeper->multi(ops);
}
/// Удалим из ZK информацию о кусках, покрытых только что добавленными.
@ -289,7 +305,7 @@ void StorageReplicatedMergeTree::checkParts()
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
zookeeper.multi(ops);
zookeeper->multi(ops);
}
/// Удалим лишние локальные куски.
@ -306,7 +322,7 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
if (!another_replica.empty())
{
String checksums_str;
if (zookeeper.tryGet(zookeeper_path + "/replicas/" + another_replica + "/parts/" + part->name + "/checksums", checksums_str))
if (zookeeper->tryGet(zookeeper_path + "/replicas/" + another_replica + "/parts/" + part->name + "/checksums", checksums_str))
{
auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
checksums.checkEqual(part->checksums, true);
@ -316,12 +332,12 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name,
"",
zookeeper.getDefaultACL(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/checksums",
part->checksums.toString(),
zookeeper.getDefaultACL(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
}
@ -334,7 +350,7 @@ void StorageReplicatedMergeTree::clearOldParts()
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
zkutil::ReturnCode::type code = zookeeper.tryMulti(ops);
zkutil::ReturnCode::type code = zookeeper->tryMulti(ops);
if (code != zkutil::ReturnCode::Ok)
LOG_WARNING(log, "Couldn't remove part " << name << " from ZooKeeper: " << zkutil::ReturnCode::toString(code));
}
@ -345,17 +361,17 @@ void StorageReplicatedMergeTree::clearOldParts()
void StorageReplicatedMergeTree::clearOldLogs()
{
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
for (const String & replica : replicas)
{
String pointer;
if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, pointer))
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, pointer))
return;
min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
}
Strings entries = zookeeper.getChildren(replica_path + "/log");
Strings entries = zookeeper->getChildren(replica_path + "/log");
std::sort(entries.begin(), entries.end());
size_t removed = 0;
@ -364,7 +380,7 @@ void StorageReplicatedMergeTree::clearOldLogs()
UInt64 index = parse<UInt64>(entry.substr(strlen("log-")));
if (index >= min_pointer)
break;
zookeeper.remove(replica_path + "/log/" + entry);
zookeeper->remove(replica_path + "/log/" + entry);
++removed;
}
@ -375,7 +391,7 @@ void StorageReplicatedMergeTree::clearOldLogs()
void StorageReplicatedMergeTree::clearOldBlocks()
{
zkutil::Stat stat;
if (!zookeeper.exists(zookeeper_path + "/blocks", &stat))
if (!zookeeper->exists(zookeeper_path + "/blocks", &stat))
throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int children_count = stat.getnumChildren();
@ -387,14 +403,14 @@ void StorageReplicatedMergeTree::clearOldBlocks()
LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - data.settings.replicated_deduplication_window
<< " old blocks from ZooKeeper");
Strings blocks = zookeeper.getChildren(zookeeper_path + "/blocks");
Strings blocks = zookeeper->getChildren(zookeeper_path + "/blocks");
std::vector<std::pair<Int64, String> > timed_blocks;
for (const String & block : blocks)
{
zkutil::Stat stat;
zookeeper.exists(zookeeper_path + "/blocks/" + block, &stat);
zookeeper->exists(zookeeper_path + "/blocks/" + block, &stat);
timed_blocks.push_back(std::make_pair(stat.getczxid(), block));
}
@ -405,7 +421,7 @@ void StorageReplicatedMergeTree::clearOldBlocks()
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
zookeeper.multi(ops);
zookeeper->multi(ops);
}
LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
@ -415,11 +431,11 @@ void StorageReplicatedMergeTree::loadQueue()
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
Strings children = zookeeper.getChildren(replica_path + "/queue");
Strings children = zookeeper->getChildren(replica_path + "/queue");
std::sort(children.begin(), children.end());
for (const String & child : children)
{
String s = zookeeper.get(replica_path + "/queue/" + child);
String s = zookeeper->get(replica_path + "/queue/" + child);
LogEntry entry = LogEntry::parse(s);
entry.znode_name = child;
entry.tagPartsAsCurrentlyMerging(*this);
@ -462,32 +478,32 @@ void StorageReplicatedMergeTree::pullLogsToQueue()
typedef std::priority_queue<LogIterator> PriorityQueue;
PriorityQueue priority_queue;
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
String index_str;
UInt64 index;
if (zookeeper.tryGet(replica_path + "/log_pointers/" + replica, index_str))
if (zookeeper->tryGet(replica_path + "/log_pointers/" + replica, index_str))
{
index = parse<UInt64>(index_str);
}
else
{
/// Если у нас еще нет указателя на лог этой реплики, поставим указатель на первую запись в нем.
Strings entries = zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/log");
Strings entries = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/log");
std::sort(entries.begin(), entries.end());
index = entries.empty() ? 0 : parse<UInt64>(entries[0].substr(strlen("log-")));
zookeeper.create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent);
}
LogIterator iterator;
iterator.replica = replica;
iterator.index = index;
if (iterator.readEntry(zookeeper, zookeeper_path))
if (iterator.readEntry(*zookeeper, zookeeper_path))
priority_queue.push(iterator);
}
@ -504,10 +520,10 @@ void StorageReplicatedMergeTree::pullLogsToQueue()
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", iterator.entry_str, zookeeper.getDefaultACL(), zkutil::CreateMode::PersistentSequential));
replica_path + "/queue/queue-", iterator.entry_str, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/log_pointers/" + iterator.replica, toString(iterator.index + 1), -1));
auto results = zookeeper.multi(ops);
auto results = zookeeper->multi(ops);
String path_created = dynamic_cast<zkutil::OpResult::Create &>((*results)[0]).getPathCreated();
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
@ -515,7 +531,7 @@ void StorageReplicatedMergeTree::pullLogsToQueue()
queue.push_back(entry);
++iterator.index;
if (iterator.readEntry(zookeeper, zookeeper_path))
if (iterator.readEntry(*zookeeper, zookeeper_path))
priority_queue.push(iterator);
}
@ -554,7 +570,7 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
MergeTreeData::DataPartPtr containing_part = data.getContainingPart(entry.new_part_name);
/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper.
if (containing_part && zookeeper.exists(replica_path + "/parts/" + containing_part->name))
if (containing_part && zookeeper->exists(replica_path + "/parts/" + containing_part->name))
{
if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
@ -606,7 +622,7 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper.multi(ops);
zookeeper->multi(ops);
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
}
@ -757,7 +773,7 @@ void StorageReplicatedMergeTree::queueThread()
{
executeLogEntry(entry);
auto code = zookeeper.tryRemove(replica_path + "/queue/" + entry.znode_name);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry.znode_name);
if (code != zkutil::ReturnCode::Ok)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
<< zkutil::ReturnCode::toString(code) + ". There must be a bug somewhere. Ignoring it.");
@ -871,7 +887,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
entry.parts_to_merge.push_back(part->name);
}
zookeeper.create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
zookeeper->create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
success = true;
}
@ -894,7 +910,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str;
zookeeper.tryRemove(path);
zookeeper->tryRemove(path);
}
}
}
@ -951,7 +967,7 @@ bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str;
if (AbandonableLockInZooKeeper::check(path, zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
if (AbandonableLockInZooKeeper::check(path, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
{
LOG_DEBUG(log, "Can't merge parts " << left->name << " and " << right->name << " because block " << path << " exists");
return false;
@ -971,15 +987,15 @@ void StorageReplicatedMergeTree::becomeLeader()
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
{
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
/// Из реплик, у которых есть кусок, выберем одну равновероятно.
std::random_shuffle(replicas.begin(), replicas.end());
for (const String & replica : replicas)
{
if (zookeeper.exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
(!active || zookeeper.exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
(!active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
return replica;
}
@ -995,7 +1011,7 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
String host;
int port;
String host_port_str = zookeeper.get(zookeeper_path + "/replicas/" + replica_name + "/host");
String host_port_str = zookeeper->get(zookeeper_path + "/replicas/" + replica_name + "/host");
ReadBufferFromString buf(host_port_str);
assertString("host: ", buf);
readString(host, buf);
@ -1010,7 +1026,7 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper.multi(ops);
zookeeper->multi(ops);
for (const auto & removed_part : removed_parts)
{
@ -1059,9 +1075,13 @@ void StorageReplicatedMergeTree::startup()
{
shutdown_called = false;
merger.uncancelAll();
if (unreplicated_merger)
unreplicated_merger->uncancelAll();
activateReplica();
leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", zookeeper,
leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", *zookeeper,
std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
@ -1071,27 +1091,48 @@ void StorageReplicatedMergeTree::startup()
void StorageReplicatedMergeTree::restartingThread()
{
startup();
while (!permanent_shutdown_called)
try
{
if (zookeeper.expired())
startup();
while (!permanent_shutdown_called)
{
/// Запретим писать в таблицу, пока подменяем zookeeper.
auto structure_lock = lockDataForAlter();
if (zookeeper->expired())
{
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
partialShutdown();
/// Запретим писать в таблицу, пока подменяем zookeeper.
LOG_TRACE(log, "Locking all operations");
auto structure_lock = lockDataForAlter();
LOG_TRACE(log, "Locked all operations");
zookeeper = context.getZooKeeper();
partialShutdown();
startup();
zookeeper = context.getZooKeeper();
startup();
}
std::this_thread::sleep_for(std::chrono::seconds(2));
}
std::this_thread::sleep_for(std::chrono::seconds(2));
}
catch (...)
{
tryLogCurrentException("StorageReplicatedMergeTree::restartingThread");
LOG_ERROR(log, "Exception in restartingThread. The storage will be read-only until server restart.");
goReadOnly();
return;
}
endpoint_holder = nullptr;
partialShutdown();
try
{
endpoint_holder = nullptr;
partialShutdown();
}
catch (...)
{
tryLogCurrentException("StorageReplicatedMergeTree::restartingThread");
}
}
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
@ -1158,9 +1199,9 @@ void StorageReplicatedMergeTree::drop()
shutdown();
replica_is_active_node = nullptr;
zookeeper.removeRecursive(replica_path);
if (zookeeper.getChildren(zookeeper_path + "/replicas").empty())
zookeeper.removeRecursive(zookeeper_path);
zookeeper->removeRecursive(replica_path);
if (zookeeper->getChildren(zookeeper_path + "/replicas").empty())
zookeeper->removeRecursive(zookeeper_path);
}
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const

View File

@ -133,7 +133,7 @@ private:
zk::ZooKeeper impl;
String hosts;
std::string hosts;
int32_t sessionTimeoutMs;
WatchFunction * state_watch;

View File

@ -6,10 +6,5 @@
# Чтобы посмотреть, какие правила сейчас есть, используйте sudo iptables -L и sudo ip6tables -L
sudo iptables -A OUTPUT -d example1 -j DROP
sudo iptables -A OUTPUT -d example2 -j DROP
sudo iptables -A OUTPUT -d example3 -j DROP
sudo ip6tables -A OUTPUT -d example1 -j DROP
sudo ip6tables -A OUTPUT -d example2 -j DROP
sudo ip6tables -A OUTPUT -d example3 -j DROP
sudo iptables -A OUTPUT -p tcp --dport 2181 -j DROP
sudo ip6tables -A OUTPUT -p tcp --dport 2181 -j DROP