diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index 3511cfa7caf..afd274a2168 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -65,7 +65,17 @@ struct ReplicatedMergeTreeLogEntry bool attach_unreplicated; FuturePartTaggerPtr future_part_tagger; - bool currently_executing = false; /// Доступ под queue_mutex. + + /// Доступ под queue_mutex. + bool currently_executing = false; /// Выполняется ли действие сейчас. + /// Эти несколько полей имеют лишь информационный характер (для просмотра пользователем с помощью системных таблиц). + /// Доступ под queue_mutex. + size_t num_tries = 0; /// Количество попыток выполнить действие (с момента старта сервера; включая выполняющееся). + ExceptionPtr exception; /// Последний эксепшен, в случае безуспешной попытки выполнить действие. + time_t last_attempt_time = 0; /// Время начала последней попытки выполнить действие. + String last_action; /// Что делается сейчас или делалось в последний раз. + String postpone_reason; /// Причина, по которой действие было отложено, если оно отложено. + std::condition_variable execution_complete; /// Пробуждается когда currently_executing становится false. /// Время создания или время копирования из общего лога в очередь конкретной реплики. diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index c6c60058385..dc19b0ddaf3 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1044,6 +1044,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro * - установим nonincrement_block_numbers, чтобы разрешить мерджи через номер потерянного куска; * - добавим кусок в список quorum/failed_parts. * + * TODO Удаление из blocks. + * * Если что-то изменится, то ничего не сделаем - попадём сюда снова в следующий раз. */ @@ -1374,6 +1376,8 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p entry->tagPartAsFuture(*this); queue.splice(queue.end(), queue, it); entry->currently_executing = true; + ++entry->num_tries; + entry->last_attempt_time = time(0); break; } } @@ -1387,24 +1391,33 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p if (!entry) return false; - bool exception = true; + bool was_exception = true; bool success = false; + ExceptionPtr saved_exception; try { - if (executeLogEntry(*entry, pool_context)) + try { - auto zookeeper = getZooKeeper(); - auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name); + if (executeLogEntry(*entry, pool_context)) + { + auto zookeeper = getZooKeeper(); + auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name); - if (code != ZOK) - LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry->znode_name << ": " - << zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often."); + if (code != ZOK) + LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry->znode_name << ": " + << zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often."); - success = true; + success = true; + } + } + catch (...) + { + saved_exception = cloneCurrentException(); + throw; } - exception = false; + was_exception = false; } catch (const Exception & e) { @@ -1431,6 +1444,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p std::lock_guard lock(queue_mutex); entry->currently_executing = false; + entry->exception = saved_exception; entry->execution_complete.notify_all(); if (success) @@ -1449,7 +1463,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p } /// Если не было исключения, не нужно спать. - return !exception; + return !was_exception; } @@ -2399,6 +2413,14 @@ BlockInputStreams StorageReplicatedMergeTree::read( const size_t max_block_size, const unsigned threads) { + /** У таблицы может быть два вида данных: + * - реплицируемые данные; + * - старые, нереплицируемые данные - они лежат отдельно и их целостность никак не контролируется. + * А ещё движок таблицы предоставляет возможность использовать "виртуальные столбцы". + * Один из них - _replicated позволяет определить, из какой части прочитаны данные, + * или, при использовании в WHERE - выбрать данные только из одной части. + */ + Names virt_column_names; Names real_column_names; for (const auto & it : column_names) @@ -2421,7 +2443,7 @@ BlockInputStreams StorageReplicatedMergeTree::read( column->getData()[1] = 1; virtual_columns_block.insert(ColumnWithTypeAndName(column_ptr, new DataTypeUInt8, "_replicated")); - /// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать + /// Если запрошен столбец _replicated, пробуем индексировать. if (!virt_column_names.empty()) VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context); @@ -2431,6 +2453,12 @@ BlockInputStreams StorageReplicatedMergeTree::read( size_t part_index = 0; + /** Настройки parallel_replica_offset и parallel_replicas_count позволяют читать с одной реплики одну часть данных, а с другой - другую. + * Для реплицируемых, данные разбиваются таким же механизмом, как работает секция SAMPLE. + * А для нереплицируемых данных, так как их целостность между репликами не контролируется, + * с первой (settings.parallel_replica_offset == 0) реплики выбираются все данные, а с остальных - никакие. + */ + if ((settings.parallel_replica_offset == 0) && unreplicated_reader && values.count(0)) { res = unreplicated_reader->read(real_column_names, query,