Support SETTINGS clause for File engine

Accept the usual user settings related to file formats.

Most of the diff are the mechanistic code changes required to allow
providing the required FormatSettings to the format factory. The File
engine then extracts these settings from the `CREATE` query, and specifies
them when creating the format parser.
This commit is contained in:
Alexander Kuzmenkov 2020-11-02 10:50:38 +03:00
parent e06f65ee73
commit 99ee127620
15 changed files with 345 additions and 276 deletions

View File

@ -41,96 +41,75 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name)
}
static FormatSettings getInputFormatSetting(const Settings & settings, const Context & context)
FormatSettings getFormatSettings(const Context & context)
{
const auto & settings = context.getSettingsRef();
FormatSettings format_settings;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.avro.allow_missing_fields = settings.input_format_avro_allow_missing_fields;
format_settings.avro.output_codec = settings.output_format_avro_codec;
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString();
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.empty_as_default = settings.input_format_defaults_for_omitted_fields;
format_settings.csv.input_format_enum_as_number = settings.input_format_csv_enum_as_number;
format_settings.null_as_default = settings.input_format_null_as_default;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.with_names_use_header = settings.input_format_with_names_use_header;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.import_nested_json = settings.input_format_import_nested_json;
format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.date_time_input_format = settings.date_time_input_format;
format_settings.date_time_output_format = settings.date_time_output_format;
format_settings.enable_streaming = settings.output_format_enable_streaming;
format_settings.import_nested_json = settings.input_format_import_nested_json;
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_format = settings.format_template_row;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
format_settings.tsv.input_format_enum_as_number = settings.input_format_tsv_enum_as_number;
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
format_settings.null_as_default = settings.input_format_null_as_default;
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
format_settings.pretty.max_value_width = settings.output_format_pretty_max_value_width;
format_settings.pretty.output_format_pretty_row_numbers = settings.output_format_pretty_row_numbers;
format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule;
format_settings.regexp.regexp = settings.format_regexp;
format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched;
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context.getFormatSchemaPath();
format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.regexp.regexp = settings.format_regexp;
format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule;
format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.template_settings.row_format = settings.format_template_row;
format_settings.tsv.crlf_end_of_line = settings.output_format_tsv_crlf_end_of_line;
format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
format_settings.tsv.input_format_enum_as_number = settings.input_format_tsv_enum_as_number;
format_settings.tsv.null_representation = settings.output_format_tsv_null_representation;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.with_names_use_header = settings.input_format_with_names_use_header;
format_settings.write_statistics = settings.output_format_write_statistics;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER))
if (format_settings.schema.is_server)
{
const Poco::URI & avro_schema_registry_url = settings.format_avro_schema_registry_url;
if (!avro_schema_registry_url.empty())
context.getRemoteHostFilter().checkURL(avro_schema_registry_url);
}
format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString();
format_settings.avro.allow_missing_fields = settings.input_format_avro_allow_missing_fields;
return format_settings;
}
static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context)
{
FormatSettings format_settings;
format_settings.enable_streaming = settings.output_format_enable_streaming;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line;
format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
format_settings.pretty.max_value_width = settings.output_format_pretty_max_value_width;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ?
FormatSettings::Pretty::Charset::ASCII :
FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.output_format_pretty_row_numbers = settings.output_format_pretty_row_numbers;
format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_format = settings.format_template_row;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.tsv.crlf_end_of_line = settings.output_format_tsv_crlf_end_of_line;
format_settings.tsv.null_representation = settings.output_format_tsv_null_representation;
format_settings.write_statistics = settings.output_format_write_statistics;
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context.getFormatSchemaPath();
format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.avro.output_codec = settings.output_format_avro_codec;
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
format_settings.date_time_output_format = settings.date_time_output_format;
return format_settings;
}
@ -142,7 +121,7 @@ BlockInputStreamPtr FormatFactory::getInput(
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback) const
std::optional<FormatSettings> format_settings) const
{
if (name == "Native")
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
@ -153,10 +132,12 @@ BlockInputStreamPtr FormatFactory::getInput(
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getInputFormatSetting(settings, context);
if (!format_settings)
{
format_settings = getFormatSettings(context);
}
return input_getter(buf, sample, max_block_size, callback ? callback : ReadCallback(), format_settings);
return input_getter(buf, sample, max_block_size, {}, *format_settings);
}
const Settings & settings = context.getSettingsRef();
@ -182,17 +163,21 @@ BlockInputStreamPtr FormatFactory::getInput(
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
FormatSettings format_settings = getInputFormatSetting(settings, context);
if (!format_settings)
{
format_settings = getFormatSettings(context);
}
RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size;
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
row_input_format_params.callback = std::move(callback);
row_input_format_params.allow_errors_num = format_settings->input_allow_errors_num;
row_input_format_params.allow_errors_ratio = format_settings->input_allow_errors_ratio;
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, row_input_format_params, format_settings};
auto input_creator_params =
ParallelParsingBlockInputStream::InputCreatorParams{sample,
row_input_format_params, *format_settings};
ParallelParsingBlockInputStream::Params params{buf, input_getter,
input_creator_params, file_segmentation_engine,
static_cast<int>(settings.max_threads),
@ -200,13 +185,15 @@ BlockInputStreamPtr FormatFactory::getInput(
return std::make_shared<ParallelParsingBlockInputStream>(params);
}
auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback));
auto format = getInputFormat(name, buf, sample, context, max_block_size,
format_settings);
return std::make_shared<InputStreamFromInputFormat>(std::move(format));
}
BlockOutputStreamPtr FormatFactory::getOutput(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback, const bool ignore_no_row_delimiter) const
BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
WriteBuffer & buf, const Block & sample, const Context & context,
WriteCallback callback, std::optional<FormatSettings> format_settings) const
{
if (!getCreators(name).output_processor_creator)
{
@ -214,18 +201,23 @@ BlockOutputStreamPtr FormatFactory::getOutput(
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings, context);
if (!format_settings)
{
format_settings = getFormatSettings(context);
}
/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(
output_getter(buf, sample, std::move(callback), format_settings), sample);
output_getter(buf, sample, std::move(callback), *format_settings),
sample);
}
auto format = getOutputFormat(name, buf, sample, context, std::move(callback), ignore_no_row_delimiter);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
auto format = getOutputFormat(name, buf, sample, context, std::move(callback),
format_settings);
return std::make_shared<MaterializingBlockOutputStream>(
std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
@ -235,24 +227,27 @@ InputFormatPtr FormatFactory::getInputFormat(
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback) const
std::optional<FormatSettings> format_settings) const
{
const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getInputFormatSetting(settings, context);
if (!format_settings)
{
format_settings = getFormatSettings(context);
}
RowInputFormatParams params;
params.max_block_size = max_block_size;
params.allow_errors_num = format_settings.input_allow_errors_num;
params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
params.callback = std::move(callback);
params.allow_errors_num = format_settings->input_allow_errors_num;
params.allow_errors_ratio = format_settings->input_allow_errors_ratio;
params.max_execution_time = settings.max_execution_time;
params.timeout_overflow_mode = settings.timeout_overflow_mode;
auto format = input_getter(buf, sample, params, format_settings);
auto format = input_getter(buf, sample, params, *format_settings);
/// It's a kludge. Because I cannot remove context from values format.
if (auto * values = typeid_cast<ValuesBlockInputFormat *>(format.get()))
@ -263,26 +258,29 @@ InputFormatPtr FormatFactory::getInputFormat(
OutputFormatPtr FormatFactory::getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback, const bool ignore_no_row_delimiter) const
const String & name, WriteBuffer & buf, const Block & sample,
const Context & context, WriteCallback callback,
std::optional<FormatSettings> format_settings) const
{
const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings, context);
if (!format_settings)
{
format_settings = getFormatSettings(context);
}
RowOutputFormatParams params;
params.ignore_no_row_delimiter = ignore_no_row_delimiter;
params.callback = std::move(callback);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
auto format = output_getter(buf, sample, params, format_settings);
auto format = output_getter(buf, sample, params, *format_settings);
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
if (format_settings.enable_streaming)
if (format_settings->enable_streaming)
format->setAutoFlush();
/// It's a kludge. Because I cannot remove context from MySQL format.

