diff --git a/src/DataStreams/RemoteBlockInputStream.cpp b/src/DataStreams/RemoteBlockInputStream.cpp index d5e77bb0759..c7c5ce2d00a 100644 --- a/src/DataStreams/RemoteBlockInputStream.cpp +++ b/src/DataStreams/RemoteBlockInputStream.cpp @@ -1,103 +1,46 @@ #include -#include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include - -#include -#include - namespace DB { -namespace ErrorCodes -{ - extern const int UNKNOWN_PACKET_FROM_SERVER; -} - - RemoteBlockInputStream::RemoteBlockInputStream( Connection & connection, const String & query_, const Block & header_, const Context & context_, const Settings * settings, const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) - : header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_) + : query_executor(connection, query_, header_, context_, settings, throttler, scalars_, external_tables_, stage_) { - if (settings) - context.setSettings(*settings); - - create_multiplexed_connections = [this, &connection, throttler]() - { - return std::make_unique(connection, context.getSettingsRef(), throttler); - }; + init(); } RemoteBlockInputStream::RemoteBlockInputStream( std::vector && connections, const String & query_, const Block & header_, const Context & context_, const Settings * settings, const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) - : header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_) + : query_executor(std::move(connections), query_, header_, context_, settings, throttler, scalars_, external_tables_, stage_) { - if (settings) - context.setSettings(*settings); - - create_multiplexed_connections = [this, connections, throttler]() mutable - { - return std::make_unique( - std::move(connections), context.getSettingsRef(), throttler); - }; + init(); } RemoteBlockInputStream::RemoteBlockInputStream( const ConnectionPoolWithFailoverPtr & pool, const String & query_, const Block & header_, const Context & context_, const Settings * settings, const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) - : header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_) + : query_executor(pool, query_, header_, context_, settings, throttler, scalars_, external_tables_, stage_) { - if (settings) - context.setSettings(*settings); - - create_multiplexed_connections = [this, pool, throttler]() - { - const Settings & current_settings = context.getSettingsRef(); - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); - std::vector connections; - if (main_table) - { - auto try_results = pool->getManyChecked(timeouts, ¤t_settings, pool_mode, main_table.getQualifiedName()); - connections.reserve(try_results.size()); - for (auto & try_result : try_results) - connections.emplace_back(std::move(try_result.entry)); - } - else - connections = pool->getMany(timeouts, ¤t_settings, pool_mode); - - return std::make_unique( - std::move(connections), current_settings, throttler); - }; + init(); } -RemoteBlockInputStream::~RemoteBlockInputStream() +void RemoteBlockInputStream::init() { - /** If interrupted in the middle of the loop of communication with replicas, then interrupt - * all connections, then read and skip the remaining packets to make sure - * these connections did not remain hanging in the out-of-sync state. - */ - if (established || isQueryPending()) - multiplexed_connections->disconnect(); + query_executor.setProgressCallback([this](const Progress & progress) { progressImpl(progress); }); + query_executor.setProfileInfoCallback([this](const BlockStreamProfileInfo & info_) { info.setFrom(info_, true); }); + query_executor.setLogger(log); } void RemoteBlockInputStream::readPrefix() { - if (!sent_query) - sendQuery(); + query_executor.sendQuery(); } void RemoteBlockInputStream::cancel(bool kill) @@ -109,280 +52,22 @@ void RemoteBlockInputStream::cancel(bool kill) if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) return; - { - std::lock_guard lock(external_tables_mutex); - - /// Stop sending external data. - for (auto & vec : external_tables_data) - for (auto & elem : vec) - elem->is_cancelled = true; - } - - if (!isQueryPending() || hasThrownException()) - return; - - tryCancel("Cancelling query"); + query_executor.cancel(); } -void RemoteBlockInputStream::sendScalars() -{ - multiplexed_connections->sendScalarsData(scalars); -} - -void RemoteBlockInputStream::sendExternalTables() -{ - size_t count = multiplexed_connections->size(); - - { - std::lock_guard lock(external_tables_mutex); - - external_tables_data.reserve(count); - - for (size_t i = 0; i < count; ++i) - { - ExternalTablesData res; - for (const auto & table : external_tables) - { - StoragePtr cur = table.second; - QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(context); - - Pipes pipes; - - pipes = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context, - read_from_table_stage, DEFAULT_BLOCK_SIZE, 1); - - auto data = std::make_unique(); - data->table_name = table.first; - - if (pipes.empty()) - data->pipe = std::make_unique(std::make_shared(cur->getSampleBlock(), Chunk())); - else if (pipes.size() == 1) - data->pipe = std::make_unique(std::move(pipes.front())); - else - { - auto concat = std::make_shared(pipes.front().getHeader(), pipes.size()); - data->pipe = std::make_unique(std::move(pipes), std::move(concat)); - } - - res.emplace_back(std::move(data)); - } - external_tables_data.push_back(std::move(res)); - } - } - - multiplexed_connections->sendExternalTablesData(external_tables_data); -} - - -/** If we receive a block with slightly different column types, or with excessive columns, - * we will adapt it to expected structure. - */ -static Block adaptBlockStructure(const Block & block, const Block & header) -{ - /// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest. - if (!header) - return block; - - Block res; - res.info = block.info; - - for (const auto & elem : header) - { - ColumnPtr column; - - if (elem.column && isColumnConst(*elem.column)) - { - /// We expect constant column in block. - /// If block is not empty, then get value for constant from it, - /// because it may be different for remote server for functions like version(), uptime(), ... - if (block.rows() > 0 && block.has(elem.name)) - { - /// Const column is passed as materialized. Get first value from it. - /// - /// TODO: check that column contains the same value. - /// TODO: serialize const columns. - auto col = block.getByName(elem.name); - col.column = block.getByName(elem.name).column->cut(0, 1); - - column = castColumn(col, elem.type); - - if (!isColumnConst(*column)) - column = ColumnConst::create(column, block.rows()); - else - /// It is not possible now. Just in case we support const columns serialization. - column = column->cloneResized(block.rows()); - } - else - column = elem.column->cloneResized(block.rows()); - } - else - column = castColumn(block.getByName(elem.name), elem.type); - - res.insert({column, elem.type, elem.name}); - } - return res; -} - - Block RemoteBlockInputStream::readImpl() { - if (!sent_query) - { - sendQuery(); + auto block = query_executor.read(); - if (context.getSettingsRef().skip_unavailable_shards && (0 == multiplexed_connections->size())) - return {}; - } + if (isCancelledOrThrowIfKilled()) + return Block(); - while (true) - { - if (isCancelledOrThrowIfKilled()) - return Block(); - - Packet packet = multiplexed_connections->receivePacket(); - - switch (packet.type) - { - case Protocol::Server::Data: - /// If the block is not empty and is not a header block - if (packet.block && (packet.block.rows() > 0)) - return adaptBlockStructure(packet.block, header); - break; /// If the block is empty - we will receive other packets before EndOfStream. - - case Protocol::Server::Exception: - got_exception_from_replica = true; - packet.exception->rethrow(); - break; - - case Protocol::Server::EndOfStream: - if (!multiplexed_connections->hasActiveConnections()) - { - finished = true; - return Block(); - } - break; - - case Protocol::Server::Progress: - /** We use the progress from a remote server. - * We also include in ProcessList, - * and we use it to check - * constraints (for example, the minimum speed of query execution) - * and quotas (for example, the number of lines to read). - */ - progressImpl(packet.progress); - break; - - case Protocol::Server::ProfileInfo: - /// Use own (client-side) info about read bytes, it is more correct info than server-side one. - info.setFrom(packet.profile_info, true); - break; - - case Protocol::Server::Totals: - totals = packet.block; - break; - - case Protocol::Server::Extremes: - extremes = packet.block; - break; - - case Protocol::Server::Log: - /// Pass logs from remote server to client - if (auto log_queue = CurrentThread::getInternalTextLogsQueue()) - log_queue->pushBlock(std::move(packet.block)); - break; - - default: - got_unknown_packet_from_replica = true; - throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); - } - } + return block; } void RemoteBlockInputStream::readSuffixImpl() { - /** If one of: - * - nothing started to do; - * - received all packets before EndOfStream; - * - received exception from one replica; - * - received an unknown packet from one replica; - * then you do not need to read anything. - */ - if (!isQueryPending() || hasThrownException()) - return; - - /** If you have not read all the data yet, but they are no longer needed. - * This may be due to the fact that the data is sufficient (for example, when using LIMIT). - */ - - /// Send the request to abort the execution of the request, if not already sent. - tryCancel("Cancelling query because enough data has been read"); - - /// Get the remaining packets so that there is no out of sync in the connections to the replicas. - Packet packet = multiplexed_connections->drain(); - switch (packet.type) - { - case Protocol::Server::EndOfStream: - finished = true; - break; - - case Protocol::Server::Exception: - got_exception_from_replica = true; - packet.exception->rethrow(); - break; - - default: - got_unknown_packet_from_replica = true; - throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); - } -} - -void RemoteBlockInputStream::sendQuery() -{ - multiplexed_connections = create_multiplexed_connections(); - - const auto& settings = context.getSettingsRef(); - if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size()) - return; - - established = true; - - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); - ClientInfo modified_client_info = context.getClientInfo(); - modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - - multiplexed_connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true); - - established = false; - sent_query = true; - - if (settings.enable_scalar_subquery_optimization) - sendScalars(); - sendExternalTables(); -} - -void RemoteBlockInputStream::tryCancel(const char * reason) -{ - { - std::lock_guard guard(was_cancelled_mutex); - - if (was_cancelled) - return; - - was_cancelled = true; - multiplexed_connections->sendCancel(); - } - - LOG_TRACE(log, "({}) {}", multiplexed_connections->dumpAddresses(), reason); -} - -bool RemoteBlockInputStream::isQueryPending() const -{ - return sent_query && !finished; -} - -bool RemoteBlockInputStream::hasThrownException() const -{ - return got_exception_from_replica || got_unknown_packet_from_replica; + query_executor.finish(); } } diff --git a/src/DataStreams/RemoteBlockInputStream.h b/src/DataStreams/RemoteBlockInputStream.h index f6bac4155da..628feb0ab80 100644 --- a/src/DataStreams/RemoteBlockInputStream.h +++ b/src/DataStreams/RemoteBlockInputStream.h @@ -11,6 +11,7 @@ #include #include +#include namespace DB { @@ -44,114 +45,38 @@ public: const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); - ~RemoteBlockInputStream() override; - /// Set the query_id. For now, used by performance test to later find the query - /// in the server query_log. Must be called before sending the query to the - /// server. - void setQueryId(const std::string& query_id_) { assert(!sent_query); query_id = query_id_; } + /// in the server query_log. Must be called before sending the query to the server. + void setQueryId(const std::string & query_id) { query_executor.setQueryId(query_id); } /// Specify how we allocate connections on a shard. - void setPoolMode(PoolMode pool_mode_) { pool_mode = pool_mode_; } + void setPoolMode(PoolMode pool_mode) { query_executor.setPoolMode(pool_mode); } - void setMainTable(StorageID main_table_) { main_table = std::move(main_table_); } + void setMainTable(StorageID main_table_) { query_executor.setMainTable(std::move(main_table_)); } /// Sends query (initiates calculation) before read() void readPrefix() override; - /** Prevent default progress notification because progress' callback is - called by its own - */ + /// Prevent default progress notification because progress' callback is called by its own. void progress(const Progress & /*value*/) override {} void cancel(bool kill) override; String getName() const override { return "Remote"; } - Block getHeader() const override { return header; } + Block getHeader() const override { return query_executor.getHeader(); } + Block getTotals() override { return query_executor.getTotals(); } + Block getExtremes() override { return query_executor.getExtremes(); } protected: - /// Send all scalars to remote servers - void sendScalars(); - - /// Send all temporary tables to remote servers - void sendExternalTables(); - Block readImpl() override; - void readSuffixImpl() override; - /// Returns true if query was sent - bool isQueryPending() const; - - /// Returns true if exception was thrown - bool hasThrownException() const; - private: - void sendQuery(); - - Block receiveBlock(); - - /// If wasn't sent yet, send request to cancell all connections to replicas - void tryCancel(const char * reason); - -private: - Block header; - - std::function()> create_multiplexed_connections; - - std::unique_ptr multiplexed_connections; - - const String query; - String query_id = ""; - Context context; - - /// Scalars needed to be sent to remote servers - Scalars scalars; - /// Temporary tables needed to be sent to remote servers - Tables external_tables; - QueryProcessingStage::Enum stage; - - /// Streams for reading from temporary tables and following sending of data - /// to remote servers for GLOBAL-subqueries - std::vector external_tables_data; - std::mutex external_tables_mutex; - - /// Connections to replicas are established, but no queries are sent yet - std::atomic established { false }; - - /// Query is sent (used before getting first block) - std::atomic sent_query { false }; - - /** All data from all replicas are received, before EndOfStream packet. - * To prevent desynchronization, if not all data is read before object - * destruction, it's required to send cancel query request to replicas and - * read all packets before EndOfStream - */ - std::atomic finished { false }; - - /** Cancel query request was sent to all replicas because data is not needed anymore - * This behaviour may occur when: - * - data size is already satisfactory (when using LIMIT, for example) - * - an exception was thrown from client side - */ - bool was_cancelled { false }; - std::mutex was_cancelled_mutex; - - /** An exception from replica was received. No need in receiving more packets or - * requesting to cancel query execution - */ - std::atomic got_exception_from_replica { false }; - - /** Unkown packet was received from replica. No need in receiving more packets or - * requesting to cancel query execution - */ - std::atomic got_unknown_packet_from_replica { false }; - - PoolMode pool_mode = PoolMode::GET_MANY; - StorageID main_table = StorageID::createEmpty(); - + RemoteQueryExecutor query_executor; Poco::Logger * log = &Poco::Logger::get("RemoteBlockInputStream"); + + void init(); }; } diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp new file mode 100644 index 00000000000..cf3b2c4abcd --- /dev/null +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -0,0 +1,378 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_PACKET_FROM_SERVER; +} + +RemoteQueryExecutor::RemoteQueryExecutor( + Connection & connection, + const String & query_, const Block & header_, const Context & context_, const Settings * settings, + ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) + : header(header_), query(query_), context(context_) + , scalars(scalars_), external_tables(external_tables_), stage(stage_) +{ + if (settings) + context.setSettings(*settings); + + create_multiplexed_connections = [this, &connection, throttler]() + { + return std::make_unique(connection, context.getSettingsRef(), throttler); + }; +} + +RemoteQueryExecutor::RemoteQueryExecutor( + std::vector && connections, + const String & query_, const Block & header_, const Context & context_, const Settings * settings, + const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) + : header(header_), query(query_), context(context_) + , scalars(scalars_), external_tables(external_tables_), stage(stage_) +{ + if (settings) + context.setSettings(*settings); + + create_multiplexed_connections = [this, connections, throttler]() mutable + { + return std::make_unique( + std::move(connections), context.getSettingsRef(), throttler); + }; +} + +RemoteQueryExecutor::RemoteQueryExecutor( + const ConnectionPoolWithFailoverPtr & pool, + const String & query_, const Block & header_, const Context & context_, const Settings * settings, + const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) + : header(header_), query(query_), context(context_) + , scalars(scalars_), external_tables(external_tables_), stage(stage_) +{ + if (settings) + context.setSettings(*settings); + + create_multiplexed_connections = [this, pool, throttler]() + { + const Settings & current_settings = context.getSettingsRef(); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); + std::vector connections; + if (main_table) + { + auto try_results = pool->getManyChecked(timeouts, ¤t_settings, pool_mode, main_table.getQualifiedName()); + connections.reserve(try_results.size()); + for (auto & try_result : try_results) + connections.emplace_back(std::move(try_result.entry)); + } + else + connections = pool->getMany(timeouts, ¤t_settings, pool_mode); + + return std::make_unique( + std::move(connections), current_settings, throttler); + }; +} + +RemoteQueryExecutor::~RemoteQueryExecutor() +{ + /** If interrupted in the middle of the loop of communication with replicas, then interrupt + * all connections, then read and skip the remaining packets to make sure + * these connections did not remain hanging in the out-of-sync state. + */ + if (established || isQueryPending()) + multiplexed_connections->disconnect(); +} + +/** If we receive a block with slightly different column types, or with excessive columns, + * we will adapt it to expected structure. + */ +static Block adaptBlockStructure(const Block & block, const Block & header) +{ + /// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest. + if (!header) + return block; + + Block res; + res.info = block.info; + + for (const auto & elem : header) + { + ColumnPtr column; + + if (elem.column && isColumnConst(*elem.column)) + { + /// We expect constant column in block. + /// If block is not empty, then get value for constant from it, + /// because it may be different for remote server for functions like version(), uptime(), ... + if (block.rows() > 0 && block.has(elem.name)) + { + /// Const column is passed as materialized. Get first value from it. + /// + /// TODO: check that column contains the same value. + /// TODO: serialize const columns. + auto col = block.getByName(elem.name); + col.column = block.getByName(elem.name).column->cut(0, 1); + + column = castColumn(col, elem.type); + + if (!isColumnConst(*column)) + column = ColumnConst::create(column, block.rows()); + else + /// It is not possible now. Just in case we support const columns serialization. + column = column->cloneResized(block.rows()); + } + else + column = elem.column->cloneResized(block.rows()); + } + else + column = castColumn(block.getByName(elem.name), elem.type); + + res.insert({column, elem.type, elem.name}); + } + return res; +} + +void RemoteQueryExecutor::sendQuery() +{ + if (sent_query) + return; + + multiplexed_connections = create_multiplexed_connections(); + + const auto& settings = context.getSettingsRef(); + if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size()) + return; + + established = true; + + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); + ClientInfo modified_client_info = context.getClientInfo(); + modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + + multiplexed_connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true); + + established = false; + sent_query = true; + + if (settings.enable_scalar_subquery_optimization) + sendScalars(); + sendExternalTables(); +} + +Block RemoteQueryExecutor::read() +{ + if (!sent_query) + { + sendQuery(); + + if (context.getSettingsRef().skip_unavailable_shards && (0 == multiplexed_connections->size())) + return {}; + } + + while (true) + { + if (was_cancelled) + return Block(); + + Packet packet = multiplexed_connections->receivePacket(); + + switch (packet.type) + { + case Protocol::Server::Data: + /// If the block is not empty and is not a header block + if (packet.block && (packet.block.rows() > 0)) + return adaptBlockStructure(packet.block, header); + break; /// If the block is empty - we will receive other packets before EndOfStream. + + case Protocol::Server::Exception: + got_exception_from_replica = true; + packet.exception->rethrow(); + break; + + case Protocol::Server::EndOfStream: + if (!multiplexed_connections->hasActiveConnections()) + { + finished = true; + return Block(); + } + break; + + case Protocol::Server::Progress: + /** We use the progress from a remote server. + * We also include in ProcessList, + * and we use it to check + * constraints (for example, the minimum speed of query execution) + * and quotas (for example, the number of lines to read). + */ + if (progress_callback) + progress_callback(packet.progress); + break; + + case Protocol::Server::ProfileInfo: + /// Use own (client-side) info about read bytes, it is more correct info than server-side one. + if (profile_info_callback) + profile_info_callback(packet.profile_info); + break; + + case Protocol::Server::Totals: + totals = packet.block; + break; + + case Protocol::Server::Extremes: + extremes = packet.block; + break; + + case Protocol::Server::Log: + /// Pass logs from remote server to client + if (auto log_queue = CurrentThread::getInternalTextLogsQueue()) + log_queue->pushBlock(std::move(packet.block)); + break; + + default: + got_unknown_packet_from_replica = true; + throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); + } + } +} + +void RemoteQueryExecutor::finish() +{ + /** If one of: + * - nothing started to do; + * - received all packets before EndOfStream; + * - received exception from one replica; + * - received an unknown packet from one replica; + * then you do not need to read anything. + */ + if (!isQueryPending() || hasThrownException()) + return; + + /** If you have not read all the data yet, but they are no longer needed. + * This may be due to the fact that the data is sufficient (for example, when using LIMIT). + */ + + /// Send the request to abort the execution of the request, if not already sent. + tryCancel("Cancelling query because enough data has been read"); + + /// Get the remaining packets so that there is no out of sync in the connections to the replicas. + Packet packet = multiplexed_connections->drain(); + switch (packet.type) + { + case Protocol::Server::EndOfStream: + finished = true; + break; + + case Protocol::Server::Exception: + got_exception_from_replica = true; + packet.exception->rethrow(); + break; + + default: + got_unknown_packet_from_replica = true; + throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); + } +} + +void RemoteQueryExecutor::cancel() +{ + { + std::lock_guard lock(external_tables_mutex); + + /// Stop sending external data. + for (auto & vec : external_tables_data) + for (auto & elem : vec) + elem->is_cancelled = true; + } + + if (!isQueryPending() || hasThrownException()) + return; + + tryCancel("Cancelling query"); +} + +void RemoteQueryExecutor::sendScalars() +{ + multiplexed_connections->sendScalarsData(scalars); +} + +void RemoteQueryExecutor::sendExternalTables() +{ + size_t count = multiplexed_connections->size(); + + { + std::lock_guard lock(external_tables_mutex); + + external_tables_data.reserve(count); + + for (size_t i = 0; i < count; ++i) + { + ExternalTablesData res; + for (const auto & table : external_tables) + { + StoragePtr cur = table.second; + QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(context); + + Pipes pipes; + + pipes = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context, + read_from_table_stage, DEFAULT_BLOCK_SIZE, 1); + + auto data = std::make_unique(); + data->table_name = table.first; + + if (pipes.empty()) + data->pipe = std::make_unique(std::make_shared(cur->getSampleBlock(), Chunk())); + else if (pipes.size() == 1) + data->pipe = std::make_unique(std::move(pipes.front())); + else + { + auto concat = std::make_shared(pipes.front().getHeader(), pipes.size()); + data->pipe = std::make_unique(std::move(pipes), std::move(concat)); + } + + res.emplace_back(std::move(data)); + } + external_tables_data.push_back(std::move(res)); + } + } + + multiplexed_connections->sendExternalTablesData(external_tables_data); +} + +void RemoteQueryExecutor::tryCancel(const char * reason) +{ + { + /// Flag was_cancelled is atomic because it is checked in read(). + std::lock_guard guard(was_cancelled_mutex); + + if (was_cancelled) + return; + + was_cancelled = true; + multiplexed_connections->sendCancel(); + } + + if (log) + LOG_TRACE(log, "({}) {}", multiplexed_connections->dumpAddresses(), reason); +} + +bool RemoteQueryExecutor::isQueryPending() const +{ + return sent_query && !finished; +} + +bool RemoteQueryExecutor::hasThrownException() const +{ + return got_exception_from_replica || got_unknown_packet_from_replica; +} + +} diff --git a/src/DataStreams/RemoteQueryExecutor.h b/src/DataStreams/RemoteQueryExecutor.h new file mode 100644 index 00000000000..e39a7ccc94b --- /dev/null +++ b/src/DataStreams/RemoteQueryExecutor.h @@ -0,0 +1,164 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +class Throttler; +using ThrottlerPtr = std::shared_ptr; + +struct Progress; +using ProgressCallback = std::function; + +struct BlockStreamProfileInfo; +using ProfileInfoCallback = std::function; + +/// This class allows one to launch queries on remote replicas of one shard and get results +class RemoteQueryExecutor +{ +public: + /// Takes already set connection. + /// If `settings` is nullptr, settings will be taken from context. + RemoteQueryExecutor( + Connection & connection, + const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, + ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); + + /// Accepts several connections already taken from pool. + /// If `settings` is nullptr, settings will be taken from context. + RemoteQueryExecutor( + std::vector && connections, + const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, + const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); + + /// Takes a pool and gets one or several connections from it. + /// If `settings` is nullptr, settings will be taken from context. + RemoteQueryExecutor( + const ConnectionPoolWithFailoverPtr & pool, + const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, + const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); + + ~RemoteQueryExecutor(); + + /// Create connection and send query, external tables and scalars. + void sendQuery(); + + /// Read next block of data. Returns empty block if query is finished. + Block read(); + + /// Receive all remain packets and finish query. + /// It should be cancelled after read returned empty block. + void finish(); + + /// Cancel query execution. Sends Cancel packet and ignore others. + /// This method may be called from separate thread. + void cancel(); + + /// Get totals and extremes if any. + Block getTotals() const { return totals; } + Block getExtremes() const { return extremes; } + + /// Set callback for progress. It will be called on Progress packet. + void setProgressCallback(ProgressCallback callback) { progress_callback = std::move(callback); } + + /// Set callback for profile info. It will be called on ProfileInfo packet. + void setProfileInfoCallback(ProfileInfoCallback callback) { profile_info_callback = std::move(callback); } + + /// Set the query_id. For now, used by performance test to later find the query + /// in the server query_log. Must be called before sending the query to the server. + void setQueryId(const std::string& query_id_) { assert(!sent_query); query_id = query_id_; } + + /// Specify how we allocate connections on a shard. + void setPoolMode(PoolMode pool_mode_) { pool_mode = pool_mode_; } + + void setMainTable(StorageID main_table_) { main_table = std::move(main_table_); } + + void setLogger(Poco::Logger * logger) { log = logger; } + + const Block & getHeader() const { return header; } + +private: + Block header; + Block totals; + Block extremes; + + std::function()> create_multiplexed_connections; + std::unique_ptr multiplexed_connections; + + const String query; + String query_id = ""; + Context context; + + ProgressCallback progress_callback; + ProfileInfoCallback profile_info_callback; + + /// Scalars needed to be sent to remote servers + Scalars scalars; + /// Temporary tables needed to be sent to remote servers + Tables external_tables; + QueryProcessingStage::Enum stage; + + /// Streams for reading from temporary tables and following sending of data + /// to remote servers for GLOBAL-subqueries + std::vector external_tables_data; + std::mutex external_tables_mutex; + + /// Connections to replicas are established, but no queries are sent yet + std::atomic established { false }; + + /// Query is sent (used before getting first block) + std::atomic sent_query { false }; + + /** All data from all replicas are received, before EndOfStream packet. + * To prevent desynchronization, if not all data is read before object + * destruction, it's required to send cancel query request to replicas and + * read all packets before EndOfStream + */ + std::atomic finished { false }; + + /** Cancel query request was sent to all replicas because data is not needed anymore + * This behaviour may occur when: + * - data size is already satisfactory (when using LIMIT, for example) + * - an exception was thrown from client side + */ + std::atomic was_cancelled { false }; + std::mutex was_cancelled_mutex; + + /** An exception from replica was received. No need in receiving more packets or + * requesting to cancel query execution + */ + std::atomic got_exception_from_replica { false }; + + /** Unknown packet was received from replica. No need in receiving more packets or + * requesting to cancel query execution + */ + std::atomic got_unknown_packet_from_replica { false }; + + PoolMode pool_mode = PoolMode::GET_MANY; + StorageID main_table = StorageID::createEmpty(); + + Poco::Logger * log = nullptr; + + /// Send all scalars to remote servers + void sendScalars(); + + /// Send all temporary tables to remote servers + void sendExternalTables(); + + /// If wasn't sent yet, send request to cancel all connections to replicas + void tryCancel(const char * reason); + + /// Returns true if query was sent + bool isQueryPending() const; + + /// Returns true if exception was thrown + bool hasThrownException() const; +}; + +} diff --git a/src/DataStreams/ya.make b/src/DataStreams/ya.make index f316c829a03..4c391cf839a 100644 --- a/src/DataStreams/ya.make +++ b/src/DataStreams/ya.make @@ -38,6 +38,7 @@ SRCS( PushingToViewsBlockOutputStream.cpp RemoteBlockInputStream.cpp RemoteBlockOutputStream.cpp + RemoteQueryExecutor.cpp SizeLimits.cpp SquashingBlockInputStream.cpp SquashingBlockOutputStream.cpp