This commit is contained in:
Alexey Milovidov 2015-09-22 01:58:26 +03:00
commit 0a8eaa37e1
2 changed files with 50 additions and 12 deletions

View File

@ -65,7 +65,17 @@ struct ReplicatedMergeTreeLogEntry
bool attach_unreplicated;
FuturePartTaggerPtr future_part_tagger;
bool currently_executing = false; /// Доступ под queue_mutex.
/// Доступ под queue_mutex.
bool currently_executing = false; /// Выполняется ли действие сейчас.
/// Эти несколько полей имеют лишь информационный характер (для просмотра пользователем с помощью системных таблиц).
/// Доступ под queue_mutex.
size_t num_tries = 0; /// Количество попыток выполнить действие (с момента старта сервера; включая выполняющееся).
ExceptionPtr exception; /// Последний эксепшен, в случае безуспешной попытки выполнить действие.
time_t last_attempt_time = 0; /// Время начала последней попытки выполнить действие.
String last_action; /// Что делается сейчас или делалось в последний раз.
String postpone_reason; /// Причина, по которой действие было отложено, если оно отложено.
std::condition_variable execution_complete; /// Пробуждается когда currently_executing становится false.
/// Время создания или время копирования из общего лога в очередь конкретной реплики.

View File

@ -1044,6 +1044,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
* - установим nonincrement_block_numbers, чтобы разрешить мерджи через номер потерянного куска;
* - добавим кусок в список quorum/failed_parts.
*
* TODO Удаление из blocks.
*
* Если что-то изменится, то ничего не сделаем - попадём сюда снова в следующий раз.
*/
@ -1374,6 +1376,8 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p
entry->tagPartAsFuture(*this);
queue.splice(queue.end(), queue, it);
entry->currently_executing = true;
++entry->num_tries;
entry->last_attempt_time = time(0);
break;
}
}
@ -1387,24 +1391,33 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p
if (!entry)
return false;
bool exception = true;
bool was_exception = true;
bool success = false;
ExceptionPtr saved_exception;
try
{
if (executeLogEntry(*entry, pool_context))
try
{
auto zookeeper = getZooKeeper();
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
if (executeLogEntry(*entry, pool_context))
{
auto zookeeper = getZooKeeper();
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
if (code != ZOK)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often.");
if (code != ZOK)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often.");
success = true;
success = true;
}
}
catch (...)
{
saved_exception = cloneCurrentException();
throw;
}
exception = false;
was_exception = false;
}
catch (const Exception & e)
{
@ -1431,6 +1444,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p
std::lock_guard<std::mutex> lock(queue_mutex);
entry->currently_executing = false;
entry->exception = saved_exception;
entry->execution_complete.notify_all();
if (success)
@ -1449,7 +1463,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p
}
/// Если не было исключения, не нужно спать.
return !exception;
return !was_exception;
}
@ -2399,6 +2413,14 @@ BlockInputStreams StorageReplicatedMergeTree::read(
const size_t max_block_size,
const unsigned threads)
{
/** У таблицы может быть два вида данных:
* - реплицируемые данные;
* - старые, нереплицируемые данные - они лежат отдельно и их целостность никак не контролируется.
* А ещё движок таблицы предоставляет возможность использовать "виртуальные столбцы".
* Один из них - _replicated позволяет определить, из какой части прочитаны данные,
* или, при использовании в WHERE - выбрать данные только из одной части.
*/
Names virt_column_names;
Names real_column_names;
for (const auto & it : column_names)
@ -2421,7 +2443,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
column->getData()[1] = 1;
virtual_columns_block.insert(ColumnWithTypeAndName(column_ptr, new DataTypeUInt8, "_replicated"));
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
/// Если запрошен столбец _replicated, пробуем индексировать.
if (!virt_column_names.empty())
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
@ -2431,6 +2453,12 @@ BlockInputStreams StorageReplicatedMergeTree::read(
size_t part_index = 0;
/** Настройки parallel_replica_offset и parallel_replicas_count позволяют читать с одной реплики одну часть данных, а с другой - другую.
* Для реплицируемых, данные разбиваются таким же механизмом, как работает секция SAMPLE.
* А для нереплицируемых данных, так как их целостность между репликами не контролируется,
* с первой (settings.parallel_replica_offset == 0) реплики выбираются все данные, а с остальных - никакие.
*/
if ((settings.parallel_replica_offset == 0) && unreplicated_reader && values.count(0))
{
res = unreplicated_reader->read(real_column_names, query,