View File

@ -3,6 +3,7 @@
#include <common/types.h>
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Formats/FormatSettings.h>
#include <IO/BufferWithOwnMemory.h>
#include <functional>
@ -16,6 +17,7 @@ namespace DB
class Block;
class Context;
struct FormatSettings;
struct Settings;
class ReadBuffer;
class WriteBuffer;
@ -32,6 +34,7 @@ struct RowOutputFormatParams;
using InputFormatPtr = std::shared_ptr<IInputFormat>;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
FormatSettings getFormatSettings(const Context & context);
/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
* Note: format and compression are independent things.
@ -105,10 +108,11 @@ public:
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback = {}) const;
std::optional<FormatSettings> format_settings = std::nullopt) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context, WriteCallback callback = {}, const bool ignore_no_row_delimiter = false) const;
const Block & sample, const Context & context, WriteCallback callback = {},
std::optional<FormatSettings> format_settings = std::nullopt) const;
InputFormatPtr getInputFormat(
const String & name,
@ -116,10 +120,12 @@ public:
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback = {}) const;
std::optional<FormatSettings> format_settings = std::nullopt) const;
OutputFormatPtr getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}, const bool ignore_no_row_delimiter = false) const;
const String & name, WriteBuffer & buf, const Block & sample,
const Context & context, WriteCallback callback = {},
std::optional<FormatSettings> format_settings = std::nullopt) const;
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);

