dbms: probably fixed error when writeSuffix method wasn't called when writing to table [#METR-12767].

This commit is contained in:
Alexey Milovidov 2014-09-18 23:49:31 +04:00
parent 6754c49010
commit 81c268177b
4 changed files with 42 additions and 26 deletions

View File

@ -24,9 +24,7 @@ public:
{
}
String getName() const { return "AddingDefaultBlockOutputStream"; }
void write(const Block & block)
void write(const Block & block) override
{
Block res = block;
res.addDefaults(required_columns);
@ -35,6 +33,9 @@ public:
void flush() { output->flush(); }
void writePrefix() override { output->writePrefix(); }
void writeSuffix() override { output->writeSuffix(); }
private:
BlockOutputStreamPtr output;
NamesAndTypesListPtr required_columns;

View File

@ -38,9 +38,7 @@ public:
output = storage->write(query_ptr);
}
String getName() const { return "PushingToViewsBlockOutputStream"; }
void write(const Block & block)
void write(const Block & block) override
{
for (size_t i = 0; i < children.size(); ++i)
{
@ -54,6 +52,18 @@ public:
output->write(block);
}
void writePrefix() override
{
if (output)
output->writePrefix();
}
void writeSuffix() override
{
if (output)
output->writeSuffix();
}
private:
StoragePtr storage;
BlockOutputStreamPtr output;

View File

@ -20,23 +20,31 @@ public:
}
/// Можно вызывать после writePrefix, чтобы получить структуру таблицы.
Block getSampleBlock() const
{
return sample_block;
}
void writePrefix() override
{
/** Отправляет запрос и получает блок-пример, описывающий структуру таблицы.
* Он нужен, чтобы знать, какие блоки передавать в метод write.
* Вызывайте только перед write.
*/
Block sendQueryAndGetSampleBlock()
{
connection.sendQuery(query, "", QueryProcessingStage::Complete, settings);
sent_query = true;
Connection::Packet packet = connection.receivePacket();
if (Protocol::Server::Data == packet.type)
return sample_block = packet.block;
{
sample_block = packet.block;
}
else if (Protocol::Server::Exception == packet.type)
{
packet.exception->rethrow();
return Block();
return;
}
else
throw Exception("Unexpected packet from server (expected Data or Exception, got "
@ -44,11 +52,8 @@ public:
}
void write(const Block & block)
void write(const Block & block) override
{
if (!sent_query)
sendQueryAndGetSampleBlock();
if (!blocksHaveEqualStructure(block, sample_block))
{
std::stringstream message;
@ -66,14 +71,12 @@ public:
/// Отправить блок данных, который уже был заранее сериализован (и, если надо, сжат), который следует прочитать из input-а.
void writePrepared(ReadBuffer & input, size_t size = 0)
{
if (!sent_query)
sendQueryAndGetSampleBlock(); /// Никак не можем использовать sample_block.
/// Не можем использовать sample_block.
connection.sendPreparedData(input, size);
}
void writeSuffix()
void writeSuffix() override
{
/// Пустой блок означает конец данных.
connection.sendData(Block());
@ -97,8 +100,6 @@ private:
String query;
Settings * settings;
Block sample_block;
bool sent_query = false;
};
}

View File

@ -241,11 +241,15 @@ void TCPHandler::readData(const Settings & global_settings)
void TCPHandler::processInsertQuery(const Settings & global_settings)
{
/** Сделано выше остальных строк, чтобы в случае, когда функция writePrefix кидает эксепшен,
* клиент получил эксепшен до того, как начнёт отправлять данные.
*/
state.io.out->writePrefix();
/// Отправляем клиенту блок - структура таблицы.
Block block = state.io.out_sample;
sendData(block);
state.io.out->writePrefix();
readData(global_settings);
state.io.out->writeSuffix();
}