mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 07:01:59 +00:00
make a separate settings collection + some cleanup
This commit is contained in:
parent
2efbcbaa14
commit
3c60f6cec2
@ -130,4 +130,6 @@ void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfigura
|
||||
}
|
||||
}
|
||||
|
||||
IMPLEMENT_SETTINGS_TRAITS(FormatFactorySettingsTraits, FORMAT_FACTORY_SETTINGS)
|
||||
|
||||
}
|
||||
|
@ -518,4 +518,13 @@ struct Settings : public BaseSettings<SettingsTraits>
|
||||
static void checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path);
|
||||
};
|
||||
|
||||
/*
|
||||
* User-specified file format settings for File and ULR engines.
|
||||
*/
|
||||
DECLARE_SETTINGS_TRAITS(FormatFactorySettingsTraits, FORMAT_FACTORY_SETTINGS)
|
||||
|
||||
struct FormatFactorySettings : public BaseSettings<FormatFactorySettingsTraits>
|
||||
{
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -40,11 +40,17 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name)
|
||||
throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
|
||||
}
|
||||
|
||||
|
||||
FormatSettings getFormatSettings(const Context & context)
|
||||
{
|
||||
const auto & settings = context.getSettingsRef();
|
||||
|
||||
return getFormatSettings(context, settings);
|
||||
}
|
||||
|
||||
template <typename Settings>
|
||||
FormatSettings getFormatSettings(const Context & context,
|
||||
const Settings & settings)
|
||||
{
|
||||
FormatSettings format_settings;
|
||||
|
||||
format_settings.avro.allow_missing_fields = settings.input_format_avro_allow_missing_fields;
|
||||
@ -114,6 +120,14 @@ FormatSettings getFormatSettings(const Context & context)
|
||||
return format_settings;
|
||||
}
|
||||
|
||||
template
|
||||
FormatSettings getFormatSettings<FormatFactorySettings>(const Context & context,
|
||||
const FormatFactorySettings & settings);
|
||||
|
||||
template
|
||||
FormatSettings getFormatSettings<Settings>(const Context & context,
|
||||
const Settings & settings);
|
||||
|
||||
|
||||
BlockInputStreamPtr FormatFactory::getInput(
|
||||
const String & name,
|
||||
@ -121,23 +135,22 @@ BlockInputStreamPtr FormatFactory::getInput(
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
std::optional<FormatSettings> format_settings) const
|
||||
const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
if (name == "Native")
|
||||
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
|
||||
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
if (!getCreators(name).input_processor_creator)
|
||||
{
|
||||
const auto & input_getter = getCreators(name).input_creator;
|
||||
if (!input_getter)
|
||||
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
|
||||
|
||||
if (!format_settings)
|
||||
{
|
||||
format_settings = getFormatSettings(context);
|
||||
}
|
||||
|
||||
return input_getter(buf, sample, max_block_size, {}, *format_settings);
|
||||
return input_getter(buf, sample, max_block_size, {}, format_settings);
|
||||
}
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
@ -163,21 +176,16 @@ BlockInputStreamPtr FormatFactory::getInput(
|
||||
if (!input_getter)
|
||||
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
|
||||
|
||||
if (!format_settings)
|
||||
{
|
||||
format_settings = getFormatSettings(context);
|
||||
}
|
||||
|
||||
RowInputFormatParams row_input_format_params;
|
||||
row_input_format_params.max_block_size = max_block_size;
|
||||
row_input_format_params.allow_errors_num = format_settings->input_allow_errors_num;
|
||||
row_input_format_params.allow_errors_ratio = format_settings->input_allow_errors_ratio;
|
||||
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
|
||||
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
|
||||
row_input_format_params.max_execution_time = settings.max_execution_time;
|
||||
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
|
||||
|
||||
auto input_creator_params =
|
||||
ParallelParsingBlockInputStream::InputCreatorParams{sample,
|
||||
row_input_format_params, *format_settings};
|
||||
row_input_format_params, format_settings};
|
||||
ParallelParsingBlockInputStream::Params params{buf, input_getter,
|
||||
input_creator_params, file_segmentation_engine,
|
||||
static_cast<int>(settings.max_threads),
|
||||
@ -193,24 +201,22 @@ BlockInputStreamPtr FormatFactory::getInput(
|
||||
|
||||
BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
|
||||
WriteBuffer & buf, const Block & sample, const Context & context,
|
||||
WriteCallback callback, std::optional<FormatSettings> format_settings) const
|
||||
WriteCallback callback, const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
if (!getCreators(name).output_processor_creator)
|
||||
{
|
||||
const auto & output_getter = getCreators(name).output_creator;
|
||||
if (!output_getter)
|
||||
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
|
||||
|
||||
if (!format_settings)
|
||||
{
|
||||
format_settings = getFormatSettings(context);
|
||||
}
|
||||
|
||||
/** Materialization is needed, because formats can use the functions `IDataType`,
|
||||
* which only work with full columns.
|
||||
*/
|
||||
return std::make_shared<MaterializingBlockOutputStream>(
|
||||
output_getter(buf, sample, std::move(callback), *format_settings),
|
||||
output_getter(buf, sample, std::move(callback), format_settings),
|
||||
sample);
|
||||
}
|
||||
|
||||
@ -227,7 +233,7 @@ InputFormatPtr FormatFactory::getInputFormat(
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
std::optional<FormatSettings> format_settings) const
|
||||
const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
const auto & input_getter = getCreators(name).input_processor_creator;
|
||||
if (!input_getter)
|
||||
@ -235,19 +241,18 @@ InputFormatPtr FormatFactory::getInputFormat(
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
if (!format_settings)
|
||||
{
|
||||
format_settings = getFormatSettings(context);
|
||||
}
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
RowInputFormatParams params;
|
||||
params.max_block_size = max_block_size;
|
||||
params.allow_errors_num = format_settings->input_allow_errors_num;
|
||||
params.allow_errors_ratio = format_settings->input_allow_errors_ratio;
|
||||
params.allow_errors_num = format_settings.input_allow_errors_num;
|
||||
params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
|
||||
params.max_execution_time = settings.max_execution_time;
|
||||
params.timeout_overflow_mode = settings.timeout_overflow_mode;
|
||||
|
||||
auto format = input_getter(buf, sample, params, *format_settings);
|
||||
auto format = input_getter(buf, sample, params, format_settings);
|
||||
|
||||
|
||||
/// It's a kludge. Because I cannot remove context from values format.
|
||||
if (auto * values = typeid_cast<ValuesBlockInputFormat *>(format.get()))
|
||||
@ -260,27 +265,25 @@ InputFormatPtr FormatFactory::getInputFormat(
|
||||
OutputFormatPtr FormatFactory::getOutputFormat(
|
||||
const String & name, WriteBuffer & buf, const Block & sample,
|
||||
const Context & context, WriteCallback callback,
|
||||
std::optional<FormatSettings> format_settings) const
|
||||
const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
const auto & output_getter = getCreators(name).output_processor_creator;
|
||||
if (!output_getter)
|
||||
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
|
||||
|
||||
if (!format_settings)
|
||||
{
|
||||
format_settings = getFormatSettings(context);
|
||||
}
|
||||
|
||||
RowOutputFormatParams params;
|
||||
params.callback = std::move(callback);
|
||||
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
|
||||
* which only work with full columns.
|
||||
*/
|
||||
auto format = output_getter(buf, sample, params, *format_settings);
|
||||
auto format = output_getter(buf, sample, params, format_settings);
|
||||
|
||||
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
|
||||
if (format_settings->enable_streaming)
|
||||
if (format_settings.enable_streaming)
|
||||
format->setAutoFlush();
|
||||
|
||||
/// It's a kludge. Because I cannot remove context from MySQL format.
|
||||
|
@ -18,6 +18,7 @@ class Block;
|
||||
class Context;
|
||||
struct FormatSettings;
|
||||
struct Settings;
|
||||
struct FormatFactorySettings;
|
||||
|
||||
class ReadBuffer;
|
||||
class WriteBuffer;
|
||||
@ -36,6 +37,10 @@ using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
|
||||
|
||||
FormatSettings getFormatSettings(const Context & context);
|
||||
|
||||
template <typename T>
|
||||
FormatSettings getFormatSettings(const Context & context,
|
||||
const T & settings);
|
||||
|
||||
/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
|
||||
* Note: format and compression are independent things.
|
||||
*/
|
||||
@ -108,11 +113,11 @@ public:
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
std::optional<FormatSettings> format_settings = std::nullopt) const;
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
|
||||
const Block & sample, const Context & context, WriteCallback callback = {},
|
||||
std::optional<FormatSettings> format_settings = std::nullopt) const;
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
InputFormatPtr getInputFormat(
|
||||
const String & name,
|
||||
@ -120,12 +125,12 @@ public:
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
std::optional<FormatSettings> format_settings = std::nullopt) const;
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
OutputFormatPtr getOutputFormat(
|
||||
const String & name, WriteBuffer & buf, const Block & sample,
|
||||
const Context & context, WriteCallback callback = {},
|
||||
std::optional<FormatSettings> format_settings = std::nullopt) const;
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
/// Register format by its name.
|
||||
void registerInputFormat(const String & name, InputCreator input_creator);
|
||||
|
@ -6,10 +6,16 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Various tweaks for input/output formats.
|
||||
* Text serialization/deserialization of data types also depend on some of these settings.
|
||||
* NOTE Parameters for unrelated formats and unrelated data types
|
||||
* are collected in this struct - it prevents modularity, but they are difficult to separate.
|
||||
/**
|
||||
* Various tweaks for input/output formats. Text serialization/deserialization
|
||||
* of data types also depend on some of these settings. It is different from
|
||||
* FormatFactorySettings in that it has all necessary user-provided settings
|
||||
* combined with information from context etc, that we can use directly during
|
||||
* serialization. In contrast, FormatFactorySettings' job is to reflect the
|
||||
* changes made to user-visible format settings, such as when tweaking the
|
||||
* the format for File engine.
|
||||
* NOTE Parameters for unrelated formats and unrelated data types are collected
|
||||
* in this struct - it prevents modularity, but they are difficult to separate.
|
||||
*/
|
||||
struct FormatSettings
|
||||
{
|
||||
@ -113,13 +119,11 @@ struct FormatSettings
|
||||
{
|
||||
bool write_row_delimiters = true;
|
||||
/**
|
||||
* Some buffers (kafka / rabbit) split the rows internally using callback
|
||||
* so we can push there formats without framing / delimiters (like
|
||||
* ProtobufSingle). In other cases you can't write more than single row
|
||||
* in unframed format.
|
||||
* Not sure we need this parameter at all, it only serves as an additional
|
||||
* safety check in ProtobufSingle format, but exporting constant-size
|
||||
* records w/o delimiters might be generally useful, not only for Kafka.
|
||||
* Some buffers (kafka / rabbit) split the rows internally using callback,
|
||||
* and always send one row per message, so we can push there formats
|
||||
* without framing / delimiters (like ProtobufSingle). In other cases,
|
||||
* we have to enforce exporting at most one row in the format output,
|
||||
* because Protobuf without delimiters is not generally useful.
|
||||
*/
|
||||
bool allow_many_rows_no_delimiters = false;
|
||||
} protobuf;
|
||||
|
@ -452,7 +452,7 @@ public:
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const CompressionMethod compression_method,
|
||||
const Context & context,
|
||||
std::optional<FormatSettings> format_settings)
|
||||
const std::optional<FormatSettings> & format_settings)
|
||||
: storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, lock(storage.rwlock)
|
||||
@ -628,11 +628,25 @@ void registerStorageFile(StorageFactory & factory)
|
||||
// session and user are ignored.
|
||||
if (factory_args.storage_def->settings)
|
||||
{
|
||||
Context global_context_copy = factory_args.context;
|
||||
global_context_copy.applySettingsChanges(
|
||||
FormatFactorySettings user_format_settings;
|
||||
|
||||
// Apply changed settings from global context, but ignore the
|
||||
// unknown ones, because we only have the format settings here.
|
||||
const auto & changes = factory_args.context.getSettingsRef().changes();
|
||||
for (const auto & change : changes)
|
||||
{
|
||||
if (user_format_settings.has(change.name))
|
||||
{
|
||||
user_format_settings.set(change.name, change.value);
|
||||
}
|
||||
}
|
||||
|
||||
// Apply changes from SETTINGS clause, with validation.
|
||||
user_format_settings.applyChanges(
|
||||
factory_args.storage_def->settings->changes);
|
||||
|
||||
storage_args.format_settings = getFormatSettings(
|
||||
global_context_copy);
|
||||
factory_args.context, user_format_settings);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -33,7 +33,7 @@ IStorageURLBase::IStorageURLBase(
|
||||
const Context & context_,
|
||||
const StorageID & table_id_,
|
||||
const String & format_name_,
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
const std::optional<FormatSettings> & format_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & compression_method_)
|
||||
@ -61,7 +61,7 @@ namespace
|
||||
const std::string & method,
|
||||
std::function<void(std::ostream &)> callback,
|
||||
const String & format,
|
||||
std::optional<FormatSettings> format_settings,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
String name_,
|
||||
const Block & sample_block,
|
||||
const Context & context,
|
||||
@ -140,7 +140,7 @@ namespace
|
||||
|
||||
StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
|
||||
const String & format,
|
||||
std::optional<FormatSettings> format_settings,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const Block & sample_block_,
|
||||
const Context & context,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
@ -276,10 +276,24 @@ void registerStorageURL(StorageFactory & factory)
|
||||
FormatSettings format_settings;
|
||||
if (args.storage_def->settings)
|
||||
{
|
||||
Context global_context_copy = args.context;
|
||||
global_context_copy.applySettingsChanges(
|
||||
args.storage_def->settings->changes);
|
||||
format_settings = getFormatSettings(global_context_copy);
|
||||
FormatFactorySettings user_format_settings;
|
||||
|
||||
// Apply changed settings from global context, but ignore the
|
||||
// unknown ones, because we only have the format settings here.
|
||||
const auto & changes = args.context.getSettingsRef().changes();
|
||||
for (const auto & change : changes)
|
||||
{
|
||||
if (user_format_settings.has(change.name))
|
||||
{
|
||||
user_format_settings.set(change.name, change.value);
|
||||
}
|
||||
}
|
||||
|
||||
// Apply changes from SETTINGS clause, with validation.
|
||||
user_format_settings.applyChanges(args.storage_def->settings->changes);
|
||||
|
||||
format_settings = getFormatSettings(args.context,
|
||||
user_format_settings);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -36,7 +36,7 @@ protected:
|
||||
const Context & context_,
|
||||
const StorageID & id_,
|
||||
const String & format_name_,
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
const std::optional<FormatSettings> & format_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & compression_method_);
|
||||
@ -79,7 +79,7 @@ public:
|
||||
StorageURLBlockOutputStream(
|
||||
const Poco::URI & uri,
|
||||
const String & format,
|
||||
std::optional<FormatSettings> format_settings,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const Block & sample_block_,
|
||||
const Context & context,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
@ -107,7 +107,7 @@ public:
|
||||
StorageURL(const Poco::URI & uri_,
|
||||
const StorageID & table_id_,
|
||||
const String & format_name_,
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
const std::optional<FormatSettings> & format_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
Context & context_,
|
||||
|
@ -1,2 +1,3 @@
|
||||
1|1
|
||||
1 1
|
||||
1 2
|
||||
|
@ -20,4 +20,8 @@ cat "$the_file"
|
||||
${CLICKHOUSE_LOCAL} --query "
|
||||
create table t(a int, b int) engine File(CSV, '$the_file') settings format_csv_delimiter = '|';
|
||||
select * from t;
|
||||
"
|
||||
"
|
||||
|
||||
# Also check that the File engine emplicitly created by clickhouse-local
|
||||
# uses the modified settings.
|
||||
${CLICKHOUSE_LOCAL} --structure="a int, b int" --input-format=CSV --format_csv_delimiter="|" --query="select * from table" <<<"1|2"
|
||||
|
Loading…
Reference in New Issue
Block a user