View File

@ -17,76 +17,6 @@ struct FormatSettings
/// Option means that each chunk of data need to be formatted independently. Also each chunk will be flushed at the end of processing.
bool enable_streaming = false;
struct JSON
{
bool quote_64bit_integers = true;
bool quote_denormals = true;
bool escape_forward_slashes = true;
};
JSON json;
struct CSV
{
char delimiter = ',';
bool allow_single_quotes = true;
bool allow_double_quotes = true;
bool unquoted_null_literal_as_null = false;
bool empty_as_default = false;
bool crlf_end_of_line = false;
bool input_format_enum_as_number = false;
};
CSV csv;
struct Pretty
{
UInt64 max_rows = 10000;
UInt64 max_column_pad_width = 250;
UInt64 max_value_width = 10000;
bool color = true;
bool output_format_pretty_row_numbers = false;
enum class Charset
{
UTF8,
ASCII,
};
Charset charset = Charset::UTF8;
};
Pretty pretty;
struct Values
{
bool interpret_expressions = true;
bool deduce_templates_of_expressions = true;
bool accurate_types_of_literals = true;
};
Values values;
struct Template
{
String resultset_format;
String row_format;
String row_between_delimiter;
};
Template template_settings;
struct TSV
{
bool empty_as_default = false;
bool crlf_end_of_line = false;
String null_representation = "\\N";
bool input_format_enum_as_number = false;
};
TSV tsv;
bool skip_unknown_fields = false;
bool with_names_use_header = false;
bool write_statistics = true;
@ -113,24 +43,29 @@ struct FormatSettings
UInt64 input_allow_errors_num = 0;
Float32 input_allow_errors_ratio = 0;
struct Arrow
struct
{
UInt64 row_group_size = 1000000;
} arrow;
struct Parquet
struct
{
UInt64 row_group_size = 1000000;
} parquet;
String schema_registry_url;
String output_codec;
UInt64 output_sync_interval = 16 * 1024;
bool allow_missing_fields = false;
} avro;
struct Schema
struct CSV
{
std::string format_schema;
std::string format_schema_path;
bool is_server = false;
};
Schema schema;
char delimiter = ',';
bool allow_single_quotes = true;
bool allow_double_quotes = true;
bool unquoted_null_literal_as_null = false;
bool empty_as_default = false;
bool crlf_end_of_line = false;
bool input_format_enum_as_number = false;
} csv;
struct Custom
{
@ -141,29 +76,89 @@ struct FormatSettings
std::string row_between_delimiter;
std::string field_delimiter;
std::string escaping_rule;
};
} custom;
Custom custom;
struct Avro
struct
{
String schema_registry_url;
String output_codec;
UInt64 output_sync_interval = 16 * 1024;
bool allow_missing_fields = false;
};
bool quote_64bit_integers = true;
bool quote_denormals = true;
bool escape_forward_slashes = true;
bool serialize_as_strings = false;
} json;
Avro avro;
struct
{
UInt64 row_group_size = 1000000;
} parquet;
struct Regexp
struct Pretty
{
UInt64 max_rows = 10000;
UInt64 max_column_pad_width = 250;
UInt64 max_value_width = 10000;
bool color = true;
bool output_format_pretty_row_numbers = false;
enum class Charset
{
UTF8,
ASCII,
};
Charset charset = Charset::UTF8;
} pretty;
struct
{
bool write_row_delimiters = true;
/**
* Some buffers (kafka / rabbit) split the rows internally using callback
* so we can push there formats without framing / delimiters (like
* ProtobufSingle). In other cases you can't write more than single row
* in unframed format.
* Not sure we need this parameter at all, it only serves as an additional
* safety check in ProtobufSingle format, but exporting constant-size
* records w/o delimiters might be generally useful, not only for Kafka.
*/
bool allow_many_rows_no_delimiters = false;
} protobuf;
struct
{
std::string regexp;
std::string escaping_rule;
bool skip_unmatched = false;
};
} regexp;
Regexp regexp;
struct
{
std::string format_schema;
std::string format_schema_path;
bool is_server = false;
} schema;
struct
{
String resultset_format;
String row_format;
String row_between_delimiter;
} template_settings;
struct
{
bool empty_as_default = false;
bool crlf_end_of_line = false;
String null_representation = "\\N";
bool input_format_enum_as_number = false;
} tsv;
struct
{
bool interpret_expressions = true;
bool deduce_templates_of_expressions = true;
bool accurate_types_of_literals = true;
} values;
};
}

