mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #16591 from ClickHouse/aku/create-file
Support `SETTINGS` clause for File engine
This commit is contained in:
commit
67099f28ac
@ -130,4 +130,6 @@ void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfigura
|
||||
}
|
||||
}
|
||||
|
||||
IMPLEMENT_SETTINGS_TRAITS(FormatFactorySettingsTraits, FORMAT_FACTORY_SETTINGS)
|
||||
|
||||
}
|
||||
|
@ -514,4 +514,13 @@ struct Settings : public BaseSettings<SettingsTraits>
|
||||
static void checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path);
|
||||
};
|
||||
|
||||
/*
|
||||
* User-specified file format settings for File and ULR engines.
|
||||
*/
|
||||
DECLARE_SETTINGS_TRAITS(FormatFactorySettingsTraits, FORMAT_FACTORY_SETTINGS)
|
||||
|
||||
struct FormatFactorySettings : public BaseSettings<FormatFactorySettingsTraits>
|
||||
{
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -40,100 +40,93 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name)
|
||||
throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
|
||||
}
|
||||
|
||||
FormatSettings getFormatSettings(const Context & context)
|
||||
{
|
||||
const auto & settings = context.getSettingsRef();
|
||||
|
||||
static FormatSettings getInputFormatSetting(const Settings & settings, const Context & context)
|
||||
return getFormatSettings(context, settings);
|
||||
}
|
||||
|
||||
template <typename Settings>
|
||||
FormatSettings getFormatSettings(const Context & context,
|
||||
const Settings & settings)
|
||||
{
|
||||
FormatSettings format_settings;
|
||||
format_settings.csv.delimiter = settings.format_csv_delimiter;
|
||||
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
|
||||
|
||||
format_settings.avro.allow_missing_fields = settings.input_format_avro_allow_missing_fields;
|
||||
format_settings.avro.output_codec = settings.output_format_avro_codec;
|
||||
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
|
||||
format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString();
|
||||
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
|
||||
format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null;
|
||||
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
|
||||
format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line;
|
||||
format_settings.csv.delimiter = settings.format_csv_delimiter;
|
||||
format_settings.csv.empty_as_default = settings.input_format_defaults_for_omitted_fields;
|
||||
format_settings.csv.input_format_enum_as_number = settings.input_format_csv_enum_as_number;
|
||||
format_settings.null_as_default = settings.input_format_null_as_default;
|
||||
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
|
||||
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
|
||||
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
|
||||
format_settings.with_names_use_header = settings.input_format_with_names_use_header;
|
||||
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
|
||||
format_settings.import_nested_json = settings.input_format_import_nested_json;
|
||||
format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null;
|
||||
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
|
||||
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
|
||||
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
|
||||
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
|
||||
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
|
||||
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
|
||||
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
|
||||
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
|
||||
format_settings.date_time_input_format = settings.date_time_input_format;
|
||||
format_settings.date_time_output_format = settings.date_time_output_format;
|
||||
format_settings.enable_streaming = settings.output_format_enable_streaming;
|
||||
format_settings.import_nested_json = settings.input_format_import_nested_json;
|
||||
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
|
||||
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
|
||||
format_settings.template_settings.resultset_format = settings.format_template_resultset;
|
||||
format_settings.template_settings.row_format = settings.format_template_row;
|
||||
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
|
||||
format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
|
||||
format_settings.tsv.input_format_enum_as_number = settings.input_format_tsv_enum_as_number;
|
||||
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
|
||||
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
|
||||
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
|
||||
format_settings.null_as_default = settings.input_format_null_as_default;
|
||||
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
|
||||
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
|
||||
format_settings.pretty.color = settings.output_format_pretty_color;
|
||||
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
|
||||
format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
|
||||
format_settings.pretty.max_value_width = settings.output_format_pretty_max_value_width;
|
||||
format_settings.pretty.output_format_pretty_row_numbers = settings.output_format_pretty_row_numbers;
|
||||
format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule;
|
||||
format_settings.regexp.regexp = settings.format_regexp;
|
||||
format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched;
|
||||
format_settings.schema.format_schema = settings.format_schema;
|
||||
format_settings.schema.format_schema_path = context.getFormatSchemaPath();
|
||||
format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
|
||||
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
|
||||
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
|
||||
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
|
||||
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
|
||||
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
|
||||
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
|
||||
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
|
||||
format_settings.regexp.regexp = settings.format_regexp;
|
||||
format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule;
|
||||
format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched;
|
||||
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
|
||||
format_settings.template_settings.resultset_format = settings.format_template_resultset;
|
||||
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
|
||||
format_settings.template_settings.row_format = settings.format_template_row;
|
||||
format_settings.tsv.crlf_end_of_line = settings.output_format_tsv_crlf_end_of_line;
|
||||
format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
|
||||
format_settings.tsv.input_format_enum_as_number = settings.input_format_tsv_enum_as_number;
|
||||
format_settings.tsv.null_representation = settings.output_format_tsv_null_representation;
|
||||
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
|
||||
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
|
||||
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
|
||||
format_settings.with_names_use_header = settings.input_format_with_names_use_header;
|
||||
format_settings.write_statistics = settings.output_format_write_statistics;
|
||||
|
||||
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
|
||||
if (context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER))
|
||||
if (format_settings.schema.is_server)
|
||||
{
|
||||
const Poco::URI & avro_schema_registry_url = settings.format_avro_schema_registry_url;
|
||||
if (!avro_schema_registry_url.empty())
|
||||
context.getRemoteHostFilter().checkURL(avro_schema_registry_url);
|
||||
}
|
||||
format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString();
|
||||
format_settings.avro.allow_missing_fields = settings.input_format_avro_allow_missing_fields;
|
||||
|
||||
return format_settings;
|
||||
}
|
||||
|
||||
static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context)
|
||||
{
|
||||
FormatSettings format_settings;
|
||||
format_settings.enable_streaming = settings.output_format_enable_streaming;
|
||||
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
|
||||
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
|
||||
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
|
||||
format_settings.csv.delimiter = settings.format_csv_delimiter;
|
||||
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
|
||||
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
|
||||
format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line;
|
||||
format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
|
||||
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
|
||||
format_settings.pretty.max_value_width = settings.output_format_pretty_max_value_width;
|
||||
format_settings.pretty.color = settings.output_format_pretty_color;
|
||||
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ?
|
||||
FormatSettings::Pretty::Charset::ASCII :
|
||||
FormatSettings::Pretty::Charset::UTF8;
|
||||
format_settings.pretty.output_format_pretty_row_numbers = settings.output_format_pretty_row_numbers;
|
||||
format_settings.template_settings.resultset_format = settings.format_template_resultset;
|
||||
format_settings.template_settings.row_format = settings.format_template_row;
|
||||
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
|
||||
format_settings.tsv.crlf_end_of_line = settings.output_format_tsv_crlf_end_of_line;
|
||||
format_settings.tsv.null_representation = settings.output_format_tsv_null_representation;
|
||||
format_settings.write_statistics = settings.output_format_write_statistics;
|
||||
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
|
||||
format_settings.schema.format_schema = settings.format_schema;
|
||||
format_settings.schema.format_schema_path = context.getFormatSchemaPath();
|
||||
format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
|
||||
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
|
||||
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
|
||||
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
|
||||
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
|
||||
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
|
||||
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
|
||||
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
|
||||
format_settings.avro.output_codec = settings.output_format_avro_codec;
|
||||
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
|
||||
format_settings.date_time_output_format = settings.date_time_output_format;
|
||||
template
|
||||
FormatSettings getFormatSettings<FormatFactorySettings>(const Context & context,
|
||||
const FormatFactorySettings & settings);
|
||||
|
||||
return format_settings;
|
||||
}
|
||||
template
|
||||
FormatSettings getFormatSettings<Settings>(const Context & context,
|
||||
const Settings & settings);
|
||||
|
||||
|
||||
BlockInputStreamPtr FormatFactory::getInput(
|
||||
@ -142,21 +135,22 @@ BlockInputStreamPtr FormatFactory::getInput(
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
ReadCallback callback) const
|
||||
const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
if (name == "Native")
|
||||
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
|
||||
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
if (!getCreators(name).input_processor_creator)
|
||||
{
|
||||
const auto & input_getter = getCreators(name).input_creator;
|
||||
if (!input_getter)
|
||||
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
FormatSettings format_settings = getInputFormatSetting(settings, context);
|
||||
|
||||
return input_getter(buf, sample, max_block_size, callback ? callback : ReadCallback(), format_settings);
|
||||
return input_getter(buf, sample, max_block_size, {}, format_settings);
|
||||
}
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
@ -182,17 +176,16 @@ BlockInputStreamPtr FormatFactory::getInput(
|
||||
if (!input_getter)
|
||||
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
|
||||
|
||||
FormatSettings format_settings = getInputFormatSetting(settings, context);
|
||||
|
||||
RowInputFormatParams row_input_format_params;
|
||||
row_input_format_params.max_block_size = max_block_size;
|
||||
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
|
||||
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
|
||||
row_input_format_params.callback = std::move(callback);
|
||||
row_input_format_params.max_execution_time = settings.max_execution_time;
|
||||
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
|
||||
|
||||
auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, row_input_format_params, format_settings};
|
||||
auto input_creator_params =
|
||||
ParallelParsingBlockInputStream::InputCreatorParams{sample,
|
||||
row_input_format_params, format_settings};
|
||||
ParallelParsingBlockInputStream::Params params{buf, input_getter,
|
||||
input_creator_params, file_segmentation_engine,
|
||||
static_cast<int>(settings.max_threads),
|
||||
@ -200,32 +193,37 @@ BlockInputStreamPtr FormatFactory::getInput(
|
||||
return std::make_shared<ParallelParsingBlockInputStream>(params);
|
||||
}
|
||||
|
||||
auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback));
|
||||
auto format = getInputFormat(name, buf, sample, context, max_block_size,
|
||||
format_settings);
|
||||
return std::make_shared<InputStreamFromInputFormat>(std::move(format));
|
||||
}
|
||||
|
||||
|
||||
BlockOutputStreamPtr FormatFactory::getOutput(
|
||||
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback, const bool ignore_no_row_delimiter) const
|
||||
BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
|
||||
WriteBuffer & buf, const Block & sample, const Context & context,
|
||||
WriteCallback callback, const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
if (!getCreators(name).output_processor_creator)
|
||||
{
|
||||
const auto & output_getter = getCreators(name).output_creator;
|
||||
if (!output_getter)
|
||||
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
FormatSettings format_settings = getOutputFormatSetting(settings, context);
|
||||
|
||||
/** Materialization is needed, because formats can use the functions `IDataType`,
|
||||
* which only work with full columns.
|
||||
*/
|
||||
return std::make_shared<MaterializingBlockOutputStream>(
|
||||
output_getter(buf, sample, std::move(callback), format_settings), sample);
|
||||
output_getter(buf, sample, std::move(callback), format_settings),
|
||||
sample);
|
||||
}
|
||||
|
||||
auto format = getOutputFormat(name, buf, sample, context, std::move(callback), ignore_no_row_delimiter);
|
||||
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
|
||||
auto format = getOutputFormat(name, buf, sample, context, std::move(callback),
|
||||
format_settings);
|
||||
return std::make_shared<MaterializingBlockOutputStream>(
|
||||
std::make_shared<OutputStreamToOutputFormat>(format), sample);
|
||||
}
|
||||
|
||||
|
||||
@ -235,25 +233,27 @@ InputFormatPtr FormatFactory::getInputFormat(
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
ReadCallback callback) const
|
||||
const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
const auto & input_getter = getCreators(name).input_processor_creator;
|
||||
if (!input_getter)
|
||||
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
FormatSettings format_settings = getInputFormatSetting(settings, context);
|
||||
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
RowInputFormatParams params;
|
||||
params.max_block_size = max_block_size;
|
||||
params.allow_errors_num = format_settings.input_allow_errors_num;
|
||||
params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
|
||||
params.callback = std::move(callback);
|
||||
params.max_execution_time = settings.max_execution_time;
|
||||
params.timeout_overflow_mode = settings.timeout_overflow_mode;
|
||||
|
||||
auto format = input_getter(buf, sample, params, format_settings);
|
||||
|
||||
|
||||
/// It's a kludge. Because I cannot remove context from values format.
|
||||
if (auto * values = typeid_cast<ValuesBlockInputFormat *>(format.get()))
|
||||
values->setContext(context);
|
||||
@ -263,19 +263,20 @@ InputFormatPtr FormatFactory::getInputFormat(
|
||||
|
||||
|
||||
OutputFormatPtr FormatFactory::getOutputFormat(
|
||||
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback, const bool ignore_no_row_delimiter) const
|
||||
const String & name, WriteBuffer & buf, const Block & sample,
|
||||
const Context & context, WriteCallback callback,
|
||||
const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
const auto & output_getter = getCreators(name).output_processor_creator;
|
||||
if (!output_getter)
|
||||
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
FormatSettings format_settings = getOutputFormatSetting(settings, context);
|
||||
|
||||
RowOutputFormatParams params;
|
||||
params.ignore_no_row_delimiter = ignore_no_row_delimiter;
|
||||
params.callback = std::move(callback);
|
||||
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
|
||||
* which only work with full columns.
|
||||
*/
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <common/types.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <DataStreams/IBlockStream_fwd.h>
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
|
||||
#include <functional>
|
||||
@ -16,6 +17,8 @@ namespace DB
|
||||
class Block;
|
||||
class Context;
|
||||
struct FormatSettings;
|
||||
struct Settings;
|
||||
struct FormatFactorySettings;
|
||||
|
||||
class ReadBuffer;
|
||||
class WriteBuffer;
|
||||
@ -32,6 +35,11 @@ struct RowOutputFormatParams;
|
||||
using InputFormatPtr = std::shared_ptr<IInputFormat>;
|
||||
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
|
||||
|
||||
FormatSettings getFormatSettings(const Context & context);
|
||||
|
||||
template <typename T>
|
||||
FormatSettings getFormatSettings(const Context & context,
|
||||
const T & settings);
|
||||
|
||||
/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
|
||||
* Note: format and compression are independent things.
|
||||
@ -104,10 +112,11 @@ public:
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
ReadCallback callback = {}) const;
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
|
||||
const Block & sample, const Context & context, WriteCallback callback = {}, const bool ignore_no_row_delimiter = false) const;
|
||||
const Block & sample, const Context & context, WriteCallback callback = {},
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
InputFormatPtr getInputFormat(
|
||||
const String & name,
|
||||
@ -115,10 +124,12 @@ public:
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
ReadCallback callback = {}) const;
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
OutputFormatPtr getOutputFormat(
|
||||
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}, const bool ignore_no_row_delimiter = false) const;
|
||||
const String & name, WriteBuffer & buf, const Block & sample,
|
||||
const Context & context, WriteCallback callback = {},
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
/// Register format by its name.
|
||||
void registerInputFormat(const String & name, InputCreator input_creator);
|
||||
|
@ -6,10 +6,16 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Various tweaks for input/output formats.
|
||||
* Text serialization/deserialization of data types also depend on some of these settings.
|
||||
* NOTE Parameters for unrelated formats and unrelated data types
|
||||
* are collected in this struct - it prevents modularity, but they are difficult to separate.
|
||||
/**
|
||||
* Various tweaks for input/output formats. Text serialization/deserialization
|
||||
* of data types also depend on some of these settings. It is different from
|
||||
* FormatFactorySettings in that it has all necessary user-provided settings
|
||||
* combined with information from context etc, that we can use directly during
|
||||
* serialization. In contrast, FormatFactorySettings' job is to reflect the
|
||||
* changes made to user-visible format settings, such as when tweaking the
|
||||
* the format for File engine.
|
||||
* NOTE Parameters for unrelated formats and unrelated data types are collected
|
||||
* in this struct - it prevents modularity, but they are difficult to separate.
|
||||
*/
|
||||
struct FormatSettings
|
||||
{
|
||||
@ -17,76 +23,6 @@ struct FormatSettings
|
||||
/// Option means that each chunk of data need to be formatted independently. Also each chunk will be flushed at the end of processing.
|
||||
bool enable_streaming = false;
|
||||
|
||||
struct JSON
|
||||
{
|
||||
bool quote_64bit_integers = true;
|
||||
bool quote_denormals = true;
|
||||
bool escape_forward_slashes = true;
|
||||
};
|
||||
|
||||
JSON json;
|
||||
|
||||
struct CSV
|
||||
{
|
||||
char delimiter = ',';
|
||||
bool allow_single_quotes = true;
|
||||
bool allow_double_quotes = true;
|
||||
bool unquoted_null_literal_as_null = false;
|
||||
bool empty_as_default = false;
|
||||
bool crlf_end_of_line = false;
|
||||
bool input_format_enum_as_number = false;
|
||||
};
|
||||
|
||||
CSV csv;
|
||||
|
||||
struct Pretty
|
||||
{
|
||||
UInt64 max_rows = 10000;
|
||||
UInt64 max_column_pad_width = 250;
|
||||
UInt64 max_value_width = 10000;
|
||||
bool color = true;
|
||||
|
||||
bool output_format_pretty_row_numbers = false;
|
||||
|
||||
enum class Charset
|
||||
{
|
||||
UTF8,
|
||||
ASCII,
|
||||
};
|
||||
|
||||
Charset charset = Charset::UTF8;
|
||||
};
|
||||
|
||||
Pretty pretty;
|
||||
|
||||
struct Values
|
||||
{
|
||||
bool interpret_expressions = true;
|
||||
bool deduce_templates_of_expressions = true;
|
||||
bool accurate_types_of_literals = true;
|
||||
};
|
||||
|
||||
Values values;
|
||||
|
||||
struct Template
|
||||
{
|
||||
String resultset_format;
|
||||
String row_format;
|
||||
String row_between_delimiter;
|
||||
};
|
||||
|
||||
Template template_settings;
|
||||
|
||||
struct TSV
|
||||
{
|
||||
bool empty_as_default = false;
|
||||
bool crlf_end_of_line = false;
|
||||
String null_representation = "\\N";
|
||||
bool input_format_enum_as_number = false;
|
||||
};
|
||||
|
||||
TSV tsv;
|
||||
|
||||
bool skip_unknown_fields = false;
|
||||
bool with_names_use_header = false;
|
||||
bool write_statistics = true;
|
||||
@ -113,24 +49,29 @@ struct FormatSettings
|
||||
UInt64 input_allow_errors_num = 0;
|
||||
Float32 input_allow_errors_ratio = 0;
|
||||
|
||||
struct Arrow
|
||||
struct
|
||||
{
|
||||
UInt64 row_group_size = 1000000;
|
||||
} arrow;
|
||||
|
||||
struct Parquet
|
||||
struct
|
||||
{
|
||||
UInt64 row_group_size = 1000000;
|
||||
} parquet;
|
||||
String schema_registry_url;
|
||||
String output_codec;
|
||||
UInt64 output_sync_interval = 16 * 1024;
|
||||
bool allow_missing_fields = false;
|
||||
} avro;
|
||||
|
||||
struct Schema
|
||||
struct CSV
|
||||
{
|
||||
std::string format_schema;
|
||||
std::string format_schema_path;
|
||||
bool is_server = false;
|
||||
};
|
||||
|
||||
Schema schema;
|
||||
char delimiter = ',';
|
||||
bool allow_single_quotes = true;
|
||||
bool allow_double_quotes = true;
|
||||
bool unquoted_null_literal_as_null = false;
|
||||
bool empty_as_default = false;
|
||||
bool crlf_end_of_line = false;
|
||||
bool input_format_enum_as_number = false;
|
||||
} csv;
|
||||
|
||||
struct Custom
|
||||
{
|
||||
@ -141,29 +82,87 @@ struct FormatSettings
|
||||
std::string row_between_delimiter;
|
||||
std::string field_delimiter;
|
||||
std::string escaping_rule;
|
||||
};
|
||||
} custom;
|
||||
|
||||
Custom custom;
|
||||
|
||||
struct Avro
|
||||
struct
|
||||
{
|
||||
String schema_registry_url;
|
||||
String output_codec;
|
||||
UInt64 output_sync_interval = 16 * 1024;
|
||||
bool allow_missing_fields = false;
|
||||
};
|
||||
bool quote_64bit_integers = true;
|
||||
bool quote_denormals = true;
|
||||
bool escape_forward_slashes = true;
|
||||
bool serialize_as_strings = false;
|
||||
} json;
|
||||
|
||||
Avro avro;
|
||||
struct
|
||||
{
|
||||
UInt64 row_group_size = 1000000;
|
||||
} parquet;
|
||||
|
||||
struct Regexp
|
||||
struct Pretty
|
||||
{
|
||||
UInt64 max_rows = 10000;
|
||||
UInt64 max_column_pad_width = 250;
|
||||
UInt64 max_value_width = 10000;
|
||||
bool color = true;
|
||||
|
||||
bool output_format_pretty_row_numbers = false;
|
||||
|
||||
enum class Charset
|
||||
{
|
||||
UTF8,
|
||||
ASCII,
|
||||
};
|
||||
|
||||
Charset charset = Charset::UTF8;
|
||||
} pretty;
|
||||
|
||||
struct
|
||||
{
|
||||
bool write_row_delimiters = true;
|
||||
/**
|
||||
* Some buffers (kafka / rabbit) split the rows internally using callback,
|
||||
* and always send one row per message, so we can push there formats
|
||||
* without framing / delimiters (like ProtobufSingle). In other cases,
|
||||
* we have to enforce exporting at most one row in the format output,
|
||||
* because Protobuf without delimiters is not generally useful.
|
||||
*/
|
||||
bool allow_many_rows_no_delimiters = false;
|
||||
} protobuf;
|
||||
|
||||
struct
|
||||
{
|
||||
std::string regexp;
|
||||
std::string escaping_rule;
|
||||
bool skip_unmatched = false;
|
||||
};
|
||||
} regexp;
|
||||
|
||||
Regexp regexp;
|
||||
struct
|
||||
{
|
||||
std::string format_schema;
|
||||
std::string format_schema_path;
|
||||
bool is_server = false;
|
||||
} schema;
|
||||
|
||||
struct
|
||||
{
|
||||
String resultset_format;
|
||||
String row_format;
|
||||
String row_between_delimiter;
|
||||
} template_settings;
|
||||
|
||||
struct
|
||||
{
|
||||
bool empty_as_default = false;
|
||||
bool crlf_end_of_line = false;
|
||||
String null_representation = "\\N";
|
||||
bool input_format_enum_as_number = false;
|
||||
} tsv;
|
||||
|
||||
struct
|
||||
{
|
||||
bool interpret_expressions = true;
|
||||
bool deduce_templates_of_expressions = true;
|
||||
bool accurate_types_of_literals = true;
|
||||
} values;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -38,8 +38,8 @@ try
|
||||
|
||||
FormatSettings format_settings;
|
||||
|
||||
RowInputFormatParams in_params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, []{}};
|
||||
RowOutputFormatParams out_params{[](const Columns & /* columns */, size_t /* row */){},false};
|
||||
RowInputFormatParams in_params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0};
|
||||
RowOutputFormatParams out_params{[](const Columns & /* columns */, size_t /* row */){}};
|
||||
|
||||
InputFormatPtr input_format = std::make_shared<TabSeparatedRowInputFormat>(sample, in_buf, in_params, false, false, format_settings);
|
||||
BlockInputStreamPtr block_input = std::make_shared<InputStreamFromInputFormat>(std::move(input_format));
|
||||
|
@ -65,8 +65,6 @@ Chunk IRowInputFormat::generate()
|
||||
info.read_columns.clear();
|
||||
if (!readRow(columns, info))
|
||||
break;
|
||||
if (params.callback)
|
||||
params.callback();
|
||||
|
||||
for (size_t column_idx = 0; column_idx < info.read_columns.size(); ++column_idx)
|
||||
{
|
||||
|
@ -27,9 +27,6 @@ struct RowInputFormatParams
|
||||
UInt64 allow_errors_num;
|
||||
Float64 allow_errors_ratio;
|
||||
|
||||
using ReadCallback = std::function<void()>;
|
||||
ReadCallback callback;
|
||||
|
||||
Poco::Timespan max_execution_time = 0;
|
||||
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
|
||||
};
|
||||
|
@ -15,14 +15,6 @@ struct RowOutputFormatParams
|
||||
|
||||
// Callback used to indicate that another row is written.
|
||||
WriteCallback callback;
|
||||
|
||||
/**
|
||||
* some buffers (kafka / rabbit) split the rows internally using callback
|
||||
* so we can push there formats without framing / delimiters
|
||||
* (like ProtobufSingle). In other cases you can't write more than single row
|
||||
* in unframed format.
|
||||
*/
|
||||
bool ignore_no_row_delimiter = false;
|
||||
};
|
||||
|
||||
class WriteBuffer;
|
||||
|
@ -23,18 +23,22 @@ ProtobufRowOutputFormat::ProtobufRowOutputFormat(
|
||||
const Block & header,
|
||||
const RowOutputFormatParams & params_,
|
||||
const FormatSchemaInfo & format_schema,
|
||||
const bool use_length_delimiters_)
|
||||
const FormatSettings & settings)
|
||||
: IRowOutputFormat(header, out_, params_)
|
||||
, data_types(header.getDataTypes())
|
||||
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames(), use_length_delimiters_)
|
||||
, throw_on_multiple_rows_undelimited(!use_length_delimiters_ && !params_.ignore_no_row_delimiter)
|
||||
, writer(out,
|
||||
ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema),
|
||||
header.getNames(), settings.protobuf.write_row_delimiters)
|
||||
, allow_only_one_row(
|
||||
!settings.protobuf.write_row_delimiters
|
||||
&& !settings.protobuf.allow_many_rows_no_delimiters)
|
||||
{
|
||||
value_indices.resize(header.columns());
|
||||
}
|
||||
|
||||
void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
|
||||
{
|
||||
if (throw_on_multiple_rows_undelimited && !first_row)
|
||||
if (allow_only_one_row && !first_row)
|
||||
{
|
||||
throw Exception("The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.", ErrorCodes::NO_ROW_DELIMITER);
|
||||
}
|
||||
@ -51,19 +55,23 @@ void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
|
||||
|
||||
void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
|
||||
{
|
||||
for (bool use_length_delimiters : {false, true})
|
||||
for (bool write_row_delimiters : {false, true})
|
||||
{
|
||||
factory.registerOutputFormatProcessor(
|
||||
use_length_delimiters ? "Protobuf" : "ProtobufSingle",
|
||||
[use_length_delimiters](WriteBuffer & buf,
|
||||
write_row_delimiters ? "Protobuf" : "ProtobufSingle",
|
||||
[write_row_delimiters](WriteBuffer & buf,
|
||||
const Block & header,
|
||||
const RowOutputFormatParams & params,
|
||||
const FormatSettings & settings)
|
||||
const FormatSettings & _settings)
|
||||
{
|
||||
return std::make_shared<ProtobufRowOutputFormat>(buf, header, params,
|
||||
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
|
||||
settings.schema.is_server, settings.schema.format_schema_path),
|
||||
use_length_delimiters);
|
||||
FormatSettings settings = _settings;
|
||||
settings.protobuf.write_row_delimiters = write_row_delimiters;
|
||||
return std::make_shared<ProtobufRowOutputFormat>(
|
||||
buf, header, params,
|
||||
FormatSchemaInfo(settings.schema.format_schema, "Protobuf",
|
||||
true, settings.schema.is_server,
|
||||
settings.schema.format_schema_path),
|
||||
settings);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ public:
|
||||
const Block & header,
|
||||
const RowOutputFormatParams & params_,
|
||||
const FormatSchemaInfo & format_schema,
|
||||
const bool use_length_delimiters_);
|
||||
const FormatSettings & settings);
|
||||
|
||||
String getName() const override { return "ProtobufRowOutputFormat"; }
|
||||
|
||||
@ -53,7 +53,7 @@ private:
|
||||
DataTypes data_types;
|
||||
ProtobufWriter writer;
|
||||
std::vector<size_t> value_indices;
|
||||
const bool throw_on_multiple_rows_undelimited;
|
||||
const bool allow_only_one_row;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -54,8 +54,6 @@ Chunk ValuesBlockInputFormat::generate()
|
||||
if (buf.eof() || *buf.position() == ';')
|
||||
break;
|
||||
readRow(columns, rows_in_block);
|
||||
if (params.callback)
|
||||
params.callback();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
|
@ -32,13 +32,16 @@ void KafkaBlockOutputStream::writePrefix()
|
||||
if (!buffer)
|
||||
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
|
||||
|
||||
child = FormatFactory::instance().getOutput(
|
||||
storage.getFormatName(), *buffer, getHeader(), *context, [this](const Columns & columns, size_t row)
|
||||
{
|
||||
buffer->countRow(columns, row);
|
||||
},
|
||||
/* ignore_no_row_delimiter = */ true
|
||||
);
|
||||
auto format_settings = getFormatSettings(*context);
|
||||
format_settings.protobuf.allow_many_rows_no_delimiters = true;
|
||||
|
||||
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer,
|
||||
getHeader(), *context,
|
||||
[this](const Columns & columns, size_t row)
|
||||
{
|
||||
buffer->countRow(columns, row);
|
||||
},
|
||||
format_settings);
|
||||
}
|
||||
|
||||
void KafkaBlockOutputStream::write(const Block & block)
|
||||
|
@ -42,13 +42,16 @@ void RabbitMQBlockOutputStream::writePrefix()
|
||||
|
||||
buffer->activateWriting();
|
||||
|
||||
child = FormatFactory::instance().getOutput(
|
||||
storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & /* columns */, size_t /* rows */)
|
||||
{
|
||||
buffer->countRow();
|
||||
},
|
||||
/* ignore_no_row_delimiter = */ true
|
||||
);
|
||||
auto format_settings = getFormatSettings(context);
|
||||
format_settings.protobuf.allow_many_rows_no_delimiters = true;
|
||||
|
||||
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer,
|
||||
getHeader(), context,
|
||||
[this](const Columns & /* columns */, size_t /* rows */)
|
||||
{
|
||||
buffer->countRow();
|
||||
},
|
||||
format_settings);
|
||||
}
|
||||
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
|
||||
@ -202,6 +203,7 @@ StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArgu
|
||||
StorageFile::StorageFile(CommonArguments args)
|
||||
: IStorage(args.table_id)
|
||||
, format_name(args.format_name)
|
||||
, format_settings(args.format_settings)
|
||||
, compression_method(args.compression_method)
|
||||
, base_path(args.context.getPath())
|
||||
{
|
||||
@ -324,9 +326,11 @@ public:
|
||||
method = chooseCompressionMethod(current_path, storage->compression_method);
|
||||
}
|
||||
|
||||
read_buf = wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method);
|
||||
reader = FormatFactory::instance().getInput(
|
||||
storage->format_name, *read_buf, metadata_snapshot->getSampleBlock(), context, max_block_size);
|
||||
read_buf = wrapReadBufferWithCompressionMethod(
|
||||
std::move(nested_buffer), method);
|
||||
reader = FormatFactory::instance().getInput(storage->format_name,
|
||||
*read_buf, metadata_snapshot->getSampleBlock(), context,
|
||||
max_block_size, storage->format_settings);
|
||||
|
||||
if (columns_description.hasDefaults())
|
||||
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns_description, context);
|
||||
@ -430,8 +434,11 @@ Pipe StorageFile::read(
|
||||
pipes.reserve(num_streams);
|
||||
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<StorageFileSource>(
|
||||
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns()));
|
||||
this_ptr, metadata_snapshot, context, max_block_size, files_info,
|
||||
metadata_snapshot->getColumns()));
|
||||
}
|
||||
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
}
|
||||
@ -444,7 +451,8 @@ public:
|
||||
StorageFile & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const CompressionMethod compression_method,
|
||||
const Context & context)
|
||||
const Context & context,
|
||||
const std::optional<FormatSettings> & format_settings)
|
||||
: storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, lock(storage.rwlock)
|
||||
@ -472,7 +480,9 @@ public:
|
||||
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
|
||||
|
||||
writer = FormatFactory::instance().getOutput(storage.format_name, *write_buf, metadata_snapshot->getSampleBlock(), context);
|
||||
writer = FormatFactory::instance().getOutput(storage.format_name,
|
||||
*write_buf, metadata_snapshot->getSampleBlock(), context,
|
||||
{}, format_settings);
|
||||
}
|
||||
|
||||
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
|
||||
@ -521,7 +531,8 @@ BlockOutputStreamPtr StorageFile::write(
|
||||
path = paths[0];
|
||||
|
||||
return std::make_shared<StorageFileBlockOutputStream>(*this, metadata_snapshot,
|
||||
chooseCompressionMethod(path, compression_method), context);
|
||||
chooseCompressionMethod(path, compression_method), context,
|
||||
format_settings);
|
||||
}
|
||||
|
||||
bool StorageFile::storesDataOnDisk() const
|
||||
@ -586,32 +597,71 @@ void StorageFile::truncate(
|
||||
|
||||
void registerStorageFile(StorageFactory & factory)
|
||||
{
|
||||
StorageFactory::StorageFeatures storage_features{
|
||||
.supports_settings = true,
|
||||
.source_access_type = AccessType::FILE
|
||||
};
|
||||
|
||||
factory.registerStorage(
|
||||
"File",
|
||||
[](const StorageFactory::Arguments & args)
|
||||
[](const StorageFactory::Arguments & factory_args)
|
||||
{
|
||||
ASTs & engine_args = args.engine_args;
|
||||
StorageFile::CommonArguments storage_args{
|
||||
.table_id = factory_args.table_id,
|
||||
.columns = factory_args.columns,
|
||||
.constraints = factory_args.constraints,
|
||||
.context = factory_args.context
|
||||
};
|
||||
|
||||
if (!(engine_args.size() >= 1 && engine_args.size() <= 3)) // NOLINT
|
||||
ASTs & engine_args_ast = factory_args.engine_args;
|
||||
|
||||
if (!(engine_args_ast.size() >= 1 && engine_args_ast.size() <= 3)) // NOLINT
|
||||
throw Exception(
|
||||
"Storage File requires from 1 to 3 arguments: name of used format, source and compression_method.",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
|
||||
String format_name = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
engine_args_ast[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[0], factory_args.local_context);
|
||||
storage_args.format_name = engine_args_ast[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
String compression_method;
|
||||
StorageFile::CommonArguments common_args{
|
||||
args.table_id, format_name, compression_method, args.columns, args.constraints, args.context};
|
||||
// Use format settings from global server context + settings from
|
||||
// the SETTINGS clause of the create query. Settings from current
|
||||
// session and user are ignored.
|
||||
if (factory_args.storage_def->settings)
|
||||
{
|
||||
FormatFactorySettings user_format_settings;
|
||||
|
||||
if (engine_args.size() == 1) /// Table in database
|
||||
return StorageFile::create(args.relative_data_path, common_args);
|
||||
// Apply changed settings from global context, but ignore the
|
||||
// unknown ones, because we only have the format settings here.
|
||||
const auto & changes = factory_args.context.getSettingsRef().changes();
|
||||
for (const auto & change : changes)
|
||||
{
|
||||
if (user_format_settings.has(change.name))
|
||||
{
|
||||
user_format_settings.set(change.name, change.value);
|
||||
}
|
||||
}
|
||||
|
||||
// Apply changes from SETTINGS clause, with validation.
|
||||
user_format_settings.applyChanges(
|
||||
factory_args.storage_def->settings->changes);
|
||||
|
||||
storage_args.format_settings = getFormatSettings(
|
||||
factory_args.context, user_format_settings);
|
||||
}
|
||||
else
|
||||
{
|
||||
storage_args.format_settings = getFormatSettings(
|
||||
factory_args.context);
|
||||
}
|
||||
|
||||
if (engine_args_ast.size() == 1) /// Table in database
|
||||
return StorageFile::create(factory_args.relative_data_path, storage_args);
|
||||
|
||||
/// Will use FD if engine_args[1] is int literal or identifier with std* name
|
||||
int source_fd = -1;
|
||||
String source_path;
|
||||
|
||||
if (auto opt_name = tryGetIdentifierName(engine_args[1]))
|
||||
if (auto opt_name = tryGetIdentifierName(engine_args_ast[1]))
|
||||
{
|
||||
if (*opt_name == "stdin")
|
||||
source_fd = STDIN_FILENO;
|
||||
@ -623,7 +673,7 @@ void registerStorageFile(StorageFactory & factory)
|
||||
throw Exception(
|
||||
"Unknown identifier '" + *opt_name + "' in second arg of File storage constructor", ErrorCodes::UNKNOWN_IDENTIFIER);
|
||||
}
|
||||
else if (const auto * literal = engine_args[1]->as<ASTLiteral>())
|
||||
else if (const auto * literal = engine_args_ast[1]->as<ASTLiteral>())
|
||||
{
|
||||
auto type = literal->value.getType();
|
||||
if (type == Field::Types::Int64)
|
||||
@ -636,23 +686,23 @@ void registerStorageFile(StorageFactory & factory)
|
||||
throw Exception("Second argument must be path or file descriptor", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
if (engine_args.size() == 3)
|
||||
if (engine_args_ast.size() == 3)
|
||||
{
|
||||
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
|
||||
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
engine_args_ast[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[2], factory_args.local_context);
|
||||
storage_args.compression_method = engine_args_ast[2]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
else
|
||||
compression_method = "auto";
|
||||
storage_args.compression_method = "auto";
|
||||
|
||||
if (0 <= source_fd) /// File descriptor
|
||||
return StorageFile::create(source_fd, common_args);
|
||||
return StorageFile::create(source_fd, storage_args);
|
||||
else /// User's file
|
||||
return StorageFile::create(source_path, args.context.getUserFilesPath(), common_args);
|
||||
return StorageFile::create(source_path, factory_args.context.getUserFilesPath(), storage_args);
|
||||
},
|
||||
{
|
||||
.source_access_type = AccessType::FILE,
|
||||
});
|
||||
storage_features);
|
||||
}
|
||||
|
||||
|
||||
NamesAndTypesList StorageFile::getVirtuals() const
|
||||
{
|
||||
return NamesAndTypesList{
|
||||
|
@ -51,9 +51,10 @@ public:
|
||||
|
||||
struct CommonArguments
|
||||
{
|
||||
const StorageID & table_id;
|
||||
const std::string & format_name;
|
||||
const std::string & compression_method;
|
||||
StorageID table_id;
|
||||
std::string format_name;
|
||||
std::optional<FormatSettings> format_settings;
|
||||
std::string compression_method;
|
||||
const ColumnsDescription & columns;
|
||||
const ConstraintsDescription & constraints;
|
||||
const Context & context;
|
||||
@ -80,6 +81,11 @@ private:
|
||||
explicit StorageFile(CommonArguments args);
|
||||
|
||||
std::string format_name;
|
||||
// We use format settings from global context + CREATE query for File table
|
||||
// function -- in this case, format_settings is set.
|
||||
// For `file` table function, we use format settings from current user context,
|
||||
// in this case, format_settings is not set.
|
||||
std::optional<FormatSettings> format_settings;
|
||||
|
||||
int table_fd = -1;
|
||||
String compression_method;
|
||||
|
@ -3,6 +3,7 @@
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
|
||||
#include <IO/ReadHelpers.h>
|
||||
@ -32,6 +33,7 @@ IStorageURLBase::IStorageURLBase(
|
||||
const Context & context_,
|
||||
const StorageID & table_id_,
|
||||
const String & format_name_,
|
||||
const std::optional<FormatSettings> & format_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & compression_method_)
|
||||
@ -40,6 +42,7 @@ IStorageURLBase::IStorageURLBase(
|
||||
, context_global(context_)
|
||||
, compression_method(compression_method_)
|
||||
, format_name(format_name_)
|
||||
, format_settings(format_settings_)
|
||||
{
|
||||
context_global.getRemoteHostFilter().checkURL(uri);
|
||||
|
||||
@ -58,6 +61,7 @@ namespace
|
||||
const std::string & method,
|
||||
std::function<void(std::ostream &)> callback,
|
||||
const String & format,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
String name_,
|
||||
const Block & sample_block,
|
||||
const Context & context,
|
||||
@ -96,8 +100,10 @@ namespace
|
||||
context.getRemoteHostFilter()),
|
||||
compression_method);
|
||||
|
||||
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
|
||||
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context);
|
||||
reader = FormatFactory::instance().getInput(format, *read_buf,
|
||||
sample_block, context, max_block_size, format_settings);
|
||||
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader,
|
||||
columns, context);
|
||||
}
|
||||
|
||||
String getName() const override
|
||||
@ -134,6 +140,7 @@ namespace
|
||||
|
||||
StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
|
||||
const String & format,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const Block & sample_block_,
|
||||
const Context & context,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
@ -143,7 +150,8 @@ StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
|
||||
compression_method, 3);
|
||||
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
|
||||
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block,
|
||||
context, {} /* write callback */, format_settings);
|
||||
}
|
||||
|
||||
|
||||
@ -214,6 +222,7 @@ Pipe IStorageURLBase::read(
|
||||
column_names, metadata_snapshot, query_info,
|
||||
context, processed_stage, max_block_size),
|
||||
format_name,
|
||||
format_settings,
|
||||
getName(),
|
||||
getHeaderBlock(column_names, metadata_snapshot),
|
||||
context,
|
||||
@ -225,8 +234,8 @@ Pipe IStorageURLBase::read(
|
||||
|
||||
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
|
||||
{
|
||||
return std::make_shared<StorageURLBlockOutputStream>(
|
||||
uri, format_name, metadata_snapshot->getSampleBlock(), context_global,
|
||||
return std::make_shared<StorageURLBlockOutputStream>(uri, format_name,
|
||||
format_settings, metadata_snapshot->getSampleBlock(), context_global,
|
||||
ConnectionTimeouts::getHTTPTimeouts(context_global),
|
||||
chooseCompressionMethod(uri.toString(), compression_method));
|
||||
}
|
||||
@ -255,16 +264,52 @@ void registerStorageURL(StorageFactory & factory)
|
||||
{
|
||||
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
|
||||
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
} else compression_method = "auto";
|
||||
}
|
||||
else
|
||||
{
|
||||
compression_method = "auto";
|
||||
}
|
||||
|
||||
// Use format settings from global server context + settings from
|
||||
// the SETTINGS clause of the create query. Settings from current
|
||||
// session and user are ignored.
|
||||
FormatSettings format_settings;
|
||||
if (args.storage_def->settings)
|
||||
{
|
||||
FormatFactorySettings user_format_settings;
|
||||
|
||||
// Apply changed settings from global context, but ignore the
|
||||
// unknown ones, because we only have the format settings here.
|
||||
const auto & changes = args.context.getSettingsRef().changes();
|
||||
for (const auto & change : changes)
|
||||
{
|
||||
if (user_format_settings.has(change.name))
|
||||
{
|
||||
user_format_settings.set(change.name, change.value);
|
||||
}
|
||||
}
|
||||
|
||||
// Apply changes from SETTINGS clause, with validation.
|
||||
user_format_settings.applyChanges(args.storage_def->settings->changes);
|
||||
|
||||
format_settings = getFormatSettings(args.context,
|
||||
user_format_settings);
|
||||
}
|
||||
else
|
||||
{
|
||||
format_settings = getFormatSettings(args.context);
|
||||
}
|
||||
|
||||
return StorageURL::create(
|
||||
uri,
|
||||
args.table_id,
|
||||
format_name,
|
||||
format_settings,
|
||||
args.columns, args.constraints, args.context,
|
||||
compression_method);
|
||||
},
|
||||
{
|
||||
.supports_settings = true,
|
||||
.source_access_type = AccessType::URL,
|
||||
});
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ protected:
|
||||
const Context & context_,
|
||||
const StorageID & id_,
|
||||
const String & format_name_,
|
||||
const std::optional<FormatSettings> & format_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & compression_method_);
|
||||
@ -44,6 +45,11 @@ protected:
|
||||
const Context & context_global;
|
||||
String compression_method;
|
||||
String format_name;
|
||||
// For URL engine, we use format settings from server context + `SETTINGS`
|
||||
// clause of the `CREATE` query. In this case, format_settings is set.
|
||||
// For `url` table function, we use settings from current query context.
|
||||
// In this case, format_settings is not set.
|
||||
std::optional<FormatSettings> format_settings;
|
||||
|
||||
private:
|
||||
virtual std::string getReadMethod() const;
|
||||
@ -73,6 +79,7 @@ public:
|
||||
StorageURLBlockOutputStream(
|
||||
const Poco::URI & uri,
|
||||
const String & format,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const Block & sample_block_,
|
||||
const Context & context,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
@ -97,15 +104,16 @@ class StorageURL final : public ext::shared_ptr_helper<StorageURL>, public IStor
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageURL>;
|
||||
public:
|
||||
StorageURL(
|
||||
const Poco::URI & uri_,
|
||||
const StorageID & table_id_,
|
||||
const String & format_name_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
Context & context_,
|
||||
const String & compression_method_)
|
||||
: IStorageURLBase(uri_, context_, table_id_, format_name_, columns_, constraints_, compression_method_)
|
||||
StorageURL(const Poco::URI & uri_,
|
||||
const StorageID & table_id_,
|
||||
const String & format_name_,
|
||||
const std::optional<FormatSettings> & format_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
Context & context_,
|
||||
const String & compression_method_)
|
||||
: IStorageURLBase(uri_, context_, table_id_, format_name_,
|
||||
format_settings_, columns_, constraints_, compression_method_)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -1,17 +1,18 @@
|
||||
#include "StorageXDBC.h"
|
||||
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Poco/Net/HTTPRequest.h>
|
||||
#include <Poco/Path.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageURL.h>
|
||||
#include <Storages/transformQueryForExternalDatabase.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Poco/Net/HTTPRequest.h>
|
||||
#include <Poco/Path.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
|
||||
#include <Processors/Pipe.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -33,6 +34,7 @@ StorageXDBC::StorageXDBC(
|
||||
context_,
|
||||
table_id_,
|
||||
IXDBCBridgeHelper::DEFAULT_FORMAT,
|
||||
getFormatSettings(context_),
|
||||
columns_,
|
||||
ConstraintsDescription{},
|
||||
"" /* CompressionMethod */)
|
||||
@ -121,6 +123,7 @@ BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageM
|
||||
return std::make_shared<StorageURLBlockOutputStream>(
|
||||
request_uri,
|
||||
format_name,
|
||||
getFormatSettings(context),
|
||||
metadata_snapshot->getSampleBlock(),
|
||||
context,
|
||||
ConnectionTimeouts::getHTTPTimeouts(context),
|
||||
|
@ -1,18 +1,29 @@
|
||||
#include <Storages/StorageFile.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Access/AccessFlags.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <TableFunctions/TableFunctionFile.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include "registerTableFunctions.h"
|
||||
#include <Access/AccessFlags.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Storages/StorageFile.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
StoragePtr TableFunctionFile::getStorage(
|
||||
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
|
||||
const std::string & table_name, const std::string & compression_method_) const
|
||||
StoragePtr TableFunctionFile::getStorage(const String & source,
|
||||
const String & format_, const ColumnsDescription & columns,
|
||||
Context & global_context, const std::string & table_name,
|
||||
const std::string & compression_method_) const
|
||||
{
|
||||
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), format_, compression_method_, columns, ConstraintsDescription{}, global_context};
|
||||
// For `file` table function, we are going to use format settings from the
|
||||
// query context.
|
||||
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name),
|
||||
format_,
|
||||
std::nullopt /*format settings*/,
|
||||
compression_method_,
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
global_context};
|
||||
|
||||
return StorageFile::create(source, global_context.getUserFilesPath(), args);
|
||||
}
|
||||
|
@ -1,10 +1,12 @@
|
||||
#include <Storages/StorageURL.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Access/AccessFlags.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <TableFunctions/TableFunctionURL.h>
|
||||
#include <Poco/URI.h>
|
||||
|
||||
#include "registerTableFunctions.h"
|
||||
#include <Access/AccessFlags.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Poco/URI.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Storages/StorageURL.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -14,8 +16,9 @@ StoragePtr TableFunctionURL::getStorage(
|
||||
const std::string & table_name, const String & compression_method_) const
|
||||
{
|
||||
Poco::URI uri(source);
|
||||
return StorageURL::create(uri, StorageID(getDatabaseName(), table_name), format_, columns, ConstraintsDescription{},
|
||||
global_context, compression_method_);
|
||||
return StorageURL::create(uri, StorageID(getDatabaseName(), table_name),
|
||||
format_, std::nullopt /*format settings*/, columns,
|
||||
ConstraintsDescription{}, global_context, compression_method_);
|
||||
}
|
||||
|
||||
void registerTableFunctionURL(TableFunctionFactory & factory)
|
||||
|
@ -0,0 +1,3 @@
|
||||
1|1
|
||||
1 1
|
||||
1 2
|
27
tests/queries/0_stateless/01544_file_engine_settings.sh
Executable file
27
tests/queries/0_stateless/01544_file_engine_settings.sh
Executable file
@ -0,0 +1,27 @@
|
||||
#!/usr/bin/env bash
|
||||
set -eu
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
the_file="$CLICKHOUSE_TMP/01544-t.csv"
|
||||
rm -f -- "$the_file"
|
||||
|
||||
# We are going to check that format settings work for File engine,
|
||||
# by creating a table with a non-default delimiter, and reading from it.
|
||||
${CLICKHOUSE_LOCAL} --query "
|
||||
create table t(a int, b int) engine File(CSV, '$the_file') settings format_csv_delimiter = '|';
|
||||
insert into t select 1 a, 1 b;
|
||||
"
|
||||
|
||||
# See what's in the file
|
||||
cat "$the_file"
|
||||
|
||||
${CLICKHOUSE_LOCAL} --query "
|
||||
create table t(a int, b int) engine File(CSV, '$the_file') settings format_csv_delimiter = '|';
|
||||
select * from t;
|
||||
"
|
||||
|
||||
# Also check that the File engine emplicitly created by clickhouse-local
|
||||
# uses the modified settings.
|
||||
${CLICKHOUSE_LOCAL} --structure="a int, b int" --input-format=CSV --format_csv_delimiter="|" --query="select * from table" <<<"1|2"
|
@ -0,0 +1,4 @@
|
||||
1 2
|
||||
1 2
|
||||
1 2
|
||||
1 2
|
19
tests/queries/0_stateless/01545_url_file_format_settings.sql
Normal file
19
tests/queries/0_stateless/01545_url_file_format_settings.sql
Normal file
@ -0,0 +1,19 @@
|
||||
create table file_delim(a int, b int) engine File(CSV, '01545_url_file_format_settings.csv') settings format_csv_delimiter = '|';
|
||||
|
||||
truncate table file_delim;
|
||||
|
||||
insert into file_delim select 1, 2;
|
||||
|
||||
-- select 1, 2 format CSV settings format_csv_delimiter='/';
|
||||
create table url_delim(a int, b int) engine URL('http://127.0.0.1:8123/?query=select%201%2C%202%20format%20CSV%20settings%20format_csv_delimiter%3D%27/%27%3B%0A', CSV) settings format_csv_delimiter = '/';
|
||||
|
||||
select * from file_delim;
|
||||
|
||||
select * from url_delim;
|
||||
|
||||
select * from file('01545_url_file_format_settings.csv', CSV, 'a int, b int') settings format_csv_delimiter = '|';
|
||||
|
||||
select * from url('http://127.0.0.1:8123/?query=select%201%2C%202%20format%20CSV%20settings%20format_csv_delimiter%3D%27/%27%3B%0A', CSV, 'a int, b int') settings format_csv_delimiter = '/';
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user