2020-06-01 15:37:23 +00:00
|
|
|
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
|
|
|
|
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
|
|
|
|
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
|
|
|
|
#include <Formats/FormatFactory.h>
|
|
|
|
#include <common/logger_useful.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-06-01 20:48:24 +00:00
|
|
|
extern const int CANNOT_CREATE_IO_BUFFER;
|
2020-06-01 15:37:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
RabbitMQBlockOutputStream::RabbitMQBlockOutputStream(
|
2020-06-24 17:32:57 +00:00
|
|
|
StorageRabbitMQ & storage_,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
|
|
|
const Context & context_)
|
|
|
|
: storage(storage_)
|
|
|
|
, metadata_snapshot(metadata_snapshot_)
|
|
|
|
, context(context_)
|
2020-06-01 15:37:23 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Block RabbitMQBlockOutputStream::getHeader() const
|
|
|
|
{
|
2020-06-24 17:32:57 +00:00
|
|
|
return metadata_snapshot->getSampleBlockNonMaterialized();
|
2020-06-01 15:37:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void RabbitMQBlockOutputStream::writePrefix()
|
|
|
|
{
|
|
|
|
buffer = storage.createWriteBuffer();
|
|
|
|
if (!buffer)
|
|
|
|
throw Exception("Failed to create RabbitMQ producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
|
|
|
|
|
|
|
|
child = FormatFactory::instance().getOutput(
|
|
|
|
storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & /* columns */, size_t /* rows */)
|
|
|
|
{
|
2020-06-02 13:15:53 +00:00
|
|
|
buffer->countRow();
|
2020-06-01 15:37:23 +00:00
|
|
|
});
|
2020-06-10 19:59:37 +00:00
|
|
|
|
|
|
|
buffer->startEventLoop();
|
2020-06-01 15:37:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void RabbitMQBlockOutputStream::write(const Block & block)
|
|
|
|
{
|
|
|
|
child->write(block);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void RabbitMQBlockOutputStream::writeSuffix()
|
|
|
|
{
|
|
|
|
child->writeSuffix();
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|