merge upstream

This commit is contained in:
proller 2016-11-23 01:33:02 +03:00
parent e280569453
commit eeeacd0805
10 changed files with 63 additions and 54 deletions

View File

@ -21,7 +21,7 @@ public:
using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)>;
public:
MergeTreeDataMerger(MergeTreeData & data_);
MergeTreeDataMerger(MergeTreeData & data_, const BackgroundProcessingPool & pool_);
void setCancellationHook(CancellationHook cancellation_hook_);
@ -122,6 +122,7 @@ public:
private:
MergeTreeData & data;
const BackgroundProcessingPool & pool;
Logger * log;

View File

@ -6,6 +6,7 @@
#include <mutex>
#include <thread>
#include <atomic>
#include <boost/noncopyable.hpp>
#include <Poco/Event.h>
#include <DB/Core/Types.h>
#include <common/logger_useful.h>
@ -33,6 +34,30 @@ public:
void start();
void stop();
/// Don't create more than one instance of this object simultaneously.
struct TemporarilyStop : private boost::noncopyable
{
ReplicatedMergeTreePartCheckThread * parent;
TemporarilyStop(ReplicatedMergeTreePartCheckThread * parent) : parent(parent)
{
parent->stop();
}
TemporarilyStop(TemporarilyStop && old) : parent(old.parent)
{
old.parent = nullptr;
}
~TemporarilyStop()
{
if (parent)
parent->start();
}
};
TemporarilyStop temporarilyStop() { return std::move(TemporarilyStop(this)); }
/// Добавить кусок (для которого есть подозрения, что он отсутствует, повреждён или не нужен) в очередь для проверки.
/// delay_to_check_seconds - проверять не раньше чем через указанное количество секунд.
void enqueuePart(const String & name, time_t delay_to_check_seconds = 0);

View File

@ -4,7 +4,6 @@
#include <Poco/URI.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>

View File

@ -62,44 +62,26 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ
catch (Exception & e)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
std::stringstream s;
s << "Code: " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
if (!response.sent())
response.send() << s.str() << std::endl;
if (e.code() == ErrorCodes::ABORTED)
LOG_INFO(log, s.str()); /// Отдача куска на удалённый сервер была остановлена из-за остановки сервера или удаления таблицы.
/// Sending to remote server was cancelled due to server shutdown or drop table.
bool is_real_error = e.code() != ErrorCodes::ABORTED;
std::string message = getCurrentExceptionMessage(is_real_error);
if (!response.sent())
response.send() << message << std::endl;
if (is_real_error)
LOG_ERROR(log, message);
else
LOG_ERROR(log, s.str());
}
catch (Poco::Exception & e)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
std::stringstream s;
s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
if (!response.sent())
response.send() << s.str() << std::endl;
LOG_ERROR(log, s.str());
}
catch (std::exception & e)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
std::stringstream s;
s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what();
if (!response.sent())
response.send() << s.str() << std::endl;
LOG_ERROR(log, s.str());
LOG_INFO(log, message);
}
catch (...)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
std::stringstream s;
s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception.";
std::string message = getCurrentExceptionMessage(false);
if (!response.sent())
response.send() << s.str() << std::endl;
LOG_ERROR(log, s.str());
response.send() << message << std::endl;
LOG_ERROR(log, message);
}
}

View File

@ -1,6 +1,7 @@
#include <DB/Storages/MergeTree/DataPartsExchange.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Common/NetException.h>
#include <DB/IO/ReadBufferFromHTTP.h>
@ -111,6 +112,11 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
part->checksums.checkEqual(data_checksums, false);
}
catch (const NetException & e)
{
/// Network error or error on remote side. No need to enquue part for check.
throw;
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::ABORTED)

View File

