ClickHouse/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp

176 lines
5.5 KiB
C++
Raw Normal View History

2020-03-12 14:18:39 +00:00
#include <stdlib.h>
2021-10-02 07:13:14 +00:00
#include <base/find_symbols.h>
#include <Processors/Formats/Impl/RegexpRowInputFormat.h>
2021-03-09 14:46:52 +00:00
#include <DataTypes/Serializations/SerializationNullable.h>
2021-11-09 13:14:07 +00:00
#include <Formats/EscapingRuleUtils.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
}
RegexpRowInputFormat::RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: RegexpRowInputFormat(std::make_unique<PeekableReadBuffer>(in_), header_, params_, format_settings_)
{
}
RegexpRowInputFormat::RegexpRowInputFormat(
std::unique_ptr<PeekableReadBuffer> buf_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, *buf_, std::move(params_))
, buf(std::move(buf_))
, format_settings(format_settings_)
2021-11-09 13:14:07 +00:00
, escaping_rule(format_settings_.regexp.escaping_rule)
, regexp(format_settings_.regexp.regexp)
{
size_t fields_count = regexp.NumberOfCapturingGroups();
matched_fields.resize(fields_count);
re2_arguments.resize(fields_count);
re2_arguments_ptrs.resize(fields_count);
for (size_t i = 0; i != fields_count; ++i)
{
// Bind an argument to a matched field.
re2_arguments[i] = &matched_fields[i];
// Save pointer to argument.
re2_arguments_ptrs[i] = &re2_arguments[i];
}
}
void RegexpRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf->reset();
}
bool RegexpRowInputFormat::readField(size_t index, MutableColumns & columns)
{
const auto & type = getPort().getHeader().getByPosition(index).type;
2020-03-27 20:10:03 +00:00
ReadBuffer field_buf(const_cast<char *>(matched_fields[index].data()), matched_fields[index].size(), 0);
try
{
2021-11-09 13:14:07 +00:00
return deserializeFieldByEscapingRule(type, serializations[index], *columns[index], field_buf, escaping_rule, format_settings);
}
catch (Exception & e)
{
e.addMessage("(while reading the value of column " + getPort().getHeader().getByPosition(index).name + ")");
throw;
}
}
void RegexpRowInputFormat::readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext)
{
if (matched_fields.size() != columns.size())
throw Exception("The number of matched fields in line doesn't match the number of columns.", ErrorCodes::INCORRECT_DATA);
ext.read_columns.assign(columns.size(), false);
for (size_t columns_index = 0; columns_index < columns.size(); ++columns_index)
{
ext.read_columns[columns_index] = readField(columns_index, columns);
}
}
bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (buf->eof())
return false;
PeekableReadBufferCheckpoint checkpoint{*buf};
size_t line_size = 0;
2020-03-28 01:15:09 +00:00
do
{
char * pos = find_first_symbols<'\n', '\r'>(buf->position(), buf->buffer().end());
line_size += pos - buf->position();
buf->position() = pos;
} while (buf->position() == buf->buffer().end() && !buf->eof());
buf->makeContinuousMemoryFromCheckpointToPos();
buf->rollbackToCheckpoint();
bool match = RE2::FullMatchN(re2::StringPiece(buf->position(), line_size), regexp, re2_arguments_ptrs.data(), re2_arguments_ptrs.size());
bool read_line = true;
if (!match)
{
if (!format_settings.regexp.skip_unmatched)
throw Exception("Line \"" + std::string(buf->position(), line_size) + "\" doesn't match the regexp.", ErrorCodes::INCORRECT_DATA);
read_line = false;
}
if (read_line)
readFieldsFromMatch(columns, ext);
buf->position() += line_size;
checkChar('\r', *buf);
if (!buf->eof() && !checkChar('\n', *buf))
2020-03-27 20:10:03 +00:00
throw Exception("No \\n after \\r at the end of line.", ErrorCodes::INCORRECT_DATA);
return true;
}
void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
2021-10-11 16:11:50 +00:00
void registerInputFormatRegexp(FormatFactory & factory)
{
2021-10-11 16:11:50 +00:00
factory.registerInputFormat("Regexp", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<RegexpRowInputFormat>(buf, sample, std::move(params), settings);
});
}
2020-11-30 16:42:41 +00:00
static std::pair<bool, size_t> fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
char * pos = in.position();
2020-03-27 20:10:03 +00:00
bool need_more_data = true;
2020-11-30 16:42:41 +00:00
size_t number_of_rows = 0;
2020-03-27 20:10:03 +00:00
while (loadAtPosition(in, memory, pos) && need_more_data)
{
pos = find_first_symbols<'\n', '\r'>(pos, in.buffer().end());
if (pos > in.buffer().end())
throw Exception("Position in buffer is out of bounds. There must be a bug.", ErrorCodes::LOGICAL_ERROR);
else if (pos == in.buffer().end())
continue;
// Support DOS-style newline ("\r\n")
2020-03-27 20:10:03 +00:00
if (*pos == '\r')
{
2020-03-27 20:10:03 +00:00
++pos;
if (pos == in.buffer().end())
loadAtPosition(in, memory, pos);
}
2020-03-27 20:10:03 +00:00
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;
++pos;
2020-11-30 16:42:41 +00:00
++number_of_rows;
}
saveUpToPosition(in, memory, pos);
2020-11-30 16:42:41 +00:00
return {loadAtPosition(in, memory, pos), number_of_rows};
}
void registerFileSegmentationEngineRegexp(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("Regexp", &fileSegmentationEngineRegexpImpl);
}
}