New feature: LineAsString format. #13630

This commit is contained in:
hexiaoting 2020-08-17 18:20:23 +08:00
parent 17eb8d24a7
commit 405a6fb08f
5 changed files with 135 additions and 0 deletions

View File

@ -365,6 +365,7 @@ FormatFactory::FormatFactory()
registerInputFormatProcessorMsgPack(*this); registerInputFormatProcessorMsgPack(*this);
registerOutputFormatProcessorMsgPack(*this); registerOutputFormatProcessorMsgPack(*this);
registerInputFormatProcessorJSONAsString(*this); registerInputFormatProcessorJSONAsString(*this);
registerInputFormatProcessorLineAsString(*this);
registerFileSegmentationEngineTabSeparated(*this); registerFileSegmentationEngineTabSeparated(*this);
registerFileSegmentationEngineCSV(*this); registerFileSegmentationEngineCSV(*this);

View File

@ -210,5 +210,6 @@ void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory);
void registerInputFormatProcessorCapnProto(FormatFactory & factory); void registerInputFormatProcessorCapnProto(FormatFactory & factory);
void registerInputFormatProcessorRegexp(FormatFactory & factory); void registerInputFormatProcessorRegexp(FormatFactory & factory);
void registerInputFormatProcessorJSONAsString(FormatFactory & factory); void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorLineAsString(FormatFactory & factory);
} }

View File

@ -0,0 +1,101 @@
#include <Processors/Formats/Impl/LineAsStringRowInputFormat.h>
#include <Formats/JSONEachRowUtils.h>
#include <common/find_symbols.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_DATA;
}
LineAsStringRowInputFormat::LineAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
IRowInputFormat(header_, in_, std::move(params_)), buf(in)
{
if (header_.columns() > 1 || header_.getDataTypes()[0]->getTypeId() != TypeIndex::String)
{
throw Exception("This input format is only suitable for tables with a single column of type String.", ErrorCodes::LOGICAL_ERROR);
}
}
void LineAsStringRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
}
void LineAsStringRowInputFormat::readLineObject(IColumn & column)
{
PeekableReadBufferCheckpoint checkpoint{buf};
size_t balance = 0;
if (*buf.position() != '"')
throw Exception("Line object must begin with '\"'.", ErrorCodes::INCORRECT_DATA);
++buf.position();
++balance;
char * pos;
while (balance)
{
if (buf.eof())
throw Exception("Unexpected end of file while parsing Line object.", ErrorCodes::INCORRECT_DATA);
pos = find_last_symbols_or_null<'"', '\\'>(buf.position(), buf.buffer().end());
buf.position() = pos;
if (buf.position() == buf.buffer().end())
continue;
else if (*buf.position() == '"')
{
--balance;
++buf.position();
}
else if (*buf.position() == '\\')
{
++buf.position();
if (!buf.eof())
{
++buf.position();
}
}
}
buf.makeContinuousMemoryFromCheckpointToPos();
char * end = buf.position();
buf.rollbackToCheckpoint();
column.insertData(buf.position(), end - buf.position());
buf.position() = end;
}
bool LineAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
skipWhitespaceIfAny(buf);
if (!buf.eof())
readLineObject(*columns[0]);
skipWhitespaceIfAny(buf);
if (!buf.eof() && *buf.position() == ',')
++buf.position();
skipWhitespaceIfAny(buf);
return !buf.eof();
}
void registerInputFormatProcessorLineAsString(FormatFactory & factory)
{
factory.registerInputFormatProcessor("LineAsString", [](
ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<LineAsStringRowInputFormat>(sample, buf, params);
});
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <IO/PeekableReadBuffer.h>
namespace DB
{
class ReadBuffer;
/// This format parses a sequence of Line objects separated by newlines, spaces and/or comma.
/// Each Line object is parsed as a whole to string.
/// This format can only parse a table with single field of type String.
class LineAsStringRowInputFormat : public IRowInputFormat
{
public:
LineAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "LineAsStringRowInputFormat"; }
void resetParser() override;
private:
void readLineObject(IColumn & column);
PeekableReadBuffer buf;
};
}

View File

@ -23,6 +23,7 @@ SRCS(
Formats/Impl/ConstantExpressionTemplate.cpp Formats/Impl/ConstantExpressionTemplate.cpp
Formats/Impl/CSVRowInputFormat.cpp Formats/Impl/CSVRowInputFormat.cpp
Formats/Impl/CSVRowOutputFormat.cpp Formats/Impl/CSVRowOutputFormat.cpp
Formats/Impl/LineAsStringRowInputFormat.cpp
Formats/Impl/JSONAsStringRowInputFormat.cpp Formats/Impl/JSONAsStringRowInputFormat.cpp
Formats/Impl/JSONCompactEachRowRowInputFormat.cpp Formats/Impl/JSONCompactEachRowRowInputFormat.cpp
Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp