ClickHouse/src/Storages/Kafka/KafkaBlockOutputStream.cpp

66 lines
1.5 KiB
C++
Raw Normal View History

2020-11-06 14:07:56 +00:00
#include <Storages/Kafka/KafkaBlockOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Storages/Kafka/WriteBufferToKafkaProducer.h>
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:20:38 +00:00
extern const int CANNOT_CREATE_IO_BUFFER;
}
KafkaBlockOutputStream::KafkaBlockOutputStream(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<Context> & context_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
{
}
Block KafkaBlockOutputStream::getHeader() const
{
return metadata_snapshot->getSampleBlockNonMaterialized();
}
void KafkaBlockOutputStream::writePrefix()
{
2020-02-03 10:02:52 +00:00
buffer = storage.createWriteBuffer(getHeader());
if (!buffer)
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
auto format_settings = getFormatSettings(*context);
format_settings.protobuf.allow_many_rows_no_delimiters = true;
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer,
getHeader(), *context,
[this](const Columns & columns, size_t row)
{
buffer->countRow(columns, row);
},
format_settings);
}
void KafkaBlockOutputStream::write(const Block & block)
{
child->write(block);
}
void KafkaBlockOutputStream::writeSuffix()
{
2020-08-28 05:26:55 +00:00
if (child)
child->writeSuffix();
flush();
}
void KafkaBlockOutputStream::flush()
{
if (buffer)
buffer->flush();
}
}