dbms: removed bogus messages from error log; improved diagnostics; extended system.replicas table [#METR-17573].

This commit is contained in:
Alexey Milovidov 2015-09-19 08:56:40 +03:00
parent 5af56824f2
commit b068913aeb
3 changed files with 73 additions and 5 deletions

View File

@ -126,6 +126,8 @@ public:
UInt32 queue_oldest_time;
UInt32 inserts_oldest_time;
UInt32 merges_oldest_time;
String oldest_part_to_get;
String oldest_part_to_merge_to;
UInt64 log_max_index;
UInt64 log_pointer;
UInt8 total_replicas;

View File

@ -30,6 +30,43 @@ const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
const Int64 RESERVED_BLOCK_NUMBERS = 200;
/** Для каждого куска есть сразу три места, где он должен быть:
* 1. В оперативке (RAM), MergeTreeData::data_parts, all_data_parts.
* 2. В файловой системе (FS), директория с данными таблицы.
* 3. В ZooKeeper (ZK).
*
* При добавлении куска, его надо добавить сразу в эти три места.
* Это делается так:
* - [FS] сначала записываем кусок во временную директорию на файловой системе;
* - [FS] переименовываем временный кусок в результирующий на файловой системе;
* - [RAM] сразу же после этого добавляем его в data_parts, и удаляем из data_parts покрываемые им куски;
* - [RAM] также устанавливаем объект Transaction, который в случае исключения (в следующем пункте),
* откатит изменения в data_parts (из предыдущего пункта) назад;
* - [ZK] затем отправляем транзакцию (multi) на добавление куска в ZooKeeper (и ещё некоторых действий);
* - [FS, ZK] кстати, удаление покрываемых (старых) кусков из файловой системы, из ZooKeeper и из all_data_parts
* делается отложенно, через несколько минут.
*
* Здесь нет никакой атомарности.
* Можно было бы добиться атомарности с помощью undo/redo логов и флага в DataPart, когда он полностью готов.
* Но это было бы неудобно - пришлось бы писать undo/redo логи для каждого Part-а в ZK, а это увеличило бы и без того большое количество взаимодействий.
*
* Вместо этого, мы вынуждены работать в ситуации, когда в любой момент времени
* (из другого потока, или после рестарта сервера) может наблюдаться недоделанная до конца транзакция.
* (заметим - для этого кусок должен быть в RAM)
* Из этих случаев наиболее частый - когда кусок уже есть в data_parts, но его ещё нет в ZooKeeper.
* Этот случай надо отличить от случая, когда такая ситуация достигается вследствие какого-то повреждения состояния.
*
* Делаем это с помощью порога на время.
* Если кусок достаточно молодой, то его отсутствие в ZooKeeper будем воспринимать оптимистично - как будто он просто не успел ещё туда добавиться
* - как будто транзакция ещё не выполнена, но скоро выполнится.
* А если кусок старый, то его отсутствие в ZooKeeper будем воспринимать как недоделанную транзакцию, которую нужно откатить.
*
* PS. Возможно, было бы лучше добавить в DataPart флаг о том, что кусок вставлен в ZK.
* Но здесь уже слишком легко запутаться с консистентностью этого флага.
*/
const auto MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER = 5 * 60;
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
@ -1400,7 +1437,9 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges is greater than max_replicated_merges_in_queue, so won't select new parts to merge.");
LOG_TRACE(log, "Number of queued merges (" << merges_queued
<< ") is greater than max_replicated_merges_in_queue ("
<< data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge.");
break;
}
@ -1422,9 +1461,16 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
/// Если о каком-то из кусков нет информации в ZK, не будем сливать.
if (!zookeeper->exists(replica_path + "/parts/" + part->name))
{
LOG_WARNING(log, "Part " << part->name << " exists locally but not in ZooKeeper.");
enqueuePartForCheck(part->name);
all_in_zk = false;
if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(0))
{
LOG_WARNING(log, "Part " << part->name << " (that was selected for merge)"
<< " with age " << (time(0) - part->modification_time)
<< " seconds exists locally but not in ZooKeeper."
<< " Won't do merge with that part and will check it.");
enqueuePartForCheck(part->name);
}
}
}
if (!all_in_zk)
@ -1906,16 +1952,22 @@ void StorageReplicatedMergeTree::checkPart(const String & part_name)
data.renameAndDetachPart(part, "broken_");
}
}
else if (part->modification_time + 5 * 60 < time(0))
else if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(0))
{
/// Если куска нет в ZooKeeper, удалим его локально.
/// Возможно, кусок кто-то только что записал, и еще не успел добавить в ZK.
/// Поэтому удаляем только если кусок старый (не очень надежно).
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
LOG_ERROR(log, "Checker: Unexpected part " << part_name << ". Removing.");
LOG_ERROR(log, "Checker: Unexpected part " << part_name << " in filesystem. Removing.");
data.renameAndDetachPart(part, "unexpected_");
}
else
{
LOG_TRACE(log, "Checker: Young part " << part_name
<< " with age " << (time(0) - part->modification_time)
<< " seconds hasn't been added to ZooKeeper yet. It's ok.");
}
}
else
{
@ -2859,7 +2911,10 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
++res.inserts_in_queue;
if (entry->create_time && (!res.inserts_oldest_time || entry->create_time < res.inserts_oldest_time))
{
res.inserts_oldest_time = entry->create_time;
res.oldest_part_to_get = entry->new_part_name;
}
}
if (entry->type == LogEntry::MERGE_PARTS)
@ -2867,7 +2922,10 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
++res.merges_in_queue;
if (entry->create_time && (!res.merges_oldest_time || entry->create_time < res.merges_oldest_time))
{
res.merges_oldest_time = entry->create_time;
res.oldest_part_to_merge_to = entry->new_part_name;
}
}
}
}

