This commit is contained in:
Michael Kolupaev 2014-04-02 11:59:43 +04:00
parent 634c33c766
commit 8fc53a7020
9 changed files with 274 additions and 11 deletions

View File

@ -236,6 +236,9 @@ namespace ErrorCodes
NO_FILE_IN_DATA_PART,
UNEXPECTED_FILE_IN_DATA_PART,
BAD_SIZE_OF_FILE_IN_DATA_PART,
NOT_FOUND_EXPECTED_DATA_PART,
TOO_MANY_UNEXPECTED_DATA_PARTS,
NO_SUCH_DATA_PART,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -63,12 +63,17 @@ private:
class InterserverIOEndpointHolder
{
public:
InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint, InterserverIOHandler & handler_)
: name(name_), handler(handler_)
InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint_, InterserverIOHandler & handler_)
: name(name_), endpoint(endpoint_), handler(handler_)
{
handler.addEndpoint(name, endpoint);
}
InterserverIOEndpointPtr getEndpoint()
{
return endpoint;
}
~InterserverIOEndpointHolder()
{
try
@ -83,6 +88,7 @@ public:
private:
String name;
InterserverIOEndpointPtr endpoint;
InterserverIOHandler & handler;
};

View File

@ -0,0 +1,99 @@
#pragma once
#include <zkutil/ZooKeeper.h>
#include <DB/Core/Exception.h>
namespace DB
{
/** Примитив синхронизации. Работает следующим образом:
* При создании создает неэфемерную инкрементную ноду и помечает ее как заблокированную (LOCKED).
* unlock() разблокирует ее (UNLOCKED).
* При вызове деструктора или завершении сессии в ZooKeeper, переходит в состояние ABANDONED.
* (В том числе при падении программы)
*/
class AbandonableLockInZooKeeper
{
public:
enum State
{
UNLOCKED,
LOCKED,
ABANDONED,
};
AbandonableLockInZookeeper(
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_)
: zookeeper(zookeeper_), path_prefix(path_prefix_)
{
/// Создадим вспомогательную эфемерную ноду.
holder_path = zookeeper.create(temp_path + "/abandonable-lock-", "", zkutil::CreateMode::EphemeralSequential);
/// Запишем в основную ноду путь к вспомогательной.
path = zookeeper.create(path_prefix, holder_path, zkutil::CreateMode::PersistentSequential);
}
String getPath()
{
return path;
}
/// Распарсить число в конце пути.
UInt64 getNumber()
{
return static_cast<UInt64>(atol(path.substr(path_prefix.size())));
}
void unlock()
{
zookeeper.remove(path);
zookeeper.remove(holder_path);
}
~AbandonableLockInZookeeper()
{
try
{
zookeeper.remove(holder_path);
zookeeper.set(path, ""); /// Это не обязательно.
}
catch (...)
{
tryLogCurrentException();
}
}
static State check(const String & path, zkutil::ZooKeeper & zookeeper)
{
String holder_path;
/// Если нет основной ноды, UNLOCKED.
if (!zookeeper.tryGet(path, holder_path))
return UNLOCKED;
/// Если в основной ноде нет пути к вспомогательной, ABANDONED.
if (holder_path.empty())
return ABANDONED;
/// Если вспомогательная нода жива, LOCKED.
if (zookeeper.exists(holder_path))
return LOCKED;
/// Если вспомогательной ноды нет, нужно еще раз проверить существование основной ноды,
/// потому что за это время могли успеть вызвать unlock().
/// Заодно уберем оттуда путь к вспомогательной ноде.
if (zookeeper.trySet(path, "") == zkutil::ReturnCode::Ok)
return ABANDONED;
return UNLOCKED;
}
private:
zkutil::ZooKeeper & zookeeper;
String path_prefix;
String path;
String holder_path;
};
}

View File

@ -7,6 +7,7 @@
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/ExpressionActions.h>
#include <DB/Storages/IStorage.h>
#include <DB/IO/ReadBufferFromString.h>
#include <Poco/RWLock.h>
namespace DB
@ -125,6 +126,15 @@ public:
{
return files.empty();
}
static Checksums parse(const String & s)
{
ReadBufferFromString in(s);
Checksums res;
res.readText(in);
assertEOF(in);
return res;
}
};
DataPart(MergeTreeData & storage_) : storage(storage_), size_in_bytes(0) {}
@ -176,10 +186,11 @@ public:
Poco::File(to).remove(true);
}
void renameToOld() const
/// Переименовывает кусок, дописав к имени префикс.
void renameAddPrefix(const String & prefix) const
{
String from = storage.full_path + name + "/";
String to = storage.full_path + "old_" + name + "/";
String to = storage.full_path + prefix + name + "/";
Poco::File f(from);
f.setLastModified(Poco::Timestamp::fromEpochTime(time(0)));
@ -318,6 +329,12 @@ public:
*/
void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment);
/** Переименовывает кусок в prefix_кусок и убирает его из рабочего набора.
* Лучше использовать только когда никто не может читать или писать этот кусок
* (например, при инициализации таблицы).
*/
void renameAndRemovePart(DataPartPtr part, const String & prefix);
/** Удалить неактуальные куски.
*/
void clearOldParts();

View File

