ClickHouse/dbms/src/Processors/Formats/Impl/BinaryRowOutputFormat.cpp
Ivan a502424c33
Implement support for insertion into Kafka tables (#6012)
* Add write-callback on each row for RowOutputStream
* Fix build of new rdkafka library
* Poll messages if Kafka outgoing queue is full
* Add test
* Add test producer-consumer
* Truncate delimiter from last row in message
2019-08-20 14:17:57 +03:00

74 lines
1.9 KiB
C++

#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Columns/IColumn.h>
#include <DataTypes/IDataType.h>
#include <Processors/Formats/Impl/BinaryRowOutputFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback)
: IRowOutputFormat(header, out_, callback), with_names(with_names_), with_types(with_types_)
{
}
void BinaryRowOutputFormat::writePrefix()
{
auto & header = getPort(PortKind::Main).getHeader();
size_t columns = header.columns();
if (with_names || with_types)
{
writeVarUInt(columns, out);
}
if (with_names)
{
for (size_t i = 0; i < columns; ++i)
{
writeStringBinary(header.safeGetByPosition(i).name, out);
}
}
if (with_types)
{
for (size_t i = 0; i < columns; ++i)
{
writeStringBinary(header.safeGetByPosition(i).type->getName(), out);
}
}
}
void BinaryRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{
type.serializeBinary(column, row_num, out);
}
void registerOutputFormatProcessorRowBinary(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("RowBinary", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, callback);
});
factory.registerOutputFormatProcessor("RowBinaryWithNamesAndTypes", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true, callback);
});
}
}