2020-06-01 15:37:23 +00:00
|
|
|
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
|
|
|
|
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
|
|
|
|
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
|
|
|
|
#include <Formats/FormatFactory.h>
|
|
|
|
#include <common/logger_useful.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-07-23 14:25:35 +00:00
|
|
|
RabbitMQSink::RabbitMQSink(
|
2020-06-24 17:32:57 +00:00
|
|
|
StorageRabbitMQ & storage_,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context_)
|
2021-07-23 14:25:35 +00:00
|
|
|
: SinkToStorage(metadata_snapshot->getSampleBlockNonMaterialized())
|
|
|
|
, storage(storage_)
|
2020-06-24 17:32:57 +00:00
|
|
|
, metadata_snapshot(metadata_snapshot_)
|
|
|
|
, context(context_)
|
2020-06-01 15:37:23 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-07-23 14:25:35 +00:00
|
|
|
void RabbitMQSink::onStart()
|
2020-06-01 15:37:23 +00:00
|
|
|
{
|
2020-08-28 08:52:02 +00:00
|
|
|
if (!storage.exchangeRemoved())
|
2020-07-21 15:47:39 +00:00
|
|
|
storage.unbindExchange();
|
2020-07-23 11:45:01 +00:00
|
|
|
|
2020-06-01 15:37:23 +00:00
|
|
|
buffer = storage.createWriteBuffer();
|
2020-06-29 12:32:04 +00:00
|
|
|
buffer->activateWriting();
|
|
|
|
|
2020-11-02 07:50:38 +00:00
|
|
|
auto format_settings = getFormatSettings(context);
|
2021-01-11 01:50:30 +00:00
|
|
|
format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
|
2020-11-02 07:50:38 +00:00
|
|
|
|
2020-12-30 03:07:30 +00:00
|
|
|
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer,
|
2021-07-23 14:25:35 +00:00
|
|
|
getPort().getHeader(), context,
|
2020-11-02 07:50:38 +00:00
|
|
|
[this](const Columns & /* columns */, size_t /* rows */)
|
|
|
|
{
|
|
|
|
buffer->countRow();
|
|
|
|
},
|
|
|
|
format_settings);
|
2020-06-01 15:37:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-07-23 14:25:35 +00:00
|
|
|
void RabbitMQSink::consume(Chunk chunk)
|
2020-06-01 15:37:23 +00:00
|
|
|
{
|
2021-07-23 14:25:35 +00:00
|
|
|
child->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
|
2020-06-01 15:37:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-07-23 14:25:35 +00:00
|
|
|
void RabbitMQSink::onFinish()
|
2020-06-01 15:37:23 +00:00
|
|
|
{
|
|
|
|
child->writeSuffix();
|
2020-07-20 06:21:18 +00:00
|
|
|
|
|
|
|
if (buffer)
|
2020-07-25 11:14:46 +00:00
|
|
|
buffer->updateMaxWait();
|
2020-06-01 15:37:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|