View File

@ -33,6 +33,8 @@ StorageSystemReplicas::StorageSystemReplicas(const std::string & name_)
{ "queue_oldest_time", new DataTypeDateTime},
{ "inserts_oldest_time", new DataTypeDateTime},
{ "merges_oldest_time", new DataTypeDateTime},
{ "oldest_part_to_get", new DataTypeString },
{ "oldest_part_to_merge_to",new DataTypeString },
{ "log_max_index", new DataTypeUInt64 },
{ "log_pointer", new DataTypeUInt64 },
{ "total_replicas", new DataTypeUInt8 },
@ -127,6 +129,8 @@ BlockInputStreams StorageSystemReplicas::read(
ColumnWithTypeAndName col_queue_oldest_time { new ColumnUInt32, new DataTypeDateTime, "queue_oldest_time"};
ColumnWithTypeAndName col_inserts_oldest_time{ new ColumnUInt32,new DataTypeDateTime, "inserts_oldest_time"};
ColumnWithTypeAndName col_merges_oldest_time{ new ColumnUInt32, new DataTypeDateTime, "merges_oldest_time"};
ColumnWithTypeAndName col_oldest_part_to_get{ new ColumnString, new DataTypeString, "oldest_part_to_get"};
ColumnWithTypeAndName col_oldest_part_to_merge_to{ new ColumnString, new DataTypeString, "oldest_part_to_merge_to"};
ColumnWithTypeAndName col_log_max_index { new ColumnUInt64, new DataTypeUInt64, "log_max_index"};
ColumnWithTypeAndName col_log_pointer { new ColumnUInt64, new DataTypeUInt64, "log_pointer"};
ColumnWithTypeAndName col_total_replicas { new ColumnUInt8, new DataTypeUInt8, "total_replicas"};
@ -155,6 +159,8 @@ BlockInputStreams StorageSystemReplicas::read(
col_queue_oldest_time .column->insert(UInt64(status.queue_oldest_time));
col_inserts_oldest_time .column->insert(UInt64(status.inserts_oldest_time));
col_merges_oldest_time .column->insert(UInt64(status.merges_oldest_time));
col_oldest_part_to_get .column->insert(status.oldest_part_to_get);
col_oldest_part_to_merge_to.column->insert(status.oldest_part_to_merge_to);
col_log_max_index .column->insert(status.log_max_index);
col_log_pointer .column->insert(status.log_pointer);
col_total_replicas .column->insert(UInt64(status.total_replicas));
@ -180,6 +186,8 @@ BlockInputStreams StorageSystemReplicas::read(
col_queue_oldest_time,
col_inserts_oldest_time,
col_merges_oldest_time,
col_oldest_part_to_get,
col_oldest_part_to_merge_to,
col_log_max_index,
col_log_pointer,
col_total_replicas,