From 2482560b15c85a5e99cc9f8d8beb4ef2649c8977 Mon Sep 17 00:00:00 2001 From: Pavel Kartavyy Date: Thu, 29 Jan 2015 15:53:59 +0300 Subject: [PATCH 01/25] DB::Connection: don't log Ping requests --- dbms/src/Client/Connection.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 3f14786085d..339a18ea249 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -165,7 +165,7 @@ void Connection::forceConnected() bool Connection::ping() { - LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")"); + // LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")"); try { From 51f8ad6e9dad7f69d1db80664a1db7674f73321a Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Wed, 4 Feb 2015 13:27:06 +0300 Subject: [PATCH 02/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/Client/Connection.h | 4 +- .../{ShardReplicas.h => ParallelReplicas.h} | 16 +++--- .../DB/DataStreams/RemoteBlockInputStream.h | 33 +++++------ dbms/include/DB/IO/ReadBuffer.h | 2 +- dbms/include/DB/Storages/StorageChunkRef.h | 2 - dbms/include/DB/Storages/StorageMergeTree.h | 1 + ...ShardReplicas.cpp => ParallelReplicas.cpp} | 51 +++++++++-------- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 56 +++++++++++-------- 8 files changed, 85 insertions(+), 80 deletions(-) rename dbms/include/DB/Client/{ShardReplicas.h => ParallelReplicas.h} (81%) rename dbms/src/Client/{ShardReplicas.cpp => ParallelReplicas.cpp} (77%) diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index ae38abc2839..899113da061 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -26,7 +26,7 @@ namespace DB using Poco::SharedPtr; -class ShardReplicas; +class ParallelReplicas; /// Поток блоков читающих из таблицы и ее имя typedef std::pair ExternalTableData; @@ -42,7 +42,7 @@ typedef std::vector ExternalTablesData; */ class Connection : private boost::noncopyable { - friend class ShardReplicas; + friend class ParallelReplicas; public: Connection(const String & host_, UInt16 port_, const String & default_database_, diff --git a/dbms/include/DB/Client/ShardReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h similarity index 81% rename from dbms/include/DB/Client/ShardReplicas.h rename to dbms/include/DB/Client/ParallelReplicas.h index afca1781917..7bd62290cd0 100644 --- a/dbms/include/DB/Client/ShardReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -8,15 +8,13 @@ namespace DB /** * Множество реплик одного шарда. */ - class ShardReplicas final + class ParallelReplicas final { public: - ShardReplicas(std::vector & entries, const Settings & settings_); + ParallelReplicas(std::vector & entries, const Settings & settings_); - ~ShardReplicas() = default; - - ShardReplicas(const ShardReplicas &) = delete; - ShardReplicas & operator=(const ShardReplicas &) = delete; + ParallelReplicas(const ParallelReplicas &) = delete; + ParallelReplicas & operator=(const ParallelReplicas &) = delete; /// Отправить на реплики всё содержимое внешних таблиц. void sendExternalTablesData(std::vector & data); @@ -41,7 +39,7 @@ namespace DB std::string dumpAddresses() const; /// Возвращает количесто реплик. - size_t size() const { return replica_hash.size(); } + size_t size() const { return replica_map.size(); } private: /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. @@ -50,11 +48,11 @@ namespace DB private: /// Реплики хэшированные по id сокета - using ReplicaHash = std::unordered_map; + using ReplicaMap = std::unordered_map; private: const Settings & settings; - ReplicaHash replica_hash; + ReplicaMap replica_map; size_t active_connection_count = 0; bool sent_query = false; bool cancelled = false; diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 6b65dc88d55..0b3a28e808e 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -8,7 +8,7 @@ #include #include -#include +#include namespace DB @@ -87,7 +87,7 @@ public: { std::string addresses; if (use_many_replicas) - addresses = shard_replicas->dumpAddresses(); + addresses = parallel_replicas->dumpAddresses(); else addresses = connection->getServerAddress(); @@ -95,7 +95,7 @@ public: /// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос. if (use_many_replicas) - shard_replicas->sendCancel(); + parallel_replicas->sendCancel(); else connection->sendCancel(); @@ -112,7 +112,7 @@ public: if (sent_query && !finished) { if (use_many_replicas) - shard_replicas->disconnect(); + parallel_replicas->disconnect(); else connection->disconnect(); } @@ -122,7 +122,7 @@ protected: /// Отправить на удаленные сервера все временные таблицы void sendExternalTables() { - size_t count = use_many_replicas ? shard_replicas->size() : 1; + size_t count = use_many_replicas ? parallel_replicas->size() : 1; std::vector instances; instances.reserve(count); @@ -145,7 +145,7 @@ protected: } if (use_many_replicas) - shard_replicas->sendExternalTablesData(instances); + parallel_replicas->sendExternalTablesData(instances); else connection->sendExternalTablesData(instances[0]); } @@ -156,14 +156,14 @@ protected: { if (use_many_replicas) { - auto entries = pool->getMany(&settings); - if (entries.size() > 1) - shard_replicas = ext::make_unique(entries, settings); + pool_entries = pool->getMany(&settings); + if (pool_entries.size() > 1) + parallel_replicas = ext::make_unique(pool_entries, settings); else { /// NOTE IConnectionPool::getMany() всегда возвращает как минимум одно соединение. use_many_replicas = false; - connection = &*entries[0]; + connection = &*pool_entries[0]; } } else @@ -177,7 +177,7 @@ protected: } if (use_many_replicas) - shard_replicas->sendQuery(query, "", stage, true); + parallel_replicas->sendQuery(query, "", stage, true); else connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true); @@ -187,7 +187,7 @@ protected: while (true) { - Connection::Packet packet = use_many_replicas ? shard_replicas->receivePacket() : connection->receivePacket(); + Connection::Packet packet = use_many_replicas ? parallel_replicas->receivePacket() : connection->receivePacket(); switch (packet.type) { @@ -258,7 +258,7 @@ protected: { std::string addresses; if (use_many_replicas) - addresses = shard_replicas->dumpAddresses(); + addresses = parallel_replicas->dumpAddresses(); else addresses = connection->getServerAddress(); @@ -267,14 +267,14 @@ protected: was_cancelled = true; if (use_many_replicas) - shard_replicas->sendCancel(); + parallel_replicas->sendCancel(); else connection->sendCancel(); } if (use_many_replicas) { - Connection::Packet packet = shard_replicas->drain(); + Connection::Packet packet = parallel_replicas->drain(); switch (packet.type) { case Protocol::Server::EndOfStream: @@ -327,7 +327,8 @@ private: ConnectionPool::Entry pool_entry; Connection * connection = nullptr; - std::unique_ptr shard_replicas; + std::vector pool_entries; + std::unique_ptr parallel_replicas; const String query; bool send_settings; diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index d3175042fc4..754412c5ee7 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -146,7 +146,7 @@ public: /** Проверить, есть ли данные в буфере для чтения. */ bool hasPendingData() const { - return offset() != working_buffer.size(); + return pos != working_buffer.end(); } private: diff --git a/dbms/include/DB/Storages/StorageChunkRef.h b/dbms/include/DB/Storages/StorageChunkRef.h index d4fc86972d3..df2be812652 100644 --- a/dbms/include/DB/Storages/StorageChunkRef.h +++ b/dbms/include/DB/Storages/StorageChunkRef.h @@ -22,8 +22,6 @@ public: NameAndTypePair getColumn(const String & column_name) const override { return getSource().getColumn(column_name); }; bool hasColumn(const String & column_name) const override { return getSource().hasColumn(column_name); }; - bool supportsParallelReplicas() const override { return true; } - BlockInputStreams read( const Names & column_names, ASTPtr query, diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index a2c4f68ee26..e33c2f7f0c4 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -54,6 +54,7 @@ public: bool supportsSampling() const override { return data.supportsSampling(); } bool supportsFinal() const override { return data.supportsFinal(); } bool supportsPrewhere() const override { return data.supportsPrewhere(); } + bool supportsParallelReplicas() const override { return true; } const NamesAndTypesList & getColumnsListImpl() const override { return data.getColumnsListNonMaterialized(); } diff --git a/dbms/src/Client/ShardReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp similarity index 77% rename from dbms/src/Client/ShardReplicas.cpp rename to dbms/src/Client/ParallelReplicas.cpp index e9a7eae0335..1c9f4ef57b0 100644 --- a/dbms/src/Client/ShardReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -1,26 +1,26 @@ -#include +#include #include namespace DB { - ShardReplicas::ShardReplicas(std::vector & entries, const Settings & settings_) : + ParallelReplicas::ParallelReplicas(std::vector & entries, const Settings & settings_) : settings(settings_), active_connection_count(entries.size()) { - replica_hash.reserve(entries.size()); + replica_map.reserve(entries.size()); for (auto & entry : entries) { Connection * connection = &*entry; if (connection == nullptr) throw Exception("Invalid connection specified in parameter."); - auto res = replica_hash.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); + auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); if (!res.second) throw Exception("Invalid set of connections."); } } - void ShardReplicas::sendExternalTablesData(std::vector & data) + void ParallelReplicas::sendExternalTablesData(std::vector & data) { if (!sent_query) throw Exception("Cannot send external tables data: query not yet sent."); @@ -29,7 +29,7 @@ namespace DB throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); auto it = data.begin(); - for (auto & e : replica_hash) + for (auto & e : replica_map) { Connection * connection = e.second; if (connection != nullptr) @@ -38,16 +38,16 @@ namespace DB } } - void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) + void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) { if (sent_query) throw Exception("Query already sent."); Settings query_settings = settings; - query_settings.parallel_replicas_count = replica_hash.size(); + query_settings.parallel_replicas_count = replica_map.size(); UInt64 offset = 0; - for (auto & e : replica_hash) + for (auto & e : replica_map) { Connection * connection = e.second; if (connection != nullptr) @@ -61,7 +61,7 @@ namespace DB sent_query = true; } - Connection::Packet ShardReplicas::receivePacket() + Connection::Packet ParallelReplicas::receivePacket() { if (!sent_query) throw Exception("Cannot receive packets: no query sent."); @@ -109,9 +109,9 @@ namespace DB return packet; } - void ShardReplicas::disconnect() + void ParallelReplicas::disconnect() { - for (auto & e : replica_hash) + for (auto & e : replica_map) { Connection * & connection = e.second; if (connection != nullptr) @@ -123,12 +123,12 @@ namespace DB } } - void ShardReplicas::sendCancel() + void ParallelReplicas::sendCancel() { if (!sent_query || cancelled) throw Exception("Cannot cancel. Either no query sent or already cancelled."); - for (auto & e : replica_hash) + for (auto & e : replica_map) { Connection * connection = e.second; if (connection != nullptr) @@ -138,7 +138,7 @@ namespace DB cancelled = true; } - Connection::Packet ShardReplicas::drain() + Connection::Packet ParallelReplicas::drain() { if (!cancelled) throw Exception("Cannot drain connections: cancel first."); @@ -172,30 +172,29 @@ namespace DB return res; } - std::string ShardReplicas::dumpAddresses() const + std::string ParallelReplicas::dumpAddresses() const { + bool is_first = true; std::ostringstream os; - for (auto & e : replica_hash) + for (auto & e : replica_map) { - char prefix = '\0'; const Connection * connection = e.second; if (connection != nullptr) { - os << prefix << connection->getServerAddress(); - if (prefix == '\0') - prefix = ';'; + os << (is_first ? "" : "; ") << connection->getServerAddress(); + if (is_first) { is_first = false; } } } return os.str(); } - Connection ** ShardReplicas::waitForReadEvent() + Connection ** ParallelReplicas::waitForReadEvent() { Poco::Net::Socket::SocketList read_list; read_list.reserve(active_connection_count); - for (auto & e : replica_hash) + for (auto & e : replica_map) { Connection * connection = e.second; if ((connection != nullptr) && connection->hasReadBufferPendingData()) @@ -207,7 +206,7 @@ namespace DB Poco::Net::Socket::SocketList write_list; Poco::Net::Socket::SocketList except_list; - for (auto & e : replica_hash) + for (auto & e : replica_map) { Connection * connection = e.second; if (connection != nullptr) @@ -219,8 +218,8 @@ namespace DB } auto & socket = read_list[rand() % read_list.size()]; - auto it = replica_hash.find(socket.impl()->sockfd()); - if (it == replica_hash.end()) + auto it = replica_map.find(socket.impl()->sockfd()); + if (it == replica_map.end()) throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA); return &(it->second); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 0ab86e6c196..aa813fac2c8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -177,10 +177,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( if (parallel_replicas_count > 1) { - UInt64 step = upper_limit / parallel_replicas_count; - sampling_column_value_lower_limit = parallel_replica_offset * step; + sampling_column_value_lower_limit = (parallel_replica_offset * upper_limit) / parallel_replicas_count; if ((parallel_replica_offset + 1) < parallel_replicas_count) - sampling_column_value_upper_limit = (parallel_replica_offset + 1) * step; + sampling_column_value_upper_limit = ((parallel_replica_offset + 1) * upper_limit) / parallel_replicas_count; else sampling_column_value_upper_limit = upper_limit + 1; } @@ -191,25 +190,15 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( } /// Добавим условие, чтобы отсечь еще что-нибудь при повторном просмотре индекса. - if (!key_condition.addCondition(data.sampling_expression->getColumnName(), - Range::createLeftBounded(sampling_column_value_lower_limit, true))) - throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); + if (sampling_column_value_lower_limit > 0) + if (!key_condition.addCondition(data.sampling_expression->getColumnName(), + Range::createLeftBounded(sampling_column_value_lower_limit, true))) + throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); if (!key_condition.addCondition(data.sampling_expression->getColumnName(), Range::createRightBounded(sampling_column_value_upper_limit, false))) throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); - /// Выражение для фильтрации: sampling_expression in [sampling_column_value_lower_limit, sampling_column_value_upper_limit) - - ASTPtr lower_filter_args = new ASTExpressionList; - lower_filter_args->children.push_back(data.sampling_expression); - lower_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_lower_limit)); - - ASTFunctionPtr lower_filter_function = new ASTFunction; - lower_filter_function->name = "greaterOrEquals"; - lower_filter_function->arguments = lower_filter_args; - lower_filter_function->children.push_back(lower_filter_function->arguments); - ASTPtr upper_filter_args = new ASTExpressionList; upper_filter_args->children.push_back(data.sampling_expression); upper_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_upper_limit)); @@ -219,14 +208,33 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( upper_filter_function->arguments = upper_filter_args; upper_filter_function->children.push_back(upper_filter_function->arguments); - ASTPtr filter_function_args = new ASTExpressionList; - filter_function_args->children.push_back(lower_filter_function); - filter_function_args->children.push_back(upper_filter_function); + if (sampling_column_value_lower_limit > 0) + { + /// Выражение для фильтрации: sampling_expression in [sampling_column_value_lower_limit, sampling_column_value_upper_limit) - filter_function = new ASTFunction; - filter_function->name = "and"; - filter_function->arguments = filter_function_args; - filter_function->children.push_back(filter_function->arguments); + ASTPtr lower_filter_args = new ASTExpressionList; + lower_filter_args->children.push_back(data.sampling_expression); + lower_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_lower_limit)); + + ASTFunctionPtr lower_filter_function = new ASTFunction; + lower_filter_function->name = "greaterOrEquals"; + lower_filter_function->arguments = lower_filter_args; + lower_filter_function->children.push_back(lower_filter_function->arguments); + + ASTPtr filter_function_args = new ASTExpressionList; + filter_function_args->children.push_back(lower_filter_function); + filter_function_args->children.push_back(upper_filter_function); + + filter_function = new ASTFunction; + filter_function->name = "and"; + filter_function->arguments = filter_function_args; + filter_function->children.push_back(filter_function->arguments); + } + else + { + /// Выражение для фильтрации: sampling_expression < sampling_column_value_upper_limit + filter_function = upper_filter_function; + } filter_expression = ExpressionAnalyzer(filter_function, data.context, data.getColumnsList()).getActions(false); From d519fac7b51fc0c69361414ce642bbbfbc9765c9 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Wed, 4 Feb 2015 13:40:00 +0300 Subject: [PATCH 03/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- .../include/DB/DataStreams/RemoteBlockInputStream.h | 2 +- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 13 +++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 0b3a28e808e..a736f837397 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -25,7 +25,7 @@ private: { send_settings = true; settings = *settings_; - use_many_replicas = (pool != nullptr) && UInt64(settings.max_parallel_replicas) > 1; + use_many_replicas = (pool != nullptr) && (settings.max_parallel_replicas > 1); } else send_settings = false; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index aa813fac2c8..3d3711ca19f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -149,10 +149,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( relative_sample_size = 0; } - UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count); - UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset); - - if ((parallel_replicas_count > 1) && !data.sampling_expression.isNull() && (relative_sample_size == 0)) + if ((settings.parallel_replicas_count > 1) && !data.sampling_expression.isNull() && (relative_sample_size == 0)) relative_sample_size = 1; if (relative_sample_size != 0) @@ -175,11 +172,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( UInt64 sampling_column_value_upper_limit; UInt64 upper_limit = static_cast(relative_sample_size * sampling_column_max); - if (parallel_replicas_count > 1) + if (settings.parallel_replicas_count > 1) { - sampling_column_value_lower_limit = (parallel_replica_offset * upper_limit) / parallel_replicas_count; - if ((parallel_replica_offset + 1) < parallel_replicas_count) - sampling_column_value_upper_limit = ((parallel_replica_offset + 1) * upper_limit) / parallel_replicas_count; + sampling_column_value_lower_limit = (settings.parallel_replica_offset * upper_limit) / settings.parallel_replicas_count; + if ((settings.parallel_replica_offset + 1) < settings.parallel_replicas_count) + sampling_column_value_upper_limit = ((settings.parallel_replica_offset + 1) * upper_limit) / settings.parallel_replicas_count; else sampling_column_value_upper_limit = upper_limit + 1; } From ab9da73deadea4616596bfc9c9adca59a0df7590 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Wed, 4 Feb 2015 16:07:10 +0300 Subject: [PATCH 04/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/Client/ParallelReplicas.h | 11 ++-- dbms/include/DB/Core/ErrorCodes.h | 2 - .../DB/DataStreams/RemoteBlockInputStream.h | 13 ++++- dbms/src/Client/ParallelReplicas.cpp | 57 +++++++------------ 4 files changed, 38 insertions(+), 45 deletions(-) diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index 7bd62290cd0..82a2daf3179 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -33,6 +33,7 @@ namespace DB void sendCancel(); /// Для каждой реплики получить оставшиеся пакеты после отмена запроса. + /// Возвращает либо последнее полученное исключение либо пакет EndOfStream. Connection::Packet drain(); /// Получить адреса реплик в виде строки. @@ -41,15 +42,17 @@ namespace DB /// Возвращает количесто реплик. size_t size() const { return replica_map.size(); } - private: - /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. - /// Возвращает соединение на такую реплику, если оно найдётся. - Connection ** waitForReadEvent(); + bool hasActiveConnections() const { return active_connection_count > 0; } private: /// Реплики хэшированные по id сокета using ReplicaMap = std::unordered_map; + private: + /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. + /// Возвращает соединение на такую реплику, если оно найдётся. + ReplicaMap::iterator waitForReadEvent(); + private: const Settings & settings; ReplicaMap replica_map; diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 969a65e1d0a..cc18deee7ec 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -273,10 +273,8 @@ namespace ErrorCodes CANNOT_COMPILE_CODE, INCOMPATIBLE_TYPE_OF_JOIN, NO_AVAILABLE_REPLICA, - UNEXPECTED_REPLICA, MISMATCH_REPLICAS_DATA_SOURCES, STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS, - MISSING_RANGE_IN_CHUNK, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index a736f837397..d7fb41f9af6 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -199,11 +199,17 @@ protected: case Protocol::Server::Exception: got_exception_from_server = true; + if (use_many_replicas) + { + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); + } packet.exception->rethrow(); break; case Protocol::Server::EndOfStream: - finished = true; + if (!use_many_replicas || !parallel_replicas->hasActiveConnections()) + finished = true; return Block(); case Protocol::Server::Progress: @@ -233,6 +239,11 @@ protected: break; default: + if (use_many_replicas) + { + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); + } throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index 1c9f4ef57b0..1aa852d1473 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -13,17 +13,17 @@ namespace DB { Connection * connection = &*entry; if (connection == nullptr) - throw Exception("Invalid connection specified in parameter."); + throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR); auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); if (!res.second) - throw Exception("Invalid set of connections."); + throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); } } void ParallelReplicas::sendExternalTablesData(std::vector & data) { if (!sent_query) - throw Exception("Cannot send external tables data: query not yet sent."); + throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR); if (data.size() < active_connection_count) throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); @@ -41,7 +41,7 @@ namespace DB void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) { if (sent_query) - throw Exception("Query already sent."); + throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); Settings query_settings = settings; query_settings.parallel_replicas_count = replica_map.size(); @@ -64,15 +64,16 @@ namespace DB Connection::Packet ParallelReplicas::receivePacket() { if (!sent_query) - throw Exception("Cannot receive packets: no query sent."); - if (active_connection_count == 0) - throw Exception("No more packets are available."); + throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); + if (!hasActiveConnections()) + throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR); - Connection ** connection = waitForReadEvent(); - if (connection == nullptr) + auto it = waitForReadEvent(); + if (it == replica_map.end()) throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA); - Connection::Packet packet = (*connection)->receivePacket(); + Connection * & connection = it->second; + Connection::Packet packet = connection->receivePacket(); switch (packet.type) { @@ -84,25 +85,10 @@ namespace DB break; case Protocol::Server::EndOfStream: - *connection = nullptr; - --active_connection_count; - if (active_connection_count > 0) - { - Connection::Packet empty_packet; - empty_packet.type = Protocol::Server::Data; - return empty_packet; - } - break; - case Protocol::Server::Exception: default: - *connection = nullptr; + connection = nullptr; --active_connection_count; - if (!cancelled) - { - sendCancel(); - (void) drain(); - } break; } @@ -126,7 +112,7 @@ namespace DB void ParallelReplicas::sendCancel() { if (!sent_query || cancelled) - throw Exception("Cannot cancel. Either no query sent or already cancelled."); + throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR); for (auto & e : replica_map) { @@ -141,12 +127,12 @@ namespace DB Connection::Packet ParallelReplicas::drain() { if (!cancelled) - throw Exception("Cannot drain connections: cancel first."); + throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR); Connection::Packet res; res.type = Protocol::Server::EndOfStream; - while (active_connection_count > 0) + while (hasActiveConnections()) { Connection::Packet packet = receivePacket(); @@ -157,10 +143,8 @@ namespace DB case Protocol::Server::ProfileInfo: case Protocol::Server::Totals: case Protocol::Server::Extremes: - break; - case Protocol::Server::EndOfStream: - return res; + break; case Protocol::Server::Exception: default: @@ -189,7 +173,7 @@ namespace DB return os.str(); } - Connection ** ParallelReplicas::waitForReadEvent() + ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent() { Poco::Net::Socket::SocketList read_list; read_list.reserve(active_connection_count); @@ -214,13 +198,10 @@ namespace DB } int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000); if (n == 0) - return nullptr; + return replica_map.end(); } auto & socket = read_list[rand() % read_list.size()]; - auto it = replica_map.find(socket.impl()->sockfd()); - if (it == replica_map.end()) - throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA); - return &(it->second); + return replica_map.find(socket.impl()->sockfd()); } } From c1a771eb666d679dc0365f11bdf94ad83d2d0826 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Fri, 6 Feb 2015 00:10:29 +0300 Subject: [PATCH 05/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/DataStreams/RemoteBlockInputStream.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index d7fb41f9af6..ca128c7b121 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -209,8 +209,11 @@ protected: case Protocol::Server::EndOfStream: if (!use_many_replicas || !parallel_replicas->hasActiveConnections()) + { finished = true; - return Block(); + return Block(); + } + break; case Protocol::Server::Progress: /** Используем прогресс с удалённого сервера. From 4a002773a8513c10069567483cf8140a1ce10396 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Fri, 6 Feb 2015 01:31:03 +0300 Subject: [PATCH 06/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/Client/ParallelReplicas.h | 13 +- .../DB/DataStreams/RemoteBlockInputStream.h | 135 ++++-------------- dbms/src/Client/ParallelReplicas.cpp | 109 ++++++++++---- 3 files changed, 120 insertions(+), 137 deletions(-) diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index 82a2daf3179..9b7fdcff586 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -11,7 +11,8 @@ namespace DB class ParallelReplicas final { public: - ParallelReplicas(std::vector & entries, const Settings & settings_); + ParallelReplicas(Connection * connection_, const Settings * settings_); + ParallelReplicas(std::vector & entries, const Settings * settings_); ParallelReplicas(const ParallelReplicas &) = delete; ParallelReplicas & operator=(const ParallelReplicas &) = delete; @@ -49,14 +50,22 @@ namespace DB using ReplicaMap = std::unordered_map; private: + /// Добавить соединение к реплике. + void addConnection(Connection * connection); + + void invalidateConnection(Connection * & connection); + + ReplicaMap::iterator getConnection(); + /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. /// Возвращает соединение на такую реплику, если оно найдётся. ReplicaMap::iterator waitForReadEvent(); private: - const Settings & settings; + const Settings * settings; ReplicaMap replica_map; size_t active_connection_count = 0; + bool supports_parallel_execution; bool sent_query = false; bool cancelled = false; }; diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index ca128c7b121..9afde2f3a7e 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -25,7 +25,6 @@ private: { send_settings = true; settings = *settings_; - use_many_replicas = (pool != nullptr) && (settings.max_parallel_replicas > 1); } else send_settings = false; @@ -85,20 +84,11 @@ public: if (sent_query && !was_cancelled && !finished && !got_exception_from_server) { - std::string addresses; - if (use_many_replicas) - addresses = parallel_replicas->dumpAddresses(); - else - addresses = connection->getServerAddress(); - + std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query"); /// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос. - if (use_many_replicas) - parallel_replicas->sendCancel(); - else - connection->sendCancel(); - + parallel_replicas->sendCancel(); was_cancelled = true; } } @@ -110,19 +100,14 @@ public: * чтобы оно не осталось висеть в рассихронизированном состоянии. */ if (sent_query && !finished) - { - if (use_many_replicas) - parallel_replicas->disconnect(); - else - connection->disconnect(); - } + parallel_replicas->disconnect(); } protected: /// Отправить на удаленные сервера все временные таблицы void sendExternalTables() { - size_t count = use_many_replicas ? parallel_replicas->size() : 1; + size_t count = parallel_replicas->size(); std::vector instances; instances.reserve(count); @@ -144,27 +129,18 @@ protected: instances.push_back(std::move(res)); } - if (use_many_replicas) - parallel_replicas->sendExternalTablesData(instances); - else - connection->sendExternalTablesData(instances[0]); + parallel_replicas->sendExternalTablesData(instances); } Block readImpl() override { if (!sent_query) { + bool use_many_replicas = (pool != nullptr) && send_settings && (settings.max_parallel_replicas > 1); if (use_many_replicas) { pool_entries = pool->getMany(&settings); - if (pool_entries.size() > 1) - parallel_replicas = ext::make_unique(pool_entries, settings); - else - { - /// NOTE IConnectionPool::getMany() всегда возвращает как минимум одно соединение. - use_many_replicas = false; - connection = &*pool_entries[0]; - } + parallel_replicas = ext::make_unique(pool_entries, &settings); } else { @@ -174,20 +150,17 @@ protected: pool_entry = pool->get(send_settings ? &settings : nullptr); connection = &*pool_entry; } + parallel_replicas = ext::make_unique(connection, send_settings ? &settings : nullptr); } - if (use_many_replicas) - parallel_replicas->sendQuery(query, "", stage, true); - else - connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true); - + parallel_replicas->sendQuery(query, "", stage, true); sendExternalTables(); sent_query = true; } while (true) { - Connection::Packet packet = use_many_replicas ? parallel_replicas->receivePacket() : connection->receivePacket(); + Connection::Packet packet = parallel_replicas->receivePacket(); switch (packet.type) { @@ -199,16 +172,13 @@ protected: case Protocol::Server::Exception: got_exception_from_server = true; - if (use_many_replicas) - { - parallel_replicas->sendCancel(); - (void) parallel_replicas->drain(); - } + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); packet.exception->rethrow(); break; case Protocol::Server::EndOfStream: - if (!use_many_replicas || !parallel_replicas->hasActiveConnections()) + if (!parallel_replicas->hasActiveConnections()) { finished = true; return Block(); @@ -242,11 +212,8 @@ protected: break; default: - if (use_many_replicas) - { - parallel_replicas->sendCancel(); - (void) parallel_replicas->drain(); - } + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } @@ -270,74 +237,34 @@ protected: /// Отправим просьбу прервать выполнение запроса, если ещё не отправляли. if (!was_cancelled) { - std::string addresses; - if (use_many_replicas) - addresses = parallel_replicas->dumpAddresses(); - else - addresses = connection->getServerAddress(); - + std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query because enough data has been read"); was_cancelled = true; - - if (use_many_replicas) - parallel_replicas->sendCancel(); - else - connection->sendCancel(); + parallel_replicas->sendCancel(); } - if (use_many_replicas) + /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером. + Connection::Packet packet = parallel_replicas->drain(); + switch (packet.type) { - Connection::Packet packet = parallel_replicas->drain(); - switch (packet.type) - { - case Protocol::Server::EndOfStream: - finished = true; - break; + case Protocol::Server::EndOfStream: + finished = true; + break; - case Protocol::Server::Exception: - got_exception_from_server = true; - packet.exception->rethrow(); - break; + case Protocol::Server::Exception: + got_exception_from_server = true; + packet.exception->rethrow(); + break; - default: - throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); - } - } - else - { - /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером. - while (true) - { - Connection::Packet packet = connection->receivePacket(); - - switch (packet.type) - { - case Protocol::Server::Data: - case Protocol::Server::Progress: - case Protocol::Server::ProfileInfo: - case Protocol::Server::Totals: - case Protocol::Server::Extremes: - break; - - case Protocol::Server::EndOfStream: - finished = true; - return; - - case Protocol::Server::Exception: - got_exception_from_server = true; - packet.exception->rethrow(); - break; - - default: - throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); - } - } + default: + throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } private: IConnectionPool * pool = nullptr; + ConnectionPool::Entry pool_entry; Connection * connection = nullptr; @@ -352,8 +279,6 @@ private: QueryProcessingStage::Enum stage; Context context; - bool use_many_replicas = false; - /// Отправили запрос (это делается перед получением первого блока). bool sent_query = false; diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index 1aa852d1473..f62c5fe7d46 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -3,21 +3,27 @@ namespace DB { - ParallelReplicas::ParallelReplicas(std::vector & entries, const Settings & settings_) : - settings(settings_), - active_connection_count(entries.size()) + ParallelReplicas::ParallelReplicas(Connection * connection_, const Settings * settings_) + : settings(settings_), + active_connection_count(1), + supports_parallel_execution(false) { - replica_map.reserve(entries.size()); + addConnection(connection_); + } - for (auto & entry : entries) - { - Connection * connection = &*entry; - if (connection == nullptr) - throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR); - auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); - if (!res.second) - throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); - } + ParallelReplicas::ParallelReplicas(std::vector & entries_, const Settings * settings_) + : settings(settings_), + active_connection_count(entries_.size()), + supports_parallel_execution(active_connection_count > 1) + { + if (supports_parallel_execution && (settings == nullptr)) + throw Exception("Settings are required for parallel execution", ErrorCodes::LOGICAL_ERROR); + if (active_connection_count == 0) + throw Exception("No connection specified", ErrorCodes::LOGICAL_ERROR); + + replica_map.reserve(active_connection_count); + for (auto & entry : entries_) + addConnection(&*entry); } void ParallelReplicas::sendExternalTablesData(std::vector & data) @@ -43,22 +49,36 @@ namespace DB if (sent_query) throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); - Settings query_settings = settings; - query_settings.parallel_replicas_count = replica_map.size(); - UInt64 offset = 0; - - for (auto & e : replica_map) + if (supports_parallel_execution) { - Connection * connection = e.second; + Settings query_settings = *settings; + query_settings.parallel_replicas_count = active_connection_count; + UInt64 offset = 0; + + for (auto & e : replica_map) + { + Connection * connection = e.second; + if (connection != nullptr) + { + query_settings.parallel_replica_offset = offset; + connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); + ++offset; + } + } + + if (offset > 0) + sent_query = true; + } + else + { + auto it = replica_map.begin(); + Connection * connection = it->second; if (connection != nullptr) { - query_settings.parallel_replica_offset = offset; - connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); - ++offset; + connection->sendQuery(query, query_id, stage, settings, with_pending_data); + sent_query = true; } } - - sent_query = true; } Connection::Packet ParallelReplicas::receivePacket() @@ -68,7 +88,7 @@ namespace DB if (!hasActiveConnections()) throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR); - auto it = waitForReadEvent(); + auto it = getConnection(); if (it == replica_map.end()) throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA); @@ -87,8 +107,7 @@ namespace DB case Protocol::Server::EndOfStream: case Protocol::Server::Exception: default: - connection = nullptr; - --active_connection_count; + invalidateConnection(connection); break; } @@ -103,8 +122,7 @@ namespace DB if (connection != nullptr) { connection->disconnect(); - connection = nullptr; - --active_connection_count; + invalidateConnection(connection); } } } @@ -173,6 +191,37 @@ namespace DB return os.str(); } + void ParallelReplicas::addConnection(Connection * connection) + { + if (connection == nullptr) + throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR); + auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); + if (!res.second) + throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); + } + + void ParallelReplicas::invalidateConnection(Connection * & connection) + { + connection = nullptr; + --active_connection_count; + } + + ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getConnection() + { + ReplicaMap::iterator it; + + if (supports_parallel_execution) + it = waitForReadEvent(); + else + { + it = replica_map.begin(); + if (it->second == nullptr) + it = replica_map.end(); + } + + return it; + } + ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent() { Poco::Net::Socket::SocketList read_list; @@ -196,7 +245,7 @@ namespace DB if (connection != nullptr) read_list.push_back(connection->socket); } - int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000); + int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000); if (n == 0) return replica_map.end(); } From 80aba1590139734dafff7ec8d2569f2dcb4e34e9 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Fri, 6 Feb 2015 13:41:03 +0300 Subject: [PATCH 07/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/Client/ParallelReplicas.h | 4 ++-- dbms/src/Client/ParallelReplicas.cpp | 22 +++++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index 9b7fdcff586..dff8114d5e5 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -53,14 +53,14 @@ namespace DB /// Добавить соединение к реплике. void addConnection(Connection * connection); - void invalidateConnection(Connection * & connection); - ReplicaMap::iterator getConnection(); /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. /// Возвращает соединение на такую реплику, если оно найдётся. ReplicaMap::iterator waitForReadEvent(); + void invalidateConnection(ReplicaMap::iterator it); + private: const Settings * settings; ReplicaMap replica_map; diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index f62c5fe7d46..1d98699c84b 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -92,7 +92,7 @@ namespace DB if (it == replica_map.end()) throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA); - Connection * & connection = it->second; + Connection * connection = it->second; Connection::Packet packet = connection->receivePacket(); switch (packet.type) @@ -107,7 +107,7 @@ namespace DB case Protocol::Server::EndOfStream: case Protocol::Server::Exception: default: - invalidateConnection(connection); + invalidateConnection(it); break; } @@ -116,13 +116,13 @@ namespace DB void ParallelReplicas::disconnect() { - for (auto & e : replica_map) + for (auto it = replica_map.begin(); it != replica_map.end(); ++it) { - Connection * & connection = e.second; + Connection * connection = it->second; if (connection != nullptr) { connection->disconnect(); - invalidateConnection(connection); + invalidateConnection(it); } } } @@ -200,12 +200,6 @@ namespace DB throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); } - void ParallelReplicas::invalidateConnection(Connection * & connection) - { - connection = nullptr; - --active_connection_count; - } - ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getConnection() { ReplicaMap::iterator it; @@ -253,4 +247,10 @@ namespace DB auto & socket = read_list[rand() % read_list.size()]; return replica_map.find(socket.impl()->sockfd()); } + + void ParallelReplicas::invalidateConnection(ParallelReplicas::ReplicaMap::iterator it) + { + it->second = nullptr; + --active_connection_count; + } } From e854aa6c71fd03eb5cfed94b1061cb9eb9fff072 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Fri, 6 Feb 2015 15:33:15 +0300 Subject: [PATCH 08/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- .../DB/DataStreams/RemoteBlockInputStream.h | 53 +++++++++++-------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 9afde2f3a7e..851488980f1 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -136,23 +136,7 @@ protected: { if (!sent_query) { - bool use_many_replicas = (pool != nullptr) && send_settings && (settings.max_parallel_replicas > 1); - if (use_many_replicas) - { - pool_entries = pool->getMany(&settings); - parallel_replicas = ext::make_unique(pool_entries, &settings); - } - else - { - /// Если надо - достаём соединение из пула. - if (pool) - { - pool_entry = pool->get(send_settings ? &settings : nullptr); - connection = &*pool_entry; - } - parallel_replicas = ext::make_unique(connection, send_settings ? &settings : nullptr); - } - + initParallelReplicas(); parallel_replicas->sendQuery(query, "", stage, true); sendExternalTables(); sent_query = true; @@ -172,8 +156,7 @@ protected: case Protocol::Server::Exception: got_exception_from_server = true; - parallel_replicas->sendCancel(); - (void) parallel_replicas->drain(); + abort(); packet.exception->rethrow(); break; @@ -212,8 +195,7 @@ protected: break; default: - parallel_replicas->sendCancel(); - (void) parallel_replicas->drain(); + abort(); throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } @@ -262,6 +244,15 @@ protected: } } + void abort() + { + std::string addresses = parallel_replicas->dumpAddresses(); + LOG_TRACE(log, "(" + addresses + ") Aborting query"); + + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); + } + private: IConnectionPool * pool = nullptr; @@ -306,6 +297,26 @@ private: static Context instance; return instance; } + + void initParallelReplicas() + { + bool use_many_replicas = (pool != nullptr) && send_settings && (settings.max_parallel_replicas > 1); + if (use_many_replicas) + { + pool_entries = pool->getMany(&settings); + parallel_replicas = ext::make_unique(pool_entries, &settings); + } + else + { + /// Если надо - достаём соединение из пула. + if (pool) + { + pool_entry = pool->get(send_settings ? &settings : nullptr); + connection = &*pool_entry; + } + parallel_replicas = ext::make_unique(connection, send_settings ? &settings : nullptr); + } + } }; } From 55acd52df243f15af1c2035a55db6b1188376dd3 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Fri, 6 Feb 2015 17:46:04 +0300 Subject: [PATCH 09/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/Client/ParallelReplicas.h | 10 +++-- .../DB/DataStreams/RemoteBlockInputStream.h | 25 +++--------- dbms/src/Client/ParallelReplicas.cpp | 39 +++++++++++++------ 3 files changed, 40 insertions(+), 34 deletions(-) diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index dff8114d5e5..6390f323759 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -11,8 +11,8 @@ namespace DB class ParallelReplicas final { public: - ParallelReplicas(Connection * connection_, const Settings * settings_); - ParallelReplicas(std::vector & entries, const Settings * settings_); + ParallelReplicas(Connection * connection_, Settings * settings_); + ParallelReplicas(IConnectionPool * pool_, Settings * settings_); ParallelReplicas(const ParallelReplicas &) = delete; ParallelReplicas & operator=(const ParallelReplicas &) = delete; @@ -62,8 +62,12 @@ namespace DB void invalidateConnection(ReplicaMap::iterator it); private: - const Settings * settings; + Settings * settings; ReplicaMap replica_map; + + std::vector pool_entries; + ConnectionPool::Entry pool_entry; + size_t active_connection_count = 0; bool supports_parallel_execution; bool sent_query = false; diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 851488980f1..147b86364d7 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -136,7 +136,7 @@ protected: { if (!sent_query) { - initParallelReplicas(); + createParallelReplicas(); parallel_replicas->sendQuery(query, "", stage, true); sendExternalTables(); sent_query = true; @@ -258,8 +258,6 @@ private: ConnectionPool::Entry pool_entry; Connection * connection = nullptr; - - std::vector pool_entries; std::unique_ptr parallel_replicas; const String query; @@ -298,24 +296,13 @@ private: return instance; } - void initParallelReplicas() + void createParallelReplicas() { - bool use_many_replicas = (pool != nullptr) && send_settings && (settings.max_parallel_replicas > 1); - if (use_many_replicas) - { - pool_entries = pool->getMany(&settings); - parallel_replicas = ext::make_unique(pool_entries, &settings); - } + Settings * parallel_replicas_settings = send_settings ? &settings : nullptr; + if (connection != nullptr) + parallel_replicas = ext::make_unique(connection, parallel_replicas_settings); else - { - /// Если надо - достаём соединение из пула. - if (pool) - { - pool_entry = pool->get(send_settings ? &settings : nullptr); - connection = &*pool_entry; - } - parallel_replicas = ext::make_unique(connection, send_settings ? &settings : nullptr); - } + parallel_replicas = ext::make_unique(pool, parallel_replicas_settings); } }; diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index 1d98699c84b..d7c2b49ddcf 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -3,7 +3,7 @@ namespace DB { - ParallelReplicas::ParallelReplicas(Connection * connection_, const Settings * settings_) + ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_) : settings(settings_), active_connection_count(1), supports_parallel_execution(false) @@ -11,19 +11,34 @@ namespace DB addConnection(connection_); } - ParallelReplicas::ParallelReplicas(std::vector & entries_, const Settings * settings_) - : settings(settings_), - active_connection_count(entries_.size()), - supports_parallel_execution(active_connection_count > 1) + ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_) + : settings(settings_) { - if (supports_parallel_execution && (settings == nullptr)) - throw Exception("Settings are required for parallel execution", ErrorCodes::LOGICAL_ERROR); - if (active_connection_count == 0) - throw Exception("No connection specified", ErrorCodes::LOGICAL_ERROR); + if (pool_ == nullptr) + throw Exception("Null pool specified", ErrorCodes::LOGICAL_ERROR); - replica_map.reserve(active_connection_count); - for (auto & entry : entries_) - addConnection(&*entry); + bool has_many_replicas = (settings != nullptr) && (settings->max_parallel_replicas > 1); + if (has_many_replicas) + { + pool_entries = pool_->getMany(settings); + active_connection_count = pool_entries.size(); + supports_parallel_execution = (active_connection_count > 1); + + if (active_connection_count == 0) + throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR); + + replica_map.reserve(active_connection_count); + for (auto & entry : pool_entries) + addConnection(&*entry); + } + else + { + active_connection_count = 1; + supports_parallel_execution = false; + + pool_entry = pool_->get(settings); + addConnection(&*pool_entry); + } } void ParallelReplicas::sendExternalTablesData(std::vector & data) From 7d5f75a8043e1f4dea415a2cfe4c81bdefb53da1 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Sat, 7 Feb 2015 01:32:54 +0300 Subject: [PATCH 10/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/Client/ParallelReplicas.h | 14 +++++++++++-- .../DB/DataStreams/RemoteBlockInputStream.h | 20 +++++++++---------- dbms/src/Client/ParallelReplicas.cpp | 9 +++++++++ 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index 6390f323759..b851ee61b6e 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -11,7 +11,10 @@ namespace DB class ParallelReplicas final { public: + /// Принимает готовое соединение. ParallelReplicas(Connection * connection_, Settings * settings_); + + /// Принимает пул, из которого нужно будет достать одно или несколько соединений. ParallelReplicas(IConnectionPool * pool_, Settings * settings_); ParallelReplicas(const ParallelReplicas &) = delete; @@ -43,6 +46,7 @@ namespace DB /// Возвращает количесто реплик. size_t size() const { return replica_map.size(); } + /// Проверить, есть ли действительные соединения к репликам. bool hasActiveConnections() const { return active_connection_count > 0; } private: @@ -50,15 +54,17 @@ namespace DB using ReplicaMap = std::unordered_map; private: - /// Добавить соединение к реплике. + /// Зарегистрировать соединение к реплике. void addConnection(Connection * connection); + /// Получить соединение к реплике, на которой можно прочитать данные. ReplicaMap::iterator getConnection(); /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. - /// Возвращает соединение на такую реплику, если оно найдётся. + /// Возвращает соединение к такой реплике, если оно найдётся. ReplicaMap::iterator waitForReadEvent(); + // Пометить соединение как недействительное. void invalidateConnection(ReplicaMap::iterator it); private: @@ -68,9 +74,13 @@ namespace DB std::vector pool_entries; ConnectionPool::Entry pool_entry; + /// Текущее количество действительных соединений к репликам. size_t active_connection_count = 0; + /// Запрос выполняется параллельно на нескольких репликах. bool supports_parallel_execution; + /// Отправили запрос bool sent_query = false; + /// Отменили запрос bool cancelled = false; }; } diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 147b86364d7..6e8dfc657bc 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -50,7 +50,7 @@ public: init(settings_); } - /// Принимает пул, из которого нужно будет достать соединение. + /// Принимает пул, из которого нужно будет достать одно или несколько соединений. RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, const Context & context = getDefaultContext()) @@ -244,6 +244,15 @@ protected: } } + void createParallelReplicas() + { + Settings * parallel_replicas_settings = send_settings ? &settings : nullptr; + if (connection != nullptr) + parallel_replicas = ext::make_unique(connection, parallel_replicas_settings); + else + parallel_replicas = ext::make_unique(pool, parallel_replicas_settings); + } + void abort() { std::string addresses = parallel_replicas->dumpAddresses(); @@ -295,15 +304,6 @@ private: static Context instance; return instance; } - - void createParallelReplicas() - { - Settings * parallel_replicas_settings = send_settings ? &settings : nullptr; - if (connection != nullptr) - parallel_replicas = ext::make_unique(connection, parallel_replicas_settings); - else - parallel_replicas = ext::make_unique(pool, parallel_replicas_settings); - } }; } diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index d7c2b49ddcf..13b4a7e0eda 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -120,8 +120,12 @@ namespace DB break; case Protocol::Server::EndOfStream: + invalidateConnection(it); + break; + case Protocol::Server::Exception: default: + connection->disconnect(); invalidateConnection(it); break; } @@ -181,6 +185,7 @@ namespace DB case Protocol::Server::Exception: default: + /// Если мы получили исключение или неизвестный пакет, сохраняем его. res = packet; break; } @@ -236,6 +241,8 @@ namespace DB Poco::Net::Socket::SocketList read_list; read_list.reserve(active_connection_count); + /// Сначала проверяем, есть ли данные, которые уже лежат в буфере + /// хоть одного соединения. for (auto & e : replica_map) { Connection * connection = e.second; @@ -243,6 +250,8 @@ namespace DB read_list.push_back(connection->socket); } + /// Если не было найдено никаких данных, то проверяем, есть ли соединения + /// готовые для чтения. if (read_list.empty()) { Poco::Net::Socket::SocketList write_list; From ba96a87097e00230b80b8c6a4d72b4f8b9104940 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Sat, 7 Feb 2015 20:12:29 +0300 Subject: [PATCH 11/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/Client/ParallelReplicas.h | 25 ++++++----- .../DB/DataStreams/RemoteBlockInputStream.h | 26 ++++++----- dbms/src/Client/ParallelReplicas.cpp | 44 +++++++++---------- 3 files changed, 50 insertions(+), 45 deletions(-) diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index b851ee61b6e..d57659ce87f 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -36,8 +36,9 @@ namespace DB /// Отменить запросы к репликам void sendCancel(); - /// Для каждой реплики получить оставшиеся пакеты после отмена запроса. - /// Возвращает либо последнее полученное исключение либо пакет EndOfStream. + /// На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception. + /// Возвращает EndOfStream, если не было получено никакого исключения. В противном + /// случае возвращает последний полученный пакет типа Exception. Connection::Packet drain(); /// Получить адреса реплик в виде строки. @@ -46,26 +47,26 @@ namespace DB /// Возвращает количесто реплик. size_t size() const { return replica_map.size(); } - /// Проверить, есть ли действительные соединения к репликам. - bool hasActiveConnections() const { return active_connection_count > 0; } + /// Проверить, есть ли действительные реплики. + bool hasActiveReplicas() const { return active_replica_count > 0; } private: /// Реплики хэшированные по id сокета using ReplicaMap = std::unordered_map; private: - /// Зарегистрировать соединение к реплике. - void addConnection(Connection * connection); + /// Зарегистрировать реплику. + void registerReplica(Connection * connection); - /// Получить соединение к реплике, на которой можно прочитать данные. - ReplicaMap::iterator getConnection(); + /// Получить реплику, на которой можно прочитать данные. + ReplicaMap::iterator getReplicaForReading(); /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. - /// Возвращает соединение к такой реплике, если оно найдётся. + /// Возвращает одну такую реплику, если она найдётся. ReplicaMap::iterator waitForReadEvent(); - // Пометить соединение как недействительное. - void invalidateConnection(ReplicaMap::iterator it); + // Пометить реплику как недействительную. + void invalidateReplica(ReplicaMap::iterator it); private: Settings * settings; @@ -75,7 +76,7 @@ namespace DB ConnectionPool::Entry pool_entry; /// Текущее количество действительных соединений к репликам. - size_t active_connection_count = 0; + size_t active_replica_count = 0; /// Запрос выполняется параллельно на нескольких репликах. bool supports_parallel_execution; /// Отправили запрос diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 6e8dfc657bc..add11992c1d 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -14,7 +14,7 @@ namespace DB { -/** Позволяет выполнить запрос (SELECT) на удалённом сервере и получить результат. +/** Позволяет выполнить запрос (SELECT) на удалённых репликах одного шарда и получить результат. */ class RemoteBlockInputStream : public IProfilingBlockInputStream { @@ -87,7 +87,7 @@ public: std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query"); - /// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос. + /// Если запрошено прервать запрос - попросим удалённые реплики тоже прервать запрос. parallel_replicas->sendCancel(); was_cancelled = true; } @@ -96,15 +96,15 @@ public: ~RemoteBlockInputStream() override { - /** Если прервались в середине цикла общения с сервером, то закрываем соединение, - * чтобы оно не осталось висеть в рассихронизированном состоянии. + /** Если прервались в середине цикла общения с репликами, то закрываем соединения, + * чтобы они не остались висеть в рассихронизированном состоянии. */ if (sent_query && !finished) parallel_replicas->disconnect(); } protected: - /// Отправить на удаленные сервера все временные таблицы + /// Отправить на удаленные реплики все временные таблицы void sendExternalTables() { size_t count = parallel_replicas->size(); @@ -161,7 +161,7 @@ protected: break; case Protocol::Server::EndOfStream: - if (!parallel_replicas->hasActiveConnections()) + if (!parallel_replicas->hasActiveReplicas()) { finished = true; return Block(); @@ -226,7 +226,7 @@ protected: parallel_replicas->sendCancel(); } - /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером. + /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами. Connection::Packet packet = parallel_replicas->drain(); switch (packet.type) { @@ -244,6 +244,7 @@ protected: } } + /// Создать структуру для общения с репликами одного шарда, на которых должен выполниться запрос. void createParallelReplicas() { Settings * parallel_replicas_settings = send_settings ? &settings : nullptr; @@ -253,6 +254,9 @@ protected: parallel_replicas = ext::make_unique(pool, parallel_replicas_settings); } + /// Если был получен пакет типа Exception или неизвестного типа, отменить запросы на всех + /// репликах. Читать и пропускать все оставшиеся пакеты до EndOfStream или Exception, чтобы + /// не было рассинхронизации в соединениях с репликами. void abort() { std::string addresses = parallel_replicas->dumpAddresses(); @@ -280,20 +284,20 @@ private: /// Отправили запрос (это делается перед получением первого блока). bool sent_query = false; - /** Получили все данные от сервера, до пакета EndOfStream. + /** Получили все данные от всех реплик, до пакета EndOfStream. * Если при уничтожении объекта, ещё не все данные считаны, - * то для того, чтобы не было рассинхронизации, на сервер отправляется просьба прервать выполнение запроса, + * то для того, чтобы не было рассинхронизации, на реплики отправляются просьбы прервать выполнение запроса, * и после этого считываются все пакеты до EndOfStream. */ bool finished = false; - /** На сервер была отправлена просьба прервать выполенение запроса, так как данные больше не нужны. + /** На каждую реплику была отправлена просьба прервать выполенение запроса, так как данные больше не нужны. * Это может быть из-за того, что данных достаточно (например, при использовании LIMIT), * или если на стороне клиента произошло исключение. */ bool was_cancelled = false; - /// С сервера было получено исключение. В этом случае получать больше пакетов или просить прервать запрос не нужно. + /// С одной репилки было получено исключение. В этом случае получать больше пакетов или просить прервать запрос не нужно. bool got_exception_from_server = false; Logger * log = &Logger::get("RemoteBlockInputStream"); diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index 13b4a7e0eda..f7f781aed7f 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -5,10 +5,10 @@ namespace DB { ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_) : settings(settings_), - active_connection_count(1), + active_replica_count(1), supports_parallel_execution(false) { - addConnection(connection_); + registerReplica(connection_); } ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_) @@ -21,23 +21,23 @@ namespace DB if (has_many_replicas) { pool_entries = pool_->getMany(settings); - active_connection_count = pool_entries.size(); - supports_parallel_execution = (active_connection_count > 1); + active_replica_count = pool_entries.size(); + supports_parallel_execution = (active_replica_count > 1); - if (active_connection_count == 0) + if (active_replica_count == 0) throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR); - replica_map.reserve(active_connection_count); + replica_map.reserve(active_replica_count); for (auto & entry : pool_entries) - addConnection(&*entry); + registerReplica(&*entry); } else { - active_connection_count = 1; + active_replica_count = 1; supports_parallel_execution = false; pool_entry = pool_->get(settings); - addConnection(&*pool_entry); + registerReplica(&*pool_entry); } } @@ -46,7 +46,7 @@ namespace DB if (!sent_query) throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR); - if (data.size() < active_connection_count) + if (data.size() < active_replica_count) throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); auto it = data.begin(); @@ -67,7 +67,7 @@ namespace DB if (supports_parallel_execution) { Settings query_settings = *settings; - query_settings.parallel_replicas_count = active_connection_count; + query_settings.parallel_replicas_count = active_replica_count; UInt64 offset = 0; for (auto & e : replica_map) @@ -100,10 +100,10 @@ namespace DB { if (!sent_query) throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); - if (!hasActiveConnections()) + if (!hasActiveReplicas()) throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR); - auto it = getConnection(); + auto it = getReplicaForReading(); if (it == replica_map.end()) throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA); @@ -120,13 +120,13 @@ namespace DB break; case Protocol::Server::EndOfStream: - invalidateConnection(it); + invalidateReplica(it); break; case Protocol::Server::Exception: default: connection->disconnect(); - invalidateConnection(it); + invalidateReplica(it); break; } @@ -141,7 +141,7 @@ namespace DB if (connection != nullptr) { connection->disconnect(); - invalidateConnection(it); + invalidateReplica(it); } } } @@ -169,7 +169,7 @@ namespace DB Connection::Packet res; res.type = Protocol::Server::EndOfStream; - while (hasActiveConnections()) + while (hasActiveReplicas()) { Connection::Packet packet = receivePacket(); @@ -211,7 +211,7 @@ namespace DB return os.str(); } - void ParallelReplicas::addConnection(Connection * connection) + void ParallelReplicas::registerReplica(Connection * connection) { if (connection == nullptr) throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR); @@ -220,7 +220,7 @@ namespace DB throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); } - ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getConnection() + ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading() { ReplicaMap::iterator it; @@ -239,7 +239,7 @@ namespace DB ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent() { Poco::Net::Socket::SocketList read_list; - read_list.reserve(active_connection_count); + read_list.reserve(active_replica_count); /// Сначала проверяем, есть ли данные, которые уже лежат в буфере /// хоть одного соединения. @@ -272,9 +272,9 @@ namespace DB return replica_map.find(socket.impl()->sockfd()); } - void ParallelReplicas::invalidateConnection(ParallelReplicas::ReplicaMap::iterator it) + void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it) { it->second = nullptr; - --active_connection_count; + --active_replica_count; } } From cede256a435bbb83fcb37978e378f731ea93aa4a Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Sat, 7 Feb 2015 22:39:16 +0300 Subject: [PATCH 12/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/Client/ParallelReplicas.h | 19 ++++---- .../DB/DataStreams/RemoteBlockInputStream.h | 43 ++++++++++++------- dbms/src/Client/ParallelReplicas.cpp | 10 +++-- 3 files changed, 44 insertions(+), 28 deletions(-) diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index d57659ce87f..a13de8a7baf 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -5,8 +5,7 @@ namespace DB { - /** - * Множество реплик одного шарда. + /** Множество реплик одного шарда. */ class ParallelReplicas final { @@ -36,9 +35,10 @@ namespace DB /// Отменить запросы к репликам void sendCancel(); - /// На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception. - /// Возвращает EndOfStream, если не было получено никакого исключения. В противном - /// случае возвращает последний полученный пакет типа Exception. + /** На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception. + * Возвращает EndOfStream, если не было получено никакого исключения. В противном + * случае возвращает последний полученный пакет типа Exception. + */ Connection::Packet drain(); /// Получить адреса реплик в виде строки. @@ -61,11 +61,12 @@ namespace DB /// Получить реплику, на которой можно прочитать данные. ReplicaMap::iterator getReplicaForReading(); - /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. - /// Возвращает одну такую реплику, если она найдётся. + /** Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. + * Возвращает одну такую реплику, если она найдётся. + */ ReplicaMap::iterator waitForReadEvent(); - // Пометить реплику как недействительную. + /// Пометить реплику как недействительную. void invalidateReplica(ReplicaMap::iterator it); private: @@ -76,7 +77,7 @@ namespace DB ConnectionPool::Entry pool_entry; /// Текущее количество действительных соединений к репликам. - size_t active_replica_count = 0; + size_t active_replica_count; /// Запрос выполняется параллельно на нескольких репликах. bool supports_parallel_execution; /// Отправили запрос diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index add11992c1d..f7b2e0ae4e3 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -82,7 +82,7 @@ public: if (!__sync_bool_compare_and_swap(&is_cancelled, false, true)) return; - if (sent_query && !was_cancelled && !finished && !got_exception_from_server) + if (sent_query && !was_cancelled && !finished && !got_exception_from_replica) { std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query"); @@ -155,7 +155,7 @@ protected: break; /// Если блок пустой - получим другие пакеты до EndOfStream. case Protocol::Server::Exception: - got_exception_from_server = true; + got_exception_from_replica = true; abort(); packet.exception->rethrow(); break; @@ -195,6 +195,7 @@ protected: break; default: + got_unknown_packet_from_replica = true; abort(); throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } @@ -209,7 +210,7 @@ protected: * - получили с сервера эксепшен; * - то больше читать ничего не нужно. */ - if (!sent_query || finished || got_exception_from_server) + if (!sent_query || finished || got_exception_from_replica) return; /** Если ещё прочитали не все данные, но они больше не нужны. @@ -235,16 +236,17 @@ protected: break; case Protocol::Server::Exception: - got_exception_from_server = true; + got_exception_from_replica = true; packet.exception->rethrow(); break; default: + got_unknown_packet_from_replica = true; throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } - /// Создать структуру для общения с репликами одного шарда, на которых должен выполниться запрос. + /// Создать объект для общения с репликами одного шарда, на которых должен выполниться запрос. void createParallelReplicas() { Settings * parallel_replicas_settings = send_settings ? &settings : nullptr; @@ -254,16 +256,20 @@ protected: parallel_replicas = ext::make_unique(pool, parallel_replicas_settings); } - /// Если был получен пакет типа Exception или неизвестного типа, отменить запросы на всех - /// репликах. Читать и пропускать все оставшиеся пакеты до EndOfStream или Exception, чтобы - /// не было рассинхронизации в соединениях с репликами. + /** Если с одной реплики был получен пакет Exception или неизвестный пакет, отменить запросы на всех + * остальных репликах. Читать и пропускать все оставшиеся пакеты до EndOfStream или Exception, чтобы + * не было рассинхронизации в соединениях с репликами. + */ void abort() { - std::string addresses = parallel_replicas->dumpAddresses(); - LOG_TRACE(log, "(" + addresses + ") Aborting query"); + if (got_exception_from_replica || got_unknown_packet_from_replica) + { + std::string addresses = parallel_replicas->dumpAddresses(); + LOG_TRACE(log, "(" + addresses + ") Aborting query"); - parallel_replicas->sendCancel(); - (void) parallel_replicas->drain(); + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); + } } private: @@ -291,14 +297,21 @@ private: */ bool finished = false; - /** На каждую реплику была отправлена просьба прервать выполенение запроса, так как данные больше не нужны. + /** На каждую реплику была отправлена просьба прервать выполнение запроса, так как данные больше не нужны. * Это может быть из-за того, что данных достаточно (например, при использовании LIMIT), * или если на стороне клиента произошло исключение. */ bool was_cancelled = false; - /// С одной репилки было получено исключение. В этом случае получать больше пакетов или просить прервать запрос не нужно. - bool got_exception_from_server = false; + /** С одной репилки было получено исключение. В этом случае получать больше пакетов или + * просить прервать запрос на этой реплике не нужно. + */ + bool got_exception_from_replica = false; + + /** С одной реплики был получен неизвестный пакет. В этом случае получать больше пакетов или + * просить прервать запрос на этой реплике не нужно. + */ + bool got_unknown_packet_from_replica = false; Logger * log = &Logger::get("RemoteBlockInputStream"); diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index f7f781aed7f..24624ba0585 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -241,8 +241,9 @@ namespace DB Poco::Net::Socket::SocketList read_list; read_list.reserve(active_replica_count); - /// Сначала проверяем, есть ли данные, которые уже лежат в буфере - /// хоть одного соединения. + /** Сначала проверяем, есть ли данные, которые уже лежат в буфере + * хоть одного соединения. + */ for (auto & e : replica_map) { Connection * connection = e.second; @@ -250,8 +251,9 @@ namespace DB read_list.push_back(connection->socket); } - /// Если не было найдено никаких данных, то проверяем, есть ли соединения - /// готовые для чтения. + /** Если не было найдено никаких данных, то проверяем, есть ли соединения + * готовые для чтения. + */ if (read_list.empty()) { Poco::Net::Socket::SocketList write_list; From 2e5d1041a1b0fe7f5b27160df8bcf0400323c4e1 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Sun, 8 Feb 2015 02:13:04 +0300 Subject: [PATCH 13/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/IO/BufferBase.h | 6 ++++++ dbms/include/DB/IO/ConcatReadBuffer.h | 12 ++++++------ dbms/include/DB/IO/ReadBuffer.h | 12 +++--------- dbms/include/DB/IO/ReadBufferFromFileDescriptor.h | 2 +- dbms/include/DB/IO/WriteBuffer.h | 8 ++++---- dbms/src/IO/ReadHelpers.cpp | 8 ++++---- dbms/src/IO/tests/mempbrk.cpp | 4 ++-- dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp | 2 +- 8 files changed, 27 insertions(+), 27 deletions(-) diff --git a/dbms/include/DB/IO/BufferBase.h b/dbms/include/DB/IO/BufferBase.h index d0d2b0f9cb1..a3afde8eb99 100644 --- a/dbms/include/DB/IO/BufferBase.h +++ b/dbms/include/DB/IO/BufferBase.h @@ -79,6 +79,12 @@ public: return bytes + offset(); } + /** Проверить, есть ли данные в буфере. */ + bool hasPendingData() const + { + return pos != working_buffer.end(); + } + protected: /// Ссылка на кусок памяти для буфера. Buffer internal_buffer; diff --git a/dbms/include/DB/IO/ConcatReadBuffer.h b/dbms/include/DB/IO/ConcatReadBuffer.h index f4f27f6562e..1f56b06fbdb 100644 --- a/dbms/include/DB/IO/ConcatReadBuffer.h +++ b/dbms/include/DB/IO/ConcatReadBuffer.h @@ -14,23 +14,23 @@ class ConcatReadBuffer : public ReadBuffer { public: typedef std::vector ReadBuffers; - + protected: ReadBuffers buffers; ReadBuffers::iterator current; - + bool nextImpl() { if (buffers.end() == current) return false; - + /// Первое чтение - if (working_buffer.size() == 0 && (*current)->position() != (*current)->buffer().end()) + if (working_buffer.size() == 0 && (*current)->hasPendingData()) { working_buffer = Buffer((*current)->position(), (*current)->buffer().end()); return true; } - + if (!(*current)->next()) { ++current; @@ -45,7 +45,7 @@ protected: return false; } } - + working_buffer = Buffer((*current)->position(), (*current)->buffer().end()); return true; } diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index 754412c5ee7..8ee26af6a33 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -46,7 +46,7 @@ public: bool res = nextImpl(); if (!res) working_buffer.resize(0); - + pos = working_buffer.begin(); return res; } @@ -54,7 +54,7 @@ public: inline void nextIfAtEnd() { - if (pos == working_buffer.end()) + if (!hasPendingData()) next(); } @@ -70,7 +70,7 @@ public: */ inline bool eof() { - return pos == working_buffer.end() && !next(); + return !hasPendingData() && !next(); } void ignore() @@ -143,12 +143,6 @@ public: return read(to, n); } - /** Проверить, есть ли данные в буфере для чтения. */ - bool hasPendingData() const - { - return pos != working_buffer.end(); - } - private: /** Прочитать следующие данные и заполнить ими буфер. * Вернуть false в случае конца, true иначе. diff --git a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h index ea89708910e..1a2febf61b5 100644 --- a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h @@ -80,7 +80,7 @@ public: if (new_pos + (working_buffer.end() - pos) == pos_in_file) return new_pos; - if (pos != working_buffer.end() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast(working_buffer.size())) + if (hasPendingData() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast(working_buffer.size())) { /// Остались в пределах буфера. pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size())); diff --git a/dbms/include/DB/IO/WriteBuffer.h b/dbms/include/DB/IO/WriteBuffer.h index c57575068b7..54f38242efc 100644 --- a/dbms/include/DB/IO/WriteBuffer.h +++ b/dbms/include/DB/IO/WriteBuffer.h @@ -44,7 +44,7 @@ public: pos = working_buffer.begin(); throw; } - + pos = working_buffer.begin(); } @@ -56,11 +56,11 @@ public: inline void nextIfAtEnd() { - if (pos == working_buffer.end()) + if (!hasPendingData()) next(); } - + void write(const char * from, size_t n) { size_t bytes_copied = 0; @@ -82,7 +82,7 @@ public: *pos = x; ++pos; } - + private: /** Записать данные, находящиеся в буфере (от начала буфера до текущей позиции). * Кинуть исключение, если что-то не так. diff --git a/dbms/src/IO/ReadHelpers.cpp b/dbms/src/IO/ReadHelpers.cpp index dac6048f6d7..06de94e0cee 100644 --- a/dbms/src/IO/ReadHelpers.cpp +++ b/dbms/src/IO/ReadHelpers.cpp @@ -55,7 +55,7 @@ void readString(String & s, ReadBuffer & buf) s.append(buf.position(), bytes); buf.position() += bytes; - if (buf.position() != buf.buffer().end()) + if (buf.hasPendingData()) return; } } @@ -121,7 +121,7 @@ void readEscapedString(DB::String & s, DB::ReadBuffer & buf) s.append(buf.position(), next_pos - buf.position()); buf.position() += next_pos - buf.position(); - if (buf.position() == buf.buffer().end()) + if (!buf.hasPendingData()) continue; if (*buf.position() == '\t' || *buf.position() == '\n') @@ -191,8 +191,8 @@ static void readAnyQuotedString(String & s, ReadBuffer & buf) s.append(buf.position(), next_pos - buf.position()); buf.position() += next_pos - buf.position(); - - if (buf.position() == buf.buffer().end()) + + if (!buf.hasPendingData()) continue; if (*buf.position() == quote) diff --git a/dbms/src/IO/tests/mempbrk.cpp b/dbms/src/IO/tests/mempbrk.cpp index d2062c10f48..4c1461d05aa 100644 --- a/dbms/src/IO/tests/mempbrk.cpp +++ b/dbms/src/IO/tests/mempbrk.cpp @@ -63,7 +63,7 @@ namespace test return end; } - + void readEscapedString(DB::String & s, DB::ReadBuffer & buf) { s = ""; @@ -74,7 +74,7 @@ namespace test s.append(buf.position(), next_pos - buf.position()); buf.position() += next_pos - buf.position(); - if (buf.position() == buf.buffer().end()) + if (!buf.hasPendingData()) continue; if (*buf.position() == '\t' || *buf.position() == '\n') diff --git a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp index 5f2cff11b7f..00746849888 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp @@ -120,7 +120,7 @@ struct Stream /// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока, /// и на начало следующего. - if (uncompressed_hashing_buf.position() == uncompressed_hashing_buf.buffer().end()) + if (!uncompressed_hashing_buf.hasPendingData()) { /// Получим засечку, указывающую на конец предыдущего блока. has_alternative_mark = true; From 6521efcb2af1c0c5a7ab23f3e5fa6886ebd55088 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 8 Feb 2015 07:25:09 +0300 Subject: [PATCH 14/25] dbms: fixed error [#METR-2944]. --- dbms/include/DB/Core/StringRef.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/include/DB/Core/StringRef.h b/dbms/include/DB/Core/StringRef.h index e7ca944e33a..4ae264e4f3f 100644 --- a/dbms/include/DB/Core/StringRef.h +++ b/dbms/include/DB/Core/StringRef.h @@ -48,7 +48,7 @@ inline bool memequalSSE2Wide(const char * p1, const char * p2, size_t size) if ( compareSSE2(p1, p2) && compareSSE2(p1 + 16, p2 + 16) && compareSSE2(p1 + 32, p2 + 32) - && compareSSE2(p1 + 40, p2 + 40)) + && compareSSE2(p1 + 48, p2 + 48)) { p1 += 64; p2 += 64; From 8c0a540350d579ccd78555b7915dc3500a4d332f Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Mon, 9 Feb 2015 00:34:43 +0300 Subject: [PATCH 15/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/Client/ParallelReplicas.h | 3 -- .../DB/DataStreams/RemoteBlockInputStream.h | 48 +++++++++++-------- dbms/src/Client/ParallelReplicas.cpp | 13 ----- 3 files changed, 28 insertions(+), 36 deletions(-) diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index a13de8a7baf..47b4ade5a1f 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -29,9 +29,6 @@ namespace DB /// Получить пакет от какой-нибудь реплики. Connection::Packet receivePacket(); - /// Разорвать соединения к репликам - void disconnect(); - /// Отменить запросы к репликам void sendCancel(); diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index f7b2e0ae4e3..2d281eb9675 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -82,7 +82,7 @@ public: if (!__sync_bool_compare_and_swap(&is_cancelled, false, true)) return; - if (sent_query && !was_cancelled && !finished && !got_exception_from_replica) + if (isInProgress() && !hasThrownException() && !was_cancelled) { std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query"); @@ -96,11 +96,12 @@ public: ~RemoteBlockInputStream() override { - /** Если прервались в середине цикла общения с репликами, то закрываем соединения, - * чтобы они не остались висеть в рассихронизированном состоянии. + /** Если прервались в середине цикла общения с репликами, то прервываем + * все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы + * эти соединения не остались висеть в рассихронизированном состоянии. */ - if (sent_query && !finished) - parallel_replicas->disconnect(); + if (isInProgress()) + abort(); } protected: @@ -156,7 +157,6 @@ protected: case Protocol::Server::Exception: got_exception_from_replica = true; - abort(); packet.exception->rethrow(); break; @@ -177,7 +177,7 @@ protected: */ progressImpl(packet.progress); - if (!was_cancelled && !finished && isCancelled()) + if (!was_cancelled && isInProgress() && isCancelled()) cancel(); break; @@ -196,7 +196,6 @@ protected: default: got_unknown_packet_from_replica = true; - abort(); throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } @@ -207,10 +206,11 @@ protected: /** Если одно из: * - ничего не начинали делать; * - получили все пакеты до EndOfStream; - * - получили с сервера эксепшен; + * - получили с одной реплики эксепшен; + * - получили с одной реплики неизвестный пакет; * - то больше читать ничего не нужно. */ - if (!sent_query || finished || got_exception_from_replica) + if (!isInProgress() || hasThrownException()) return; /** Если ещё прочитали не все данные, но они больше не нужны. @@ -256,20 +256,28 @@ protected: parallel_replicas = ext::make_unique(pool, parallel_replicas_settings); } - /** Если с одной реплики был получен пакет Exception или неизвестный пакет, отменить запросы на всех - * остальных репликах. Читать и пропускать все оставшиеся пакеты до EndOfStream или Exception, чтобы - * не было рассинхронизации в соединениях с репликами. + /** Отменить запросы на всех репликах. Читать и пропускать все оставшиеся пакеты + * до EndOfStream или Exception, чтоб не было рассинхронизации в соединениях с репликами. */ void abort() { - if (got_exception_from_replica || got_unknown_packet_from_replica) - { - std::string addresses = parallel_replicas->dumpAddresses(); - LOG_TRACE(log, "(" + addresses + ") Aborting query"); + std::string addresses = parallel_replicas->dumpAddresses(); + LOG_TRACE(log, "(" + addresses + ") Aborting query"); - parallel_replicas->sendCancel(); - (void) parallel_replicas->drain(); - } + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); + } + + /// Возвращает true, если запрос отправлен, а ещё не выполнен. + bool isInProgress() const + { + return sent_query && !finished; + } + + /// Возвращает true, если исключение было выкинуто. + bool hasThrownException() const + { + return got_exception_from_replica || got_unknown_packet_from_replica; } private: diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index 24624ba0585..a1a0dafe43e 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -133,19 +133,6 @@ namespace DB return packet; } - void ParallelReplicas::disconnect() - { - for (auto it = replica_map.begin(); it != replica_map.end(); ++it) - { - Connection * connection = it->second; - if (connection != nullptr) - { - connection->disconnect(); - invalidateReplica(it); - } - } - } - void ParallelReplicas::sendCancel() { if (!sent_query || cancelled) From e8902aa644e341033a42b2b4a51fc648ac4af9e3 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Mon, 9 Feb 2015 01:37:55 +0300 Subject: [PATCH 16/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- .../DB/Storages/StorageReplicatedMergeTree.h | 4 -- .../Interpreters/InterpreterSelectQuery.cpp | 7 --- .../Storages/StorageReplicatedMergeTree.cpp | 2 +- ...4_distributed_with_many_replicas.reference | 45 ---------------- .../00124_distributed_with_many_replicas.sql | 52 +------------------ 5 files changed, 2 insertions(+), 108 deletions(-) diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index b2944827e7a..9f9f83b4c1c 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -103,8 +103,6 @@ public: /// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке. void enqueuePartForCheck(const String & name); - void skipUnreplicated() { process_unreplicated = false; } - MergeTreeData & getData() { return data; } MergeTreeData * getUnreplicatedData() { return unreplicated_data.get(); } @@ -166,8 +164,6 @@ private: current_zookeeper = zookeeper; } - bool process_unreplicated = true; - /// Если true, таблица в офлайновом режиме, и в нее нельзя писать. bool is_readonly = false; diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 90beac1638d..264d8aa4b40 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -25,7 +25,6 @@ #include #include -#include #include #include @@ -105,12 +104,6 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn + " does not support parallel execution on several replicas", ErrorCodes::STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS); - if (StorageReplicatedMergeTree * storage_replicated_merge_tree = typeid_cast(&*storage)) - { - if (settings.parallel_replica_offset > 0) - storage_replicated_merge_tree->skipUnreplicated(); - } - table_lock = storage->lockStructure(false); if (table_column_names.empty()) context.setColumns(storage->getColumnsListNonMaterialized()); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index e14520cfc77..c8484c491de 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2027,7 +2027,7 @@ BlockInputStreams StorageReplicatedMergeTree::read( size_t part_index = 0; - if (process_unreplicated && unreplicated_reader && values.count(0)) + if ((settings.parallel_replica_offset == 0) && unreplicated_reader && values.count(0)) { res = unreplicated_reader->read(real_column_names, query, context, settings, processed_stage, diff --git a/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.reference b/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.reference index efa2d8b20ab..f2c230b213c 100644 --- a/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.reference +++ b/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.reference @@ -3,48 +3,3 @@ 3 2015-03-01 3 foo 4 2015-04-01 4 bar 5 2015-05-01 5 foo -6 2015-06-01 6 bar -7 2015-07-01 7 foo -8 2015-08-01 8 bar -9 2015-09-01 9 foo -10 2015-10-01 10 bar -11 2015-11-01 1 foo -12 2015-12-01 2 bar -13 2015-01-01 3 foo -14 2015-02-01 4 bar -15 2015-03-01 5 foo -16 2015-04-01 6 bar -17 2015-05-01 7 foo -18 2015-06-01 8 bar -19 2015-07-01 9 foo -20 2015-08-01 10 bar -21 2015-09-01 1 foo -22 2015-10-01 2 bar -23 2015-11-01 3 foo -24 2015-12-01 4 bar -25 2015-01-01 5 foo -26 2015-02-01 6 bar -27 2015-03-01 7 foo -28 2015-04-01 8 bar -29 2015-05-01 9 foo -30 2015-06-01 10 bar -31 2015-07-01 1 foo -32 2015-08-01 2 bar -33 2015-09-01 3 foo -34 2015-10-01 4 bar -35 2015-11-01 5 foo -36 2015-12-01 6 bar -37 2015-01-01 7 foo -38 2015-02-01 8 bar -39 2015-03-01 9 foo -40 2015-04-01 10 bar -41 2015-05-01 1 foo -42 2015-06-01 2 bar -43 2015-07-01 3 foo -44 2015-08-01 4 bar -45 2015-09-01 5 foo -46 2015-10-01 6 bar -47 2015-11-01 7 foo -48 2015-12-01 8 bar -49 2015-01-01 9 foo -50 2015-02-01 10 bar diff --git a/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.sql b/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.sql index a97ce710c2e..489c46eb8f2 100644 --- a/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.sql +++ b/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.sql @@ -4,57 +4,7 @@ DROP TABLE IF EXISTS test.report; CREATE TABLE test.report(id UInt32, event_date Date, priority UInt32, description String) ENGINE = MergeTree(event_date, intHash32(id), (id, event_date, intHash32(id)), 8192); -INSERT INTO test.report(id,event_date,priority,description) VALUES(1, '2015-01-01', 1, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(2, '2015-02-01', 2, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(3, '2015-03-01', 3, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(4, '2015-04-01', 4, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(5, '2015-05-01', 5, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(6, '2015-06-01', 6, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(7, '2015-07-01', 7, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(8, '2015-08-01', 8, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(9, '2015-09-01', 9, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(10, '2015-10-01', 10, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(11, '2015-11-01', 1, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(12, '2015-12-01', 2, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(13, '2015-01-01', 3, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(14, '2015-02-01', 4, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(15, '2015-03-01', 5, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(16, '2015-04-01', 6, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(17, '2015-05-01', 7, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(18, '2015-06-01', 8, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(19, '2015-07-01', 9, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(20, '2015-08-01', 10, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(21, '2015-09-01', 1, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(22, '2015-10-01', 2, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(23, '2015-11-01', 3, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(24, '2015-12-01', 4, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(25, '2015-01-01', 5, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(26, '2015-02-01', 6, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(27, '2015-03-01', 7, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(28, '2015-04-01', 8, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(29, '2015-05-01', 9, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(30, '2015-06-01', 10, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(31, '2015-07-01', 1, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(32, '2015-08-01', 2, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(33, '2015-09-01', 3, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(34, '2015-10-01', 4, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(35, '2015-11-01', 5, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(36, '2015-12-01', 6, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(37, '2015-01-01', 7, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(38, '2015-02-01', 8, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(39, '2015-03-01', 9, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(40, '2015-04-01', 10, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(41, '2015-05-01', 1, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(42, '2015-06-01', 2, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(43, '2015-07-01', 3, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(44, '2015-08-01', 4, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(45, '2015-09-01', 5, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(46, '2015-10-01', 6, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(47, '2015-11-01', 7, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(48, '2015-12-01', 8, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(49, '2015-01-01', 9, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(50, '2015-02-01', 10, 'bar'); - +INSERT INTO test.report(id,event_date,priority,description) VALUES (1, '2015-01-01', 1, 'foo')(2, '2015-02-01', 2, 'bar')(3, '2015-03-01', 3, 'foo')(4, '2015-04-01', 4, 'bar')(5, '2015-05-01', 5, 'foo'); SELECT * FROM (SELECT id, event_date, priority, description FROM remote('127.0.0.{1|2}', test, report)) ORDER BY id ASC; DROP TABLE test.report; From bfbe878f99af918a33df1da70a3852c3360c7c81 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Fri, 6 Feb 2015 13:35:35 +0300 Subject: [PATCH 17/25] dbms: properly delay dictionary update time in case of exception [#METR-13298] --- dbms/src/Interpreters/Dictionaries.cpp | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dbms/src/Interpreters/Dictionaries.cpp b/dbms/src/Interpreters/Dictionaries.cpp index ee83289af47..bf8b38da787 100644 --- a/dbms/src/Interpreters/Dictionaries.cpp +++ b/dbms/src/Interpreters/Dictionaries.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB { @@ -125,6 +126,12 @@ void Dictionaries::reloadExternals() if (std::chrono::system_clock::now() < update_time) continue; + scope_exit({ + /// calculate next update time + std::uniform_int_distribution distribution{lifetime.min_sec, lifetime.max_sec}; + update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + }); + /// check source modified if (current->getSource()->isModified()) { @@ -132,10 +139,6 @@ void Dictionaries::reloadExternals() auto new_version = current->clone(); dictionary.second->set(new_version.release()); } - - /// calculate next update time - std::uniform_int_distribution distribution{lifetime.min_sec, lifetime.max_sec}; - update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; } } catch (...) From 19e3f7a561fceca209c9aa7e5c926878fb971f6c Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 9 Feb 2015 12:49:21 +0300 Subject: [PATCH 18/25] dbms: remove unused header --- dbms/src/Core/Block.cpp | 1 - dbms/src/Interpreters/InterpreterAlterQuery.cpp | 3 --- 2 files changed, 4 deletions(-) diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index d96d8c974e2..336adfa9b78 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -11,7 +11,6 @@ #include #include -#include #include #include diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index 10d50729f75..0b27a35a221 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -10,9 +10,6 @@ #include #include #include -#include -#include -#include #include From 8c62be82d1013a12f2f75d3c1632ad622088b365 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 9 Feb 2015 12:51:08 +0300 Subject: [PATCH 19/25] dbms: make StorageFactory a singleton --- dbms/include/DB/Interpreters/Context.h | 3 --- dbms/include/DB/Storages/StorageFactory.h | 3 ++- dbms/src/Interpreters/InterpreterCreateQuery.cpp | 3 ++- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index c3f08b52fcd..3cdc20dde70 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -86,7 +85,6 @@ struct ContextShared TableFunctionFactory table_function_factory; /// Табличные функции. AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции. DataTypeFactory data_type_factory; /// Типы данных. - StorageFactory storage_factory; /// Движки таблиц. FormatFactory format_factory; /// Форматы. mutable SharedPtr dictionaries; /// Словари Метрики. Инициализируются лениво. Users users; /// Известные пользователи. @@ -259,7 +257,6 @@ public: const TableFunctionFactory & getTableFunctionFactory() const { return shared->table_function_factory; } const AggregateFunctionFactory & getAggregateFunctionFactory() const { return shared->aggregate_function_factory; } const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; } - const StorageFactory & getStorageFactory() const { return shared->storage_factory; } const FormatFactory & getFormatFactory() const { return shared->format_factory; } const Dictionaries & getDictionaries() const; diff --git a/dbms/include/DB/Storages/StorageFactory.h b/dbms/include/DB/Storages/StorageFactory.h index cc6aa390379..832fa0ead11 100644 --- a/dbms/include/DB/Storages/StorageFactory.h +++ b/dbms/include/DB/Storages/StorageFactory.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB @@ -11,7 +12,7 @@ class Context; /** Позволяет создать таблицу по имени движка. */ -class StorageFactory +class StorageFactory : public Singleton { public: StoragePtr get( diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 95bc1d2993b..a345ffe63bc 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -194,7 +195,7 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists) else throw Exception("Incorrect CREATE query: required ENGINE.", ErrorCodes::ENGINE_REQUIRED); - res = context.getStorageFactory().get( + res = StorageFactory::instance().get( storage_name, data_path, table_name, database_name, context, context.getGlobalContext(), query_ptr, columns, materialized_columns, alias_columns, column_defaults, create.attach); From a485aacc54a2ef66aba19d423fa61f97198516e1 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 9 Feb 2015 13:10:25 +0300 Subject: [PATCH 20/25] dbms: fix external dictionaries exception on empty path [#METR-13298] --- dbms/src/Interpreters/Dictionaries.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/Dictionaries.cpp b/dbms/src/Interpreters/Dictionaries.cpp index bf8b38da787..a4e713e68eb 100644 --- a/dbms/src/Interpreters/Dictionaries.cpp +++ b/dbms/src/Interpreters/Dictionaries.cpp @@ -34,7 +34,7 @@ void Dictionaries::reloadExternals() const auto config_path = getDictionariesConfigPath(Poco::Util::Application::instance().config()); const Poco::File config_file{config_path}; - if (!config_file.exists()) + if (config_path.empty() || !config_file.exists()) { LOG_WARNING(log, "config file '" + config_path + "' does not exist"); } From 77f262f8a833fe38c80452a2807ea9e2e4de3825 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Mon, 9 Feb 2015 13:51:36 +0300 Subject: [PATCH 21/25] dbms: Server: queries with several replicas: fixes [#METR-14410] --- .../DB/DataStreams/RemoteBlockInputStream.h | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 2d281eb9675..5d37b757df3 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -82,14 +82,14 @@ public: if (!__sync_bool_compare_and_swap(&is_cancelled, false, true)) return; - if (isInProgress() && !hasThrownException() && !was_cancelled) + if (isQueryInProgress() && !hasThrownException()) { std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query"); /// Если запрошено прервать запрос - попросим удалённые реплики тоже прервать запрос. - parallel_replicas->sendCancel(); was_cancelled = true; + parallel_replicas->sendCancel(); } } @@ -100,8 +100,14 @@ public: * все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы * эти соединения не остались висеть в рассихронизированном состоянии. */ - if (isInProgress()) - abort(); + if (isQueryInProgress()) + { + std::string addresses = parallel_replicas->dumpAddresses(); + LOG_TRACE(log, "(" + addresses + ") Aborting query"); + + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); + } } protected: @@ -177,7 +183,7 @@ protected: */ progressImpl(packet.progress); - if (!was_cancelled && isInProgress() && isCancelled()) + if (isQueryInProgress() && isCancelled()) cancel(); break; @@ -210,7 +216,7 @@ protected: * - получили с одной реплики неизвестный пакет; * - то больше читать ничего не нужно. */ - if (!isInProgress() || hasThrownException()) + if (hasNoQueryInProgress() || hasThrownException()) return; /** Если ещё прочитали не все данные, но они больше не нужны. @@ -256,22 +262,16 @@ protected: parallel_replicas = ext::make_unique(pool, parallel_replicas_settings); } - /** Отменить запросы на всех репликах. Читать и пропускать все оставшиеся пакеты - * до EndOfStream или Exception, чтоб не было рассинхронизации в соединениях с репликами. - */ - void abort() + /// Возвращает true, если запрос отправлен, а ещё не выполнен. + bool isQueryInProgress() const { - std::string addresses = parallel_replicas->dumpAddresses(); - LOG_TRACE(log, "(" + addresses + ") Aborting query"); - - parallel_replicas->sendCancel(); - (void) parallel_replicas->drain(); + return sent_query && !finished && !was_cancelled; } - /// Возвращает true, если запрос отправлен, а ещё не выполнен. - bool isInProgress() const + /// Возвращает true, если никакой запрос не отправлен или один запрос уже выполнен. + bool hasNoQueryInProgress() const { - return sent_query && !finished; + return !sent_query || finished; } /// Возвращает true, если исключение было выкинуто. From a36c7f0001520c5344dfa7b0a91b0fd331571dd5 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Mon, 9 Feb 2015 15:56:46 +0300 Subject: [PATCH 22/25] dbms: Server: removed unneeded include [#METR-14410] --- dbms/src/Client/ParallelReplicas.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index a1a0dafe43e..9e5fee8c3d7 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -1,5 +1,4 @@ #include -#include namespace DB { From f6b34a25e0665546912360afda010ad42d498df1 Mon Sep 17 00:00:00 2001 From: Sergey Magidovich Date: Fri, 23 Jan 2015 18:51:03 +0300 Subject: [PATCH 23/25] =?UTF-8?q?=E2=96=88=E2=96=88=E2=96=88=E2=96=88?= =?UTF-8?q?=E2=96=88=E2=96=88=E2=96=88=E2=96=88=E2=96=88=E2=96=88=E2=96=88?= =?UTF-8?q?:=20Fix=20exception=20evoiding.=20[#MOBMET-1238]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dbms/include/DB/IO/ReadBuffer.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index 8ee26af6a33..da00c643403 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -40,7 +40,7 @@ public: /** прочитать следующие данные и заполнить ими буфер; переместить позицию в начало; * вернуть false в случае конца, true иначе; кинуть исключение, если что-то не так */ - inline bool next() + __attribute__((noinline)) bool next() { bytes += offset(); bool res = nextImpl(); @@ -68,7 +68,7 @@ public: * * При попытке чтения после конца, следует кидать исключение. */ - inline bool eof() + bool __attribute__ ((noinline)) eof() { return !hasPendingData() && !next(); } From 74f89e1199be69e31025516afbd82cdf03c6da76 Mon Sep 17 00:00:00 2001 From: Sergey Magidovich Date: Thu, 5 Feb 2015 17:28:48 +0300 Subject: [PATCH 24/25] =?UTF-8?q?=E2=96=88=E2=96=88=E2=96=88=E2=96=88?= =?UTF-8?q?=E2=96=88=E2=96=88=E2=96=88=E2=96=88=E2=96=88=E2=96=88=E2=96=88?= =?UTF-8?q?:=20Remove=20useless=20noinline.=20[#MOBMET-1116],=20[#MOBMET-1?= =?UTF-8?q?209],=20[#MOBMET-737]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dbms/include/DB/IO/ReadBuffer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index da00c643403..d4219dc0469 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -40,7 +40,7 @@ public: /** прочитать следующие данные и заполнить ими буфер; переместить позицию в начало; * вернуть false в случае конца, true иначе; кинуть исключение, если что-то не так */ - __attribute__((noinline)) bool next() + bool next() { bytes += offset(); bool res = nextImpl(); From a026585b189b513db813a03f1a86d8d9c8abfe72 Mon Sep 17 00:00:00 2001 From: Sergey Magidovich Date: Sun, 8 Feb 2015 12:42:53 +0300 Subject: [PATCH 25/25] dbms: Remove noinlene. Problem solved using gcc-4.9. [#MOBMET-1238] --- dbms/include/DB/IO/ReadBuffer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index d4219dc0469..4e97039fce9 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -68,7 +68,7 @@ public: * * При попытке чтения после конца, следует кидать исключение. */ - bool __attribute__ ((noinline)) eof() + bool eof() { return !hasPendingData() && !next(); }