ClickHouse/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp

68 lines
1.5 KiB
C++
Raw Normal View History

2020-06-01 15:37:23 +00:00
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
#include <Formats/FormatFactory.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
2020-06-01 20:48:24 +00:00
extern const int CANNOT_CREATE_IO_BUFFER;
2020-06-01 15:37:23 +00:00
}
RabbitMQBlockOutputStream::RabbitMQBlockOutputStream(
2020-06-24 17:32:57 +00:00
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
2020-06-01 15:37:23 +00:00
{
}
Block RabbitMQBlockOutputStream::getHeader() const
{
2020-06-24 17:32:57 +00:00
return metadata_snapshot->getSampleBlockNonMaterialized();
2020-06-01 15:37:23 +00:00
}
void RabbitMQBlockOutputStream::writePrefix()
{
2020-08-28 08:52:02 +00:00
if (!storage.exchangeRemoved())
storage.unbindExchange();
2020-07-23 11:45:01 +00:00
2020-06-01 15:37:23 +00:00
buffer = storage.createWriteBuffer();
if (!buffer)
throw Exception("Failed to create RabbitMQ producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
buffer->activateWriting();
2020-06-01 15:37:23 +00:00
child = FormatFactory::instance().getOutput(
storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & /* columns */, size_t /* rows */)
{
2020-06-02 13:15:53 +00:00
buffer->countRow();
2020-06-01 15:37:23 +00:00
});
}
void RabbitMQBlockOutputStream::write(const Block & block)
{
child->write(block);
}
void RabbitMQBlockOutputStream::writeSuffix()
{
child->writeSuffix();
if (buffer)
2020-07-25 11:14:46 +00:00
buffer->updateMaxWait();
2020-06-01 15:37:23 +00:00
}
}