View File

@ -63,8 +63,6 @@ Chunk IRowInputFormat::generate()
info.read_columns.clear();
if (!readRow(columns, info))
break;
if (params.callback)
params.callback();
for (size_t column_idx = 0; column_idx < info.read_columns.size(); ++column_idx)
{

View File

@ -15,14 +15,6 @@ struct RowOutputFormatParams
// Callback used to indicate that another row is written.
WriteCallback callback;
/**
* some buffers (kafka / rabbit) split the rows internally using callback
* so we can push there formats without framing / delimiters
* (like ProtobufSingle). In other cases you can't write more than single row
* in unframed format.
*/
bool ignore_no_row_delimiter = false;
};
class WriteBuffer;

View File

@ -23,18 +23,22 @@ ProtobufRowOutputFormat::ProtobufRowOutputFormat(
const Block & header,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & format_schema,
const bool use_length_delimiters_)
const FormatSettings & settings)
: IRowOutputFormat(header, out_, params_)
, data_types(header.getDataTypes())
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames(), use_length_delimiters_)
, throw_on_multiple_rows_undelimited(!use_length_delimiters_ && !params_.ignore_no_row_delimiter)
, writer(out,
ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema),
header.getNames(), settings.protobuf.write_row_delimiters)
, allow_only_one_row(
!settings.protobuf.write_row_delimiters
&& !settings.protobuf.allow_many_rows_no_delimiters)
{
value_indices.resize(header.columns());
}
void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
{
if (throw_on_multiple_rows_undelimited && !first_row)
if (allow_only_one_row && !first_row)
{
throw Exception("The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.", ErrorCodes::NO_ROW_DELIMITER);
}
@ -51,19 +55,23 @@ void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
{
for (bool use_length_delimiters : {false, true})
for (bool write_row_delimiters : {false, true})
{
factory.registerOutputFormatProcessor(
use_length_delimiters ? "Protobuf" : "ProtobufSingle",
[use_length_delimiters](WriteBuffer & buf,
write_row_delimiters ? "Protobuf" : "ProtobufSingle",
[write_row_delimiters](WriteBuffer & buf,
const Block & header,
const RowOutputFormatParams & params,
const FormatSettings & settings)
const FormatSettings & _settings)
{
return std::make_shared<ProtobufRowOutputFormat>(buf, header, params,
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path),
use_length_delimiters);
FormatSettings settings = _settings;
settings.protobuf.write_row_delimiters = write_row_delimiters;
return std::make_shared<ProtobufRowOutputFormat>(
buf, header, params,
FormatSchemaInfo(settings.schema.format_schema, "Protobuf",
true, settings.schema.is_server,
settings.schema.format_schema_path),
settings);
});
}
}

View File

@ -41,7 +41,7 @@ public:
const Block & header,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & format_schema,
const bool use_length_delimiters_);
const FormatSettings & settings);
String getName() const override { return "ProtobufRowOutputFormat"; }
@ -53,7 +53,7 @@ private:
DataTypes data_types;
ProtobufWriter writer;
std::vector<size_t> value_indices;
const bool throw_on_multiple_rows_undelimited;
const bool allow_only_one_row;
};
}

View File

