This commit is contained in:
Michael Kolupaev 2014-07-22 17:49:52 +04:00
parent 776f11a256
commit 16e5a92ed9
8 changed files with 299 additions and 147 deletions

View File

@ -96,6 +96,11 @@ public:
return UNLOCKED;
}
static void createAbandonedIfNotExists(const String & path, zkutil::ZooKeeper & zookeeper)
{
zookeeper.createIfNotExists(path, "");
}
private:
zkutil::ZooKeeper & zookeeper;
String path_prefix;

View File

@ -73,6 +73,8 @@ public:
/// Кладет в DataPart данные из имени кусочка.
static void parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches = nullptr);
static bool contains(const String & outer_part_name, const String & inner_part_name);
private:
typedef std::set<Part> Parts;

View File

@ -10,52 +10,19 @@
namespace DB
{
class StorageReplicatedMergeTree;
class ReplicatedMergeTreePartsServer : public InterserverIOEndpoint
{
public:
ReplicatedMergeTreePartsServer(MergeTreeData & data_, StoragePtr owned_storage_) : data(data_),
owned_storage(owned_storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsServer)")) {}
ReplicatedMergeTreePartsServer(MergeTreeData & data_, StorageReplicatedMergeTree & storage_) : data(data_),
storage(storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsServer)")) {}
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override
{
String part_name = params.get("part");
LOG_TRACE(log, "Sending part " << part_name);
auto storage_lock = owned_storage->lockStructure(false);
MergeTreeData::DataPartPtr part = findPart(part_name);
Poco::ScopedReadRWLock part_lock(part->columns_lock);
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Добавим файлы, которых нет в списке контрольных сумм.
checksums.files["checksums.txt"];
checksums.files["columns.txt"];
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String path = data.getFullPath() + part_name + "/" + it.first;
UInt64 size = Poco::File(path).getSize();
writeStringBinary(it.first, out);
writeBinary(size, out);
ReadBufferFromFile file_in(path);
HashingWriteBuffer hashing_out(out);
copyData(file_in, hashing_out);
if (hashing_out.count() != size)
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
writeBinary(hashing_out.getHash(), out);
}
}
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
private:
MergeTreeData & data;
StoragePtr owned_storage;
StorageReplicatedMergeTree & storage;
Logger * log;
@ -78,60 +45,7 @@ public:
const String & part_name,
const String & replica_path,
const String & host,
int port)
{
ReadBufferFromHTTP::Params params = {
std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path),
std::make_pair("part", part_name),
std::make_pair("compress", "false")};
ReadBufferFromHTTP in(host, port, params);
String part_path = data.getFullPath() + "tmp_" + part_name + "/";
if (!Poco::File(part_path).createDirectory())
throw Exception("Directory " + part_path + " already exists");
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
new_data_part->name = "tmp_" + part_name;
new_data_part->is_temp = true;
size_t files;
readBinary(files, in);
MergeTreeData::DataPart::Checksums checksums;
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, in);
readBinary(file_size, in);
WriteBufferFromFile file_out(part_path + file_name);
HashingWriteBuffer hashing_out(file_out);
copyData(in, hashing_out, file_size);
uint128 expected_hash;
readBinary(expected_hash, in);
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path);
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
checksums.addFile(file_name, file_size, expected_hash);
}
assertEOF(in);
ActiveDataPartSet::parsePartName(part_name, *new_data_part);
new_data_part->modification_time = time(0);
new_data_part->loadColumns();
new_data_part->loadChecksums();
new_data_part->loadIndex();
new_data_part->checksums.checkEqual(checksums, false);
return new_data_part;
}
int port);
private:
MergeTreeData & data;

View File

@ -70,6 +70,9 @@ public:
bool supportsIndexForIn() const override { return true; }
/// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке.
void enqueuePartForCheck(const String & name);
private:
friend class ReplicatedMergeTreeBlockOutputStream;
@ -186,7 +189,7 @@ private:
*/
StringSet parts_to_check_set;
StringList parts_to_check_queue;
Poco::FastMuterx parts_to_check_mutex;
Poco::FastMutex parts_to_check_mutex;
Poco::Event parts_to_check_event;
String database_name;
@ -326,8 +329,8 @@ private:
*/
void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops);
/// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке.
void enqueuePartForCheck(const String & name);
/// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками.
void removePartAndEnqueueFetch(const String & part_name);
void clearOldParts();

View File

@ -140,4 +140,12 @@ void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, con
part.right_month = date_lut.toFirstDayNumOfMonth(part.right_date);
}
bool ActiveDataPartSet::contains(const String & outer_part_name, const String & inner_part_name)
{
Part outer, inner;
parsePartName(outer_part_name, outer);
parsePartName(inner_part_name, inner);
return outer.contains(inner);
}
}

View File

@ -532,6 +532,9 @@ void MergeTreeData::AlterDataPartTransaction::commit()
mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path);
/// TODO: можно не сбрасывать кеши при добавлении столбца.
data_part->storage.context.resetCaches();
clear();
}
catch (...)

View File

