mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 10:52:30 +00:00
58 lines
1.2 KiB
C++
58 lines
1.2 KiB
C++
|
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
|
||
|
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
|
||
|
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
|
||
|
#include <Formats/FormatFactory.h>
|
||
|
#include <common/logger_useful.h>
|
||
|
|
||
|
|
||
|
namespace DB
|
||
|
{
|
||
|
|
||
|
namespace ErrorCodes
|
||
|
{
|
||
|
extern int CANNOT_CREATE_IO_BUFFER;
|
||
|
}
|
||
|
|
||
|
|
||
|
RabbitMQBlockOutputStream::RabbitMQBlockOutputStream(
|
||
|
StorageRabbitMQ & storage_, const Context & context_) : storage(storage_), context(context_)
|
||
|
{
|
||
|
}
|
||
|
|
||
|
|
||
|
Block RabbitMQBlockOutputStream::getHeader() const
|
||
|
{
|
||
|
return storage.getSampleBlockNonMaterialized();
|
||
|
}
|
||
|
|
||
|
|
||
|
void RabbitMQBlockOutputStream::writePrefix()
|
||
|
{
|
||
|
buffer = storage.createWriteBuffer();
|
||
|
if (!buffer)
|
||
|
throw Exception("Failed to create RabbitMQ producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
|
||
|
|
||
|
child = FormatFactory::instance().getOutput(
|
||
|
storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & /* columns */, size_t /* rows */)
|
||
|
{
|
||
|
buffer->count_row();
|
||
|
});
|
||
|
}
|
||
|
|
||
|
|
||
|
void RabbitMQBlockOutputStream::write(const Block & block)
|
||
|
{
|
||
|
child->write(block);
|
||
|
|
||
|
if (buffer)
|
||
|
buffer->flush();
|
||
|
}
|
||
|
|
||
|
|
||
|
void RabbitMQBlockOutputStream::writeSuffix()
|
||
|
{
|
||
|
child->writeSuffix();
|
||
|
}
|
||
|
|
||
|
}
|