2020-03-12 14:18:39 +00:00
|
|
|
#include <stdlib.h>
|
2021-10-02 07:13:14 +00:00
|
|
|
#include <base/find_symbols.h>
|
2020-02-14 19:48:45 +00:00
|
|
|
#include <Processors/Formats/Impl/RegexpRowInputFormat.h>
|
2021-03-09 14:46:52 +00:00
|
|
|
#include <DataTypes/Serializations/SerializationNullable.h>
|
2021-11-09 13:14:07 +00:00
|
|
|
#include <Formats/EscapingRuleUtils.h>
|
2020-02-14 19:48:45 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int INCORRECT_DATA;
|
2021-02-15 13:10:14 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2020-02-14 19:48:45 +00:00
|
|
|
}
|
|
|
|
|
2021-12-15 11:30:57 +00:00
|
|
|
RegexpFieldExtractor::RegexpFieldExtractor(const FormatSettings & format_settings) : regexp(format_settings.regexp.regexp), skip_unmatched(format_settings.regexp.skip_unmatched)
|
2020-02-14 19:48:45 +00:00
|
|
|
{
|
2020-02-21 15:21:31 +00:00
|
|
|
size_t fields_count = regexp.NumberOfCapturingGroups();
|
|
|
|
matched_fields.resize(fields_count);
|
|
|
|
re2_arguments.resize(fields_count);
|
|
|
|
re2_arguments_ptrs.resize(fields_count);
|
|
|
|
for (size_t i = 0; i != fields_count; ++i)
|
|
|
|
{
|
|
|
|
// Bind an argument to a matched field.
|
|
|
|
re2_arguments[i] = &matched_fields[i];
|
|
|
|
// Save pointer to argument.
|
|
|
|
re2_arguments_ptrs[i] = &re2_arguments[i];
|
|
|
|
}
|
2020-06-11 00:51:27 +00:00
|
|
|
}
|
|
|
|
|
2021-12-15 11:30:57 +00:00
|
|
|
bool RegexpFieldExtractor::parseRow(PeekableReadBuffer & buf)
|
|
|
|
{
|
|
|
|
PeekableReadBufferCheckpoint checkpoint{buf};
|
|
|
|
|
|
|
|
size_t line_size = 0;
|
|
|
|
|
|
|
|
do
|
|
|
|
{
|
|
|
|
char * pos = find_first_symbols<'\n', '\r'>(buf.position(), buf.buffer().end());
|
|
|
|
line_size += pos - buf.position();
|
|
|
|
buf.position() = pos;
|
|
|
|
} while (buf.position() == buf.buffer().end() && !buf.eof());
|
|
|
|
|
|
|
|
buf.makeContinuousMemoryFromCheckpointToPos();
|
|
|
|
buf.rollbackToCheckpoint();
|
|
|
|
|
2022-01-31 23:07:48 +00:00
|
|
|
bool match = re2_st::RE2::FullMatchN(re2_st::StringPiece(buf.position(), line_size), regexp, re2_arguments_ptrs.data(), re2_arguments_ptrs.size());
|
2021-12-15 11:30:57 +00:00
|
|
|
|
|
|
|
if (!match && !skip_unmatched)
|
|
|
|
throw Exception("Line \"" + std::string(buf.position(), line_size) + "\" doesn't match the regexp.", ErrorCodes::INCORRECT_DATA);
|
|
|
|
|
|
|
|
buf.position() += line_size;
|
|
|
|
checkChar('\r', buf);
|
|
|
|
if (!buf.eof() && !checkChar('\n', buf))
|
|
|
|
throw Exception("No \\n after \\r at the end of line.", ErrorCodes::INCORRECT_DATA);
|
|
|
|
|
|
|
|
return match;
|
|
|
|
}
|
|
|
|
|
|
|
|
RegexpRowInputFormat::RegexpRowInputFormat(
|
|
|
|
ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
|
|
|
|
: RegexpRowInputFormat(std::make_unique<PeekableReadBuffer>(in_), header_, params_, format_settings_)
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
RegexpRowInputFormat::RegexpRowInputFormat(
|
|
|
|
std::unique_ptr<PeekableReadBuffer> buf_, const Block & header_, Params params_, const FormatSettings & format_settings_)
|
|
|
|
: IRowInputFormat(header_, *buf_, std::move(params_))
|
|
|
|
, buf(std::move(buf_))
|
|
|
|
, format_settings(format_settings_)
|
|
|
|
, escaping_rule(format_settings_.regexp.escaping_rule)
|
|
|
|
, field_extractor(RegexpFieldExtractor(format_settings_))
|
|
|
|
{
|
|
|
|
}
|
2020-02-21 15:21:31 +00:00
|
|
|
|
2020-06-11 00:51:27 +00:00
|
|
|
void RegexpRowInputFormat::resetParser()
|
|
|
|
{
|
|
|
|
IRowInputFormat::resetParser();
|
2021-12-10 17:54:08 +00:00
|
|
|
buf->reset();
|
2020-02-14 19:48:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool RegexpRowInputFormat::readField(size_t index, MutableColumns & columns)
|
|
|
|
{
|
|
|
|
const auto & type = getPort().getHeader().getByPosition(index).type;
|
2021-12-15 11:30:57 +00:00
|
|
|
auto matched_field = field_extractor.getField(index);
|
|
|
|
ReadBuffer field_buf(const_cast<char *>(matched_field.data()), matched_field.size(), 0);
|
2020-02-14 19:48:45 +00:00
|
|
|
try
|
|
|
|
{
|
2021-11-09 13:14:07 +00:00
|
|
|
return deserializeFieldByEscapingRule(type, serializations[index], *columns[index], field_buf, escaping_rule, format_settings);
|
2020-02-14 19:48:45 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
2020-11-19 14:44:58 +00:00
|
|
|
e.addMessage("(while reading the value of column " + getPort().getHeader().getByPosition(index).name + ")");
|
2020-02-14 19:48:45 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void RegexpRowInputFormat::readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext)
|
|
|
|
{
|
2021-12-15 11:30:57 +00:00
|
|
|
if (field_extractor.getMatchedFieldsSize() != columns.size())
|
2020-02-14 19:48:45 +00:00
|
|
|
throw Exception("The number of matched fields in line doesn't match the number of columns.", ErrorCodes::INCORRECT_DATA);
|
|
|
|
|
|
|
|
ext.read_columns.assign(columns.size(), false);
|
|
|
|
for (size_t columns_index = 0; columns_index < columns.size(); ++columns_index)
|
|
|
|
{
|
|
|
|
ext.read_columns[columns_index] = readField(columns_index, columns);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
|
|
|
|
{
|
2021-12-10 17:54:08 +00:00
|
|
|
if (buf->eof())
|
2020-02-14 19:48:45 +00:00
|
|
|
return false;
|
|
|
|
|
2021-12-15 11:30:57 +00:00
|
|
|
if (field_extractor.parseRow(*buf))
|
2020-02-21 15:21:31 +00:00
|
|
|
readFieldsFromMatch(columns, ext);
|
2020-02-14 19:48:45 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-12-10 17:54:08 +00:00
|
|
|
void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_)
|
|
|
|
{
|
|
|
|
buf = std::make_unique<PeekableReadBuffer>(in_);
|
|
|
|
IInputFormat::setReadBuffer(*buf);
|
|
|
|
}
|
|
|
|
|
2021-12-16 18:48:38 +00:00
|
|
|
RegexpSchemaReader::RegexpSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, ContextPtr context_)
|
2021-12-15 11:30:57 +00:00
|
|
|
: IRowSchemaReader(
|
|
|
|
buf,
|
|
|
|
format_settings_.max_rows_to_read_for_schema_inference,
|
|
|
|
getDefaultDataTypeForEscapingRule(format_settings_.regexp.escaping_rule))
|
|
|
|
, format_settings(format_settings_)
|
|
|
|
, field_extractor(format_settings)
|
|
|
|
, buf(in_)
|
2021-12-16 18:48:38 +00:00
|
|
|
, context(context_)
|
2021-12-15 11:30:57 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
DataTypes RegexpSchemaReader::readRowAndGetDataTypes()
|
|
|
|
{
|
|
|
|
if (buf.eof())
|
|
|
|
return {};
|
|
|
|
|
|
|
|
field_extractor.parseRow(buf);
|
|
|
|
|
|
|
|
DataTypes data_types;
|
|
|
|
data_types.reserve(field_extractor.getMatchedFieldsSize());
|
|
|
|
for (size_t i = 0; i != field_extractor.getMatchedFieldsSize(); ++i)
|
|
|
|
{
|
|
|
|
String field(field_extractor.getField(i));
|
2021-12-16 18:48:38 +00:00
|
|
|
data_types.push_back(determineDataTypeByEscapingRule(field, format_settings, format_settings.regexp.escaping_rule, context));
|
2021-12-15 11:30:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return data_types;
|
|
|
|
}
|
|
|
|
|
2021-10-11 16:11:50 +00:00
|
|
|
void registerInputFormatRegexp(FormatFactory & factory)
|
2020-02-14 19:48:45 +00:00
|
|
|
{
|
2021-10-11 16:11:50 +00:00
|
|
|
factory.registerInputFormat("Regexp", [](
|
2020-02-14 19:48:45 +00:00
|
|
|
ReadBuffer & buf,
|
|
|
|
const Block & sample,
|
|
|
|
IRowInputFormat::Params params,
|
|
|
|
const FormatSettings & settings)
|
|
|
|
{
|
|
|
|
return std::make_shared<RegexpRowInputFormat>(buf, sample, std::move(params), settings);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2020-11-30 16:42:41 +00:00
|
|
|
static std::pair<bool, size_t> fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
|
2020-02-14 19:48:45 +00:00
|
|
|
{
|
|
|
|
char * pos = in.position();
|
2020-03-27 20:10:03 +00:00
|
|
|
bool need_more_data = true;
|
2020-11-30 16:42:41 +00:00
|
|
|
size_t number_of_rows = 0;
|
2020-02-14 19:48:45 +00:00
|
|
|
|
2020-03-27 20:10:03 +00:00
|
|
|
while (loadAtPosition(in, memory, pos) && need_more_data)
|
2020-02-14 19:48:45 +00:00
|
|
|
{
|
|
|
|
pos = find_first_symbols<'\n', '\r'>(pos, in.buffer().end());
|
2021-02-15 13:10:14 +00:00
|
|
|
if (pos > in.buffer().end())
|
|
|
|
throw Exception("Position in buffer is out of bounds. There must be a bug.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
else if (pos == in.buffer().end())
|
2020-02-14 19:48:45 +00:00
|
|
|
continue;
|
|
|
|
|
2020-02-21 18:49:18 +00:00
|
|
|
// Support DOS-style newline ("\r\n")
|
2020-03-27 20:10:03 +00:00
|
|
|
if (*pos == '\r')
|
2020-02-21 18:49:18 +00:00
|
|
|
{
|
2020-03-27 20:10:03 +00:00
|
|
|
++pos;
|
2020-02-21 18:49:18 +00:00
|
|
|
if (pos == in.buffer().end())
|
|
|
|
loadAtPosition(in, memory, pos);
|
|
|
|
}
|
2020-03-27 20:10:03 +00:00
|
|
|
|
|
|
|
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
|
|
|
|
need_more_data = false;
|
|
|
|
|
|
|
|
++pos;
|
2020-11-30 16:42:41 +00:00
|
|
|
++number_of_rows;
|
2020-02-14 19:48:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
saveUpToPosition(in, memory, pos);
|
|
|
|
|
2020-11-30 16:42:41 +00:00
|
|
|
return {loadAtPosition(in, memory, pos), number_of_rows};
|
2020-02-14 19:48:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void registerFileSegmentationEngineRegexp(FormatFactory & factory)
|
|
|
|
{
|
|
|
|
factory.registerFileSegmentationEngine("Regexp", &fileSegmentationEngineRegexpImpl);
|
|
|
|
}
|
|
|
|
|
2021-12-15 11:30:57 +00:00
|
|
|
void registerRegexpSchemaReader(FormatFactory & factory)
|
|
|
|
{
|
2021-12-16 18:48:38 +00:00
|
|
|
factory.registerSchemaReader("Regexp", [](ReadBuffer & buf, const FormatSettings & settings, ContextPtr context)
|
2021-12-15 11:30:57 +00:00
|
|
|
{
|
2021-12-16 18:48:38 +00:00
|
|
|
return std::make_shared<RegexpSchemaReader>(buf, settings, context);
|
2021-12-15 11:30:57 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2020-02-14 19:48:45 +00:00
|
|
|
}
|