This commit is contained in:
Michael Kolupaev 2014-05-07 17:58:20 +04:00
parent df07fa2674
commit 0b7928f6a9
4 changed files with 33 additions and 15 deletions

View File

@ -2,6 +2,7 @@
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Yandex/time2str.h>
namespace DB
@ -20,9 +21,13 @@ public:
{
++block_index;
String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index);
time_t min_date_time = DateLUTSingleton::instance().fromDayNum(DayNum_t(current_block.min_date));
String month_name = toString(Date2OrderedIdentifier(min_date_time) / 100);
storage.zookeeper.tryCreate(storage.zookeeper_path + "/block_numbers/" + month_name, "", zkutil::CreateMode::Persistent);
AbandonableLockInZooKeeper block_number_lock(
storage.zookeeper_path + "/block_numbers/block-",
storage.zookeeper_path + "/block_numbers/" + month_name + "/block-",
storage.zookeeper_path + "/temp", storage.zookeeper);
UInt64 part_number = block_number_lock.getNumber();

View File

@ -869,21 +869,25 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
}
}
/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
/// (чтобы куски пометились как currently_merging).
pullLogsToQueue();
for (size_t i = 0; i + 1 < parts.size(); ++i)
if (success)
{
/// Уберем больше не нужные отметки о несуществующих блоках.
for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/block-" + number_str;
/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
/// (чтобы куски пометились как currently_merging).
pullLogsToQueue();
zookeeper.tryRemove(path);
String month_name = parts[0]->name.substr(0, 6);
for (size_t i = 0; i + 1 < parts.size(); ++i)
{
/// Уберем больше не нужные отметки о несуществующих блоках.
for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str;
zookeeper.tryRemove(path);
}
}
}
}
@ -929,13 +933,15 @@ bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr
if (currently_merging.count(left->name) || currently_merging.count(right->name))
return false;
String month_name = left->name.substr(0, 6);
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
for (UInt64 number = left->right + 1; number <= right->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/block-" + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str;
if (AbandonableLockInZooKeeper::check(path, zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
{

View File

@ -61,6 +61,7 @@ public:
* При остальных ошибках бросает исключение.
*/
ReturnCode::type tryCreate(const std::string & path, const std::string & data, CreateMode::type mode, std::string & pathCreated);
ReturnCode::type tryCreate(const std::string & path, const std::string & data, CreateMode::type mode);
/** Удалить ноду, если ее версия равна version (если -1, подойдет любая версия).
*/

View File

@ -195,6 +195,12 @@ ReturnCode::type ZooKeeper::tryCreate(const std::string & path, const std::strin
return code;
}
ReturnCode::type ZooKeeper::tryCreate(const std::string & path, const std::string & data, CreateMode::type mode)
{
std::string path_created;
return tryCreate(path, data, mode, path_created);
}
void ZooKeeper::remove(const std::string & path, int32_t version)
{
checkNotExpired();