From eeeacd0805775b465587ace56d92b1d99dec63a8 Mon Sep 17 00:00:00 2001 From: proller Date: Wed, 23 Nov 2016 01:33:02 +0300 Subject: [PATCH] merge upstream --- .../Storages/MergeTree/MergeTreeDataMerger.h | 3 +- .../ReplicatedMergeTreePartCheckThread.h | 25 +++++++++++ dbms/src/IO/InterserverWriteBuffer.cpp | 1 - dbms/src/Server/InterserverIOHTTPHandler.cpp | 44 ++++++------------- .../Storages/MergeTree/DataPartsExchange.cpp | 6 +++ .../MergeTree/MergeTreeDataMerger.cpp | 6 +-- .../ReplicatedMergeTreeAlterThread.cpp | 3 ++ .../Storages/MergeTree/ReshardingWorker.cpp | 2 +- dbms/src/Storages/StorageMergeTree.cpp | 2 +- .../Storages/StorageReplicatedMergeTree.cpp | 25 ++++------- 10 files changed, 63 insertions(+), 54 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h index bcd06808d11..0970328d171 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -21,7 +21,7 @@ public: using AllowedMergingPredicate = std::function; public: - MergeTreeDataMerger(MergeTreeData & data_); + MergeTreeDataMerger(MergeTreeData & data_, const BackgroundProcessingPool & pool_); void setCancellationHook(CancellationHook cancellation_hook_); @@ -122,6 +122,7 @@ public: private: MergeTreeData & data; + const BackgroundProcessingPool & pool; Logger * log; diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h index 0631a715ae1..21a66ca3e20 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -33,6 +34,30 @@ public: void start(); void stop(); + /// Don't create more than one instance of this object simultaneously. + struct TemporarilyStop : private boost::noncopyable + { + ReplicatedMergeTreePartCheckThread * parent; + + TemporarilyStop(ReplicatedMergeTreePartCheckThread * parent) : parent(parent) + { + parent->stop(); + } + + TemporarilyStop(TemporarilyStop && old) : parent(old.parent) + { + old.parent = nullptr; + } + + ~TemporarilyStop() + { + if (parent) + parent->start(); + } + }; + + TemporarilyStop temporarilyStop() { return std::move(TemporarilyStop(this)); } + /// Добавить кусок (для которого есть подозрения, что он отсутствует, повреждён или не нужен) в очередь для проверки. /// delay_to_check_seconds - проверять не раньше чем через указанное количество секунд. void enqueuePart(const String & name, time_t delay_to_check_seconds = 0); diff --git a/dbms/src/IO/InterserverWriteBuffer.cpp b/dbms/src/IO/InterserverWriteBuffer.cpp index ad1a84e30d3..7e4b2bb7691 100644 --- a/dbms/src/IO/InterserverWriteBuffer.cpp +++ b/dbms/src/IO/InterserverWriteBuffer.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include diff --git a/dbms/src/Server/InterserverIOHTTPHandler.cpp b/dbms/src/Server/InterserverIOHTTPHandler.cpp index b91fa7a839a..b0224ecf3f6 100644 --- a/dbms/src/Server/InterserverIOHTTPHandler.cpp +++ b/dbms/src/Server/InterserverIOHTTPHandler.cpp @@ -62,44 +62,26 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ catch (Exception & e) { response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); - std::stringstream s; - s << "Code: " << e.code() - << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what(); - if (!response.sent()) - response.send() << s.str() << std::endl; - if (e.code() == ErrorCodes::ABORTED) - LOG_INFO(log, s.str()); /// Отдача куска на удалённый сервер была остановлена из-за остановки сервера или удаления таблицы. + /// Sending to remote server was cancelled due to server shutdown or drop table. + bool is_real_error = e.code() != ErrorCodes::ABORTED; + + std::string message = getCurrentExceptionMessage(is_real_error); + if (!response.sent()) + response.send() << message << std::endl; + + if (is_real_error) + LOG_ERROR(log, message); else - LOG_ERROR(log, s.str()); - } - catch (Poco::Exception & e) - { - response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); - std::stringstream s; - s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code() - << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what(); - if (!response.sent()) - response.send() << s.str() << std::endl; - LOG_ERROR(log, s.str()); - } - catch (std::exception & e) - { - response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); - std::stringstream s; - s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what(); - if (!response.sent()) - response.send() << s.str() << std::endl; - LOG_ERROR(log, s.str()); + LOG_INFO(log, message); } catch (...) { response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); - std::stringstream s; - s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception."; + std::string message = getCurrentExceptionMessage(false); if (!response.sent()) - response.send() << s.str() << std::endl; - LOG_ERROR(log, s.str()); + response.send() << message << std::endl; + LOG_ERROR(log, message); } } diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index 8df3d3fd451..c699730698f 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include @@ -111,6 +112,11 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body part->checksums.checkEqual(data_checksums, false); } + catch (const NetException & e) + { + /// Network error or error on remote side. No need to enquue part for check. + throw; + } catch (const Exception & e) { if (e.code() != ErrorCodes::ABORTED) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 9b3b8779b60..c993e103e42 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -70,8 +70,8 @@ static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 1.6; /// потому что между выбором кусков и резервированием места места может стать немного меньше. static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.4; -MergeTreeDataMerger::MergeTreeDataMerger(MergeTreeData & data_) - : data(data_), log(&Logger::get(data.getLogName() + " (Merger)")) +MergeTreeDataMerger::MergeTreeDataMerger(MergeTreeData & data_, const BackgroundProcessingPool & pool_) + : data(data_), pool(pool_), log(&Logger::get(data.getLogName() + " (Merger)")) { } @@ -83,7 +83,7 @@ void MergeTreeDataMerger::setCancellationHook(CancellationHook cancellation_hook size_t MergeTreeDataMerger::getMaxPartsSizeForMerge() { - size_t total_threads_in_pool = data.context.getBackgroundPool().getNumberOfThreads(); + size_t total_threads_in_pool = pool.getNumberOfThreads(); size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); size_t free_threads_in_pool = 1 + total_threads_in_pool - busy_threads_in_pool; /// 1 is current thread diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp index 910afa6dc2e..79dd4544925 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp @@ -82,6 +82,9 @@ void ReplicatedMergeTreeAlterThread::run() /// Если описание столбцов изменилось, обновим структуру таблицы локально. if (changed_version) { + /// Temporarily cancel part checks to avoid locking for long time. + auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop(); + LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock."); auto table_lock = storage.lockStructureForAlter(); diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index 98a8dbec450..51af83ec25a 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -800,7 +800,7 @@ void ReshardingWorker::createShardedPartitions() auto & storage = *(current_job.storage); - MergeTreeDataMerger merger{storage.data}; + MergeTreeDataMerger merger{storage.data, context.getBackgroundPool()}; MergeTreeDataMerger::CancellationHook hook = std::bind(&ReshardingWorker::abortJobIfRequested, this); merger.setCancellationHook(hook); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 318776ed595..cfec7099bde 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -47,7 +47,7 @@ StorageMergeTree::StorageMergeTree( context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_, merging_params_, settings_, database_name_ + "." + table_name, false), - reader(data), writer(data), merger(data), + reader(data), writer(data), merger(data, context.getBackgroundPool()), increment(0), log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)")) { diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 8a8eb9c94c3..cf290c8f686 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -214,7 +214,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( sampling_expression_, index_granularity_, merging_params_, settings_, database_name_ + "." + table_name, true, [this] (const std::string & name) { enqueuePartForCheck(name); }), - reader(data), writer(data), merger(data), fetcher(data), sharded_partition_uploader_client(*this), + reader(data), writer(data), merger(data, context.getBackgroundPool()), fetcher(data), sharded_partition_uploader_client(*this), shutdown_event(false), part_check_thread(*this), log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)")) { @@ -303,7 +303,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( { LOG_INFO(log, "Have unreplicated data"); unreplicated_reader = std::make_unique(*unreplicated_data); - unreplicated_merger = std::make_unique(*unreplicated_data); + unreplicated_merger = std::make_unique(*unreplicated_data, context.getBackgroundPool()); } } @@ -2339,22 +2339,15 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, { assertNotReadonly(); - auto merge_blocker = merger.cancel(); - auto unreplicated_merge_blocker = unreplicated_merger ? - unreplicated_merger->cancel() : MergeTreeDataMerger::Blocker(); - LOG_DEBUG(log, "Doing ALTER"); - NamesAndTypesList new_columns; - NamesAndTypesList new_materialized_columns; - NamesAndTypesList new_alias_columns; - ColumnDefaults new_column_defaults; - String new_columns_str; int new_columns_version; + String new_columns_str; zkutil::Stat stat; { - auto table_lock = lockStructureForAlter(); + /// Just to read current structure. Alter will be done in separate thread. + auto table_lock = lockStructure(false); if (is_readonly) throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY); @@ -2365,10 +2358,10 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, if (param.type == AlterCommand::MODIFY_PRIMARY_KEY) throw Exception("Modification of primary key is not supported for replicated tables", ErrorCodes::NOT_IMPLEMENTED); - new_columns = data.getColumnsListNonMaterialized(); - new_materialized_columns = data.materialized_columns; - new_alias_columns = data.alias_columns; - new_column_defaults = data.column_defaults; + NamesAndTypesList new_columns = data.getColumnsListNonMaterialized(); + NamesAndTypesList new_materialized_columns = data.materialized_columns; + NamesAndTypesList new_alias_columns = data.alias_columns; + ColumnDefaults new_column_defaults = data.column_defaults; params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults); new_columns_str = ColumnsDescription{