From 93301b8f9f3b07b2d1630ce063fcf2fcbf5f0095 Mon Sep 17 00:00:00 2001 From: Avogar Date: Fri, 14 Feb 2020 22:48:45 +0300 Subject: [PATCH] Additional of new input format: RegexpRowInputFormat. --- dbms/src/Core/Settings.h | 4 + dbms/src/Formats/FormatFactory.cpp | 5 + dbms/src/Formats/FormatFactory.h | 2 + dbms/src/Formats/FormatSettings.h | 9 + .../Formats/Impl/RegexpRowInputFormat.cpp | 155 ++++++++++++++++++ .../Formats/Impl/RegexpRowInputFormat.h | 43 +++++ .../01079_regexp_input_format.reference | 12 ++ .../0_stateless/01079_regexp_input_format.sh | 27 +++ ...gexp_input_format_skip_unmatched.reference | 3 + ...1080_regexp_input_format_skip_unmatched.sh | 16 ++ 10 files changed, 276 insertions(+) create mode 100644 dbms/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp create mode 100644 dbms/src/Processors/Formats/Impl/RegexpRowInputFormat.h create mode 100644 dbms/tests/queries/0_stateless/01079_regexp_input_format.reference create mode 100755 dbms/tests/queries/0_stateless/01079_regexp_input_format.sh create mode 100644 dbms/tests/queries/0_stateless/01080_regexp_input_format_skip_unmatched.reference create mode 100644 dbms/tests/queries/0_stateless/01080_regexp_input_format_skip_unmatched.sh diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 08c555beb03..13e7f2ae70e 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -247,6 +247,10 @@ struct Settings : public SettingsCollection M(SettingString, format_custom_result_before_delimiter, "", "Prefix before result set (for CustomSeparated format)", 0) \ M(SettingString, format_custom_result_after_delimiter, "", "Suffix after result set (for CustomSeparated format)", 0) \ \ + M(SettingString, format_regexp, "", "Regular expression (for Regexp format)", 0) \ + M(SettingString, format_regexp_escaping_rule, "Escaped", "Field escaping rule (for Regexp format)", 0) \ + M(SettingBool, format_regexp_skip_unmatched, false, "Skip lines unmatched by regular expression (for Regexp format", 0) \ + \ M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.", 0) \ M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.", 0) \ M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout", 0) \ diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index a8e27054704..c2b890ec631 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -68,6 +68,9 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter; format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter; format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter; + format_settings.regexp.regexp = settings.format_regexp; + format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule; + format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched; /// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context if (context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER)) @@ -352,10 +355,12 @@ FormatFactory::FormatFactory() registerOutputFormatProcessorAvro(*this); registerInputFormatProcessorTemplate(*this); registerOutputFormatProcessorTemplate(*this); + registerInputFormatProcessorRegexp(*this); registerFileSegmentationEngineTabSeparated(*this); registerFileSegmentationEngineCSV(*this); registerFileSegmentationEngineJSONEachRow(*this); + registerFileSegmentationEngineRegexp(*this); registerOutputFormatNull(*this); diff --git a/dbms/src/Formats/FormatFactory.h b/dbms/src/Formats/FormatFactory.h index 7c18971e0eb..7c515dbce90 100644 --- a/dbms/src/Formats/FormatFactory.h +++ b/dbms/src/Formats/FormatFactory.h @@ -178,6 +178,7 @@ void registerOutputFormatProcessorTemplate(FormatFactory &factory); void registerFileSegmentationEngineTabSeparated(FormatFactory & factory); void registerFileSegmentationEngineCSV(FormatFactory & factory); void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory); +void registerFileSegmentationEngineRegexp(FormatFactory & factory); /// Output only (presentational) formats. @@ -198,5 +199,6 @@ void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory); /// Input only formats. void registerInputFormatProcessorCapnProto(FormatFactory & factory); +void registerInputFormatProcessorRegexp(FormatFactory & factory); } diff --git a/dbms/src/Formats/FormatSettings.h b/dbms/src/Formats/FormatSettings.h index 610768e5d08..1eb95ce2dbf 100644 --- a/dbms/src/Formats/FormatSettings.h +++ b/dbms/src/Formats/FormatSettings.h @@ -122,6 +122,15 @@ struct FormatSettings Avro avro; + struct Regexp + { + std::string regexp; + std::string escaping_rule; + bool skip_unmatched = false; + }; + + Regexp regexp; + }; } diff --git a/dbms/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp new file mode 100644 index 00000000000..d53c6819c91 --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp @@ -0,0 +1,155 @@ +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; + extern const int BAD_ARGUMENTS; +} + +RegexpRowInputFormat::RegexpRowInputFormat( + ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) + : IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), regexp(format_settings_.regexp.regexp) +{ + field_format = stringToFormat(format_settings_.regexp.escaping_rule); +} + +RegexpRowInputFormat::FieldFormat RegexpRowInputFormat::stringToFormat(const String & format) +{ + if (format == "Escaped") + return FieldFormat::Escaped; + if (format == "Quoted") + return FieldFormat::Quoted; + if (format == "CSV") + return FieldFormat::Csv; + if (format == "JSON") + return FieldFormat::Json; + throw Exception("Unknown field format \"" + format + "\".", ErrorCodes::BAD_ARGUMENTS); +} + +bool RegexpRowInputFormat::readField(size_t index, MutableColumns & columns) +{ + const auto & type = getPort().getHeader().getByPosition(index).type; + bool parse_as_nullable = format_settings.null_as_default && !type->isNullable(); + bool read = true; + ReadBuffer field_buf(matched_fields[index + 1].first, matched_fields[index + 1].length(), 0); + try + { + switch (field_format) + { + case FieldFormat::Escaped: + if (parse_as_nullable) + read = DataTypeNullable::deserializeTextEscaped(*columns[index], field_buf, format_settings, type); + else + type->deserializeAsTextEscaped(*columns[index], field_buf, format_settings); + break; + case FieldFormat::Quoted: + if (parse_as_nullable) + read = DataTypeNullable::deserializeTextQuoted(*columns[index], field_buf, format_settings, type); + else + type->deserializeAsTextQuoted(*columns[index], field_buf, format_settings); + break; + case FieldFormat::Csv: + if (parse_as_nullable) + read = DataTypeNullable::deserializeTextCSV(*columns[index], field_buf, format_settings, type); + else + type->deserializeAsTextCSV(*columns[index], field_buf, format_settings); + break; + case FieldFormat::Json: + if (parse_as_nullable) + read = DataTypeNullable::deserializeTextJSON(*columns[index], field_buf, format_settings, type); + else + type->deserializeAsTextJSON(*columns[index], field_buf, format_settings); + break; + default: + __builtin_unreachable(); + } + } + catch (Exception & e) + { + throw; + } + return read; +} + +void RegexpRowInputFormat::readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext) +{ + if (matched_fields.size() != columns.size() + 1) + throw Exception("The number of matched fields in line doesn't match the number of columns.", ErrorCodes::INCORRECT_DATA); + + ext.read_columns.assign(columns.size(), false); + for (size_t columns_index = 0; columns_index < columns.size(); ++columns_index) + { + ext.read_columns[columns_index] = readField(columns_index, columns); + } +} + +bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) +{ + if (in.eof()) + return false; + + char * line_end = find_first_symbols<'\n', '\r'>(in.position(), in.buffer().end()); + bool match = std::regex_match(in.position(), line_end, matched_fields, regexp); + + if (!match) + { + if (!format_settings.regexp.skip_unmatched) + throw Exception("Line \"" + std::string(in.position(), line_end) + "\" doesn't match the regexp.", ErrorCodes::INCORRECT_DATA); + in.position() = line_end + 1; + return true; + } + + readFieldsFromMatch(columns, ext); + + in.position() = line_end + 1; + return true; +} + +void registerInputFormatProcessorRegexp(FormatFactory & factory) +{ + factory.registerInputFormatProcessor("Regexp", []( + ReadBuffer & buf, + const Block & sample, + IRowInputFormat::Params params, + const FormatSettings & settings) + { + return std::make_shared(buf, sample, std::move(params), settings); + }); +} + +static bool fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) +{ + char * pos = in.position(); + bool need_more_data = true; + + while (loadAtPosition(in, memory, pos) && need_more_data) + { + pos = find_first_symbols<'\n', '\r'>(pos, in.buffer().end()); + if (pos == in.buffer().end()) + continue; + + if (memory.size() + static_cast(pos - in.position()) >= min_chunk_size) + need_more_data = false; + ++pos; + } + + saveUpToPosition(in, memory, pos); + + return loadAtPosition(in, memory, pos); +} + +void registerFileSegmentationEngineRegexp(FormatFactory & factory) +{ + factory.registerFileSegmentationEngine("Regexp", &fileSegmentationEngineRegexpImpl); +} + +} diff --git a/dbms/src/Processors/Formats/Impl/RegexpRowInputFormat.h b/dbms/src/Processors/Formats/Impl/RegexpRowInputFormat.h new file mode 100644 index 00000000000..fe920f26fed --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/RegexpRowInputFormat.h @@ -0,0 +1,43 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +class ReadBuffer; + + +class RegexpRowInputFormat : public IRowInputFormat +{ +public: + RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_); + + String getName() const override { return "RegexpRowInputFormat"; } + + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + +private: + enum class FieldFormat + { + Escaped, + Quoted, + Csv, + Json, + }; + + bool readField(size_t index, MutableColumns & columns); + void readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext); + FieldFormat stringToFormat(const String & format); + + const FormatSettings format_settings; + std::regex regexp; + std::match_results matched_fields; + FieldFormat field_format; +}; + +} diff --git a/dbms/tests/queries/0_stateless/01079_regexp_input_format.reference b/dbms/tests/queries/0_stateless/01079_regexp_input_format.reference new file mode 100644 index 00000000000..61435bccefc --- /dev/null +++ b/dbms/tests/queries/0_stateless/01079_regexp_input_format.reference @@ -0,0 +1,12 @@ +1 [1,2,3] str1 2020-01-01 +2 [1,2,3] str2 2020-01-02 +3 [1,2,3] str3 2020-01-03 +4 [1,2,3] str4 2020-01-04 +5 [1,2,3] str5 2020-01-05 +6 [1,2,3] str6 2020-01-06 +7 [1,2,3] str7 2020-01-07 +8 [1,2,3] str8 2020-01-08 +9 [1,2,3] str9 2020-01-09 +10 [1,2,3] str10 2020-01-10 +11 [1,2,3] str11 2020-01-11 +12 [1,2,3] str12 2020-01-12 diff --git a/dbms/tests/queries/0_stateless/01079_regexp_input_format.sh b/dbms/tests/queries/0_stateless/01079_regexp_input_format.sh new file mode 100755 index 00000000000..cbaa1bd2162 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01079_regexp_input_format.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS regexp"; +$CLICKHOUSE_CLIENT --query="CREATE TABLE regexp (id UInt32, array Array(UInt32), string String, date Date) ENGINE = Memory"; + +echo 'id: 1 array: [1,2,3] string: str1 date: 2020-01-01 +id: 2 array: [1,2,3] string: str2 date: 2020-01-02 +id: 3 array: [1,2,3] string: str3 date: 2020-01-03' | $CLICKHOUSE_CLIENT --query="INSERT INTO regexp FORMAT Regexp SETTINGS format_regexp='id: (.+?) array: (.+?) string: (.+?) date: (.+?)', format_regexp_escaping_rule='Escaped'"; + +echo 'id: 4 array: "[1,2,3]" string: "str4" date: "2020-01-04" +id: 5 array: "[1,2,3]" string: "str5" date: "2020-01-05" +id: 6 array: "[1,2,3]" string: "str6" date: "2020-01-06"' | $CLICKHOUSE_CLIENT --query="INSERT INTO regexp FORMAT Regexp SETTINGS format_regexp='id: (.+?) array: (.+?) string: (.+?) date: (.+?)', format_regexp_escaping_rule='CSV'"; + +echo "id: 7 array: [1,2,3] string: 'str7' date: '2020-01-07' +id: 8 array: [1,2,3] string: 'str8' date: '2020-01-08' +id: 9 array: [1,2,3] string: 'str9' date: '2020-01-09'" | $CLICKHOUSE_CLIENT --query="INSERT INTO regexp FORMAT Regexp SETTINGS format_regexp='id: (.+?) array: (.+?) string: (.+?) date: (.+?)', format_regexp_escaping_rule='Quoted'"; + +echo 'id: 10 array: [1,2,3] string: "str10" date: "2020-01-10" +id: 11 array: [1,2,3] string: "str11" date: "2020-01-11" +id: 12 array: [1,2,3] string: "str12" date: "2020-01-12"' | $CLICKHOUSE_CLIENT --query="INSERT INTO regexp FORMAT Regexp SETTINGS format_regexp='id: (.+?) array: (.+?) string: (.+?) date: (.+?)', format_regexp_escaping_rule='JSON'"; + +$CLICKHOUSE_CLIENT --query="SELECT * FROM regexp ORDER BY id"; +$CLICKHOUSE_CLIENT --query="DROP TABLE regexp"; + diff --git a/dbms/tests/queries/0_stateless/01080_regexp_input_format_skip_unmatched.reference b/dbms/tests/queries/0_stateless/01080_regexp_input_format_skip_unmatched.reference new file mode 100644 index 00000000000..ff57df7316e --- /dev/null +++ b/dbms/tests/queries/0_stateless/01080_regexp_input_format_skip_unmatched.reference @@ -0,0 +1,3 @@ +1 str1 +2 str2 +4 str4 diff --git a/dbms/tests/queries/0_stateless/01080_regexp_input_format_skip_unmatched.sh b/dbms/tests/queries/0_stateless/01080_regexp_input_format_skip_unmatched.sh new file mode 100644 index 00000000000..98bcb0a256a --- /dev/null +++ b/dbms/tests/queries/0_stateless/01080_regexp_input_format_skip_unmatched.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash + +# CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# . $CURDIR/../shell_config.sh + +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS regexp"; +$CLICKHOUSE_CLIENT --query="CREATE TABLE regexp (id UInt32, string String) ENGINE = Memory"; + +echo 'id: 1 string: str1 +id: 2 string: str2 +id=3, string=str3 +id: 4 string: str4' | $CLICKHOUSE_CLIENT --query="INSERT INTO regexp FORMAT Regexp SETTINGS format_regexp='id: (.+?) string: (.+?)', format_regexp_escaping_rule='Escaped', format_regexp_skip_unmatched=1"; + +$CLICKHOUSE_CLIENT --query="SELECT * FROM regexp"; +$CLICKHOUSE_CLIENT --query="DROP TABLE regexp"; +