2019-08-20 11:17:57 +00:00
|
|
|
#include "KafkaBlockOutputStream.h"
|
|
|
|
|
|
|
|
#include <Formats/FormatFactory.h>
|
2019-10-22 10:31:28 +00:00
|
|
|
#include <Storages/Kafka/WriteBufferToKafkaProducer.h>
|
2019-08-20 11:17:57 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-02-25 18:20:38 +00:00
|
|
|
extern const int CANNOT_CREATE_IO_BUFFER;
|
2019-08-20 11:17:57 +00:00
|
|
|
}
|
|
|
|
|
2020-06-16 12:48:10 +00:00
|
|
|
KafkaBlockOutputStream::KafkaBlockOutputStream(
|
|
|
|
StorageKafka & storage_,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
|
|
|
const std::shared_ptr<Context> & context_)
|
|
|
|
: storage(storage_)
|
|
|
|
, metadata_snapshot(metadata_snapshot_)
|
|
|
|
, context(context_)
|
2019-08-20 11:17:57 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
Block KafkaBlockOutputStream::getHeader() const
|
|
|
|
{
|
2020-06-16 12:48:10 +00:00
|
|
|
return metadata_snapshot->getSampleBlockNonMaterialized();
|
2019-08-20 11:17:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void KafkaBlockOutputStream::writePrefix()
|
|
|
|
{
|
2020-02-03 10:02:52 +00:00
|
|
|
buffer = storage.createWriteBuffer(getHeader());
|
2019-08-20 11:17:57 +00:00
|
|
|
if (!buffer)
|
|
|
|
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
|
|
|
|
|
2020-06-03 10:00:15 +00:00
|
|
|
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), *context, [this](const Columns & columns, size_t row){ buffer->countRow(columns, row); });
|
2019-08-20 11:17:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void KafkaBlockOutputStream::write(const Block & block)
|
|
|
|
{
|
|
|
|
child->write(block);
|
|
|
|
}
|
|
|
|
|
|
|
|
void KafkaBlockOutputStream::writeSuffix()
|
|
|
|
{
|
|
|
|
child->writeSuffix();
|
|
|
|
flush();
|
|
|
|
}
|
|
|
|
|
|
|
|
void KafkaBlockOutputStream::flush()
|
|
|
|
{
|
|
|
|
if (buffer)
|
|
|
|
buffer->flush();
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|