#include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int UNKNOWN_PACKET_FROM_SERVER; extern const int LOGICAL_ERROR; } RemoteBlockInputStream::RemoteBlockInputStream( Connection & connection, const String & query_, const Context & context_, const Settings * settings, const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_) : query(query_), context(context_), external_tables(external_tables_), stage(stage_) { if (settings) context.setSettings(*settings); create_multiplexed_connections = [this, &connection, throttler]() { return std::make_unique(connection, context.getSettingsRef(), throttler); }; } RemoteBlockInputStream::RemoteBlockInputStream( std::vector && connections, const String & query_, const Context & context_, const Settings * settings, const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_) : query(query_), context(context_), external_tables(external_tables_), stage(stage_) { if (settings) context.setSettings(*settings); create_multiplexed_connections = [this, connections, throttler]() mutable { return std::make_unique( std::move(connections), context.getSettingsRef(), throttler, append_extra_info); }; } RemoteBlockInputStream::RemoteBlockInputStream( const ConnectionPoolWithFailoverPtr & pool, const String & query_, const Context & context_, const Settings * settings, const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_) : query(query_), context(context_), external_tables(external_tables_), stage(stage_) { if (settings) context.setSettings(*settings); create_multiplexed_connections = [this, pool, throttler]() { const Settings & settings = context.getSettingsRef(); std::vector connections; if (main_table) { auto try_results = pool->getManyChecked(&settings, pool_mode, *main_table); connections.reserve(try_results.size()); for (auto & try_result : try_results) connections.emplace_back(std::move(try_result.entry)); } else connections = pool->getMany(&settings, pool_mode); return std::make_unique( std::move(connections), settings, throttler, append_extra_info); }; } RemoteBlockInputStream::~RemoteBlockInputStream() { /** If interrupted in the middle of the loop of communication with replicas, then interrupt * all connections, then read and skip the remaining packets to make sure * these connections did not remain hanging in the out-of-sync state. */ if (established || isQueryPending()) multiplexed_connections->disconnect(); } void RemoteBlockInputStream::appendExtraInfo() { append_extra_info = true; } void RemoteBlockInputStream::readPrefix() { if (!sent_query) sendQuery(); } void RemoteBlockInputStream::cancel() { bool old_val = false; if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) return; { std::lock_guard lock(external_tables_mutex); /// Stop sending external data. for (auto & vec : external_tables_data) for (auto & elem : vec) if (IProfilingBlockInputStream * stream = dynamic_cast(elem.first.get())) stream->cancel(); } if (!isQueryPending() || hasThrownException()) return; tryCancel("Cancelling query"); } void RemoteBlockInputStream::sendExternalTables() { size_t count = multiplexed_connections->size(); { std::lock_guard lock(external_tables_mutex); external_tables_data.reserve(count); for (size_t i = 0; i < count; ++i) { ExternalTablesData res; for (const auto & table : external_tables) { StoragePtr cur = table.second; QueryProcessingStage::Enum stage = QueryProcessingStage::Complete; BlockInputStreams input = cur->read(cur->getColumnNamesList(), {}, context, stage, DEFAULT_BLOCK_SIZE, 1); if (input.size() == 0) res.push_back(std::make_pair(std::make_shared(cur->getSampleBlock()), table.first)); else res.push_back(std::make_pair(input[0], table.first)); } external_tables_data.push_back(std::move(res)); } } multiplexed_connections->sendExternalTablesData(external_tables_data); } Block RemoteBlockInputStream::receiveBlock() { if (!sent_query) { sendQuery(); if (context.getSettingsRef().skip_unavailable_shards && (0 == multiplexed_connections->size())) return {}; } while (true) { if (isCancelled()) return {}; Connection::Packet packet = multiplexed_connections->receivePacket(); switch (packet.type) { case Protocol::Server::Data: return packet.block; case Protocol::Server::Exception: got_exception_from_replica = true; packet.exception->rethrow(); break; case Protocol::Server::EndOfStream: if (!multiplexed_connections->hasActiveConnections()) { finished = true; return {}; } break; case Protocol::Server::Progress: /** We use the progress from a remote server. * We also include in ProcessList, * and we use it to check * constraints (for example, the minimum speed of query execution) * and quotas (for example, the number of lines to read). */ progressImpl(packet.progress); break; case Protocol::Server::ProfileInfo: /// Use own (client-side) info about read bytes, it is more correct info than server-side one. info.setFrom(packet.profile_info, true); break; case Protocol::Server::Totals: totals = packet.block; break; case Protocol::Server::Extremes: extremes = packet.block; break; default: got_unknown_packet_from_replica = true; throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } } Block RemoteBlockInputStream::getHeader() { if (header) return header; Block res = receiveBlock(); if (res.rows() > 0) throw Exception("Logical error: the header block must be sent before data", ErrorCodes::LOGICAL_ERROR); header = res; return header; } Block RemoteBlockInputStream::readImpl() { while (true) { Block res = receiveBlock(); if (finished) return {}; /// If the block is empty - we will receive other packets before EndOfStream. if (!res) continue; if (res.rows() > 0) return res; /// Block with zero rows is a header block. The data will be sent in a following packet. header = res; } } void RemoteBlockInputStream::readSuffixImpl() { /** If one of: * - nothing started to do; * - received all packets before EndOfStream; * - received exception from one replica; * - received an unknown packet from one replica; * then you do not need to read anything. */ if (!isQueryPending() || hasThrownException()) return; /** If you have not read all the data yet, but they are no longer needed. * This may be due to the fact that the data is sufficient (for example, when using LIMIT). */ /// Send the request to abort the execution of the request, if not already sent. tryCancel("Cancelling query because enough data has been read"); /// Get the remaining packets so that there is no out of sync in the connections to the replicas. Connection::Packet packet = multiplexed_connections->drain(); switch (packet.type) { case Protocol::Server::EndOfStream: finished = true; break; case Protocol::Server::Exception: got_exception_from_replica = true; packet.exception->rethrow(); break; default: got_unknown_packet_from_replica = true; throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } void RemoteBlockInputStream::sendQuery() { multiplexed_connections = create_multiplexed_connections(); if (context.getSettingsRef().skip_unavailable_shards && 0 == multiplexed_connections->size()) return; established = true; multiplexed_connections->sendQuery(query, "", stage, &context.getClientInfo(), true); established = false; sent_query = true; sendExternalTables(); } void RemoteBlockInputStream::tryCancel(const char * reason) { bool old_val = false; if (!was_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) return; LOG_TRACE(log, "(" << multiplexed_connections->dumpAddresses() << ") " << reason); multiplexed_connections->sendCancel(); } bool RemoteBlockInputStream::isQueryPending() const { return sent_query && !finished; } bool RemoteBlockInputStream::hasThrownException() const { return got_exception_from_replica || got_unknown_packet_from_replica; } }