2020-08-17 10:20:23 +00:00
|
|
|
#include <Processors/Formats/Impl/LineAsStringRowInputFormat.h>
|
|
|
|
#include <Formats/JSONEachRowUtils.h>
|
|
|
|
#include <common/find_symbols.h>
|
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int INCORRECT_DATA;
|
|
|
|
}
|
|
|
|
|
|
|
|
LineAsStringRowInputFormat::LineAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
|
|
|
|
IRowInputFormat(header_, in_, std::move(params_)), buf(in)
|
|
|
|
{
|
|
|
|
if (header_.columns() > 1 || header_.getDataTypes()[0]->getTypeId() != TypeIndex::String)
|
|
|
|
{
|
|
|
|
throw Exception("This input format is only suitable for tables with a single column of type String.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void LineAsStringRowInputFormat::resetParser()
|
|
|
|
{
|
|
|
|
IRowInputFormat::resetParser();
|
|
|
|
buf.reset();
|
|
|
|
}
|
|
|
|
|
|
|
|
void LineAsStringRowInputFormat::readLineObject(IColumn & column)
|
|
|
|
{
|
|
|
|
PeekableReadBufferCheckpoint checkpoint{buf};
|
2020-08-19 03:50:43 +00:00
|
|
|
bool newline = true;
|
|
|
|
bool over = false;
|
2020-08-17 10:20:23 +00:00
|
|
|
|
|
|
|
char * pos;
|
|
|
|
|
2020-08-19 03:50:43 +00:00
|
|
|
while (newline)
|
2020-08-17 10:20:23 +00:00
|
|
|
{
|
2020-08-19 03:50:43 +00:00
|
|
|
pos = find_first_symbols<'\n', '\\'>(buf.position(), buf.buffer().end());
|
2020-08-17 10:20:23 +00:00
|
|
|
buf.position() = pos;
|
2020-08-19 03:50:43 +00:00
|
|
|
if (buf.position() == buf.buffer().end()) {
|
|
|
|
over = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
else if (*buf.position() == '\n')
|
2020-08-17 10:20:23 +00:00
|
|
|
{
|
2020-08-19 03:50:43 +00:00
|
|
|
newline = false;
|
2020-08-17 10:20:23 +00:00
|
|
|
}
|
|
|
|
else if (*buf.position() == '\\')
|
|
|
|
{
|
|
|
|
++buf.position();
|
|
|
|
if (!buf.eof())
|
|
|
|
{
|
|
|
|
++buf.position();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
2020-08-19 03:50:43 +00:00
|
|
|
|
2020-08-17 10:20:23 +00:00
|
|
|
buf.makeContinuousMemoryFromCheckpointToPos();
|
2020-08-19 03:50:43 +00:00
|
|
|
char * end = over ? buf.position(): ++buf.position();
|
2020-08-17 10:20:23 +00:00
|
|
|
buf.rollbackToCheckpoint();
|
2020-08-19 03:50:43 +00:00
|
|
|
column.insertData(buf.position(), end - (over ? 0 : 1) - buf.position());
|
2020-08-17 10:20:23 +00:00
|
|
|
buf.position() = end;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool LineAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
|
|
|
|
{
|
|
|
|
if (!buf.eof())
|
|
|
|
readLineObject(*columns[0]);
|
|
|
|
|
|
|
|
return !buf.eof();
|
|
|
|
}
|
|
|
|
|
|
|
|
void registerInputFormatProcessorLineAsString(FormatFactory & factory)
|
|
|
|
{
|
|
|
|
factory.registerInputFormatProcessor("LineAsString", [](
|
|
|
|
ReadBuffer & buf,
|
|
|
|
const Block & sample,
|
|
|
|
const RowInputFormatParams & params,
|
|
|
|
const FormatSettings &)
|
|
|
|
{
|
|
|
|
return std::make_shared<LineAsStringRowInputFormat>(sample, buf, params);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|