diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 73b54c31d5b..4d01ba598a6 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -257,6 +257,7 @@ namespace ErrorCodes INCORRECT_MARK, INVALID_PARTITION_NAME, NOT_LEADER, + NOT_ENOUGH_BLOCK_NUMBERS, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/Interpreters/InterpreterAlterQuery.h b/dbms/include/DB/Interpreters/InterpreterAlterQuery.h index 0b0416f0941..e3f44a0efd8 100644 --- a/dbms/include/DB/Interpreters/InterpreterAlterQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterAlterQuery.h @@ -46,7 +46,7 @@ private: static PartitionCommand attachPartition(const Field & partition, bool unreplicated, bool part) { - return {ATTACH_PARTITION, partition, false, part, unreplicated}; + return {ATTACH_PARTITION, partition, false, unreplicated, part}; } }; diff --git a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h index 9543aa991e9..807087f7da9 100644 --- a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h +++ b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h @@ -63,12 +63,14 @@ public: void add(const String & name); String getContainingPart(const String & name) const; - Strings getParts() const; + Strings getParts() const; /// В порядке возрастания месяца и номера блока. + + size_t size() const; static String getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level); /// Возвращает true если имя директории совпадает с форматом имени директории кусочков - static bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches); + static bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec * out_matches = nullptr); /// Кладет в DataPart данные из имени кусочка. static void parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches = nullptr); diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index ad5521b0e1a..208e51f3939 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -307,17 +307,22 @@ public: Poco::File(to).remove(true); } - /// Переименовывает кусок, дописав к имени префикс. - void renameAddPrefix(const String & prefix) const + void renameTo(const String & new_name) const { String from = storage.full_path + name + "/"; - String to = storage.full_path + prefix + name + "/"; + String to = storage.full_path + new_name + "/"; Poco::File f(from); f.setLastModified(Poco::Timestamp::fromEpochTime(time(0))); f.renameTo(to); } + /// Переименовывает кусок, дописав к имени префикс. + void renameAddPrefix(const String & prefix) const + { + renameTo(prefix + name); + } + /// Загрузить индекс и вычислить размер. Если size=0, вычислить его тоже. void loadIndex() { @@ -344,12 +349,12 @@ public: } /// Прочитать контрольные суммы, если есть. - void loadChecksums() + void loadChecksums(bool require) { String path = storage.full_path + name + "/checksums.txt"; if (!Poco::File(path).exists()) { - if (storage.require_part_metadata) + if (require) throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); return; @@ -359,16 +364,21 @@ public: assertEOF(file); } - void loadColumns() + void loadColumns(bool require) { String path = storage.full_path + name + "/columns.txt"; if (!Poco::File(path).exists()) { - if (storage.require_part_metadata) + if (require) throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); - columns = *storage.columns; /// Если нет файла со списком столбцов, запишем его. + for (const NameAndTypePair & column : *storage.columns) + { + if (Poco::File(storage.full_path + name + "/" + escapeForFileName(column.name) + ".bin").exists()) + columns.push_back(column); + } + { WriteBufferFromFile out(path + ".tmp", 4096); columns.writeText(out); @@ -382,7 +392,7 @@ public: columns.readText(file, storage.context.getDataTypeFactory()); } - void checkNotBroken() + void checkNotBroken(bool require_part_metadata) { String path = storage.full_path + name; @@ -391,7 +401,7 @@ public: if (!checksums.files.count("primary.idx")) throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART); - if (storage.require_part_metadata) + if (require_part_metadata) { for (const NameAndTypePair & it : columns) { @@ -625,15 +635,23 @@ public: */ DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr, Transaction * out_transaction = nullptr); - /** Убирает из рабочего набора куски remove и добавляет куски add. + /** Убирает из рабочего набора куски remove и добавляет куски add. add должны уже быть в all_data_parts. * Если clear_without_timeout, данные будут удалены при следующем clearOldParts, игнорируя old_parts_lifetime. */ void replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout); + /** Добавляет новый кусок в список известных кусков и в рабочий набор. + */ + void attachPart(DataPartPtr part); + /** Переименовывает кусок в detached/prefix_кусок и забывает про него. Данные не будут удалены в clearOldParts. * Если restore_covered, добавляет в рабочий набор неактивные куски, слиянием которых получен удаляемый кусок. */ - void renameAndDetachPart(DataPartPtr part, const String & prefix = "", bool restore_covered = false); + void renameAndDetachPart(DataPartPtr part, const String & prefix = "", bool restore_covered = false, bool move_to_detached = true); + + /** Убирает кусок из списка кусков (включая all_data_parts), но не перемещщает директорию. + */ + void detachPartInPlace(DataPartPtr part); /** Возвращает старые неактуальные куски, которые можно удалить. Одновременно удаляет их из списка кусков, но не с диска. */ @@ -685,6 +703,9 @@ public: ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; } SortDescription getSortDescription() const { return sort_descr; } + /// Проверить, что кусок не сломан и посчитать для него чексуммы, если их нет. + MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path); + const Context & context; const String date_column_name; const ASTPtr sampling_expression; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h b/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h index 4490cd9ebdb..417ddd6d7f8 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h @@ -9,16 +9,27 @@ namespace DB class MergeTreePartChecker { public: + struct Settings + { + bool verbose = false; /// Пишет в stderr прогресс и ошибки, и не останавливается при первой ошибке. + bool require_checksums = false; /// Требует, чтобы был columns.txt. + bool require_column_files = false; /// Требует, чтобы для всех столбцов из columns.txt были файлы. + size_t index_granularity = 8192; + + Settings & setVerbose(bool verbose_) { verbose = verbose_; return *this; } + Settings & setRequireChecksums(bool require_checksums_) { require_checksums = require_checksums_; return *this; } + Settings & setRequireColumnFiles(bool require_column_files_) { require_column_files = require_column_files_; return *this; } + Settings & setIndexGranularity(bool index_granularity_) { index_granularity = index_granularity_; return *this; } + }; + /** Полностью проверяет данные кусочка: * - Вычисляет контрольные суммы и сравнивает с checksums.txt. * - Для массивов и строк проверяет соответствие размеров и количества данных. * - Проверяет правильность засечек. * Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи). - * Если strict, требует, чтобы для всех столбцов из columns.txt были файлы. - * Если verbose, пишет в stderr прогресс и ошибки, и не останавливается при первой ошибке. */ - static void checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory, - bool verbose = false); + static void checkDataPart(String path, const Settings & settings, const DataTypeFactory & data_type_factory, + MergeTreeData::DataPart::Checksums * out_checksums = nullptr); }; } diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 38bc341e68c..9c2d040fa1d 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -137,6 +137,7 @@ private: GET_PART, /// Получить кусок с другой реплики. MERGE_PARTS, /// Слить куски. DROP_RANGE, /// Удалить куски в указанном месяце в указанном диапазоне номеров. + ATTACH_PART, /// Перенести кусок из директории detached или unreplicated. }; String znode_name; @@ -144,11 +145,19 @@ private: Type type; String source_replica; /// Пустая строка значит, что эта запись была добавлена сразу в очередь, а не скопирована из лога. - String new_part_name; /// Для DROP_RANGE имя несуществующего куска. Нужно удалить все куски, покрытые им. + /// Имя куска, получающегося в результате. + /// Для DROP_RANGE имя несуществующего куска. Нужно удалить все куски, покрытые им. + String new_part_name; Strings parts_to_merge; - bool detach = false; /// Для DROP_RANGE, true значит, что куски нужно не удалить, а перенести в директорию detached. + /// Для DROP_RANGE, true значит, что куски нужно не удалить, а перенести в директорию detached. + bool detach = false; + + /// Для ATTACH_PART имя куска в директории detached или unreplicated. + String source_part_name; + /// Нужно переносить из директории unreplicated, а не detached. + bool attach_unreplicated; FuturePartTaggerPtr future_part_tagger; bool currently_executing = false; /// Доступ под queue_mutex. @@ -156,13 +165,13 @@ private: void addResultToVirtualParts(StorageReplicatedMergeTree & storage) { - if (type == MERGE_PARTS || type == GET_PART || type == DROP_RANGE) + if (type == MERGE_PARTS || type == GET_PART || type == DROP_RANGE || type == ATTACH_PART) storage.virtual_parts.add(new_part_name); } void tagPartAsFuture(StorageReplicatedMergeTree & storage) { - if (type == MERGE_PARTS || type == GET_PART) + if (type == MERGE_PARTS || type == GET_PART || type == ATTACH_PART) future_part_tagger = new FuturePartTagger(new_part_name, storage); } @@ -362,7 +371,7 @@ private: * Кладет в ops действия, добавляющие данные о куске в ZooKeeper. * Вызывать под TableStructureLock. */ - void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops); + void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops, String name_override = ""); /// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками. void removePartAndEnqueueFetch(const String & part_name); @@ -396,7 +405,8 @@ private: */ bool executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context); - bool executeDropRange(const LogEntry & entry); + void executeDropRange(const LogEntry & entry); + bool executeAttachPart(const LogEntry & entry); /// Возвращает false, если куска нет, и его нужно забрать с другой реплики. /** Обновляет очередь. */ @@ -450,7 +460,7 @@ private: /** Дождаться, пока все реплики, включая эту, выполнят указанное действие из лога. * Если одновременно с этим добавляются реплики, может не дождаться добавленную реплику. */ - void waitForAllReplicasToProcessLogEntry(const String & log_znode_path, const LogEntry & entry); + void waitForAllReplicasToProcessLogEntry(const LogEntry & entry); }; } diff --git a/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp b/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp index 8f6bd3f8eef..b8dbc8f4165 100644 --- a/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp +++ b/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp @@ -82,6 +82,12 @@ Strings ActiveDataPartSet::getParts() const return res; } +size_t ActiveDataPartSet::size() const +{ + Poco::ScopedLock lock(mutex); + return parts.size(); +} + String ActiveDataPartSet::getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level) @@ -110,10 +116,14 @@ String ActiveDataPartSet::getPartName(DayNum_t left_date, DayNum_t right_date, U return res; } -bool ActiveDataPartSet::isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches) +bool ActiveDataPartSet::isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec * out_matches) { + Poco::RegularExpression::MatchVec matches; static Poco::RegularExpression file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)"); - return (file_name_regexp.match(dir_name, 0, matches) && 6 == matches.size()); + bool res = (file_name_regexp.match(dir_name, 0, matches) && 6 == matches.size()); + if (out_matches) + *out_matches = matches; + return res; } void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches_p) @@ -121,7 +131,7 @@ void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, con Poco::RegularExpression::MatchVec match_vec; if (!matches_p) { - if (!isPartDirectory(file_name, match_vec)) + if (!isPartDirectory(file_name, &match_vec)) throw Exception("Unexpected part name: " + file_name, ErrorCodes::BAD_DATA_PART_NAME); matches_p = &match_vec; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index c7eced0b86c..55d0805e65a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -124,7 +125,7 @@ void MergeTreeData::loadDataParts() Poco::RegularExpression::MatchVec matches; for (const String & file_name : part_file_names) { - if (!ActiveDataPartSet::isPartDirectory(file_name, matches)) + if (!ActiveDataPartSet::isPartDirectory(file_name, &matches)) continue; MutableDataPartPtr part = std::make_shared(*this); @@ -135,10 +136,10 @@ void MergeTreeData::loadDataParts() try { - part->loadColumns(); - part->loadChecksums(); + part->loadColumns(require_part_metadata); + part->loadChecksums(require_part_metadata); part->loadIndex(); - part->checkNotBroken(); + part->checkNotBroken(require_part_metadata); } catch (...) { @@ -167,7 +168,7 @@ void MergeTreeData::loadDataParts() { if (contained_name == file_name) continue; - if (!ActiveDataPartSet::isPartDirectory(contained_name, matches)) + if (!ActiveDataPartSet::isPartDirectory(contained_name, &matches)) continue; DataPart contained_part(*this); ActiveDataPartSet::parsePartName(contained_name, contained_part, &matches); @@ -720,7 +721,17 @@ void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataParts } } -void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix, bool restore_covered) +void MergeTreeData::attachPart(DataPartPtr part) +{ + Poco::ScopedLock lock(data_parts_mutex); + Poco::ScopedLock lock_all(all_data_parts_mutex); + + if (!all_data_parts.insert(part).second) + throw Exception("Part " + part->name + " is already attached", ErrorCodes::DUPLICATE_DATA_PART); + data_parts.insert(part); +} + +void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix, bool restore_covered, bool move_to_detached) { LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it."); @@ -731,7 +742,8 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix, throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART); data_parts.erase(part); - part->renameAddPrefix("detached/" + prefix); + if (move_to_detached || !prefix.empty()) + part->renameAddPrefix((move_to_detached ? "detached/" : "") + prefix); if (restore_covered) { @@ -783,6 +795,11 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix, } } +void MergeTreeData::detachPartInPlace(DataPartPtr part) +{ + renameAndDetachPart(part, "", false, false); +} + MergeTreeData::DataParts MergeTreeData::getDataParts() { Poco::ScopedLock lock(data_parts_mutex); @@ -879,6 +896,41 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na return nullptr; } +MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path) +{ + MutableDataPartPtr part = std::make_shared(*this); + part->name = relative_path; + + /// Раньше список столбцов записывался неправильно. Удалим его и создадим заново. + if (Poco::File(full_path + relative_path + "/columns.txt").exists()) + Poco::File(full_path + relative_path + "/columns.txt").remove(); + + part->loadColumns(false); + part->loadChecksums(false); + part->loadIndex(); + part->checkNotBroken(false); + + part->modification_time = Poco::File(full_path + relative_path).getLastModified().epochTime(); + + /// Если нет файла с чексуммами, посчитаем чексуммы и запишем. Заодно проверим данные. + if (part->checksums.empty()) + { + MergeTreePartChecker::Settings settings; + settings.setIndexGranularity(index_granularity); + settings.setRequireColumnFiles(true); + MergeTreePartChecker::checkDataPart(full_path + relative_path, settings, context.getDataTypeFactory(), &part->checksums); + + { + WriteBufferFromFile out(full_path + relative_path + "/checksums.txt.tmp", 4096); + part->checksums.writeText(out); + } + + Poco::File(full_path + relative_path + "/checksums.txt.tmp").renameTo(full_path + relative_path + "/checksums.txt"); + } + + return part; +} + void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const { diff --git a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp index 8dc761b26a4..6382f2624bf 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp @@ -97,7 +97,7 @@ struct Stream return size / sizeof(UInt64); } - void assertMark(bool strict) + void assertMark() { MarkInCompressedFile mrk_mark; readIntBinary(mrk_mark.offset_in_compressed_file, mrk_hashing_buf); @@ -152,7 +152,7 @@ struct Stream }; /// Возвращает количество строк. Добавляет в checksums чексуммы всех файлов столбца. -static size_t checkColumn(const String & path, const String & name, DataTypePtr type, size_t index_granularity, bool strict, +static size_t checkColumn(const String & path, const String & name, DataTypePtr type, const MergeTreePartChecker::Settings & settings, MergeTreeData::DataPart::Checksums & checksums) { size_t rows = 0; @@ -171,10 +171,10 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr if (sizes_stream.marksEOF()) break; - sizes_stream.assertMark(strict); - data_stream.assertMark(strict); + sizes_stream.assertMark(); + data_stream.assertMark(); - size_t cur_rows = sizes_stream.readUInt64(index_granularity, sizes); + size_t cur_rows = sizes_stream.readUInt64(settings.index_granularity, sizes); size_t sum = 0; for (size_t i = 0; i < cur_rows; ++i) @@ -188,7 +188,7 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr data_stream.read(sum); rows += cur_rows; - if (cur_rows < index_granularity) + if (cur_rows < settings.index_granularity) break; } @@ -207,12 +207,12 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr if (data_stream.marksEOF()) break; - data_stream.assertMark(strict); + data_stream.assertMark(); - size_t cur_rows = data_stream.read(index_granularity); + size_t cur_rows = data_stream.read(settings.index_granularity); rows += cur_rows; - if (cur_rows < index_granularity) + if (cur_rows < settings.index_granularity) break; } @@ -228,8 +228,8 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr } } -void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory, - bool verbose) +void MergeTreePartChecker::checkDataPart(String path, const Settings & settings, const DataTypeFactory & data_type_factory, + MergeTreeData::DataPart::Checksums * out_checksums) { if (!path.empty() && *path.rbegin() != '/') path += "/"; @@ -243,7 +243,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, assertEOF(buf); } - if (strict || Poco::File(path + "checksums.txt").exists()) + if (settings.require_checksums || Poco::File(path + "checksums.txt").exists()) { ReadBufferFromFile buf(path + "checksums.txt"); checksums_txt.readText(buf); @@ -266,7 +266,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, for (const NameAndTypePair & column : columns) { - if (verbose) + if (settings.verbose) { std::cerr << column.name << ":"; std::cerr.flush(); @@ -275,14 +275,14 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, bool ok = false; try { - if (!strict && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists()) + if (!settings.require_column_files && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists()) { - if (verbose) + if (settings.verbose) std::cerr << " no files" << std::endl; continue; } - size_t cur_rows = checkColumn(path, column.name, column.type, index_granularity, strict, checksums_data); + size_t cur_rows = checkColumn(path, column.name, column.type, settings, checksums_data); if (first) { rows = cur_rows; @@ -298,7 +298,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, } catch (...) { - if (!verbose) + if (!settings.verbose) throw; ExceptionPtr e = cloneCurrentException(); if (!first_exception) @@ -311,18 +311,18 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, std::cerr << std::endl; } - if (verbose && ok) + if (settings.verbose && ok) std::cerr << " ok" << std::endl; } if (first) throw Exception("No columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); - if (primary_idx_size % ((rows - 1) / index_granularity + 1)) + if (primary_idx_size % ((rows - 1) / settings.index_granularity + 1)) throw Exception("primary.idx size (" + toString(primary_idx_size) + ") not divisible by number of marks (" - + toString(rows) + "/" + toString(index_granularity) + " rounded up)", ErrorCodes::CORRUPTED_DATA); + + toString(rows) + "/" + toString(settings.index_granularity) + " rounded up)", ErrorCodes::CORRUPTED_DATA); - if (strict || !checksums_txt.files.empty()) + if (settings.require_checksums || !checksums_txt.files.empty()) checksums_txt.checkEqual(checksums_data, true); if (first_exception) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartsExchange.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartsExchange.cpp index da860c75357..e97ebc7381c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartsExchange.cpp @@ -110,8 +110,8 @@ MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart( ActiveDataPartSet::parsePartName(part_name, *new_data_part); new_data_part->modification_time = time(0); - new_data_part->loadColumns(); - new_data_part->loadChecksums(); + new_data_part->loadColumns(true); + new_data_part->loadChecksums(true); new_data_part->loadIndex(); new_data_part->checksums.checkEqual(checksums, false); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index a6f359b7cc1..c79a52ec11a 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -17,6 +17,8 @@ const auto ERROR_SLEEP_MS = 1000; const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000; const auto CLEANUP_SLEEP_MS = 30 * 1000; +const auto RESERVED_BLOCK_NUMBERS = 200; + /// Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper. static String padIndex(UInt64 index) { @@ -401,8 +403,8 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) bool insane = parts_to_add.size() > 2 || unexpected_parts.size() > 2 || - expected_parts.size() > 20 || - parts_to_fetch.size() > 2; + expected_parts.size() > 5 || + parts_to_fetch.size() > 30; if (insane && !skip_sanity_checks) { @@ -475,8 +477,11 @@ void StorageReplicatedMergeTree::initVirtualParts() } } -void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops) +void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops, String part_name) { + if (part_name.empty()) + part_name = part->name; + check(part->columns); int expected_columns_version = columns_version; @@ -488,22 +493,22 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP { zkutil::Stat stat_before, stat_after; String columns_str; - if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/columns", columns_str, &stat_before)) + if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/columns", columns_str, &stat_before)) continue; if (columns_str != expected_columns_str) { - LOG_INFO(log, "Not checking checksums of part " << part->name << " with replica " << replica + LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica << " because columns are different"); continue; } String checksums_str; /// Проверим, что версия ноды со столбцами не изменилась, пока мы читали checksums. /// Это гарантирует, что столбцы и чексуммы относятся к одним и тем же данным. - if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/checksums", checksums_str) || - !zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/columns", &stat_after) || + if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/checksums", checksums_str) || + !zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/columns", &stat_after) || stat_before.version != stat_after.version) { - LOG_INFO(log, "Not checking checksums of part " << part->name << " with replica " << replica + LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica << " because part changed while we were reading its checksums"); continue; } @@ -512,9 +517,9 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP checksums.checkEqual(part->checksums, true); } - if (zookeeper->exists(replica_path + "/parts/" + part->name)) + if (zookeeper->exists(replica_path + "/parts/" + part_name)) { - LOG_ERROR(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part->name << " already exists"); + LOG_ERROR(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part_name << " already exists"); return; } @@ -522,17 +527,17 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP zookeeper_path + "/columns", expected_columns_version)); ops.push_back(new zkutil::Op::Create( - replica_path + "/parts/" + part->name, + replica_path + "/parts/" + part_name, "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create( - replica_path + "/parts/" + part->name + "/columns", + replica_path + "/parts/" + part_name + "/columns", part->columns.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create( - replica_path + "/parts/" + part->name + "/checksums", + replica_path + "/parts/" + part_name + "/checksums", part->checksums.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); @@ -749,7 +754,8 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry) { - if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART) && future_parts.count(entry.new_part_name)) + if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART) + && future_parts.count(entry.new_part_name)) { LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name << " because another log entry for the same part is being processed. This shouldn't happen often."); @@ -779,10 +785,14 @@ bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry) bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context) { if (entry.type == LogEntry::DROP_RANGE) - return executeDropRange(entry); + { + executeDropRange(entry); + return true; + } if (entry.type == LogEntry::GET_PART || - entry.type == LogEntry::MERGE_PARTS) + entry.type == LogEntry::MERGE_PARTS || + entry.type == LogEntry::ATTACH_PART) { /// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно. MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name); @@ -805,6 +815,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro { do_fetch = true; } + else if (entry.type == LogEntry::ATTACH_PART) + { + do_fetch = !executeAttachPart(entry); + } else if (entry.type == LogEntry::MERGE_PARTS) { MergeTreeData::DataPartsVector parts; @@ -959,7 +973,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro return true; } -bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry) +void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry) { LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << "."); @@ -1019,7 +1033,7 @@ bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr /// Если кусок нужно удалить, надежнее удалить директорию после изменений в ZooKeeper. if (!entry.detach) - data.replaceParts({part}, {}, false); + data.replaceParts({part}, {}, true); } LOG_INFO(log, (entry.detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << entry.new_part_name << "."); @@ -1043,6 +1057,45 @@ bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr unreplicated_data->replaceParts({part}, {}, false); } } +} + +bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry) +{ + String source_path = (entry.attach_unreplicated ? "unreplicated/" : "detached/") + entry.source_part_name; + + LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name); + + if (!Poco::File(data.getFullPath() + source_path).isDirectory()) + { + LOG_INFO(log, "No such directory. Will fetch " << entry.new_part_name << " instead"); + return false; + } + + LOG_DEBUG(log, "Checking data"); + MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path); + + zkutil::Ops ops; + checkPartAndAddToZooKeeper(part, ops, entry.new_part_name); + + if (entry.attach_unreplicated && unreplicated_data) + { + MergeTreeData::DataPartPtr unreplicated_part = unreplicated_data->getPartIfExists(entry.source_part_name); + if (unreplicated_part) + unreplicated_data->detachPartInPlace(unreplicated_part); + else + LOG_WARNING(log, "Unreplicated part " << entry.source_part_name << " is already detached"); + } + + zookeeper->multi(ops); + + /// NOTE: Не можем использовать renameTempPartAndAdd, потому что кусок не временный - если что-то пойдет не так, его не нужно удалять. + part->renameTo(entry.new_part_name); + part->name = entry.new_part_name; + ActiveDataPartSet::parsePartName(part->name, *part); + + data.attachPart(part); + + LOG_INFO(log, "Finished attaching part " << entry.new_part_name); /// На месте удаленных кусков могут появиться новые, с другими данными. context.resetCaches(); @@ -1625,8 +1678,12 @@ void StorageReplicatedMergeTree::partCheckThread() if (part->columns != zk_columns) throw Exception("Columns of local part " + part_name + " are different from ZooKeeper"); + MergeTreePartChecker::Settings settings; + settings.setIndexGranularity(data.index_granularity); + settings.setRequireChecksums(true); + settings.setRequireColumnFiles(true); MergeTreePartChecker::checkDataPart( - data.getFullPath() + part_name, data.index_granularity, true, context.getDataTypeFactory()); + data.getFullPath() + part_name, settings, context.getDataTypeFactory()); LOG_INFO(log, "Part " << part_name << " looks good."); } @@ -2135,12 +2192,7 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach) { - String month_name; - - if (field.getType() == Field::Types::UInt64) - month_name = toString(field.get()); - else - month_name = field.safeGet(); + String month_name = field.getType() == Field::Types::UInt64 ? toString(field.get()) : field.safeGet(); if (!isValidMonthName(month_name)) throw Exception("Invalid partition format: " + month_name + ". Partition should consist of 6 digits: YYYYMM", @@ -2191,12 +2243,101 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach) entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); /// Дождемся, пока все реплики выполнят дроп. - waitForAllReplicasToProcessLogEntry(log_znode_path, entry); + waitForAllReplicasToProcessLogEntry(entry); } -void StorageReplicatedMergeTree::attachPartition(const Field& partition, bool unreplicated, bool part) +void StorageReplicatedMergeTree::attachPartition(const Field & field, bool unreplicated, bool attach_part) { - throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); + String partition = field.getType() == Field::Types::UInt64 ? toString(field.get()) : field.safeGet(); + + if (!attach_part && !isValidMonthName(partition)) + throw Exception("Invalid partition format: " + partition + ". Partition should consist of 6 digits: YYYYMM", + ErrorCodes::INVALID_PARTITION_NAME); + + String source_dir = (unreplicated ? "unreplicated/" : "detached/"); + + /// Составим список кусков, которые нужно добавить. + Strings parts; + if (attach_part) + { + parts.push_back(partition); + } + else + { + LOG_DEBUG(log, "Looking for parts for partition " << partition << " in " << source_dir); + ActiveDataPartSet active_parts; + for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) + { + String name = it.name(); + if (!ActiveDataPartSet::isPartDirectory(name)) + continue; + if (name.substr(0, partition.size()) != partition) + continue; + LOG_DEBUG(log, "Found part " << name); + active_parts.add(name); + } + LOG_DEBUG(log, active_parts.size() << " of them are active"); + parts = active_parts.getParts(); + } + + /// Синхронно проверим, что добавляемые куски существуют и не испорчены хотя бы на этой реплике. Запишем checksums.txt, если его нет. + LOG_DEBUG(log, "Checking parts"); + for (const String & part : parts) + { + LOG_DEBUG(log, "Checking part " << part); + data.loadPartAndFixMetadata(source_dir + part); + } + + /// Выделим добавляемым кускам максимальные свободные номера, меньшие RESERVED_BLOCK_NUMBERS. + /// NOTE: Проверка свободности номеров никак не синхронизируется. Выполнять несколько запросов ATTACH/DETACH/DROP одновременно нельзя. + UInt64 min_used_number = RESERVED_BLOCK_NUMBERS; + + { + auto existing_parts = data.getDataParts(); + for (const auto & part : existing_parts) + { + min_used_number = std::min(min_used_number, part->left); + } + } + + if (parts.size() > min_used_number) + throw Exception("Not enough free small block numbers for attaching parts: " + + toString(parts.size()) + " needed, " + toString(min_used_number) + " available", ErrorCodes::NOT_ENOUGH_BLOCK_NUMBERS); + + /// Добавим записи в лог. + std::reverse(parts.begin(), parts.end()); + std::list entries; + zkutil::Ops ops; + for (const String & part_name : parts) + { + ActiveDataPartSet::Part part; + ActiveDataPartSet::parsePartName(part_name, part); + part.left = part.right = --min_used_number; + String new_part_name = ActiveDataPartSet::getPartName(part.left_date, part.right_date, part.left, part.right, part.level); + + LOG_INFO(log, "Will attach " << part_name << " as " << new_part_name); + + entries.emplace_back(); + LogEntry & entry = entries.back(); + entry.type = LogEntry::ATTACH_PART; + entry.source_replica = replica_name; + entry.source_part_name = part_name; + entry.new_part_name = new_part_name; + entry.attach_unreplicated = unreplicated; + ops.push_back(new zkutil::Op::Create( + zookeeper_path + "/log/log-", entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential)); + } + + LOG_DEBUG(log, "Adding attaches to log"); + zookeeper->multi(ops); + size_t i = 0; + for (LogEntry & entry : entries) + { + String log_znode_path = dynamic_cast(ops[i++]).getPathCreated(); + entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + waitForAllReplicasToProcessLogEntry(entry); + } } void StorageReplicatedMergeTree::drop() @@ -2246,7 +2387,7 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const zkutil::Ops ops; auto acl = zookeeper->getDefaultACL(); ops.push_back(new zkutil::Op::Create(month_path, "", acl, zkutil::CreateMode::Persistent)); - for (size_t i = 0; i < 200; ++i) + for (size_t i = 0; i < RESERVED_BLOCK_NUMBERS; ++i) { ops.push_back(new zkutil::Op::Create(month_path + "/skip_increment", "", acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Remove(month_path + "/skip_increment", -1)); @@ -2260,11 +2401,11 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const zookeeper_path + "/temp", *zookeeper); } -void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const String & log_znode_path, const LogEntry & entry) +void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const LogEntry & entry) { LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name); - UInt64 log_index = parse(log_znode_path.substr(log_znode_path.size() - 10)); + UInt64 log_index = parse(entry.znode_name.substr(entry.znode_name.size() - 10)); String log_entry_str = entry.toString(); Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); @@ -2354,6 +2495,16 @@ void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const writeString("drop\n", out); writeString(new_part_name, out); break; + case ATTACH_PART: + writeString("attach\n", out); + if (attach_unreplicated) + writeString("unreplicated\n", out); + else + writeString("detached\n", out); + writeString(source_part_name, out); + writeString("\ninto\n", out); + writeString(new_part_name, out); + break; } writeString("\n", out); } @@ -2394,6 +2545,22 @@ void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in) detach = type_str == "detach"; readString(new_part_name, in); } + else if (type_str == "attach") + { + type = ATTACH_PART; + String source_type; + readString(source_type, in); + if (source_type == "unreplicated") + attach_unreplicated = true; + else if (source_type == "detached") + attach_unreplicated = false; + else + throw Exception("Bad format: expected 'unreplicated' or 'detached', found '" + source_type + "'", ErrorCodes::CANNOT_PARSE_TEXT); + assertString("\n", in); + readString(source_part_name, in); + assertString("\ninto\n", in); + readString(new_part_name, in); + } assertString("\n", in); } diff --git a/dbms/src/Storages/tests/part_checker.cpp b/dbms/src/Storages/tests/part_checker.cpp index be19a4c494f..d5376b23c37 100644 --- a/dbms/src/Storages/tests/part_checker.cpp +++ b/dbms/src/Storages/tests/part_checker.cpp @@ -15,8 +15,14 @@ int main(int argc, char ** argv) try { - DB::MergeTreePartChecker::checkDataPart(argv[1], argc == 4 ? DB::parse(argv[3]) : 8192ul, argv[2][0] == '1', - DB::DataTypeFactory(), true); + DB::MergeTreePartChecker::Settings settings; + if (argc == 4) + settings.setIndexGranularity(DB::parse(argv[3])); + settings.setRequireChecksums(argv[2][0] == '1'); + settings.setRequireColumnFiles(argv[2][0] == '1'); + settings.setVerbose(true); + + DB::MergeTreePartChecker::checkDataPart(argv[1], settings, DB::DataTypeFactory()); } catch (...) {