@ -54,8 +54,6 @@ Chunk ValuesBlockInputFormat::generate()
if (buf.eof() || *buf.position() == ';')
break;
readRow(columns, rows_in_block);
if (params.callback)
params.callback();
}
catch (Exception & e)
{

View File

@ -32,13 +32,16 @@ void KafkaBlockOutputStream::writePrefix()
if (!buffer)
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
child = FormatFactory::instance().getOutput(
storage.getFormatName(), *buffer, getHeader(), *context, [this](const Columns & columns, size_t row)
{
buffer->countRow(columns, row);
},
/* ignore_no_row_delimiter = */ true
);
auto format_settings = getFormatSettings(*context);
format_settings.protobuf.allow_many_rows_no_delimiters = true;
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer,
getHeader(), *context,
[this](const Columns & columns, size_t row)
{
buffer->countRow(columns, row);
},
format_settings);
}
void KafkaBlockOutputStream::write(const Block & block)

View File

@ -42,13 +42,16 @@ void RabbitMQBlockOutputStream::writePrefix()
buffer->activateWriting();
child = FormatFactory::instance().getOutput(
storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & /* columns */, size_t /* rows */)
{
buffer->countRow();
},
/* ignore_no_row_delimiter = */ true
);
auto format_settings = getFormatSettings(context);
format_settings.protobuf.allow_many_rows_no_delimiters = true;
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer,
getHeader(), context,
[this](const Columns & /* columns */, size_t /* rows */)
{
buffer->countRow();
},
format_settings);
}

View File

