Small clean up

This commit is contained in:
kssenii 2021-08-27 11:46:31 +03:00
parent 34f9983f14
commit 578a750b8b
7 changed files with 42 additions and 49 deletions

View File

@ -391,7 +391,7 @@ bool Client::processMultiQuery(const String & all_queries_text)
std::vector<String> Client::loadWarningMessages()
{
std::vector<String> messages;
connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings", "" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false);
connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings", "" /* query_id */);
while (true)
{
Packet packet = connection->receivePacket();

View File

@ -796,7 +796,7 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes
receiveLogs(parsed_query);
/// Check if server send Exception packet
auto packet_type = connection->checkPacket(/* timeout_milliseconds */0);
auto packet_type = connection->checkPacket();
if (packet_type && *packet_type == Protocol::Server::Exception)
{
/*
@ -826,7 +826,7 @@ void ClientBase::receiveLogs(ASTPtr parsed_query)
while (packet_type && *packet_type == Protocol::Server::Log)
{
receiveAndProcessPacket(parsed_query, false);
packet_type = connection->checkPacket(/* timeout_milliseconds */0);
packet_type = connection->checkPacket();
}
}

View File

@ -499,7 +499,7 @@ void Connection::sendQuery(
/// Send empty block which means end of data.
if (!with_pending_data)
{
sendData(Block(), /* name */"", /* scalar */false);
sendData(Block());
out->next();
}
}
@ -654,7 +654,7 @@ protected:
num_rows += chunk.getNumRows();
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
connection.sendData(block, table_data.table_name, /* scalar */false);
connection.sendData(block, table_data.table_name);
}
private:
@ -670,7 +670,7 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
if (data.empty())
{
/// Send empty block, which means end of data transfer.
sendData(Block(), "", false);
sendData(Block());
return;
}
@ -702,16 +702,17 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
});
executor = pipeline.execute();
executor->execute(/*num_threads = */ 1);
auto read_rows = sink->getNumReadRows();
auto read_rows = sink->getNumReadRows();
rows += read_rows;
/// If table is empty, send empty block with name.
if (read_rows == 0)
sendData(sink->getPort().getHeader(), elem->table_name, /* scalar */false);
sendData(sink->getPort().getHeader(), elem->table_name);
}
/// Send empty block, which means end of data transfer.
sendData(Block(), /* name */"", /* scalar */false);
sendData(Block());
out_bytes = out->count() - out_bytes;
maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes;

View File

@ -102,22 +102,25 @@ public:
Protocol::Compression getCompression() const { return compression; }
void sendQuery(
const ConnectionTimeouts & timeouts, const String & query,
const String & query_id_, UInt64 stage,
const Settings * settings, const ClientInfo * client_info,
bool with_pending_database) override;
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_ = "",
UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings = nullptr,
const ClientInfo * client_info = nullptr,
bool with_pending_data = false) override;
void sendCancel() override;
void sendData(const Block & block, const String & name, bool scalar) override;
void sendData(const Block & block, const String & name = "", bool scalar = false) override;
void sendExternalTablesData(ExternalTablesData & data) override;
bool poll(size_t timeout_microseconds) override;
bool poll(size_t timeout_microseconds = 0) override;
bool hasReadPendingData() const override;
std::optional<UInt64> checkPacket(size_t timeout_microseconds) override;
std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0) override;
Packet receivePacket() override;

View File

@ -75,28 +75,28 @@ public:
virtual void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_ /* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings /* = nullptr */,
const ClientInfo * client_info /* = nullptr */,
bool with_pending_data /* = false */) = 0;
const String & query_id_ = "",
UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings = nullptr,
const ClientInfo * client_info = nullptr,
bool with_pending_data = false) = 0;
virtual void sendCancel() = 0;
/// Send block of data; if name is specified, server will write it to external (temporary) table of that name.
virtual void sendData(const Block & block, const String & name/* = "" */, bool scalar/* = false */) = 0;
virtual void sendData(const Block & block, const String & name = "", bool scalar = false) = 0;
/// Send all contents of external (temporary) tables.
virtual void sendExternalTablesData(ExternalTablesData & data) = 0;
/// Check, if has data to read.
virtual bool poll(size_t timeout_microseconds /* = 0 */) = 0;
virtual bool poll(size_t timeout_microseconds = 0) = 0;
/// Check, if has data in read buffer.
virtual bool hasReadPendingData() const = 0;
/// Checks if there is input data in connection and reads packet ID.
virtual std::optional<UInt64> checkPacket(size_t timeout_microseconds /* = 0 */) = 0;
virtual std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0) = 0;
/// Receive packet from server.
virtual Packet receivePacket() = 0;

View File

@ -31,11 +31,6 @@ LocalConnection::~LocalConnection()
}
}
void LocalConnection::setDefaultDatabase(const String & database)
{
default_database = database;
}
bool LocalConnection::hasReadPendingData() const
{
return !state->is_finished;
@ -361,15 +356,13 @@ Packet LocalConnection::receivePacket()
}
void LocalConnection::getServerVersion(
const ConnectionTimeouts & /* timeouts */, String & name,
UInt64 & version_major, UInt64 & version_minor,
UInt64 & version_patch, UInt64 & revision)
const ConnectionTimeouts & /* timeouts */, String & /* name */,
UInt64 & /* version_major */, UInt64 & /* version_minor */,
UInt64 & /* version_patch */, UInt64 & /* revision */) { }
void LocalConnection::setDefaultDatabase(const String & name)
{
name = server_name;
version_major = server_version_major;
version_minor = server_version_minor;
version_patch = server_version_patch;
revision = server_revision;
default_database = name;
}
UInt64 LocalConnection::getServerRevision(const ConnectionTimeouts &)

View File

@ -75,11 +75,11 @@ public:
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_ /* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings /* = nullptr */,
const ClientInfo * client_info /* = nullptr */,
bool with_pending_data /* = false */) override;
const String & query_id_ = "",
UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings = nullptr,
const ClientInfo * client_info = nullptr,
bool with_pending_data = false) override;
void sendCancel() override;
@ -87,11 +87,11 @@ public:
void sendExternalTablesData(ExternalTablesData &) override {}
bool poll(size_t timeout_microseconds) override;
bool poll(size_t timeout_microseconds = 0) override;
bool hasReadPendingData() const override;
std::optional<UInt64> checkPacket(size_t timeout_microseconds) override;
std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0) override;
Packet receivePacket() override;
@ -131,11 +131,7 @@ private:
/// Last "server" packet.
std::optional<UInt64> next_packet_type;
String description;
String server_name;
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_version_patch = 0;
String description = "clickhouse-local";
UInt64 server_revision = 0;
String server_timezone;
String server_display_name;