ClickHouse/dbms/src/DataStreams/RemoteBlockOutputStream.cpp
Alexey Zatelepin e4e38f71e1 allow empty header in RemoteBlockOutputStream #3411
The bug with inserts to Distributed tables was introduced in https://github.com/yandex/ClickHouse/pull/3171
It added a workaround specifically for inserting in the Native format without specifying the list of columns.
Native (as opposed to other formats) historically supports this. To signal that the input block structure
shouldn't conform to any fixed header in this case, the remote server started sending empty header block.
This commit adds support for empty headers to RemoteBlockOutputStream.
2018-10-26 18:26:07 +03:00

131 lines
3.7 KiB
C++

#include <DataStreams/RemoteBlockOutputStream.h>
#include <Client/Connection.h>
#include <common/logger_useful.h>
#include <Common/NetException.h>
#include <Common/CurrentThread.h>
#include <Interpreters/InternalTextLogsQueue.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNEXPECTED_PACKET_FROM_SERVER;
extern const int LOGICAL_ERROR;
}
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_)
: connection(connection_), query(query_), settings(settings_)
{
/** Send query and receive "header", that describe table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
connection.sendQuery(query, "", QueryProcessingStage::Complete, settings, nullptr);
while (true)
{
Connection::Packet packet = connection.receivePacket();
if (Protocol::Server::Data == packet.type)
{
header = packet.block;
break;
}
else if (Protocol::Server::Exception == packet.type)
{
packet.exception->rethrow();
break;
}
else if (Protocol::Server::Log == packet.type)
{
/// Pass logs from remote server to client
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
log_queue->pushBlock(std::move(packet.block));
}
else
throw NetException("Unexpected packet from server (expected Data or Exception, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
}
}
void RemoteBlockOutputStream::write(const Block & block)
{
if (header)
assertBlocksHaveEqualStructure(block, header, "RemoteBlockOutputStream");
try
{
connection.sendData(block);
}
catch (const NetException &)
{
/// Try to get more detailed exception from server
auto packet_type = connection.checkPacket();
if (packet_type && *packet_type == Protocol::Server::Exception)
{
Connection::Packet packet = connection.receivePacket();
packet.exception->rethrow();
}
throw;
}
}
void RemoteBlockOutputStream::writePrepared(ReadBuffer & input, size_t size)
{
/// We cannot use 'header'. Input must contain block with proper structure.
connection.sendPreparedData(input, size);
}
void RemoteBlockOutputStream::writeSuffix()
{
/// Empty block means end of data.
connection.sendData(Block());
/// Wait for EndOfStream or Exception packet, skip Log packets.
while (true)
{
Connection::Packet packet = connection.receivePacket();
if (Protocol::Server::EndOfStream == packet.type)
break;
else if (Protocol::Server::Exception == packet.type)
packet.exception->rethrow();
else if (Protocol::Server::Log == packet.type)
{
// Do nothing
}
else
throw NetException("Unexpected packet from server (expected EndOfStream or Exception, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
}
finished = true;
}
RemoteBlockOutputStream::~RemoteBlockOutputStream()
{
/// If interrupted in the middle of the loop of communication with the server, then interrupt the connection,
/// to not leave the connection in unsynchronized state.
if (!finished)
{
try
{
connection.disconnect();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
}