mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
better INCLUDE_DEBUG_HELPERS
This commit is contained in:
parent
309d7270d0
commit
755904f51f
@ -76,7 +76,7 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
|
||||
endif()
|
||||
|
||||
if (USE_DEBUG_HELPERS)
|
||||
set (INCLUDE_DEBUG_HELPERS "-include ${ClickHouse_SOURCE_DIR}/libs/libcommon/include/common/iostream_debug_helpers.h")
|
||||
set (INCLUDE_DEBUG_HELPERS "-I${ClickHouse_SOURCE_DIR}/libs/libcommon/include -include ${ClickHouse_SOURCE_DIR}/dbms/src/Core/iostream_debug_helpers.h")
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${INCLUDE_DEBUG_HELPERS}")
|
||||
endif ()
|
||||
|
||||
|
@ -1226,7 +1226,7 @@ private:
|
||||
/// Returns true if one should continue receiving packets.
|
||||
bool receiveAndProcessPacket()
|
||||
{
|
||||
Connection::Packet packet = connection->receivePacket();
|
||||
Packet packet = connection->receivePacket();
|
||||
|
||||
switch (packet.type)
|
||||
{
|
||||
@ -1274,7 +1274,7 @@ private:
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
Connection::Packet packet = connection->receivePacket();
|
||||
Packet packet = connection->receivePacket();
|
||||
|
||||
switch (packet.type)
|
||||
{
|
||||
@ -1308,7 +1308,7 @@ private:
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
Connection::Packet packet = connection->receivePacket();
|
||||
Packet packet = connection->receivePacket();
|
||||
|
||||
switch (packet.type)
|
||||
{
|
||||
|
@ -113,7 +113,7 @@ private:
|
||||
|
||||
while (true)
|
||||
{
|
||||
Connection::Packet packet = connection.receivePacket();
|
||||
Packet packet = connection.receivePacket();
|
||||
switch (packet.type)
|
||||
{
|
||||
case Protocol::Server::Data:
|
||||
|
@ -35,7 +35,7 @@ void waitQuery(Connection & connection)
|
||||
if (!connection.poll(1000000))
|
||||
continue;
|
||||
|
||||
Connection::Packet packet = connection.receivePacket();
|
||||
Packet packet = connection.receivePacket();
|
||||
switch (packet.type)
|
||||
{
|
||||
case Protocol::Server::EndOfStream:
|
||||
@ -120,7 +120,7 @@ bool PerformanceTest::checkPreconditions() const
|
||||
|
||||
while (true)
|
||||
{
|
||||
Connection::Packet packet = connection.receivePacket();
|
||||
Packet packet = connection.receivePacket();
|
||||
|
||||
if (packet.type == Protocol::Server::Data)
|
||||
{
|
||||
|
@ -612,7 +612,7 @@ std::optional<UInt64> Connection::checkPacket(size_t timeout_microseconds)
|
||||
}
|
||||
|
||||
|
||||
Connection::Packet Connection::receivePacket()
|
||||
Packet Connection::receivePacket()
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -42,6 +42,21 @@ using ConnectionPtr = std::shared_ptr<Connection>;
|
||||
using Connections = std::vector<ConnectionPtr>;
|
||||
|
||||
|
||||
/// Packet that could be received from server.
|
||||
struct Packet
|
||||
{
|
||||
UInt64 type;
|
||||
|
||||
Block block;
|
||||
std::unique_ptr<Exception> exception;
|
||||
std::vector<String> multistring_message;
|
||||
Progress progress;
|
||||
BlockStreamProfileInfo profile_info;
|
||||
|
||||
Packet() : type(Protocol::Server::Hello) {}
|
||||
};
|
||||
|
||||
|
||||
/** Connection with database server, to use by client.
|
||||
* How to use - see Core/Protocol.h
|
||||
* (Implementation of server end - see Server/TCPHandler.h)
|
||||
@ -87,20 +102,6 @@ public:
|
||||
}
|
||||
|
||||
|
||||
/// Packet that could be received from server.
|
||||
struct Packet
|
||||
{
|
||||
UInt64 type;
|
||||
|
||||
Block block;
|
||||
std::unique_ptr<Exception> exception;
|
||||
std::vector<String> multistring_message;
|
||||
Progress progress;
|
||||
BlockStreamProfileInfo profile_info;
|
||||
|
||||
Packet() : type(Protocol::Server::Hello) {}
|
||||
};
|
||||
|
||||
/// Change default database. Changes will take effect on next reconnect.
|
||||
void setDefaultDatabase(const String & database);
|
||||
|
||||
|
@ -138,10 +138,10 @@ void MultiplexedConnections::sendQuery(
|
||||
sent_query = true;
|
||||
}
|
||||
|
||||
Connection::Packet MultiplexedConnections::receivePacket()
|
||||
Packet MultiplexedConnections::receivePacket()
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
Connection::Packet packet = receivePacketUnlocked();
|
||||
Packet packet = receivePacketUnlocked();
|
||||
return packet;
|
||||
}
|
||||
|
||||
@ -177,19 +177,19 @@ void MultiplexedConnections::sendCancel()
|
||||
cancelled = true;
|
||||
}
|
||||
|
||||
Connection::Packet MultiplexedConnections::drain()
|
||||
Packet MultiplexedConnections::drain()
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
|
||||
if (!cancelled)
|
||||
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
Connection::Packet res;
|
||||
Packet res;
|
||||
res.type = Protocol::Server::EndOfStream;
|
||||
|
||||
while (hasActiveConnections())
|
||||
{
|
||||
Connection::Packet packet = receivePacketUnlocked();
|
||||
Packet packet = receivePacketUnlocked();
|
||||
|
||||
switch (packet.type)
|
||||
{
|
||||
@ -235,7 +235,7 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
|
||||
return os.str();
|
||||
}
|
||||
|
||||
Connection::Packet MultiplexedConnections::receivePacketUnlocked()
|
||||
Packet MultiplexedConnections::receivePacketUnlocked()
|
||||
{
|
||||
if (!sent_query)
|
||||
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
|
||||
@ -247,7 +247,7 @@ Connection::Packet MultiplexedConnections::receivePacketUnlocked()
|
||||
if (current_connection == nullptr)
|
||||
throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
|
||||
|
||||
Connection::Packet packet = current_connection->receivePacket();
|
||||
Packet packet = current_connection->receivePacket();
|
||||
|
||||
switch (packet.type)
|
||||
{
|
||||
|
@ -42,7 +42,7 @@ public:
|
||||
bool with_pending_data = false);
|
||||
|
||||
/// Get packet from any replica.
|
||||
Connection::Packet receivePacket();
|
||||
Packet receivePacket();
|
||||
|
||||
/// Break all active connections.
|
||||
void disconnect();
|
||||
@ -54,7 +54,7 @@ public:
|
||||
* Returns EndOfStream if no exception has been received. Otherwise
|
||||
* returns the last received packet of type Exception.
|
||||
*/
|
||||
Connection::Packet drain();
|
||||
Packet drain();
|
||||
|
||||
/// Get the replica addresses as a string.
|
||||
std::string dumpAddresses() const;
|
||||
@ -69,7 +69,7 @@ public:
|
||||
|
||||
private:
|
||||
/// Internal version of `receivePacket` function without locking.
|
||||
Connection::Packet receivePacketUnlocked();
|
||||
Packet receivePacketUnlocked();
|
||||
|
||||
/// Internal version of `dumpAddresses` function without locking.
|
||||
std::string dumpAddressesUnlocked() const;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include "iostream_debug_helpers.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <Client/Connection.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Core/ColumnWithTypeAndName.h>
|
||||
#include <Core/Field.h>
|
||||
@ -92,9 +93,9 @@ std::ostream & operator<<(std::ostream & stream, const IColumn & what)
|
||||
return stream;
|
||||
}
|
||||
|
||||
std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what)
|
||||
std::ostream & operator<<(std::ostream & stream, const Packet & what)
|
||||
{
|
||||
stream << "Connection::Packet("
|
||||
stream << "Packet("
|
||||
<< "type = " << what.type;
|
||||
// types description: Core/Protocol.h
|
||||
if (what.exception)
|
||||
|
@ -1,9 +1,6 @@
|
||||
#pragma once
|
||||
#include <iostream>
|
||||
|
||||
#include <Client/Connection.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -40,7 +37,8 @@ std::ostream & operator<<(std::ostream & stream, const ColumnWithTypeAndName & w
|
||||
class IColumn;
|
||||
std::ostream & operator<<(std::ostream & stream, const IColumn & what);
|
||||
|
||||
std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what);
|
||||
struct Packet;
|
||||
std::ostream & operator<<(std::ostream & stream, const Packet & what);
|
||||
|
||||
struct ExpressionAction;
|
||||
std::ostream & operator<<(std::ostream & stream, const ExpressionAction & what);
|
||||
|
@ -222,7 +222,7 @@ Block RemoteBlockInputStream::readImpl()
|
||||
if (isCancelledOrThrowIfKilled())
|
||||
return Block();
|
||||
|
||||
Connection::Packet packet = multiplexed_connections->receivePacket();
|
||||
Packet packet = multiplexed_connections->receivePacket();
|
||||
|
||||
switch (packet.type)
|
||||
{
|
||||
@ -301,7 +301,7 @@ void RemoteBlockInputStream::readSuffixImpl()
|
||||
tryCancel("Cancelling query because enough data has been read");
|
||||
|
||||
/// Get the remaining packets so that there is no out of sync in the connections to the replicas.
|
||||
Connection::Packet packet = multiplexed_connections->drain();
|
||||
Packet packet = multiplexed_connections->drain();
|
||||
switch (packet.type)
|
||||
{
|
||||
case Protocol::Server::EndOfStream:
|
||||
|
@ -32,7 +32,7 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
|
||||
|
||||
while (true)
|
||||
{
|
||||
Connection::Packet packet = connection.receivePacket();
|
||||
Packet packet = connection.receivePacket();
|
||||
|
||||
if (Protocol::Server::Data == packet.type)
|
||||
{
|
||||
@ -77,7 +77,7 @@ void RemoteBlockOutputStream::write(const Block & block)
|
||||
auto packet_type = connection.checkPacket();
|
||||
if (packet_type && *packet_type == Protocol::Server::Exception)
|
||||
{
|
||||
Connection::Packet packet = connection.receivePacket();
|
||||
Packet packet = connection.receivePacket();
|
||||
packet.exception->rethrow();
|
||||
}
|
||||
|
||||
@ -101,7 +101,7 @@ void RemoteBlockOutputStream::writeSuffix()
|
||||
/// Wait for EndOfStream or Exception packet, skip Log packets.
|
||||
while (true)
|
||||
{
|
||||
Connection::Packet packet = connection.receivePacket();
|
||||
Packet packet = connection.receivePacket();
|
||||
|
||||
if (Protocol::Server::EndOfStream == packet.type)
|
||||
break;
|
||||
|
Loading…
Reference in New Issue
Block a user