From 755904f51fbb6b66dd0e673b526c65381778506f Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Sat, 16 Nov 2019 00:34:43 +0800 Subject: [PATCH] better INCLUDE_DEBUG_HELPERS --- dbms/CMakeLists.txt | 2 +- dbms/programs/client/Client.cpp | 6 ++-- dbms/programs/client/Suggest.h | 2 +- .../performance-test/PerformanceTest.cpp | 4 +-- dbms/src/Client/Connection.cpp | 2 +- dbms/src/Client/Connection.h | 29 ++++++++++--------- dbms/src/Client/MultiplexedConnections.cpp | 14 ++++----- dbms/src/Client/MultiplexedConnections.h | 6 ++-- dbms/src/Core/iostream_debug_helpers.cpp | 5 ++-- dbms/src/Core/iostream_debug_helpers.h | 6 ++-- .../DataStreams/RemoteBlockInputStream.cpp | 4 +-- .../DataStreams/RemoteBlockOutputStream.cpp | 6 ++-- 12 files changed, 43 insertions(+), 43 deletions(-) diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 1d5f4af645b..510faed187b 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -76,7 +76,7 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") endif() if (USE_DEBUG_HELPERS) - set (INCLUDE_DEBUG_HELPERS "-include ${ClickHouse_SOURCE_DIR}/libs/libcommon/include/common/iostream_debug_helpers.h") + set (INCLUDE_DEBUG_HELPERS "-I${ClickHouse_SOURCE_DIR}/libs/libcommon/include -include ${ClickHouse_SOURCE_DIR}/dbms/src/Core/iostream_debug_helpers.h") set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${INCLUDE_DEBUG_HELPERS}") endif () diff --git a/dbms/programs/client/Client.cpp b/dbms/programs/client/Client.cpp index 212a6500afd..0d2c2344bf5 100644 --- a/dbms/programs/client/Client.cpp +++ b/dbms/programs/client/Client.cpp @@ -1226,7 +1226,7 @@ private: /// Returns true if one should continue receiving packets. bool receiveAndProcessPacket() { - Connection::Packet packet = connection->receivePacket(); + Packet packet = connection->receivePacket(); switch (packet.type) { @@ -1274,7 +1274,7 @@ private: { while (true) { - Connection::Packet packet = connection->receivePacket(); + Packet packet = connection->receivePacket(); switch (packet.type) { @@ -1308,7 +1308,7 @@ private: { while (true) { - Connection::Packet packet = connection->receivePacket(); + Packet packet = connection->receivePacket(); switch (packet.type) { diff --git a/dbms/programs/client/Suggest.h b/dbms/programs/client/Suggest.h index 57895b38764..78cc8d94db0 100644 --- a/dbms/programs/client/Suggest.h +++ b/dbms/programs/client/Suggest.h @@ -113,7 +113,7 @@ private: while (true) { - Connection::Packet packet = connection.receivePacket(); + Packet packet = connection.receivePacket(); switch (packet.type) { case Protocol::Server::Data: diff --git a/dbms/programs/performance-test/PerformanceTest.cpp b/dbms/programs/performance-test/PerformanceTest.cpp index ab55cd3d6cf..a138d6ab8f4 100644 --- a/dbms/programs/performance-test/PerformanceTest.cpp +++ b/dbms/programs/performance-test/PerformanceTest.cpp @@ -35,7 +35,7 @@ void waitQuery(Connection & connection) if (!connection.poll(1000000)) continue; - Connection::Packet packet = connection.receivePacket(); + Packet packet = connection.receivePacket(); switch (packet.type) { case Protocol::Server::EndOfStream: @@ -120,7 +120,7 @@ bool PerformanceTest::checkPreconditions() const while (true) { - Connection::Packet packet = connection.receivePacket(); + Packet packet = connection.receivePacket(); if (packet.type == Protocol::Server::Data) { diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 95dff73f870..05b89f1de87 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -612,7 +612,7 @@ std::optional Connection::checkPacket(size_t timeout_microseconds) } -Connection::Packet Connection::receivePacket() +Packet Connection::receivePacket() { try { diff --git a/dbms/src/Client/Connection.h b/dbms/src/Client/Connection.h index 8b507a4172a..bb639c6388b 100644 --- a/dbms/src/Client/Connection.h +++ b/dbms/src/Client/Connection.h @@ -42,6 +42,21 @@ using ConnectionPtr = std::shared_ptr; using Connections = std::vector; +/// Packet that could be received from server. +struct Packet +{ + UInt64 type; + + Block block; + std::unique_ptr exception; + std::vector multistring_message; + Progress progress; + BlockStreamProfileInfo profile_info; + + Packet() : type(Protocol::Server::Hello) {} +}; + + /** Connection with database server, to use by client. * How to use - see Core/Protocol.h * (Implementation of server end - see Server/TCPHandler.h) @@ -87,20 +102,6 @@ public: } - /// Packet that could be received from server. - struct Packet - { - UInt64 type; - - Block block; - std::unique_ptr exception; - std::vector multistring_message; - Progress progress; - BlockStreamProfileInfo profile_info; - - Packet() : type(Protocol::Server::Hello) {} - }; - /// Change default database. Changes will take effect on next reconnect. void setDefaultDatabase(const String & database); diff --git a/dbms/src/Client/MultiplexedConnections.cpp b/dbms/src/Client/MultiplexedConnections.cpp index d7934924242..c8d3fa4dcce 100644 --- a/dbms/src/Client/MultiplexedConnections.cpp +++ b/dbms/src/Client/MultiplexedConnections.cpp @@ -138,10 +138,10 @@ void MultiplexedConnections::sendQuery( sent_query = true; } -Connection::Packet MultiplexedConnections::receivePacket() +Packet MultiplexedConnections::receivePacket() { std::lock_guard lock(cancel_mutex); - Connection::Packet packet = receivePacketUnlocked(); + Packet packet = receivePacketUnlocked(); return packet; } @@ -177,19 +177,19 @@ void MultiplexedConnections::sendCancel() cancelled = true; } -Connection::Packet MultiplexedConnections::drain() +Packet MultiplexedConnections::drain() { std::lock_guard lock(cancel_mutex); if (!cancelled) throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR); - Connection::Packet res; + Packet res; res.type = Protocol::Server::EndOfStream; while (hasActiveConnections()) { - Connection::Packet packet = receivePacketUnlocked(); + Packet packet = receivePacketUnlocked(); switch (packet.type) { @@ -235,7 +235,7 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const return os.str(); } -Connection::Packet MultiplexedConnections::receivePacketUnlocked() +Packet MultiplexedConnections::receivePacketUnlocked() { if (!sent_query) throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); @@ -247,7 +247,7 @@ Connection::Packet MultiplexedConnections::receivePacketUnlocked() if (current_connection == nullptr) throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA); - Connection::Packet packet = current_connection->receivePacket(); + Packet packet = current_connection->receivePacket(); switch (packet.type) { diff --git a/dbms/src/Client/MultiplexedConnections.h b/dbms/src/Client/MultiplexedConnections.h index b26c9569422..9d825adb227 100644 --- a/dbms/src/Client/MultiplexedConnections.h +++ b/dbms/src/Client/MultiplexedConnections.h @@ -42,7 +42,7 @@ public: bool with_pending_data = false); /// Get packet from any replica. - Connection::Packet receivePacket(); + Packet receivePacket(); /// Break all active connections. void disconnect(); @@ -54,7 +54,7 @@ public: * Returns EndOfStream if no exception has been received. Otherwise * returns the last received packet of type Exception. */ - Connection::Packet drain(); + Packet drain(); /// Get the replica addresses as a string. std::string dumpAddresses() const; @@ -69,7 +69,7 @@ public: private: /// Internal version of `receivePacket` function without locking. - Connection::Packet receivePacketUnlocked(); + Packet receivePacketUnlocked(); /// Internal version of `dumpAddresses` function without locking. std::string dumpAddressesUnlocked() const; diff --git a/dbms/src/Core/iostream_debug_helpers.cpp b/dbms/src/Core/iostream_debug_helpers.cpp index 8e673d1c547..eea8694dfb0 100644 --- a/dbms/src/Core/iostream_debug_helpers.cpp +++ b/dbms/src/Core/iostream_debug_helpers.cpp @@ -1,6 +1,7 @@ #include "iostream_debug_helpers.h" #include +#include #include #include #include @@ -92,9 +93,9 @@ std::ostream & operator<<(std::ostream & stream, const IColumn & what) return stream; } -std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what) +std::ostream & operator<<(std::ostream & stream, const Packet & what) { - stream << "Connection::Packet(" + stream << "Packet(" << "type = " << what.type; // types description: Core/Protocol.h if (what.exception) diff --git a/dbms/src/Core/iostream_debug_helpers.h b/dbms/src/Core/iostream_debug_helpers.h index 35fc05faf1d..dc48da931f0 100644 --- a/dbms/src/Core/iostream_debug_helpers.h +++ b/dbms/src/Core/iostream_debug_helpers.h @@ -1,9 +1,6 @@ #pragma once #include -#include - - namespace DB { @@ -40,7 +37,8 @@ std::ostream & operator<<(std::ostream & stream, const ColumnWithTypeAndName & w class IColumn; std::ostream & operator<<(std::ostream & stream, const IColumn & what); -std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what); +struct Packet; +std::ostream & operator<<(std::ostream & stream, const Packet & what); struct ExpressionAction; std::ostream & operator<<(std::ostream & stream, const ExpressionAction & what); diff --git a/dbms/src/DataStreams/RemoteBlockInputStream.cpp b/dbms/src/DataStreams/RemoteBlockInputStream.cpp index aedbe676688..f6dc30d6e8c 100644 --- a/dbms/src/DataStreams/RemoteBlockInputStream.cpp +++ b/dbms/src/DataStreams/RemoteBlockInputStream.cpp @@ -222,7 +222,7 @@ Block RemoteBlockInputStream::readImpl() if (isCancelledOrThrowIfKilled()) return Block(); - Connection::Packet packet = multiplexed_connections->receivePacket(); + Packet packet = multiplexed_connections->receivePacket(); switch (packet.type) { @@ -301,7 +301,7 @@ void RemoteBlockInputStream::readSuffixImpl() tryCancel("Cancelling query because enough data has been read"); /// Get the remaining packets so that there is no out of sync in the connections to the replicas. - Connection::Packet packet = multiplexed_connections->drain(); + Packet packet = multiplexed_connections->drain(); switch (packet.type) { case Protocol::Server::EndOfStream: diff --git a/dbms/src/DataStreams/RemoteBlockOutputStream.cpp b/dbms/src/DataStreams/RemoteBlockOutputStream.cpp index a95ea174541..3446af8b840 100644 --- a/dbms/src/DataStreams/RemoteBlockOutputStream.cpp +++ b/dbms/src/DataStreams/RemoteBlockOutputStream.cpp @@ -32,7 +32,7 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, while (true) { - Connection::Packet packet = connection.receivePacket(); + Packet packet = connection.receivePacket(); if (Protocol::Server::Data == packet.type) { @@ -77,7 +77,7 @@ void RemoteBlockOutputStream::write(const Block & block) auto packet_type = connection.checkPacket(); if (packet_type && *packet_type == Protocol::Server::Exception) { - Connection::Packet packet = connection.receivePacket(); + Packet packet = connection.receivePacket(); packet.exception->rethrow(); } @@ -101,7 +101,7 @@ void RemoteBlockOutputStream::writeSuffix() /// Wait for EndOfStream or Exception packet, skip Log packets. while (true) { - Connection::Packet packet = connection.receivePacket(); + Packet packet = connection.receivePacket(); if (Protocol::Server::EndOfStream == packet.type) break;