diff --git a/dbms/include/DB/Common/ProfileEvents.h b/dbms/include/DB/Common/ProfileEvents.h index 4ebf38f2f27..4f521a42168 100644 --- a/dbms/include/DB/Common/ProfileEvents.h +++ b/dbms/include/DB/Common/ProfileEvents.h @@ -33,6 +33,8 @@ M(ObsoleteReplicatedParts, "Replicated parts rendered obsolete by fetches") \ M(ReplicatedPartMerges, "Replicated part merges") \ M(ReplicatedPartFetchesOfMerged, "Replicated part merges replaced with fetches") \ + M(ReplicatedPartChecks, "Replicated part checks") \ + M(ReplicatedPartChecksFailed, "Replicated part checks failed") \ \ M(END, "") diff --git a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h index 96630fbcdce..9543aa991e9 100644 --- a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h +++ b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h @@ -73,6 +73,8 @@ public: /// Кладет в DataPart данные из имени кусочка. static void parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches = nullptr); + static bool contains(const String & outer_part_name, const String & inner_part_name); + private: typedef std::set Parts; diff --git a/dbms/include/DB/Storages/MergeTree/BackgroundProcessingPool.h b/dbms/include/DB/Storages/MergeTree/BackgroundProcessingPool.h index 292b4635f62..de2b10cf11e 100644 --- a/dbms/include/DB/Storages/MergeTree/BackgroundProcessingPool.h +++ b/dbms/include/DB/Storages/MergeTree/BackgroundProcessingPool.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -32,7 +33,7 @@ public: public: void incrementCounter(const String & name, int value = 1) { - Poco::ScopedLock lock(pool.mutex); + std::unique_lock lock(pool.mutex); local_counters[name] += value; pool.counters[name] += value; } @@ -56,9 +57,12 @@ public: /// Переставить таск в начало очереди и разбудить какой-нибудь поток. void wake() { - Poco::ScopedLock lock(pool.mutex); + std::unique_lock lock(pool.mutex); pool.tasks.splice(pool.tasks.begin(), pool.tasks, iterator); - pool.wake_event.set(); + + /// Не очень надежно: если все потоки сейчас выполняют работу, этот вызов никого не разбудит, + /// и все будут спать в конце итерации. + pool.wake_event.notify_one(); } private: @@ -83,8 +87,8 @@ public: if (size_ <= 0) throw Exception("Invalid number of threads: " + toString(size_), ErrorCodes::ARGUMENT_OUT_OF_BOUND); - Poco::ScopedLock tlock(threads_mutex); - Poco::ScopedLock lock(mutex); + std::unique_lock tlock(threads_mutex); + std::unique_lock lock(mutex); if (size_ == size) return; @@ -100,30 +104,30 @@ public: int getNumberOfThreads() { - Poco::ScopedLock lock(mutex); + std::unique_lock lock(mutex); return size; } void setSleepTime(double seconds) { - Poco::ScopedLock lock(mutex); + std::unique_lock lock(mutex); sleep_seconds = seconds; } int getCounter(const String & name) { - Poco::ScopedLock lock(mutex); + std::unique_lock lock(mutex); return counters[name]; } TaskHandle addTask(const Task & task) { - Poco::ScopedLock lock(threads_mutex); + std::unique_lock lock(threads_mutex); TaskHandle res(new TaskInfo(*this, task)); { - Poco::ScopedLock lock(mutex); + std::unique_lock lock(mutex); tasks.push_back(res); res->iterator = --tasks.end(); } @@ -142,7 +146,7 @@ public: void removeTask(const TaskHandle & task) { - Poco::ScopedLock tlock(threads_mutex); + std::unique_lock tlock(threads_mutex); /// Дождемся завершения всех выполнений этой задачи. { @@ -151,7 +155,7 @@ public: } { - Poco::ScopedLock lock(mutex); + std::unique_lock lock(mutex); auto it = std::find(tasks.begin(), tasks.end(), task); if (it == tasks.end()) throw Exception("Task not found", ErrorCodes::LOGICAL_ERROR); @@ -161,6 +165,7 @@ public: if (tasks.empty()) { shutdown = true; + wake_event.notify_all(); for (std::thread & thread : threads) thread.join(); threads.clear(); @@ -172,12 +177,12 @@ public: { try { - Poco::ScopedLock lock(threads_mutex); + std::unique_lock lock(threads_mutex); if (!threads.empty()) { LOG_ERROR(&Logger::get("~BackgroundProcessingPool"), "Destroying non-empty BackgroundProcessingPool"); shutdown = true; - wake_event.set(); /// NOTE: это разбудит только один поток. Лучше было бы разбудить все. + wake_event.notify_all(); for (std::thread & thread : threads) thread.join(); } @@ -192,15 +197,16 @@ private: typedef std::list Tasks; typedef std::vector Threads; - Poco::FastMutex threads_mutex; - Poco::FastMutex mutex; + std::mutex threads_mutex; + std::mutex mutex; int size; Tasks tasks; /// Таски в порядке, в котором мы планируем их выполнять. Threads threads; - Poco::Event wake_event; Counters counters; double sleep_seconds; - bool shutdown; + + volatile bool shutdown; + std::condition_variable wake_event; void threadFunction() { @@ -215,7 +221,7 @@ private: TaskHandle task; { - Poco::ScopedLock lock(mutex); + std::unique_lock lock(mutex); if (!tasks.empty()) { @@ -243,7 +249,7 @@ private: if (task->function(context)) { /// Если у таска получилось выполнить какую-то работу, запустим его снова без паузы. - Poco::ScopedLock lock(mutex); + std::unique_lock lock(mutex); auto it = std::find(tasks.begin(), tasks.end(), task); if (it != tasks.end()) @@ -262,7 +268,7 @@ private: /// Вычтем все счетчики обратно. if (!counters_diff.empty()) { - Poco::ScopedLock lock(mutex); + std::unique_lock lock(mutex); for (const auto & it : counters_diff) { counters[it.first] -= it.second; @@ -274,7 +280,8 @@ private: if (need_sleep) { - wake_event.tryWait(sleep_seconds * 1000. / tasks_count); + std::unique_lock lock(mutex); + wake_event.wait_for(lock, std::chrono::duration(sleep_seconds / tasks_count)); } } } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h index 288eb999dea..64396a5aeef 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h @@ -109,9 +109,10 @@ protected: if (!reader) { UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL; - reader.reset(new MergeTreeReader(path, columns, uncompressed_cache, storage, all_mark_ranges)); + reader.reset(new MergeTreeReader(path, owned_data_part->name, columns, uncompressed_cache, storage, all_mark_ranges)); if (prewhere_actions) - pre_reader.reset(new MergeTreeReader(path, pre_columns, uncompressed_cache, storage, all_mark_ranges)); + pre_reader.reset(new MergeTreeReader(path, owned_data_part->name, pre_columns, uncompressed_cache, storage, + all_mark_ranges)); } if (prewhere_actions) @@ -266,9 +267,9 @@ protected: if (remaining_mark_ranges.empty()) { /** Закрываем файлы (ещё до уничтожения объекта). - * Чтобы при создании многих источников, но одновременном чтении только из нескольких, - * буферы не висели в памяти. - */ + * Чтобы при создании многих источников, но одновременном чтении только из нескольких, + * буферы не висели в памяти. + */ reader.reset(); pre_reader.reset(); part_columns_lock.reset(); diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 8d019c9b31d..cdd4d32f404 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -118,6 +118,9 @@ struct MergeTreeSettings class MergeTreeData : public ITableDeclaration { public: + /// Функция, которую можно вызвать, если есть подозрение, что данные куска испорчены. + typedef std::function BrokenPartCallback; + /// Описание куска с данными. struct DataPart : public ActiveDataPartSet::Part { @@ -222,7 +225,7 @@ public: std::atomic size_in_bytes; /// размер в байтах, 0 - если не посчитано; /// atomic, чтобы можно было не заботиться о блокировках при ALTER. time_t modification_time; - mutable time_t remove_time; /// Когда кусок убрали из рабочего набора. + mutable time_t remove_time = std::numeric_limits::max(); /// Когда кусок убрали из рабочего набора. /// Если true, деструктор удалит директорию с куском. bool is_temp = false; @@ -531,6 +534,8 @@ public: Aggregating, }; + static void doNothing(const String & name) {} + /** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце), * (корректность имён и путей не проверяется) * состоящую из указанных столбцов. @@ -550,8 +555,8 @@ public: const String & sign_column_, const MergeTreeSettings & settings_, const String & log_name_, - bool require_part_metadata_ - ); + bool require_part_metadata_, + BrokenPartCallback broken_part_callback_ = &MergeTreeData::doNothing); std::string getModePrefix() const; @@ -615,8 +620,9 @@ public: void renameAndDetachPart(DataPartPtr part, const String & prefix); /** Убрать кусок из рабочего набора. Его данные удалятся при вызове clearOldParts, когда их перестанут читать. + * Если clear_without_timeout, данные будут удалены при следующем clearOldParts, игнорируя old_parts_lifetime. */ - void deletePart(DataPartPtr part); + void deletePart(DataPartPtr part, bool clear_without_timeout); /** Удалить неактуальные куски. Возвращает имена удаленных кусков. */ @@ -651,6 +657,12 @@ public: /// Нужно вызывать под залоченным lockStructureForAlter(). void setColumnsList(const NamesAndTypesList & new_columns) { columns = new NamesAndTypesList(new_columns); } + /// Нужно вызвать, если есть подозрение, что данные куска испорчены. + void reportBrokenPart(const String & name) + { + broken_part_callback(name); + } + ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; } SortDescription getSortDescription() const { return sort_descr; } @@ -679,6 +691,8 @@ private: NamesAndTypesListPtr columns; + BrokenPartCallback broken_part_callback; + String log_name; Logger * log; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h b/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h index 2a0a314e4ea..4490cd9ebdb 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h @@ -14,7 +14,7 @@ public: * - Для массивов и строк проверяет соответствие размеров и количества данных. * - Проверяет правильность засечек. * Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи). - * Если strict, требует, чтобы для всех столбцов из columns.txt были файлы, и чтобы засечки не указывали в конец сжатого блока. + * Если strict, требует, чтобы для всех столбцов из columns.txt были файлы. * Если verbose, пишет в stderr прогресс и ошибки, и не останавливается при первой ошибке. */ static void checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory, diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h index 7a6b936b8e9..35ce03059a7 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h @@ -37,12 +37,20 @@ class MergeTreeReader typedef std::map OffsetColumns; public: - MergeTreeReader(const String & path_, /// Путь к куску + MergeTreeReader(const String & path_, const String & part_name_, /// Путь к куску const NamesAndTypesList & columns_, bool use_uncompressed_cache_, MergeTreeData & storage_, const MarkRanges & all_mark_ranges) - : path(path_), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_) + : path(path_), part_name(part_name_), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_) { - for (const NameAndTypePair & column : columns) - addStream(column.name, *column.type, all_mark_ranges); + try + { + for (const NameAndTypePair & column : columns) + addStream(column.name, *column.type, all_mark_ranges); + } + catch (...) + { + storage.reportBrokenPart(part_name); + throw; + } } /** Если столбцов нет в блоке, добавляет их, если есть - добавляет прочитанные значения к ним в конец. @@ -109,10 +117,19 @@ public: } catch (const Exception & e) { + if (e.code() != ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING) + storage.reportBrokenPart(part_name); + /// Более хорошая диагностика. throw Exception(e.message() + " (while reading from part " + path + " from mark " + toString(from_mark) + " to " + toString(to_mark) + ")", e.code()); } + catch (...) + { + storage.reportBrokenPart(part_name); + + throw; + } } /// Заполняет столбцы, которых нет в блоке, значениями по умолчанию. @@ -291,6 +308,7 @@ private: typedef std::map > FileStreams; String path; + String part_name; FileStreams streams; NamesAndTypesList columns; bool use_uncompressed_cache; diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h index 67f4f0b2d2a..f0300a5ab8b 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h @@ -10,52 +10,19 @@ namespace DB { +class StorageReplicatedMergeTree; + class ReplicatedMergeTreePartsServer : public InterserverIOEndpoint { public: - ReplicatedMergeTreePartsServer(MergeTreeData & data_, StoragePtr owned_storage_) : data(data_), - owned_storage(owned_storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsServer)")) {} + ReplicatedMergeTreePartsServer(MergeTreeData & data_, StorageReplicatedMergeTree & storage_) : data(data_), + storage(storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsServer)")) {} - void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override - { - String part_name = params.get("part"); - LOG_TRACE(log, "Sending part " << part_name); - - auto storage_lock = owned_storage->lockStructure(false); - - MergeTreeData::DataPartPtr part = findPart(part_name); - - Poco::ScopedReadRWLock part_lock(part->columns_lock); - - /// Список файлов возьмем из списка контрольных сумм. - MergeTreeData::DataPart::Checksums checksums = part->checksums; - /// Добавим файлы, которых нет в списке контрольных сумм. - checksums.files["checksums.txt"]; - checksums.files["columns.txt"]; - - writeBinary(checksums.files.size(), out); - for (const auto & it : checksums.files) - { - String path = data.getFullPath() + part_name + "/" + it.first; - UInt64 size = Poco::File(path).getSize(); - - writeStringBinary(it.first, out); - writeBinary(size, out); - - ReadBufferFromFile file_in(path); - HashingWriteBuffer hashing_out(out); - copyData(file_in, hashing_out); - - if (hashing_out.count() != size) - throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); - - writeBinary(hashing_out.getHash(), out); - } - } + void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override; private: MergeTreeData & data; - StoragePtr owned_storage; + StorageReplicatedMergeTree & storage; Logger * log; @@ -78,60 +45,7 @@ public: const String & part_name, const String & replica_path, const String & host, - int port) - { - ReadBufferFromHTTP::Params params = { - std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path), - std::make_pair("part", part_name), - std::make_pair("compress", "false")}; - ReadBufferFromHTTP in(host, port, params); - - String part_path = data.getFullPath() + "tmp_" + part_name + "/"; - if (!Poco::File(part_path).createDirectory()) - throw Exception("Directory " + part_path + " already exists"); - - MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); - new_data_part->name = "tmp_" + part_name; - new_data_part->is_temp = true; - - size_t files; - readBinary(files, in); - MergeTreeData::DataPart::Checksums checksums; - for (size_t i = 0; i < files; ++i) - { - String file_name; - UInt64 file_size; - - readStringBinary(file_name, in); - readBinary(file_size, in); - - WriteBufferFromFile file_out(part_path + file_name); - HashingWriteBuffer hashing_out(file_out); - copyData(in, hashing_out, file_size); - - uint128 expected_hash; - readBinary(expected_hash, in); - - if (expected_hash != hashing_out.getHash()) - throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path); - - if (file_name != "checksums.txt" && - file_name != "columns.txt") - checksums.addFile(file_name, file_size, expected_hash); - } - - assertEOF(in); - - ActiveDataPartSet::parsePartName(part_name, *new_data_part); - new_data_part->modification_time = time(0); - new_data_part->loadColumns(); - new_data_part->loadChecksums(); - new_data_part->loadIndex(); - - new_data_part->checksums.checkEqual(checksums, false); - - return new_data_part; - } + int port); private: MergeTreeData & data; diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 801050ceb7c..30954786b7e 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -70,6 +70,9 @@ public: bool supportsIndexForIn() const override { return true; } + /// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке. + void enqueuePartForCheck(const String & name); + private: friend class ReplicatedMergeTreeBlockOutputStream; @@ -158,6 +161,7 @@ private: typedef std::list LogEntries; typedef std::set StringSet; + typedef std::list StringList; Context & context; zkutil::ZooKeeperPtr zookeeper; @@ -179,6 +183,15 @@ private: */ StringSet future_parts; + /** Куски, для которых нужно проверить одно из двух: + * - Если кусок у нас есть, сверить, его данные с его контрольными суммами, а их с ZooKeeper. + * - Если куска у нас нет, проверить, есть ли он (или покрывающий его кусок) хоть у кого-то. + */ + StringSet parts_to_check_set; + StringList parts_to_check_queue; + Poco::FastMutex parts_to_check_mutex; + Poco::Event parts_to_check_event; + String database_name; String table_name; String full_path; @@ -218,7 +231,7 @@ private: std::unique_ptr unreplicated_reader; std::unique_ptr unreplicated_merger; - /// Потоки. + /// Потоки: /// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь. std::thread queue_updating_thread; @@ -241,6 +254,8 @@ private: std::thread alter_thread; zkutil::EventPtr alter_thread_event = zkutil::EventPtr(new Poco::Event); + /// Поток, проверяющий данные кусков. + std::thread part_check_thread; /// Событие, пробуждающее метод alter от ожидания завершения запроса ALTER. zkutil::EventPtr alter_query_event = zkutil::EventPtr(new Poco::Event); @@ -314,6 +329,9 @@ private: */ void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops); + /// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками. + void removePartAndEnqueueFetch(const String & part_name); + void clearOldParts(); /// Удалить из ZooKeeper старые записи в логе. @@ -343,7 +361,7 @@ private: */ bool executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context); - /** В бесконечном цикле обновляет очередь. + /** Обновляет очередь. */ void queueUpdatingThread(); @@ -355,19 +373,23 @@ private: void becomeLeader(); - /** В бесконечном цикле выбирает куски для слияния и записывает в лог. + /** Выбирает куски для слияния и записывает в лог. */ void mergeSelectingThread(); - /** В бесконечном цикле вызывает clearOldBlocks. + /** Удаляет устаревшие данные. */ void cleanupThread(); - /** В бесконечном цикле проверяет, не нужно ли сделать локальный ALTER, и делает его. + /** Делает локальный ALTER, когда список столбцов в ZooKeeper меняется. */ void alterThread(); - /** В бесконечном цикле проверяет, не протухла ли сессия в ZooKeeper. + /** Проверяет целостность кусков. + */ + void partCheckThread(); + + /** Когда сессия в ZooKeeper протухает, переходит на новую. */ void restartingThread(); diff --git a/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp b/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp index e6a116543df..8f6bd3f8eef 100644 --- a/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp +++ b/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp @@ -140,4 +140,12 @@ void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, con part.right_month = date_lut.toFirstDayNumOfMonth(part.right_date); } +bool ActiveDataPartSet::contains(const String & outer_part_name, const String & inner_part_name) +{ + Part outer, inner; + parsePartName(outer_part_name, outer); + parsePartName(inner_part_name, inner); + return outer.contains(inner); +} + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index a392c4ae85d..3db08ccbcb5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -27,15 +27,17 @@ MergeTreeData::MergeTreeData( const String & sign_column_, const MergeTreeSettings & settings_, const String & log_name_, - bool require_part_metadata_) + bool require_part_metadata_, + BrokenPartCallback broken_part_callback_) : context(context_), date_column_name(date_column_name_), sampling_expression(sampling_expression_), index_granularity(index_granularity_), mode(mode_), sign_column(sign_column_), settings(settings_), primary_expr_ast(primary_expr_ast_->clone()), require_part_metadata(require_part_metadata_), - full_path(full_path_), columns(columns_), log_name(log_name_), - log(&Logger::get(log_name + " (Data)")) + full_path(full_path_), columns(columns_), + broken_part_callback(broken_part_callback_), + log_name(log_name_), log(&Logger::get(log_name + " (Data)")) { /// создаём директорию, если её нет Poco::File(full_path).createDirectories(); @@ -265,7 +267,8 @@ Strings MergeTreeData::clearOldParts() { int ref_count = it->use_count(); if (ref_count == 1 && /// После этого ref_count не может увеличиться. - (*it)->remove_time + settings.old_parts_lifetime < now) + (*it)->remove_time < now && + now - (*it)->remove_time > settings.old_parts_lifetime) { LOG_DEBUG(log, "Removing part " << (*it)->name); @@ -532,6 +535,9 @@ void MergeTreeData::AlterDataPartTransaction::commit() mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path); + /// TODO: можно не сбрасывать кеши при добавлении столбца. + data_part->storage.context.resetCaches(); + clear(); } catch (...) @@ -704,13 +710,15 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix) Poco::ScopedLock lock_all(all_data_parts_mutex); if (!all_data_parts.erase(part)) throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART); + part->remove_time = time(0); data_parts.erase(part); part->renameAddPrefix(prefix); } -void MergeTreeData::deletePart(DataPartPtr part) +void MergeTreeData::deletePart(DataPartPtr part, bool clear_without_timeout) { Poco::ScopedLock lock(data_parts_mutex); + part->remove_time = clear_without_timeout ? 0 : time(0); data_parts.erase(part); } diff --git a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp index 37bd5601a5b..b7055da37f4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp @@ -107,16 +107,13 @@ struct Stream if (uncompressed_hashing_buf.position() == uncompressed_hashing_buf.buffer().end()) { - if (!strict) - { - /// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока, - /// и на начало следующего. - data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed(); - data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset(); + /// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока, + /// и на начало следующего. + data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed(); + data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset(); - if (mrk_mark == data_mark) - return; - } + if (mrk_mark == data_mark) + return; uncompressed_hashing_buf.next(); } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartsExchange.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartsExchange.cpp new file mode 100644 index 00000000000..da860c75357 --- /dev/null +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartsExchange.cpp @@ -0,0 +1,122 @@ +#include +#include + + +namespace DB +{ + +void ReplicatedMergeTreePartsServer::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) +{ + String part_name = params.get("part"); + LOG_TRACE(log, "Sending part " << part_name); + + try + { + auto storage_lock = storage.lockStructure(false); + + MergeTreeData::DataPartPtr part = findPart(part_name); + + Poco::ScopedReadRWLock part_lock(part->columns_lock); + + /// Список файлов возьмем из списка контрольных сумм. + MergeTreeData::DataPart::Checksums checksums = part->checksums; + /// Добавим файлы, которых нет в списке контрольных сумм. + checksums.files["checksums.txt"]; + checksums.files["columns.txt"]; + + MergeTreeData::DataPart::Checksums data_checksums; + + writeBinary(checksums.files.size(), out); + for (const auto & it : checksums.files) + { + String file_name = it.first; + + String path = data.getFullPath() + part_name + "/" + file_name; + UInt64 size = Poco::File(path).getSize(); + + writeStringBinary(it.first, out); + writeBinary(size, out); + + ReadBufferFromFile file_in(path); + HashingWriteBuffer hashing_out(out); + copyData(file_in, hashing_out); + + if (hashing_out.count() != size) + throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); + + writeBinary(hashing_out.getHash(), out); + + if (file_name != "checksums.txt" && + file_name != "columns.txt") + data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash()); + } + + part->checksums.checkEqual(data_checksums, false); + } + catch (...) + { + storage.enqueuePartForCheck(part_name); + throw; + } +} + +MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart( + const String & part_name, + const String & replica_path, + const String & host, + int port) +{ + ReadBufferFromHTTP::Params params = { + std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path), + std::make_pair("part", part_name), + std::make_pair("compress", "false")}; + ReadBufferFromHTTP in(host, port, params); + + String part_path = data.getFullPath() + "tmp_" + part_name + "/"; + if (!Poco::File(part_path).createDirectory()) + throw Exception("Directory " + part_path + " already exists"); + + MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); + new_data_part->name = "tmp_" + part_name; + new_data_part->is_temp = true; + + size_t files; + readBinary(files, in); + MergeTreeData::DataPart::Checksums checksums; + for (size_t i = 0; i < files; ++i) + { + String file_name; + UInt64 file_size; + + readStringBinary(file_name, in); + readBinary(file_size, in); + + WriteBufferFromFile file_out(part_path + file_name); + HashingWriteBuffer hashing_out(file_out); + copyData(in, hashing_out, file_size); + + uint128 expected_hash; + readBinary(expected_hash, in); + + if (expected_hash != hashing_out.getHash()) + throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path); + + if (file_name != "checksums.txt" && + file_name != "columns.txt") + checksums.addFile(file_name, file_size, expected_hash); + } + + assertEOF(in); + + ActiveDataPartSet::parsePartName(part_name, *new_data_part); + new_data_part->modification_time = time(0); + new_data_part->loadColumns(); + new_data_part->loadChecksums(); + new_data_part->loadIndex(); + + new_data_part->checksums.checkEqual(checksums, false); + + return new_data_part; +} + +} diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index e48d543e82d..850bc6d1c8f 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -35,7 +36,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_), replica_name(replica_name_), data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, - index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true), + index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true, + std::bind(&StorageReplicatedMergeTree::enqueuePartForCheck, this, std::placeholders::_1)), reader(data), writer(data), merger(data), fetcher(data), log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")), shutdown_event(false), permanent_shutdown_event(false) @@ -119,7 +121,7 @@ StoragePtr StorageReplicatedMergeTree::create( if (!res->is_read_only) { String endpoint_name = "ReplicatedMergeTree:" + res->replica_path; - InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr); + InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, *res); res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler()); } return res_ptr; @@ -660,7 +662,7 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev Strings entries = zookeeper->getChildren(zookeeper_path + "/log"); index = entries.empty() ? 0 : parse(std::min_element(entries.begin(), entries.end())->substr(strlen("log-"))); - zookeeper->set(replica_path + "/log_pointer", toString(index), zkutil::CreateMode::Persistent); + zookeeper->set(replica_path + "/log_pointer", toString(index)); } else { @@ -893,6 +895,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro return false; } } + + /// Если ни у кого нет куска, и в очереди нет слияний с его участием, проверим, есть ли у кого-то покрывающий его. + if (replica.empty()) + enqueuePartForCheck(entry.new_part_name); } catch (...) { @@ -1082,10 +1088,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() /// Уберем больше не нужные отметки о несуществующих блоках. for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number) { - String number_str = toString(number); - while (number_str.size() < 10) - number_str = '0' + number_str; - String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str; + String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number); zookeeper->tryRemove(path); } @@ -1190,7 +1193,8 @@ void StorageReplicatedMergeTree::alterThread() for (const MergeTreeData::DataPartPtr & part : parts) { /// Обновим кусок и запишем результат во временные файлы. - /// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например, нода /flags/force_alter. + /// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например, + /// нода /flags/force_alter. auto transaction = data.alterDataPart(part, columns); if (!transaction) @@ -1248,6 +1252,250 @@ void StorageReplicatedMergeTree::alterThread() LOG_DEBUG(log, "alter thread finished"); } +void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name) +{ + String part_path = replica_path + "/parts/" + part_name; + + LogEntry log_entry; + log_entry.type = LogEntry::GET_PART; + log_entry.source_replica = ""; + log_entry.new_part_name = part_name; + + zkutil::Ops ops; + ops.push_back(new zkutil::Op::Create( + replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(), + zkutil::CreateMode::PersistentSequential)); + ops.push_back(new zkutil::Op::Remove(part_path + "/checksums", -1)); + ops.push_back(new zkutil::Op::Remove(part_path + "/columns", -1)); + ops.push_back(new zkutil::Op::Remove(part_path, -1)); + auto results = zookeeper->multi(ops); + + { + Poco::ScopedLock lock(queue_mutex); + + String path_created = dynamic_cast(ops[0]).getPathCreated(); + log_entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); + log_entry.addResultToVirtualParts(*this); + queue.push_back(log_entry); + } +} + +void StorageReplicatedMergeTree::enqueuePartForCheck(const String & name) +{ + Poco::ScopedLock lock(parts_to_check_mutex); + + if (parts_to_check_set.count(name)) + return; + parts_to_check_queue.push_back(name); + parts_to_check_set.insert(name); + parts_to_check_event.set(); +} + +void StorageReplicatedMergeTree::partCheckThread() +{ + while (!shutdown_called) + { + try + { + /// Достанем из очереди кусок для проверки. + String part_name; + { + Poco::ScopedLock lock(parts_to_check_mutex); + if (parts_to_check_queue.empty()) + { + if (!parts_to_check_set.empty()) + { + LOG_ERROR(log, "Non-empty parts_to_check_set with empty parts_to_check_queue. This is a bug."); + parts_to_check_set.clear(); + } + } + else + { + part_name = parts_to_check_queue.front(); + } + } + if (part_name.empty()) + { + parts_to_check_event.wait(); + continue; + } + + LOG_WARNING(log, "Checking part " << part_name); + ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks); + + auto part = data.getContainingPart(part_name); + String part_path = replica_path + "/parts/" + part_name; + + /// Этого или покрывающего куска у нас нет. + if (!part) + { + /// Если кусок есть в ZooKeeper, удалим его оттуда и добавим в очередь задание скачать его. + if (zookeeper->exists(part_path)) + { + LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. " + "Removing from ZooKeeper and queueing a fetch."); + ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); + + removePartAndEnqueueFetch(part_name); + } + /// Если куска нет в ZooKeeper, проверим есть ли он хоть у кого-то. + else + { + ActiveDataPartSet::Part part_info; + ActiveDataPartSet::parsePartName(part_name, part_info); + + /** Будем проверять только куски, не полученные в результате слияния. + * Для кусков, полученных в результате слияния такая проверка была бы некорректной, + * потому что слитого куска может еще ни у кого не быть. + */ + if (part_info.left == part_info.right) + { + LOG_WARNING(log, "Checking if anyone has part covering " << part_name << "."); + + bool found = false; + Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); + for (const String & replica : replicas) + { + Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts"); + for (const String & part_on_replica : parts) + { + if (part_on_replica == part_name || ActiveDataPartSet::contains(part_on_replica, part_name)) + { + found = true; + LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica); + break; + } + } + if (found) + break; + } + + if (!found) + { + /** Такая ситуация возможна при нормальной работе, без потери данных, например, так: + * ReplicatedMergeTreeBlockOutputStream записал кусок, попытался добавить его в ZK, + * получил operation timeout, удалил локальный кусок и бросил исключение, + * а на самом деле, кусок добавился в ZK. + */ + LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. " + << "There might or might not be a data loss."); + ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); + + /** Если ни у кого нет такого куска, удалим его из нашей очереди. + * + * Еще можно было бы добавить его в block_numbers, чтобы он не мешал слияниям, + * но если так сделать, ZooKeeper почему-то пропустит один номер для автоинкремента, + * и в номерах блоков все равно останется дырка. + * TODO: можно это исправить, сделав две директории block_numbers: для автоинкрементных и ручных нод. + */ + + { + Poco::ScopedLock lock(queue_mutex); + + /** NOTE: Не удалятся записи в очереди, которые сейчас выполняются. + * Они пофейлятся и положат кусок снова в очередь на проверку. + * Расчитываем, что это редкая ситуация. + */ + for (LogEntries::iterator it = queue.begin(); it != queue.end(); ) + { + if (it->new_part_name == part_name) + { + zookeeper->remove(replica_path + "/queue/" + it->znode_name); + queue.erase(it++); + } + else + { + ++it; + } + } + } + } + } + } + } + /// У нас есть этот кусок, и он активен. + else if (part->name == part_name) + { + /// Если кусок есть в ZooKeeper, сверим его данные с его чексуммами, а их с ZooKeeper. + if (zookeeper->exists(replica_path + "/parts/" + part_name)) + { + LOG_WARNING(log, "Checking data of part " << part_name << "."); + + try + { + auto zk_checksums = MergeTreeData::DataPart::Checksums::parse( + zookeeper->get(replica_path + "/parts/" + part_name + "/checksums")); + zk_checksums.checkEqual(part->checksums, true); + + auto zk_columns = NamesAndTypesList::parse( + zookeeper->get(replica_path + "/parts/" + part_name + "/columns"), context.getDataTypeFactory()); + if (part->columns != zk_columns) + throw Exception("Columns of local part " + part_name + " are different from ZooKeeper"); + + MergeTreePartChecker::checkDataPart( + data.getFullPath() + part_name, data.index_granularity, true, context.getDataTypeFactory()); + + LOG_INFO(log, "Part " << part_name << " looks good."); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + + LOG_ERROR(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch."); + ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); + + removePartAndEnqueueFetch(part_name); + + /// Удалим кусок локально. + data.deletePart(part, true); + } + } + /// Если куска нет в ZooKeeper, удалим его локально. + else + { + ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); + + /// Если этот кусок еще и получен в результате слияния, это уже чересчур странно. + if (part->left != part->right) + { + LOG_ERROR(log, "Unexpected part " << part_name << " is a result of a merge. You have to resolve this manually."); + } + else + { + LOG_ERROR(log, "Unexpected part " << part_name << ". Removing."); + data.deletePart(part, false); + } + } + } + else + { + /// Если у нас есть покрывающий кусок, игнорируем все проблемы с этим куском. + /// В худшем случае в лог еще old_parts_lifetime секунд будут валиться ошибки, пока кусок не удалится как старый. + } + + /// Удалим кусок из очереди. + { + Poco::ScopedLock lock(parts_to_check_mutex); + if (parts_to_check_queue.empty() || parts_to_check_queue.front() != part_name) + { + LOG_ERROR(log, "Someone changed parts_to_check_queue.front(). This is a bug."); + } + else + { + parts_to_check_queue.pop_front(); + parts_to_check_set.erase(part_name); + } + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + parts_to_check_event.tryWait(ERROR_SLEEP_MS); + } + } +} + + bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) { /// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать. @@ -1265,10 +1513,7 @@ bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr /// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам. for (UInt64 number = left->right + 1; number <= right->left - 1; ++number) { - String number_str = toString(number); - while (number_str.size() < 10) - number_str = '0' + number_str; - String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str; + String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number); if (AbandonableLockInZooKeeper::check(path, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED) { @@ -1370,6 +1615,7 @@ void StorageReplicatedMergeTree::partialShutdown() queue_updating_event->set(); alter_thread_event->set(); alter_query_event->set(); + parts_to_check_event.set(); replica_is_active_node = nullptr; merger.cancelAll(); @@ -1389,6 +1635,8 @@ void StorageReplicatedMergeTree::partialShutdown() cleanup_thread.join(); if (alter_thread.joinable()) alter_thread.join(); + if (part_check_thread.joinable()) + part_check_thread.join(); if (queue_task_handle) context.getBackgroundPool().removeTask(queue_task_handle); queue_task_handle.reset(); @@ -1423,6 +1671,7 @@ void StorageReplicatedMergeTree::startup() queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this); cleanup_thread = std::thread(&StorageReplicatedMergeTree::cleanupThread, this); alter_thread = std::thread(&StorageReplicatedMergeTree::alterThread, this); + part_check_thread = std::thread(&StorageReplicatedMergeTree::partCheckThread, this); queue_task_handle = context.getBackgroundPool().addTask( std::bind(&StorageReplicatedMergeTree::queueTask, this, std::placeholders::_1)); }