@ -0,0 +1,80 @@
#pragma once
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/AbandonableLockInZooKeeper.h>
namespace DB
{
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_)
: storage(storage_), insert_id(insert_id_), block_index(0) {}
void write(const Block & block) override
{
auto part_blocks = storage.writer.splitBlockIntoParts(block);
for (auto & current_block : part_blocks)
{
++block_index;
String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index);
AbandonableLockInZooKeeper block_number_lock(
storage.zookeeper_path + "/block-numbers/block-",
storage.zookeeper_path + "/temp", storage.zookeeper);
UInt64 part_number = block_number_lock.getNumber();
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, part_number);
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper.tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
/// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять.
auto expected_checksums = MergeTreeData::DataPart::Checksums.parse(expected_checksums_str);
expected_checksums.check(part->checksums);
/// Бросаем block_number_lock.
continue;
}
storage.data.renameTempPartAndAdd(part);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id,
"",
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
part->checksums.toString(),
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/number",
toString(part_numbre),
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/parts/" + block_id + "/number",
toString(part_numbre),
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
const std::vector<data::ACL>& acl, CreateMode::type mode);
block_number_lock.unlock();
}
}
private:
StorageReplicatedMergeTree & storage;
String insert_id;
size_t block_index;
};
}

View File

@ -62,6 +62,8 @@ public:
void drop() override;
private:
friend class ReplicatedMergeTreeBlockOutputStream;
struct LogEntry
{
enum Type
@ -82,7 +84,12 @@ private:
class MyInterserverIOEndpoint : public InterserverIOEndpoint
{
public:
MyInterserverIOEndpoint(StorageReplicatedMergeTree & storage_) : storage(storage_), owned_storage(storage.thisPtr()) {}
MyInterserverIOEndpoint(StorageReplicatedMergeTree & storage_) : storage(storage_) {}
void setOwnedStorage(StoragePtr owned_storage_)
{
owned_storage = owned_storage_;
}
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;

View File

@ -290,7 +290,7 @@ void MergeTreeData::clearOldParts()
{
LOG_DEBUG(log, "'Removing' part " << (*it)->name << " (prepending old_ to its name)");
(*it)->renameToOld();
(*it)->renameAddPrefix("old_");
all_data_parts.erase(it++);
}
else
@ -703,6 +703,14 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * in
all_data_parts.insert(part);
}
void MergeTreeData::renameAndRemovePart(DataPartPtr part, const String & prefix)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
if (!data_parts.erase(part))
throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
part->renameAddPrefix(prefix);
}
MergeTreeData::DataParts MergeTreeData::getDataParts()
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

View File

@ -114,7 +114,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
new_data_part->left = temp_index;
new_data_part->right = temp_index;
new_data_part->level = 0;
new_data_part->name = tmp_part_name;
new_data_part->name = tmp_part_name; (no tmp_ here?)
new_data_part->size = part_size;
new_data_part->modification_time = time(0);
new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date);

View File

@ -55,7 +55,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
else
{
checkTableStructure();
//checkParts();
checkParts();
}
String endpoint_name = "ReplicatedMergeTree:" + replica_path;
@ -79,8 +79,12 @@ StoragePtr StorageReplicatedMergeTree::create(
const String & sign_column_,
const MergeTreeSettings & settings_)
{
return (new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach, path_, name_, columns_, context_, primary_expr_ast_,
date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_))->thisPtr();
StorageReplicatedMergeTree * res = new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach,
path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_);
StoragePtr res_ptr = res->thisPtr();
dynamic_cast<MyInterserverIOEndpoint &>(*res->endpoint_holder->getEndpoint()).setOwnedStorage(res_ptr);
return res_ptr;
}
static String formattedAST(const ASTPtr & ast)
@ -156,6 +160,7 @@ void StorageReplicatedMergeTree::checkTableStructure()
assertString(it.second->getName(), buf);
assertString("\n", buf);
}
assertEOF(buf);
}
void StorageReplicatedMergeTree::createReplica()
@ -170,6 +175,8 @@ void StorageReplicatedMergeTree::createReplica()
void StorageReplicatedMergeTree::activateReplica()
{
throw Exception("test");
std::stringstream host;
host << "host: " << context.getInterserverIOHost() << std::endl;
host << "port: " << context.getInterserverIOPort() << std::endl;
@ -194,7 +201,42 @@ bool StorageReplicatedMergeTree::isTableEmpty()
return true;
}
void StorageReplicatedMergeTree::checkParts() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
void StorageReplicatedMergeTree::checkParts()
{
Strings expected_parts_vec = zookeeper.getChildren(replica_path + "/parts");
NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
MergeTreeData::DataParts parts = data.getDataParts();
MergeTreeData::DataPartsVector unexpected_parts;
for (const auto & part : parts)
{
if (expected_parts.count(part->name))
{
expected_parts.erase(part->name);
}
else
{
unexpected_parts.push_back(part);
}
}
if (!expected_parts.empty())
throw Exception("Not found " + toString(expected_parts.size())
+ " (including " + *expected_parts.begin() + ") parts in table " + data.getTableName(),
ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
if (unexpected_parts.size() > 1)
throw Exception("More than one unexpected part (including " + unexpected_parts[0]->name
+ ") in table " + data.getTableName(),
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
for (MergeTreeData::DataPartPtr part : unexpected_parts)
{
LOG_ERROR(log, "Unexpected part " << part->name << ". Renaming it to ignored_" + part->name);
data.renameAndRemovePart(part, "ignored_");
}
}
void StorageReplicatedMergeTree::loadQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
@ -207,6 +249,7 @@ void StorageReplicatedMergeTree::executeSomeQueueEntry() { throw Exception("Not
bool StorageReplicatedMergeTree::tryExecute(const LogEntry & entry) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
void StorageReplicatedMergeTree::getPart(const String & name, const String & replica_name) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
void StorageReplicatedMergeTree::shutdown()