Additional of new input format: RegexpRowInputFormat.

This commit is contained in:
Avogar 2020-02-14 22:48:45 +03:00
parent 1ca4b7b8da
commit 93301b8f9f
10 changed files with 276 additions and 0 deletions

View File

@ -247,6 +247,10 @@ struct Settings : public SettingsCollection<Settings>
M(SettingString, format_custom_result_before_delimiter, "", "Prefix before result set (for CustomSeparated format)", 0) \
M(SettingString, format_custom_result_after_delimiter, "", "Suffix after result set (for CustomSeparated format)", 0) \
\
M(SettingString, format_regexp, "", "Regular expression (for Regexp format)", 0) \
M(SettingString, format_regexp_escaping_rule, "Escaped", "Field escaping rule (for Regexp format)", 0) \
M(SettingBool, format_regexp_skip_unmatched, false, "Skip lines unmatched by regular expression (for Regexp format", 0) \
\
M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.", 0) \
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.", 0) \
M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout", 0) \

View File

@ -68,6 +68,9 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.regexp.regexp = settings.format_regexp;
format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule;
format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER))
@ -352,10 +355,12 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorAvro(*this);
registerInputFormatProcessorTemplate(*this);
registerOutputFormatProcessorTemplate(*this);
registerInputFormatProcessorRegexp(*this);
registerFileSegmentationEngineTabSeparated(*this);
registerFileSegmentationEngineCSV(*this);
registerFileSegmentationEngineJSONEachRow(*this);
registerFileSegmentationEngineRegexp(*this);
registerOutputFormatNull(*this);

View File

@ -178,6 +178,7 @@ void registerOutputFormatProcessorTemplate(FormatFactory &factory);
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory);
void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
void registerFileSegmentationEngineRegexp(FormatFactory & factory);
/// Output only (presentational) formats.
@ -198,5 +199,6 @@ void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory);
/// Input only formats.
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
void registerInputFormatProcessorRegexp(FormatFactory & factory);
}

View File

@ -122,6 +122,15 @@ struct FormatSettings
Avro avro;
struct Regexp
{
std::string regexp;
std::string escaping_rule;
bool skip_unmatched = false;
};
Regexp regexp;
};
}

View File

@ -0,0 +1,155 @@
#include <string>
#include <common/find_symbols.h>
#include <Processors/Formats/Impl/RegexpRowInputFormat.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/ReadHelpers.h>
#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int BAD_ARGUMENTS;
}
RegexpRowInputFormat::RegexpRowInputFormat(
ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), regexp(format_settings_.regexp.regexp)
{
field_format = stringToFormat(format_settings_.regexp.escaping_rule);
}
RegexpRowInputFormat::FieldFormat RegexpRowInputFormat::stringToFormat(const String & format)
{
if (format == "Escaped")
return FieldFormat::Escaped;
if (format == "Quoted")
return FieldFormat::Quoted;
if (format == "CSV")
return FieldFormat::Csv;
if (format == "JSON")
return FieldFormat::Json;
throw Exception("Unknown field format \"" + format + "\".", ErrorCodes::BAD_ARGUMENTS);
}
bool RegexpRowInputFormat::readField(size_t index, MutableColumns & columns)
{
const auto & type = getPort().getHeader().getByPosition(index).type;
bool parse_as_nullable = format_settings.null_as_default && !type->isNullable();
bool read = true;
ReadBuffer field_buf(matched_fields[index + 1].first, matched_fields[index + 1].length(), 0);
try
{
switch (field_format)
{
case FieldFormat::Escaped:
if (parse_as_nullable)
read = DataTypeNullable::deserializeTextEscaped(*columns[index], field_buf, format_settings, type);
else
type->deserializeAsTextEscaped(*columns[index], field_buf, format_settings);
break;
case FieldFormat::Quoted:
if (parse_as_nullable)
read = DataTypeNullable::deserializeTextQuoted(*columns[index], field_buf, format_settings, type);
else
type->deserializeAsTextQuoted(*columns[index], field_buf, format_settings);
break;
case FieldFormat::Csv:
if (parse_as_nullable)
read = DataTypeNullable::deserializeTextCSV(*columns[index], field_buf, format_settings, type);
else
type->deserializeAsTextCSV(*columns[index], field_buf, format_settings);
break;
case FieldFormat::Json:
if (parse_as_nullable)
read = DataTypeNullable::deserializeTextJSON(*columns[index], field_buf, format_settings, type);
else
type->deserializeAsTextJSON(*columns[index], field_buf, format_settings);
break;
default:
__builtin_unreachable();
}
}
catch (Exception & e)
{
throw;
}
return read;
}
void RegexpRowInputFormat::readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext)
{
if (matched_fields.size() != columns.size() + 1)
throw Exception("The number of matched fields in line doesn't match the number of columns.", ErrorCodes::INCORRECT_DATA);
ext.read_columns.assign(columns.size(), false);
for (size_t columns_index = 0; columns_index < columns.size(); ++columns_index)
{
ext.read_columns[columns_index] = readField(columns_index, columns);
}
}
bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (in.eof())
return false;
char * line_end = find_first_symbols<'\n', '\r'>(in.position(), in.buffer().end());
bool match = std::regex_match(in.position(), line_end, matched_fields, regexp);
if (!match)
{
if (!format_settings.regexp.skip_unmatched)
throw Exception("Line \"" + std::string(in.position(), line_end) + "\" doesn't match the regexp.", ErrorCodes::INCORRECT_DATA);
in.position() = line_end + 1;
return true;
}
readFieldsFromMatch(columns, ext);
in.position() = line_end + 1;
return true;
}
void registerInputFormatProcessorRegexp(FormatFactory & factory)
{
factory.registerInputFormatProcessor("Regexp", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<RegexpRowInputFormat>(buf, sample, std::move(params), settings);
});
}
static bool fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
char * pos = in.position();
bool need_more_data = true;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
pos = find_first_symbols<'\n', '\r'>(pos, in.buffer().end());
if (pos == in.buffer().end())
continue;
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;
++pos;
}
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
}
void registerFileSegmentationEngineRegexp(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("Regexp", &fileSegmentationEngineRegexpImpl);
}
}

