From e493438564f709679a91f07f512ae0e4fe38bc95 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Thu, 17 Jul 2014 14:44:17 +0400 Subject: [PATCH] Merge --- .../DB/Storages/MergeTree/MergeTreeData.h | 29 +++++++++++++++++++ .../MergeTree/MergedBlockOutputStream.h | 6 +--- .../ReplicatedMergeTreePartsExchange.h | 6 ++-- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 20 +++++++++---- .../MergeTree/MergeTreeDataMerger.cpp | 1 + .../MergeTree/MergeTreeDataWriter.cpp | 6 ++-- .../Storages/StorageReplicatedMergeTree.cpp | 2 +- 7 files changed, 55 insertions(+), 15 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index f1fc32c4825..b237d4956df 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -221,6 +221,9 @@ public: time_t modification_time; mutable time_t remove_time; /// Когда кусок убрали из рабочего набора. + /// Если true, деструктор удалит директорию с куском. + bool is_temp = false; + /// Первичный ключ. Всегда загружается в оперативку. typedef std::vector Index; Index index; @@ -247,6 +250,32 @@ public: /// NOTE можно загружать засечки тоже в оперативку + ~DataPart() + { + if (is_temp) + { + try + { + Poco::File dir(storage.full_path + name); + if (!dir.exists()) + return; + + if (name.substr(0, strlen("tmp")) != "tmp") + { + LOG_ERROR(storage.log, "~DataPart() should remove part " << storage.full_path + name + << " but its name doesn't start with tmp. Too suspicious, keeping the part."); + return; + } + + dir.remove(true); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + } + /// Вычисляем сумарный размер всей директории со всеми файлами static size_t calcTotalSize(const String &from) { diff --git a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h index c6c9ed948a1..9d2cb5e4901 100644 --- a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h @@ -341,7 +341,7 @@ public: for (size_t i = 0; i < block.columns(); ++i) { addStream(part_path, block.getByPosition(i).name, - *block.getByPosition(i).type, 0, prefix + block.getByPosition(i).name); + *block.getByPosition(i).type, 0, block.getByPosition(i).name); } initialized = true; } @@ -375,8 +375,6 @@ public: column_stream.second->sync(); std::string column = escapeForFileName(column_stream.first); column_stream.second->addToChecksums(checksums, column); - Poco::File(part_path + prefix + column + ".bin").renameTo(part_path + column + ".bin"); - Poco::File(part_path + prefix + column + ".mrk").renameTo(part_path + column + ".mrk"); } column_streams.clear(); @@ -390,8 +388,6 @@ private: bool initialized; - const std::string prefix = "tmp_"; - bool sync; }; diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h index 06a85fbf537..67f4f0b2d2a 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h @@ -90,6 +90,10 @@ public: if (!Poco::File(part_path).createDirectory()) throw Exception("Directory " + part_path + " already exists"); + MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); + new_data_part->name = "tmp_" + part_name; + new_data_part->is_temp = true; + size_t files; readBinary(files, in); MergeTreeData::DataPart::Checksums checksums; @@ -118,9 +122,7 @@ public: assertEOF(in); - MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); ActiveDataPartSet::parsePartName(part_name, *new_data_part); - new_data_part->name = "tmp_" + part_name; new_data_part->modification_time = time(0); new_data_part->loadColumns(); new_data_part->loadChecksums(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index b9f3b11f5b7..8c8864f06f1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -595,7 +595,8 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( Poco::ScopedLock lock(data_parts_mutex); Poco::ScopedLock lock_all(all_data_parts_mutex); - String old_path = getFullPath() + part->name + "/"; + String old_name = part->name; + String old_path = getFullPath() + old_name + "/"; /** Для StorageMergeTree важно, что получение номера куска происходит атомарно с добавлением этого куска в набор. * Иначе есть race condition - может произойти слияние пары кусков, диапазоны номеров которых @@ -604,16 +605,25 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( if (increment) part->left = part->right = increment->get(false); - part->name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level); + String new_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level); - if (data_parts.count(part)) - throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); + part->is_temp = false; + part->name = new_name; + bool duplicate = data_parts.count(part); + part->name = old_name; + part->is_temp = true; - String new_path = getFullPath() + part->name + "/"; + if (duplicate) + throw Exception("Part " + new_name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); + + String new_path = getFullPath() + new_name + "/"; /// Переименовываем кусок. Poco::File(old_path).renameTo(new_path); + part->is_temp = false; + part->name = new_name; + bool obsolete = false; /// Покрыт ли part каким-нибудь куском. DataPartsVector res; /// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index b6477a6200a..62a2b4365aa 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -268,6 +268,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts( MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); ActiveDataPartSet::parsePartName(merged_name, *new_data_part); new_data_part->name = "tmp_" + merged_name; + new_data_part->is_temp = true; /** Читаем из всех кусков, сливаем и пишем в новый. * Попутно вычисляем выражение для сортировки. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index cb8f1ffa09a..aafa4f0af91 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -89,6 +89,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa Poco::File(part_tmp_path).createDirectories(); + MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); + new_data_part->name = tmp_part_name; + new_data_part->is_temp = true; + /// Если для сортировки надо вычислить некоторые столбцы - делаем это. data.getPrimaryExpression()->execute(block); @@ -105,13 +109,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa out.write(block); MergeTreeData::DataPart::Checksums checksums = out.writeSuffixAndGetChecksums(); - MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); new_data_part->left_date = DayNum_t(min_date); new_data_part->right_date = DayNum_t(max_date); new_data_part->left = temp_index; new_data_part->right = temp_index; new_data_part->level = 0; - new_data_part->name = tmp_part_name; new_data_part->size = part_size; new_data_part->modification_time = time(0); new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index debf688c8db..bee7a93ccfa 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1591,7 +1591,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, const Strin return; } - if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat)) + if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event)) { LOG_WARNING(log, replica << " was removed"); break;