This commit is contained in:
Michael Kolupaev 2014-04-03 16:49:01 +04:00
parent b4d85c1676
commit 50ba6ca8e1
3 changed files with 72 additions and 30 deletions

View File

@ -22,7 +22,7 @@ public:
String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index);
AbandonableLockInZooKeeper block_number_lock(
storage.zookeeper_path + "/block-numbers/block-",
storage.zookeeper_path + "/block_numbers/block-",
storage.zookeeper_path + "/temp", storage.zookeeper);
UInt64 part_number = block_number_lock.getNumber();

View File

@ -133,7 +133,8 @@ void loadMetadata(Context & context)
}
catch (const Exception & e)
{
throw Exception("Cannot create table from metadata file " + tables[j] + ", error: " + e.displayText(),
throw Exception("Cannot create table from metadata file " + tables[j] + ", error: " + e.displayText() +
", stack trace:\n" + e.getStackTrace().toString(),
ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA);
}
}

View File

@ -129,7 +129,7 @@ void StorageReplicatedMergeTree::createTable()
/// Создадим нужные "директории".
zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/block-numbers", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
}
@ -249,7 +249,7 @@ void StorageReplicatedMergeTree::loadQueue()
std::sort(children.begin(), children.end());
for (const String & child : children)
{
String s = zookeeper.get(child);
String s = zookeeper.get(replica_path + "/queue/" + child);
LogEntry entry = LogEntry::parse(s);
entry.znode_name = child;
queue.push_back(entry);
@ -260,47 +260,88 @@ void StorageReplicatedMergeTree::pullLogsToQueue()
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/// Сольем все логи в хронологическом порядке.
struct LogIterator
{
String replica; /// Имя реплики.
UInt64 index; /// Номер записи в логе (суффикс имени ноды).
Int64 timestamp; /// Время (czxid) создания записи в логе.
String entry_str; /// Сама запись.
bool operator<(const LogIterator & rhs) const
{
return timestamp < rhs.timestamp;
}
bool readEntry(zkutil::ZooKeeper & zookeeper, const String & zookeeper_path)
{
String index_str = toString(index);
while (index_str.size() < 10)
index_str = '0' + index_str;
zkutil::Stat stat;
if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log/log-" + index_str, entry_str, &stat))
return false;
timestamp = stat.getczxid();
return true;
}
};
typedef std::priority_queue<LogIterator> PriorityQueue;
PriorityQueue priority_queue;
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
String log_path = zookeeper_path + "/" + replica + "/log";
String index_str;
UInt64 index;
String pointer_str;
UInt64 pointer;
if (zookeeper.tryGet(replica_path + "/log_pointers/" + replica, pointer_str))
if (zookeeper.tryGet(replica_path + "/log_pointers/" + replica, index_str))
{
pointer = Poco::NumberParser::parseUnsigned64(pointer_str);
index = Poco::NumberParser::parseUnsigned64(index_str);
}
else
{
/// Если у нас еще нет указателя на лог этой реплики, поставим указатель на первую запись в нем.
Strings entries = zookeeper.getChildren(log_path);
Strings entries = zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/log");
std::sort(entries.begin(), entries.end());
pointer = entries.empty() ? 0 : Poco::NumberParser::parseUnsigned64(entries[0].substr(strlen("log-")));
index = entries.empty() ? 0 : Poco::NumberParser::parseUnsigned64(entries[0].substr(strlen("log-")));
zookeeper.create(replica_path + "/log_pointers/" + replica, toString(pointer), zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent);
}
String entry_str;
while (zookeeper.tryGet(log_path + "/log-" + toString(pointer), entry_str))
{
LogEntry entry = LogEntry::parse(entry_str);
LogIterator iterator;
iterator.replica = replica;
iterator.index = index;
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", entry_str, zookeeper.getDefaultACL(), zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/log_pointers/" + replica, toString(pointer + 1), -1));
auto results = zookeeper.multi(ops);
if (iterator.readEntry(zookeeper, zookeeper_path))
priority_queue.push(iterator);
}
String path_created = dynamic_cast<zkutil::OpResult::Create &>((*results)[0]).getPathCreated();
entry.znode_name = path.substr(path.find_last_of('/') + 1);
queue.push_back(entry);
while (!priority_queue.empty())
{
LogIterator iterator = priority_queue.top();
priority_queue.pop();
++pointer;
}
LogEntry entry = LogEntry::parse(iterator.entry_str);
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", iterator.entry_str, zookeeper.getDefaultACL(), zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/log_pointers/" + iterator.replica, toString(iterator.index + 1), -1));
auto results = zookeeper.multi(ops);
String path_created = dynamic_cast<zkutil::OpResult::Create &>((*results)[0]).getPathCreated();
entry.znode_name = path.substr(path.find_last_of('/') + 1);
queue.push_back(entry);
++iterator.index;
if (iterator.readEntry(zookeeper, zookeeper_path))
priority_queue.push(iterator);
}
}
@ -441,7 +482,7 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
assertString("\n", buf);
assertEOF(buf);
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(name, zookeeper_path + "/replicas/" + replica_name, host, port);
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
data.renameTempPartAndAdd(part, nullptr);
zkutil::Ops ops;