View File

@ -0,0 +1,43 @@
#pragma once
#include <regex>
#include <Core/Block.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatSettings.h>
#include <Formats/FormatFactory.h>
namespace DB
{
class ReadBuffer;
class RegexpRowInputFormat : public IRowInputFormat
{
public:
RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_);
String getName() const override { return "RegexpRowInputFormat"; }
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
private:
enum class FieldFormat
{
Escaped,
Quoted,
Csv,
Json,
};
bool readField(size_t index, MutableColumns & columns);
void readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext);
FieldFormat stringToFormat(const String & format);
const FormatSettings format_settings;
std::regex regexp;
std::match_results<char *> matched_fields;
FieldFormat field_format;
};
}

View File

@ -0,0 +1,12 @@
1 [1,2,3] str1 2020-01-01
2 [1,2,3] str2 2020-01-02
3 [1,2,3] str3 2020-01-03
4 [1,2,3] str4 2020-01-04
5 [1,2,3] str5 2020-01-05
6 [1,2,3] str6 2020-01-06
7 [1,2,3] str7 2020-01-07
8 [1,2,3] str8 2020-01-08
9 [1,2,3] str9 2020-01-09
10 [1,2,3] str10 2020-01-10
11 [1,2,3] str11 2020-01-11
12 [1,2,3] str12 2020-01-12

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS regexp";
$CLICKHOUSE_CLIENT --query="CREATE TABLE regexp (id UInt32, array Array(UInt32), string String, date Date) ENGINE = Memory";
echo 'id: 1 array: [1,2,3] string: str1 date: 2020-01-01
id: 2 array: [1,2,3] string: str2 date: 2020-01-02
id: 3 array: [1,2,3] string: str3 date: 2020-01-03' | $CLICKHOUSE_CLIENT --query="INSERT INTO regexp FORMAT Regexp SETTINGS format_regexp='id: (.+?) array: (.+?) string: (.+?) date: (.+?)', format_regexp_escaping_rule='Escaped'";
echo 'id: 4 array: "[1,2,3]" string: "str4" date: "2020-01-04"
id: 5 array: "[1,2,3]" string: "str5" date: "2020-01-05"
id: 6 array: "[1,2,3]" string: "str6" date: "2020-01-06"' | $CLICKHOUSE_CLIENT --query="INSERT INTO regexp FORMAT Regexp SETTINGS format_regexp='id: (.+?) array: (.+?) string: (.+?) date: (.+?)', format_regexp_escaping_rule='CSV'";
echo "id: 7 array: [1,2,3] string: 'str7' date: '2020-01-07'
id: 8 array: [1,2,3] string: 'str8' date: '2020-01-08'
id: 9 array: [1,2,3] string: 'str9' date: '2020-01-09'" | $CLICKHOUSE_CLIENT --query="INSERT INTO regexp FORMAT Regexp SETTINGS format_regexp='id: (.+?) array: (.+?) string: (.+?) date: (.+?)', format_regexp_escaping_rule='Quoted'";
echo 'id: 10 array: [1,2,3] string: "str10" date: "2020-01-10"
id: 11 array: [1,2,3] string: "str11" date: "2020-01-11"
id: 12 array: [1,2,3] string: "str12" date: "2020-01-12"' | $CLICKHOUSE_CLIENT --query="INSERT INTO regexp FORMAT Regexp SETTINGS format_regexp='id: (.+?) array: (.+?) string: (.+?) date: (.+?)', format_regexp_escaping_rule='JSON'";
$CLICKHOUSE_CLIENT --query="SELECT * FROM regexp ORDER BY id";
$CLICKHOUSE_CLIENT --query="DROP TABLE regexp";

View File

@ -0,0 +1,3 @@
1 str1
2 str2
4 str4

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
# CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# . $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS regexp";
$CLICKHOUSE_CLIENT --query="CREATE TABLE regexp (id UInt32, string String) ENGINE = Memory";
echo 'id: 1 string: str1
id: 2 string: str2
id=3, string=str3
id: 4 string: str4' | $CLICKHOUSE_CLIENT --query="INSERT INTO regexp FORMAT Regexp SETTINGS format_regexp='id: (.+?) string: (.+?)', format_regexp_escaping_rule='Escaped', format_regexp_skip_unmatched=1";
$CLICKHOUSE_CLIENT --query="SELECT * FROM regexp";
$CLICKHOUSE_CLIENT --query="DROP TABLE regexp";