@ -0,0 +1,122 @@
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
namespace DB
{
void ReplicatedMergeTreePartsServer::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
{
String part_name = params.get("part");
LOG_TRACE(log, "Sending part " << part_name);
try
{
auto storage_lock = storage.lockStructure(false);
MergeTreeData::DataPartPtr part = findPart(part_name);
Poco::ScopedReadRWLock part_lock(part->columns_lock);
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Добавим файлы, которых нет в списке контрольных сумм.
checksums.files["checksums.txt"];
checksums.files["columns.txt"];
MergeTreeData::DataPart::Checksums data_checksums;
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String file_name = it.first;
String path = data.getFullPath() + part_name + "/" + file_name;
UInt64 size = Poco::File(path).getSize();
writeStringBinary(it.first, out);
writeBinary(size, out);
ReadBufferFromFile file_in(path);
HashingWriteBuffer hashing_out(out);
copyData(file_in, hashing_out);
if (hashing_out.count() != size)
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
writeBinary(hashing_out.getHash(), out);
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
}
part->checksums.checkEqual(checksums, false);
}
catch (...)
{
storage.enqueuePartForCheck(part_name);
throw;
}
}
MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart(
const String & part_name,
const String & replica_path,
const String & host,
int port)
{
ReadBufferFromHTTP::Params params = {
std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path),
std::make_pair("part", part_name),
std::make_pair("compress", "false")};
ReadBufferFromHTTP in(host, port, params);
String part_path = data.getFullPath() + "tmp_" + part_name + "/";
if (!Poco::File(part_path).createDirectory())
throw Exception("Directory " + part_path + " already exists");
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
new_data_part->name = "tmp_" + part_name;
new_data_part->is_temp = true;
size_t files;
readBinary(files, in);
MergeTreeData::DataPart::Checksums checksums;
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, in);
readBinary(file_size, in);
WriteBufferFromFile file_out(part_path + file_name);
HashingWriteBuffer hashing_out(file_out);
copyData(in, hashing_out, file_size);
uint128 expected_hash;
readBinary(expected_hash, in);
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path);
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
checksums.addFile(file_name, file_size, expected_hash);
}
assertEOF(in);
ActiveDataPartSet::parsePartName(part_name, *new_data_part);
new_data_part->modification_time = time(0);
new_data_part->loadColumns();
new_data_part->loadChecksums();
new_data_part->loadIndex();
new_data_part->checksums.checkEqual(checksums, false);
return new_data_part;
}
}

View File