@ -70,8 +70,8 @@ static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 1.6;
/// потому что между выбором кусков и резервированием места места может стать немного меньше.
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.4;
MergeTreeDataMerger::MergeTreeDataMerger(MergeTreeData & data_)
: data(data_), log(&Logger::get(data.getLogName() + " (Merger)"))
MergeTreeDataMerger::MergeTreeDataMerger(MergeTreeData & data_, const BackgroundProcessingPool & pool_)
: data(data_), pool(pool_), log(&Logger::get(data.getLogName() + " (Merger)"))
{
}
@ -83,7 +83,7 @@ void MergeTreeDataMerger::setCancellationHook(CancellationHook cancellation_hook
size_t MergeTreeDataMerger::getMaxPartsSizeForMerge()
{
size_t total_threads_in_pool = data.context.getBackgroundPool().getNumberOfThreads();
size_t total_threads_in_pool = pool.getNumberOfThreads();
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
size_t free_threads_in_pool = 1 + total_threads_in_pool - busy_threads_in_pool; /// 1 is current thread

View File

@ -82,6 +82,9 @@ void ReplicatedMergeTreeAlterThread::run()
/// Если описание столбцов изменилось, обновим структуру таблицы локально.
if (changed_version)
{
/// Temporarily cancel part checks to avoid locking for long time.
auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop();
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
auto table_lock = storage.lockStructureForAlter();

View File

@ -800,7 +800,7 @@ void ReshardingWorker::createShardedPartitions()
auto & storage = *(current_job.storage);
MergeTreeDataMerger merger{storage.data};
MergeTreeDataMerger merger{storage.data, context.getBackgroundPool()};
MergeTreeDataMerger::CancellationHook hook = std::bind(&ReshardingWorker::abortJobIfRequested, this);
merger.setCancellationHook(hook);

View File

@ -47,7 +47,7 @@ StorageMergeTree::StorageMergeTree(
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, merging_params_,
settings_, database_name_ + "." + table_name, false),
reader(data), writer(data), merger(data),
reader(data), writer(data), merger(data, context.getBackgroundPool()),
increment(0),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
{

View File

@ -214,7 +214,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
sampling_expression_, index_granularity_, merging_params_,
settings_, database_name_ + "." + table_name, true,
[this] (const std::string & name) { enqueuePartForCheck(name); }),
reader(data), writer(data), merger(data), fetcher(data), sharded_partition_uploader_client(*this),
reader(data), writer(data), merger(data, context.getBackgroundPool()), fetcher(data), sharded_partition_uploader_client(*this),
shutdown_event(false), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{
@ -303,7 +303,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
{
LOG_INFO(log, "Have unreplicated data");
unreplicated_reader = std::make_unique<MergeTreeDataSelectExecutor>(*unreplicated_data);
unreplicated_merger = std::make_unique<MergeTreeDataMerger>(*unreplicated_data);
unreplicated_merger = std::make_unique<MergeTreeDataMerger>(*unreplicated_data, context.getBackgroundPool());
}
}
@ -2339,22 +2339,15 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
{
assertNotReadonly();
auto merge_blocker = merger.cancel();
auto unreplicated_merge_blocker = unreplicated_merger ?
unreplicated_merger->cancel() : MergeTreeDataMerger::Blocker();
LOG_DEBUG(log, "Doing ALTER");
NamesAndTypesList new_columns;
NamesAndTypesList new_materialized_columns;
NamesAndTypesList new_alias_columns;
ColumnDefaults new_column_defaults;
String new_columns_str;
int new_columns_version;
String new_columns_str;
zkutil::Stat stat;
{
auto table_lock = lockStructureForAlter();
/// Just to read current structure. Alter will be done in separate thread.
auto table_lock = lockStructure(false);
if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
@ -2365,10 +2358,10 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
throw Exception("Modification of primary key is not supported for replicated tables", ErrorCodes::NOT_IMPLEMENTED);
new_columns = data.getColumnsListNonMaterialized();
new_materialized_columns = data.materialized_columns;
new_alias_columns = data.alias_columns;
new_column_defaults = data.column_defaults;
NamesAndTypesList new_columns = data.getColumnsListNonMaterialized();
NamesAndTypesList new_materialized_columns = data.materialized_columns;
NamesAndTypesList new_alias_columns = data.alias_columns;
ColumnDefaults new_column_defaults = data.column_defaults;
params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
new_columns_str = ColumnsDescription<false>{