@ -4,6 +4,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
@ -202,6 +203,7 @@ StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArgu
StorageFile::StorageFile(CommonArguments args)
: IStorage(args.table_id)
, format_name(args.format_name)
, format_settings(args.format_settings)
, compression_method(args.compression_method)
, base_path(args.context.getPath())
{
@ -324,9 +326,11 @@ public:
method = chooseCompressionMethod(current_path, storage->compression_method);
}
read_buf = wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method);
reader = FormatFactory::instance().getInput(
storage->format_name, *read_buf, metadata_snapshot->getSampleBlock(), context, max_block_size);
read_buf = wrapReadBufferWithCompressionMethod(
std::move(nested_buffer), method);
reader = FormatFactory::instance().getInput(storage->format_name,
*read_buf, metadata_snapshot->getSampleBlock(), context,
max_block_size, storage->format_settings);
if (columns_description.hasDefaults())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns_description, context);
@ -430,8 +434,11 @@ Pipe StorageFile::read(
pipes.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns()));
this_ptr, metadata_snapshot, context, max_block_size, files_info,
metadata_snapshot->getColumns()));
}
return Pipe::unitePipes(std::move(pipes));
}
@ -444,7 +451,8 @@ public:
StorageFile & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const CompressionMethod compression_method,
const Context & context)
const Context & context,
const FormatSettings & format_settings)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(storage.rwlock)
@ -472,7 +480,9 @@ public:
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
writer = FormatFactory::instance().getOutput(storage.format_name, *write_buf, metadata_snapshot->getSampleBlock(), context);
writer = FormatFactory::instance().getOutput(storage.format_name,
*write_buf, metadata_snapshot->getSampleBlock(), context,
{}, format_settings);
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
@ -521,7 +531,8 @@ BlockOutputStreamPtr StorageFile::write(
path = paths[0];
return std::make_shared<StorageFileBlockOutputStream>(*this, metadata_snapshot,
chooseCompressionMethod(path, compression_method), context);
chooseCompressionMethod(path, compression_method), context,
format_settings);
}
Strings StorageFile::getDataPaths() const
@ -581,32 +592,54 @@ void StorageFile::truncate(
void registerStorageFile(StorageFactory & factory)
{
StorageFactory::StorageFeatures storage_features{
.supports_settings = true,
.source_access_type = AccessType::FILE
};
factory.registerStorage(
"File",
[](const StorageFactory::Arguments & args)
[](const StorageFactory::Arguments & factory_args)
{
ASTs & engine_args = args.engine_args;
StorageFile::CommonArguments storage_args{
.table_id = factory_args.table_id,
.columns = factory_args.columns,
.constraints = factory_args.constraints,
.context = factory_args.context
};
if (!(engine_args.size() >= 1 && engine_args.size() <= 3)) // NOLINT
ASTs & engine_args_ast = factory_args.engine_args;
if (!(engine_args_ast.size() >= 1 && engine_args_ast.size() <= 3)) // NOLINT
throw Exception(
"Storage File requires from 1 to 3 arguments: name of used format, source and compression_method.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
String format_name = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
engine_args_ast[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[0], factory_args.local_context);
storage_args.format_name = engine_args_ast[0]->as<ASTLiteral &>().value.safeGet<String>();
String compression_method;
StorageFile::CommonArguments common_args{
args.table_id, format_name, compression_method, args.columns, args.constraints, args.context};
if (factory_args.storage_def->settings)
{
Context local_context_copy = factory_args.local_context;
local_context_copy.applySettingsChanges(
factory_args.storage_def->settings->changes);
storage_args.format_settings = getFormatSettings(
local_context_copy);
}
else
{
storage_args.format_settings = getFormatSettings(
factory_args.local_context);
}
if (engine_args.size() == 1) /// Table in database
return StorageFile::create(args.relative_data_path, common_args);
if (engine_args_ast.size() == 1) /// Table in database
return StorageFile::create(factory_args.relative_data_path, storage_args);
/// Will use FD if engine_args[1] is int literal or identifier with std* name
int source_fd = -1;
String source_path;
if (auto opt_name = tryGetIdentifierName(engine_args[1]))
if (auto opt_name = tryGetIdentifierName(engine_args_ast[1]))
{
if (*opt_name == "stdin")
source_fd = STDIN_FILENO;
@ -618,7 +651,7 @@ void registerStorageFile(StorageFactory & factory)
throw Exception(
"Unknown identifier '" + *opt_name + "' in second arg of File storage constructor", ErrorCodes::UNKNOWN_IDENTIFIER);
}
else if (const auto * literal = engine_args[1]->as<ASTLiteral>())
else if (const auto * literal = engine_args_ast[1]->as<ASTLiteral>())
{
auto type = literal->value.getType();
if (type == Field::Types::Int64)
@ -631,23 +664,23 @@ void registerStorageFile(StorageFactory & factory)
throw Exception("Second argument must be path or file descriptor", ErrorCodes::BAD_ARGUMENTS);
}
if (engine_args.size() == 3)
if (engine_args_ast.size() == 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
engine_args_ast[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[2], factory_args.local_context);
storage_args.compression_method = engine_args_ast[2]->as<ASTLiteral &>().value.safeGet<String>();
}
else
compression_method = "auto";
storage_args.compression_method = "auto";
if (0 <= source_fd) /// File descriptor
return StorageFile::create(source_fd, common_args);
return StorageFile::create(source_fd, storage_args);
else /// User's file
return StorageFile::create(source_path, args.context.getUserFilesPath(), common_args);
return StorageFile::create(source_path, factory_args.context.getUserFilesPath(), storage_args);
},
{
.source_access_type = AccessType::FILE,
});
storage_features);
}
NamesAndTypesList StorageFile::getVirtuals() const
{
return NamesAndTypesList{

View File

@ -50,9 +50,10 @@ public:
struct CommonArguments
{
const StorageID & table_id;
const std::string & format_name;
const std::string & compression_method;
StorageID table_id;
std::string format_name;
FormatSettings format_settings;
std::string compression_method;
const ColumnsDescription & columns;
const ConstraintsDescription & constraints;
const Context & context;
@ -79,6 +80,7 @@ private:
explicit StorageFile(CommonArguments args);
std::string format_name;
FormatSettings format_settings;
int table_fd = -1;
String compression_method;

View File

@ -1,10 +1,12 @@
#include <Storages/StorageFile.h>
#include <Storages/ColumnsDescription.h>
#include <Access/AccessFlags.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionFile.h>
#include <Interpreters/Context.h>
#include "registerTableFunctions.h"
#include <Access/AccessFlags.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageFile.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
{
@ -12,7 +14,13 @@ StoragePtr TableFunctionFile::getStorage(
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const std::string & compression_method_) const
{
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), format_, compression_method_, columns, ConstraintsDescription{}, global_context};
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name),
format_,
getFormatSettings(global_context),
compression_method_,
columns,
ConstraintsDescription{},
global_context};
return StorageFile::create(source, global_context.getUserFilesPath(), args);
}

View File

@ -0,0 +1,2 @@
1|1
1 1

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
set -eu
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
the_file="$CLICKHOUSE_TMP/01544-t.csv"
rm -f -- "$the_file"
# We are going to check that format settings work for File engine,
# by creating a table with a non-default delimiter, and reading from it.
${CLICKHOUSE_LOCAL} --query "
create table t(a int, b int) engine File(CSV, '$the_file') settings format_csv_delimiter = '|';
insert into t select 1 a, 1 b;
"
# See what's in the file
cat "$the_file"
${CLICKHOUSE_LOCAL} --query "
create table t(a int, b int) engine File(CSV, '$the_file') settings format_csv_delimiter = '|';
select * from t;
"