@ -1,6 +1,7 @@
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
@ -119,7 +120,7 @@ StoragePtr StorageReplicatedMergeTree::create(
if (!res->is_read_only)
{
String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr);
InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, *res);
res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
}
return res_ptr;
@ -515,15 +516,6 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
zkutil::CreateMode::Persistent));
}
void StorageReplicatedMergeTree::enqueuePartForCheck(const String & name)
{
Poco::ScopedLock<Poco::FastMutex> lock(parts_to_check_mutex);
if (parts_to_check_set.count(name))
return;
parts_to_check_queue.push_back(name);
parts_to_check_set.insert(name);
}
void StorageReplicatedMergeTree::clearOldParts()
{
Strings parts = data.clearOldParts();
@ -1091,10 +1083,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
/// Уберем больше не нужные отметки о несуществующих блоках.
for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
zookeeper->tryRemove(path);
}
@ -1257,6 +1246,45 @@ void StorageReplicatedMergeTree::alterThread()
LOG_DEBUG(log, "alter thread finished");
}
void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name)
{
String part_path = replica_path + "/parts/" + part_name;
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = part_name;
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(),
zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::Remove(part_path + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(part_path + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(part_path, -1));
auto results = zookeeper->multi(ops);
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
log_entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
log_entry.addResultToVirtualParts(*this);
queue.push_back(log_entry);
}
}
void StorageReplicatedMergeTree::enqueuePartForCheck(const String & name)
{
Poco::ScopedLock<Poco::FastMutex> lock(parts_to_check_mutex);
if (parts_to_check_set.count(name))
return;
parts_to_check_queue.push_back(name);
parts_to_check_set.insert(name);
parts_to_check_event.set();
}
void StorageReplicatedMergeTree::partCheckThread()
{
while (!shutdown_called)
@ -1300,39 +1328,80 @@ void StorageReplicatedMergeTree::partCheckThread()
LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. "
"Removing from ZooKeeper and queueing a fetch.");
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = part_name;
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(),
zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::Remove(part_path + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(part_path + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(part_path, -1));
auto results = zookeeper->multi(ops);
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
log_entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
log_entry.addResultToVirtualParts(*this);
queue.push_back(entry);
}
removePartAndEnqueueFetch(part_name);
}
/// Если куска нет в ZooKeeper, проверим есть ли он хоть у кого-то.
else
{
LOG_WARNING(log, "Checking if anyone has part covering " << part_name << ".");
asdqwe;
/// Если ни у кого нет такого куска, удалим его из нашей очереди и добавим его в block_numbers.
//не получится надежно удалить из очереди :( Можно попробовать полагаться на block_numbers, но их могут удалить
LOG_ERROR(log,
//asdqwe;
bool found = false;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
{
if (ActiveDataPartSet::contains(part_on_replica, part_name))
{
found = true;
LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica);
break;
}
}
if (found)
break;
}
if (!found)
{
/** Такая ситуация возможна при нормальной работе, без потери данных, например, так:
* ReplicatedMergeTreeBlockOutputStream записал кусок, попытался добавить его в ZK,
* получил operation timeout, удалил локальный кусок и бросил исключение,
* а на самом деле, кусок добавился в ZK.
*/
LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. "
<< "There might or might not be a data loss.");
/// Если ни у кого нет такого куска, удалим его из нашей очереди и добавим его в block_numbers.
String month_name = part_name.substr(0, 6);
zookeeper->createIfNotExists(zookeeper_path + "/block_numbers/" + month_name, "");
ActiveDataPartSet::Part part_info;
ActiveDataPartSet::parsePartName(part_name, part_info);
if (part_info.left != part_info.right)
LOG_ERROR(log, "Lost part " << part_name << " is a result of a merge. "
"This means some data is definitely lost (or there's a bug).");
for (size_t index = part_info.left; index <= part_info.right; ++index)
{
AbandonableLockInZooKeeper::createAbandonedIfNotExists(
zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(index), *zookeeper);
}
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/** NOTE: Не удалятся записи в очереди, которые сейчас выполняются.
* Они пофейлятся и положат кусок снова в очередь на проверку.
* Расчитываем, что это редкая ситуация.
*/
for (LogEntries::iterator it = queue.begin(); it != queue.end(); )
{
if (it->new_part_name == part_name)
{
zookeeper->remove(replica_path + "/queue/" + it->znode_name);
queue.erase(it++);
}
else
{
++it;
}
}
}
}
}
}
/// У нас есть этот кусок, и он активен.
@ -1341,11 +1410,35 @@ void StorageReplicatedMergeTree::partCheckThread()
/// Если кусок есть в ZooKeeper, сверим его данные с его чексуммами, а их с ZooKeeper.
if (zookeeper->exists(replica_path + "/parts/" + part_name))
{
asdqwe;
LOG_WARNING(log, "Checking data of part " << part_name << ".");
/// Если кусок сломан, одновременно удалим его из ZK и добавим в очередь задание забрать этот кусок у другой реплики.
/// И удалим кусок локально.
asdqwe;
try
{
auto zk_checksums = MergeTreeData::DataPart::Checksums::parse(
zookeeper->get(replica_path + "/parts/" + part_name + "/checksums"));
zk_checksums.checkEqual(part->checksums, true);
auto zk_columns = NamesAndTypesList::parse(
zookeeper->get(replica_path + "/parts/" + part_name + "/columns"), context.getDataTypeFactory());
if (part->columns != zk_columns)
throw Exception("Columns of local part " + part_name + " are different from ZooKeeper");
MergeTreePartChecker::checkDataPart(
data.getFullPath() + part_name, data.index_granularity, true, context.getDataTypeFactory());
LOG_INFO(log, "Part " << part_name << " looks good.");
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_INFO(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch.");
removePartAndEnqueueFetch(part_name);
/// Удалим кусок локально.
data.deletePart(part);
}
}
/// Если куска нет в ZooKeeper, удалим его локально.
else
@ -1353,11 +1446,12 @@ void StorageReplicatedMergeTree::partCheckThread()
/// Если этот кусок еще и получен в результате слияния, это уже чересчур странно.
if (part->left != part->right)
{
LOG_ERROR(log, );
LOG_ERROR(log, "Unexpected part " << part_name << " is a result of a merge. You have to resolve this manually.");
}
else
{
asdqwe;
LOG_ERROR(log, "Unexpected part " << part_name << ". Removing.");
data.deletePart(part);
}
}
}
@ -1407,10 +1501,7 @@ bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
for (UInt64 number = left->right + 1; number <= right->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
if (AbandonableLockInZooKeeper::check(path, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
{
@ -1512,6 +1603,7 @@ void StorageReplicatedMergeTree::partialShutdown()
queue_updating_event->set();
alter_thread_event->set();
alter_query_event->set();
parts_to_check_event.set();
replica_is_active_node = nullptr;
merger.cancelAll();
@ -1531,6 +1623,8 @@ void StorageReplicatedMergeTree::partialShutdown()
cleanup_thread.join();
if (alter_thread.joinable())
alter_thread.join();
if (part_check_thread.joinable())
part_check_thread.join();
if (queue_task_handle)
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
@ -1565,6 +1659,7 @@ void StorageReplicatedMergeTree::startup()
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
cleanup_thread = std::thread(&StorageReplicatedMergeTree::cleanupThread, this);
alter_thread = std::thread(&StorageReplicatedMergeTree::alterThread, this);
part_check_thread = std::thread(&StorageReplicatedMergeTree::partCheckThread, this);
queue_task_handle = context.getBackgroundPool().addTask(
std::bind(&StorageReplicatedMergeTree::queueTask, this, std::placeholders::_1));
}