This commit is contained in:
Michael Kolupaev 2014-04-14 14:19:33 +04:00
parent f362d03611
commit b9c4a3419a
5 changed files with 91 additions and 8 deletions

View File

@ -107,6 +107,9 @@ struct MergeTreeSettings
/// Если в таблице parts_to_delay_insert + k кусков, спать insert_delay_step^k миллисекунд перед вставкой каждого блока. /// Если в таблице parts_to_delay_insert + k кусков, спать insert_delay_step^k миллисекунд перед вставкой каждого блока.
/// Таким образом, скорость вставок автоматически замедлится примерно до скорости слияний. /// Таким образом, скорость вставок автоматически замедлится примерно до скорости слияний.
double insert_delay_step = 1.1; double insert_delay_step = 1.1;
/// Для скольки блоков, вставленных с непустым insert ID, хранить хеши в ZooKeeper.
size_t replicated_deduplication_window = 10000;
}; };
class MergeTreeData : public ITableDeclaration class MergeTreeData : public ITableDeclaration

View File

@ -33,7 +33,7 @@ public:
if (!block_id.empty() && storage.zookeeper.tryGet( if (!block_id.empty() && storage.zookeeper.tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str)) storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{ {
LOG_INFO(log, "Block with this ID already exists; ignoring it"); LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it");
/// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять. /// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять.
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str); auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);

View File

@ -227,7 +227,7 @@ private:
/** Является ли эта реплика "ведущей". Ведущая реплика выбирает куски для слияния. /** Является ли эта реплика "ведущей". Ведущая реплика выбирает куски для слияния.
*/ */
bool is_leader_node; bool is_leader_node = false;
InterserverIOEndpointHolderPtr endpoint_holder; InterserverIOEndpointHolderPtr endpoint_holder;
@ -247,9 +247,13 @@ private:
/// Поток, выбирающий куски для слияния. /// Поток, выбирающий куски для слияния.
std::thread merge_selecting_thread; std::thread merge_selecting_thread;
/// Когда последний раз выбрасывали старые данные из ZooKeeper.
time_t clear_old_blocks_time = 0;
time_t clear_old_logs_time = 0;
Logger * log; Logger * log;
volatile bool shutdown_called; volatile bool shutdown_called = false;
StorageReplicatedMergeTree( StorageReplicatedMergeTree(
const String & zookeeper_path_, const String & zookeeper_path_,
@ -302,6 +306,12 @@ private:
void clearOldParts(); void clearOldParts();
/// Удалить из ZooKeeper старые записи в логе.
void clearOldLogs();
/// Удалить из ZooKeeper старые хеши блоков. Это делает ведущая реплика.
void clearOldBlocks();
/// Выполнение заданий из очереди. /// Выполнение заданий из очереди.
/** Кладет в queue записи из ZooKeeper (/replicas/me/queue/). /** Кладет в queue записи из ZooKeeper (/replicas/me/queue/).

View File

@ -66,9 +66,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
cur_max_rows_to_merge_parts *= data.settings.merge_parts_at_night_inc; cur_max_rows_to_merge_parts *= data.settings.merge_parts_at_night_inc;
if (only_small) if (only_small)
{
cur_max_rows_to_merge_parts = data.settings.max_rows_to_merge_parts_second; cur_max_rows_to_merge_parts = data.settings.max_rows_to_merge_parts_second;
}
/// Найдем суммарный размер еще не пройденных кусков (то есть всех). /// Найдем суммарный размер еще не пройденных кусков (то есть всех).
size_t size_of_remaining_parts = 0; size_t size_of_remaining_parts = 0;

View File

@ -32,12 +32,11 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
: :
context(context_), zookeeper(context.getZooKeeper()), context(context_), zookeeper(context.getZooKeeper()),
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_), table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
replica_name(replica_name_), is_leader_node(false), replica_name(replica_name_),
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_,mode_, sign_column_, settings_), index_granularity_,mode_, sign_column_, settings_),
reader(data), writer(data), merger(data), fetcher(data), reader(data), writer(data), merger(data), fetcher(data),
log(&Logger::get("StorageReplicatedMergeTree: " + table_name)), log(&Logger::get("StorageReplicatedMergeTree: " + table_name))
shutdown_called(false)
{ {
if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/') if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/')
zookeeper_path.erase(zookeeper_path.end() - 1); zookeeper_path.erase(zookeeper_path.end() - 1);
@ -335,6 +334,65 @@ void StorageReplicatedMergeTree::clearOldParts()
LOG_DEBUG(log, "Removed " << parts.size() << " old parts"); LOG_DEBUG(log, "Removed " << parts.size() << " old parts");
} }
void StorageReplicatedMergeTree::clearOldLogs()
{
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
for (const String & replica : replicas)
{
String pointer;
if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, pointer))
return;
min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
}
Strings entries = zookeeper.getChildren(replica_path + "/log");
std::sort(entries.begin(), entries.end());
size_t removed = 0;
for (const String & entry : entries)
{
UInt64 index = parse<UInt64>(entry.substr(strlen("log-")));
if (index >= min_pointer)
break;
zookeeper.remove(replica_path + "/log/" + entry);
++removed;
}
if (removed > 0)
LOG_DEBUG(log, "Removed " << removed << " old log entries");
}
void StorageReplicatedMergeTree::clearOldBlocks()
{
zkutil::Stat stat;
if (!zookeeper.exists(zookeeper_path + "/blocks", &stat))
throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
/// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно.
if (static_cast<double>(stat.getnumChildren()) < data.settings.replicated_deduplication_window * 1.1)
return;
Strings blocks = zookeeper.getChildren(zookeeper_path + "/blocks");
std::vector<std::pair<Int64, String> > timed_blocks;
for (const String & block : blocks)
{
zkutil::Stat stat;
zookeeper.exists(zookeeper_path + "/blocks/" + block, &stat);
timed_blocks.push_back(std::make_pair(stat.getczxid(), block));
}
std::sort(timed_blocks.begin(), timed_blocks.end());
for (size_t i = data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i)
{
zookeeper.remove(zookeeper_path + "/blocks/" + timed_blocks[i].second);
}
LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
}
void StorageReplicatedMergeTree::loadQueue() void StorageReplicatedMergeTree::loadQueue()
{ {
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex); Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
@ -621,6 +679,13 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
pullLogsToQueue(); pullLogsToQueue();
clearOldParts(); clearOldParts();
/// Каждую минуту выбрасываем ненужные записи из лога.
if (time(0) - clear_old_logs_time > 60)
{
clear_old_logs_time = time(0);
clearOldLogs();
}
} }
catch (...) catch (...)
{ {
@ -820,6 +885,13 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
if (shutdown_called) if (shutdown_called)
break; break;
/// Каждую минуту выбрасываем старые блоки.
if (time(0) - clear_old_blocks_time > 60)
{
clear_old_blocks_time = time(0);
clearOldBlocks();
}
if (!success) if (!success)
std::this_thread::sleep_for(MERGE_SELECTING_SLEEP); std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
} }