Added format RawBLOB

This commit is contained in:
Alexey Milovidov 2020-09-28 03:11:19 +03:00
parent 9446e43146
commit a7ddd8489a
8 changed files with 196 additions and 5 deletions

View File

@ -367,6 +367,8 @@ void registerInputFormatProcessorArrow(FormatFactory & factory);
void registerOutputFormatProcessorArrow(FormatFactory & factory); void registerOutputFormatProcessorArrow(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory); void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory); void registerOutputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorRawBLOB(FormatFactory & factory);
void registerOutputFormatProcessorRawBLOB(FormatFactory & factory);
/// Output only (presentational) formats. /// Output only (presentational) formats.
@ -426,6 +428,9 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorTemplate(*this); registerOutputFormatProcessorTemplate(*this);
registerInputFormatProcessorMsgPack(*this); registerInputFormatProcessorMsgPack(*this);
registerOutputFormatProcessorMsgPack(*this); registerOutputFormatProcessorMsgPack(*this);
registerInputFormatProcessorRawBLOB(*this);
registerOutputFormatProcessorRawBLOB(*this);
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
registerInputFormatProcessorORC(*this); registerInputFormatProcessorORC(*this);
registerOutputFormatProcessorORC(*this); registerOutputFormatProcessorORC(*this);
@ -456,6 +461,7 @@ FormatFactory::FormatFactory()
registerInputFormatProcessorRegexp(*this); registerInputFormatProcessorRegexp(*this);
registerInputFormatProcessorJSONAsString(*this); registerInputFormatProcessorJSONAsString(*this);
registerInputFormatProcessorLineAsString(*this); registerInputFormatProcessorLineAsString(*this);
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
registerInputFormatProcessorCapnProto(*this); registerInputFormatProcessorCapnProto(*this);
#endif #endif

View File

@ -1,5 +1,7 @@
#include <Processors/Formats/Impl/JSONAsStringRowInputFormat.h> #include <Processors/Formats/Impl/JSONAsStringRowInputFormat.h>
#include <Formats/JSONEachRowUtils.h> #include <Formats/JSONEachRowUtils.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <common/find_symbols.h> #include <common/find_symbols.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
@ -8,17 +10,22 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA; extern const int INCORRECT_DATA;
} }
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) : JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
IRowInputFormat(header_, in_, std::move(params_)), buf(in) IRowInputFormat(header_, in_, std::move(params_)), buf(in)
{ {
if (header_.columns() > 1 || header_.getDataTypes()[0]->getTypeId() != TypeIndex::String) if (header_.columns() > 1)
{ throw Exception(ErrorCodes::BAD_ARGUMENTS,
throw Exception("This input format is only suitable for tables with a single column of type String.", ErrorCodes::LOGICAL_ERROR); "This input format is only suitable for tables with a single column of type String but the number of columns is {}",
} header_.columns());
if (!isString(removeNullable(removeLowCardinality(header_.getByPosition(0).type))))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"This input format is only suitable for tables with a single column of type String but the column type is {}",
header_.getByPosition(0).type->getName());
} }
void JSONAsStringRowInputFormat::resetParser() void JSONAsStringRowInputFormat::resetParser()

View File

@ -0,0 +1,55 @@
#include <Formats/FormatFactory.h>
#include <Processors/Formats/Impl/RawBLOBRowInputFormat.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
RawBLOBRowInputFormat::RawBLOBRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, std::move(params_))
{
if (header_.columns() > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"This input format is only suitable for tables with a single column of type String but the number of columns is {}",
header_.columns());
if (!isString(removeNullable(removeLowCardinality(header_.getByPosition(0).type))))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"This input format is only suitable for tables with a single column of type String but the column type is {}",
header_.getByPosition(0).type->getName());
}
bool RawBLOBRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in.eof())
return false;
/// One excessive copy.
String blob;
readStringUntilEOF(blob, in);
columns.at(0)->insertData(blob.data(), blob.size());
return false;
}
void registerInputFormatProcessorRawBLOB(FormatFactory & factory)
{
factory.registerInputFormatProcessor("RawBLOB", [](
ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<RawBLOBRowInputFormat>(sample, buf, params);
});
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Processors/Formats/IRowInputFormat.h>
namespace DB
{
class ReadBuffer;
/// This format slurps all input data into single value.
/// This format can only parse a table with single field of type String or similar.
class RawBLOBRowInputFormat : public IRowInputFormat
{
public:
RawBLOBRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension &) override;
String getName() const override { return "RawBLOBRowInputFormat"; }
};
}

View File

@ -0,0 +1,38 @@
#include <Processors/Formats/Impl/RawBLOBRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteBuffer.h>
namespace DB
{
RawBLOBRowOutputFormat::RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback)
: IRowOutputFormat(header_, out_, callback)
{
}
void RawBLOBRowOutputFormat::writeField(const IColumn & column, const IDataType &, size_t row_num)
{
StringRef value = column.getDataAt(row_num);
out.write(value.data, value.size);
}
void registerOutputFormatProcessorRawBLOB(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("RawBLOB", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
return std::make_shared<RawBLOBRowOutputFormat>(buf, sample, callback);
});
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <Core/Block.h>
#include <Processors/Formats/IRowOutputFormat.h>
namespace DB
{
class WriteBuffer;
/** This format only allows to output one column of type String or similar.
* It is output as raw bytes without any delimiters or escaping.
*
* The difference between RawBLOB and TSVRaw:
* - only single column dataset is supported;
* - data is output in binary;
* - no newline at the end of each value.
*
* The difference between RawBLOB and RowBinary:
* - only single column dataset is supported;
* - strings are output without their lengths.
*
* If you are output more than one value, the output format is ambiguous and you may not be able to read data back.
*/
class RawBLOBRowOutputFormat : public IRowOutputFormat
{
public:
RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback);
String getName() const override { return "RawBLOBRowOutputFormat"; }
void writeField(const IColumn & column, const IDataType &, size_t row_num) override;
};
}

View File

@ -0,0 +1,2 @@
96b229180107fd2d23fd0a2ef9326701 -
96b229180107fd2d23fd0a2ef9326701 -

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -n --query "
DROP TABLE IF EXISTS t;
CREATE TABLE t (a LowCardinality(Nullable(String))) ENGINE = Memory;
"
${CLICKHOUSE_CLIENT} --query "INSERT INTO t FORMAT RawBLOB" < ${BASH_SOURCE[0]}
cat ${BASH_SOURCE[0]} | md5sum
${CLICKHOUSE_CLIENT} -n --query "SELECT * FROM t FORMAT RawBLOB" | md5sum
${CLICKHOUSE_CLIENT} --query "
DROP TABLE t;
"