diff --git a/contrib/googletest b/contrib/googletest
index e47544ad31c..a7f443b80b1 160000
--- a/contrib/googletest
+++ b/contrib/googletest
@@ -1 +1 @@
-Subproject commit e47544ad31cb3ceecd04cc13e8fe556f8df9fe0b
+Subproject commit a7f443b80b105f940225332ed3c31f2790092f47
diff --git a/docs/en/interfaces/formats.md b/docs/en/interfaces/formats.md
index 66d5bd2e574..3de416ae64d 100644
--- a/docs/en/interfaces/formats.md
+++ b/docs/en/interfaces/formats.md
@@ -2165,6 +2165,8 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [output_format_parquet_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_fixed_string_as_fixed_byte_array) - use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary/String for FixedString columns. Default value - `true`.
- [output_format_parquet_version](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_version) - The version of Parquet format used in output format. Default value - `2.latest`.
- [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `lz4`.
+- [input_format_parquet_max_block_size](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_max_block_size) - Max block row size for parquet reader. Default value - `65409`.
+- [input_format_parquet_prefer_block_bytes](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_prefer_block_bytes) - Average block bytes output by parquet reader. Default value - `16744704`.
## ParquetMetadata {data-format-parquet-metadata}
diff --git a/docs/en/operations/named-collections.md b/docs/en/operations/named-collections.md
index 1d261d935af..91438cfb675 100644
--- a/docs/en/operations/named-collections.md
+++ b/docs/en/operations/named-collections.md
@@ -67,6 +67,23 @@ To manage named collections with DDL a user must have the `named_control_collect
In the above example the `password_sha256_hex` value is the hexadecimal representation of the SHA256 hash of the password. This configuration for the user `default` has the attribute `replace=true` as in the default configuration has a plain text `password` set, and it is not possible to have both plain text and sha256 hex passwords set for a user.
:::
+### Storage for named collections
+
+Named collections can either be stored on local disk or in zookeeper/keeper. By default local storage is used.
+
+To configure named collections storage in keeper and a `type` (equal to either `keeper` or `zookeeper`) and `path` (path in keeper, where named collections will be stored) to `named_collections_storage` section in configuration file:
+```
+
+
+ zookeeper
+ /named_collections_path/
+ 1000
+
+
+```
+
+An optional configuration parameter `update_timeout_ms` by default is equal to `5000`.
+
## Storing named collections in configuration files
### XML example
diff --git a/docs/en/operations/settings/settings-formats.md b/docs/en/operations/settings/settings-formats.md
index 1a27b350652..6aae1ea62e5 100644
--- a/docs/en/operations/settings/settings-formats.md
+++ b/docs/en/operations/settings/settings-formats.md
@@ -1417,6 +1417,17 @@ Compression method used in output Parquet format. Supported codecs: `snappy`, `l
Default value: `lz4`.
+### input_format_parquet_max_block_size {#input_format_parquet_max_block_size}
+Max block row size for parquet reader. By controlling the number of rows in each block, you can control the memory usage,
+and in some operators that cache blocks, you can improve the accuracy of the operator's memory control。
+
+Default value: `65409`.
+
+### input_format_parquet_prefer_block_bytes {#input_format_parquet_prefer_block_bytes}
+Average block bytes output by parquet reader. Lowering the configuration in the case of reading some high compression parquet relieves the memory pressure.
+
+Default value: `65409 * 256 = 16744704`
+
## Hive format settings {#hive-format-settings}
### input_format_hive_text_fields_delimiter {#input_format_hive_text_fields_delimiter}
diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp
index 84ff0e94cef..00994b39a40 100644
--- a/programs/server/Server.cpp
+++ b/programs/server/Server.cpp
@@ -48,6 +48,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -70,7 +71,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -1378,7 +1378,7 @@ try
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif
- NamedCollectionUtils::loadIfNot();
+ NamedCollectionFactory::instance().loadIfNot();
/// Initialize main config reloader.
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
@@ -1647,7 +1647,7 @@ try
#if USE_SSL
CertificateReloader::instance().tryLoad(*config);
#endif
- NamedCollectionUtils::reloadFromConfig(*config);
+ NamedCollectionFactory::instance().reloadFromConfig(*config);
FileCacheFactory::instance().updateSettingsFromConfig(*config);
diff --git a/src/Analyzer/FunctionNode.cpp b/src/Analyzer/FunctionNode.cpp
index f13842cf67c..e98b04fe9a9 100644
--- a/src/Analyzer/FunctionNode.cpp
+++ b/src/Analyzer/FunctionNode.cpp
@@ -1,5 +1,7 @@
#include
+#include
+
#include
#include
@@ -58,12 +60,20 @@ ColumnsWithTypeAndName FunctionNode::getArgumentColumns() const
ColumnWithTypeAndName argument_column;
+ auto * constant = argument->as();
if (isNameOfInFunction(function_name) && i == 1)
+ {
argument_column.type = std::make_shared();
+ if (constant)
+ {
+ /// Created but not filled for the analysis during function resolution.
+ FutureSetPtr empty_set;
+ argument_column.column = ColumnConst::create(ColumnSet::create(1, empty_set), 1);
+ }
+ }
else
argument_column.type = argument->getResultType();
- auto * constant = argument->as();
if (constant && !isNotCreatable(argument_column.type))
argument_column.column = argument_column.type->createColumnConst(1, constant->getValue());
diff --git a/src/Analyzer/Passes/LogicalExpressionOptimizerPass.cpp b/src/Analyzer/Passes/LogicalExpressionOptimizerPass.cpp
index 11811ae4f2d..ac221bd66e7 100644
--- a/src/Analyzer/Passes/LogicalExpressionOptimizerPass.cpp
+++ b/src/Analyzer/Passes/LogicalExpressionOptimizerPass.cpp
@@ -551,14 +551,25 @@ private:
in_function->getArguments().getNodes() = std::move(in_arguments);
in_function->resolveAsFunction(in_function_resolver);
+
+ DataTypePtr result_type = in_function->getResultType();
+ const auto * type_low_cardinality = typeid_cast(result_type.get());
+ if (type_low_cardinality)
+ result_type = type_low_cardinality->getDictionaryType();
/** For `k :: UInt8`, expression `k = 1 OR k = NULL` with result type Nullable(UInt8)
* is replaced with `k IN (1, NULL)` with result type UInt8.
* Convert it back to Nullable(UInt8).
+ * And for `k :: LowCardinality(UInt8)`, the transformation of `k IN (1, NULL)` results in type LowCardinality(UInt8).
+ * Convert it to LowCardinality(Nullable(UInt8)).
*/
- if (is_any_nullable && !in_function->getResultType()->isNullable())
+ if (is_any_nullable && !result_type->isNullable())
{
- auto nullable_result_type = std::make_shared(in_function->getResultType());
- auto in_function_nullable = createCastFunction(std::move(in_function), std::move(nullable_result_type), getContext());
+ DataTypePtr new_result_type = std::make_shared(result_type);
+ if (type_low_cardinality)
+ {
+ new_result_type = std::make_shared(new_result_type);
+ }
+ auto in_function_nullable = createCastFunction(std::move(in_function), std::move(new_result_type), getContext());
or_operands.push_back(std::move(in_function_nullable));
}
else
diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp
index 92f086295a0..1ea59c1d38b 100644
--- a/src/Backups/BackupIO_S3.cpp
+++ b/src/Backups/BackupIO_S3.cpp
@@ -54,9 +54,9 @@ namespace
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
settings.auth_settings.region,
context->getRemoteHostFilter(),
- static_cast(global_settings.s3_max_redirects),
- static_cast(global_settings.s3_retry_attempts),
- global_settings.enable_s3_requests_logging,
+ static_cast(local_settings.s3_max_redirects),
+ static_cast(local_settings.backup_restore_s3_retry_attempts),
+ local_settings.enable_s3_requests_logging,
/* for_disk_s3 = */ false,
request_settings.get_request_throttler,
request_settings.put_request_throttler,
diff --git a/src/Common/AsynchronousMetrics.h b/src/Common/AsynchronousMetrics.h
index b62529a08e7..10a972d2458 100644
--- a/src/Common/AsynchronousMetrics.h
+++ b/src/Common/AsynchronousMetrics.h
@@ -45,14 +45,17 @@ struct ProtocolServerMetrics
};
/** Periodically (by default, each second)
- * calculates and updates some metrics,
- * that are not updated automatically (so, need to be asynchronously calculated).
+ * calculates and updates some metrics,
+ * that are not updated automatically (so, need to be asynchronously calculated).
*
- * This includes both ClickHouse-related metrics (like memory usage of ClickHouse process)
- * and common OS-related metrics (like total memory usage on the server).
+ * This includes both general process metrics (like memory usage)
+ * and common OS-related metrics (like total memory usage on the server).
*
* All the values are either gauge type (like the total number of tables, the current memory usage).
* Or delta-counters representing some accumulation during the interval of time.
+ *
+ * Server and Keeper specific metrics are contained inside
+ * ServerAsynchronousMetrics and KeeperAsynchronousMetrics respectively.
*/
class AsynchronousMetrics
{
diff --git a/src/Common/NamedCollections/NamedCollectionUtils.cpp b/src/Common/NamedCollections/NamedCollectionUtils.cpp
deleted file mode 100644
index 5dbdeb10795..00000000000
--- a/src/Common/NamedCollections/NamedCollectionUtils.cpp
+++ /dev/null
@@ -1,484 +0,0 @@
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-
-#include
-
-namespace fs = std::filesystem;
-
-namespace DB
-{
-
-namespace ErrorCodes
-{
- extern const int NAMED_COLLECTION_ALREADY_EXISTS;
- extern const int NAMED_COLLECTION_DOESNT_EXIST;
- extern const int BAD_ARGUMENTS;
-}
-
-namespace NamedCollectionUtils
-{
-
-static std::atomic is_loaded_from_config = false;
-static std::atomic is_loaded_from_sql = false;
-
-class LoadFromConfig
-{
-private:
- const Poco::Util::AbstractConfiguration & config;
-
-public:
- explicit LoadFromConfig(const Poco::Util::AbstractConfiguration & config_)
- : config(config_) {}
-
- std::vector listCollections() const
- {
- Poco::Util::AbstractConfiguration::Keys collections_names;
- config.keys(NAMED_COLLECTIONS_CONFIG_PREFIX, collections_names);
- return collections_names;
- }
-
- NamedCollectionsMap getAll() const
- {
- NamedCollectionsMap result;
- for (const auto & collection_name : listCollections())
- {
- if (result.contains(collection_name))
- {
- throw Exception(
- ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
- "Found duplicate named collection `{}`",
- collection_name);
- }
- result.emplace(collection_name, get(collection_name));
- }
- return result;
- }
-
- MutableNamedCollectionPtr get(const std::string & collection_name) const
- {
- const auto collection_prefix = getCollectionPrefix(collection_name);
- std::queue enumerate_input;
- std::set> enumerate_result;
-
- enumerate_input.push(collection_prefix);
- NamedCollectionConfiguration::listKeys(config, std::move(enumerate_input), enumerate_result, -1);
-
- /// Collection does not have any keys.
- /// (`enumerate_result` == ).
- const bool collection_is_empty = enumerate_result.size() == 1
- && *enumerate_result.begin() == collection_prefix;
- std::set> keys;
- if (!collection_is_empty)
- {
- /// Skip collection prefix and add +1 to avoid '.' in the beginning.
- for (const auto & path : enumerate_result)
- keys.emplace(path.substr(collection_prefix.size() + 1));
- }
-
- return NamedCollection::create(
- config, collection_name, collection_prefix, keys, SourceId::CONFIG, /* is_mutable */false);
- }
-
-private:
- static constexpr auto NAMED_COLLECTIONS_CONFIG_PREFIX = "named_collections";
-
- static std::string getCollectionPrefix(const std::string & collection_name)
- {
- return fmt::format("{}.{}", NAMED_COLLECTIONS_CONFIG_PREFIX, collection_name);
- }
-};
-
-
-class LoadFromSQL : private WithContext
-{
-private:
- const std::string metadata_path;
-
-public:
- explicit LoadFromSQL(ContextPtr context_)
- : WithContext(context_)
- , metadata_path(fs::weakly_canonical(context_->getPath()) / NAMED_COLLECTIONS_METADATA_DIRECTORY)
- {
- if (fs::exists(metadata_path))
- cleanup();
- }
-
- std::vector listCollections() const
- {
- if (!fs::exists(metadata_path))
- return {};
-
- std::vector collection_names;
- fs::directory_iterator it{metadata_path};
- for (; it != fs::directory_iterator{}; ++it)
- {
- const auto & current_path = it->path();
- if (current_path.extension() == ".sql")
- {
- collection_names.push_back(it->path().stem());
- }
- else
- {
- LOG_WARNING(
- getLogger("NamedCollectionsLoadFromSQL"),
- "Unexpected file {} in named collections directory",
- current_path.filename().string());
- }
- }
- return collection_names;
- }
-
- NamedCollectionsMap getAll() const
- {
- NamedCollectionsMap result;
- for (const auto & collection_name : listCollections())
- {
- if (result.contains(collection_name))
- {
- throw Exception(
- ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
- "Found duplicate named collection `{}`",
- collection_name);
- }
- result.emplace(collection_name, get(collection_name));
- }
- return result;
- }
-
- MutableNamedCollectionPtr get(const std::string & collection_name) const
- {
- const auto query = readCreateQueryFromMetadata(
- getMetadataPath(collection_name),
- getContext()->getSettingsRef());
- return createNamedCollectionFromAST(query);
- }
-
- MutableNamedCollectionPtr create(const ASTCreateNamedCollectionQuery & query)
- {
- writeCreateQueryToMetadata(
- query,
- getMetadataPath(query.collection_name),
- getContext()->getSettingsRef());
-
- return createNamedCollectionFromAST(query);
- }
-
- void update(const ASTAlterNamedCollectionQuery & query)
- {
- const auto path = getMetadataPath(query.collection_name);
- auto create_query = readCreateQueryFromMetadata(path, getContext()->getSettings());
-
- std::unordered_map result_changes_map;
- for (const auto & [name, value] : query.changes)
- {
- auto [it, inserted] = result_changes_map.emplace(name, value);
- if (!inserted)
- {
- throw Exception(
- ErrorCodes::BAD_ARGUMENTS,
- "Value with key `{}` is used twice in the SET query (collection name: {})",
- name, query.collection_name);
- }
- }
-
- for (const auto & [name, value] : create_query.changes)
- result_changes_map.emplace(name, value);
-
- std::unordered_map result_overridability_map;
- for (const auto & [name, value] : query.overridability)
- result_overridability_map.emplace(name, value);
- for (const auto & [name, value] : create_query.overridability)
- result_overridability_map.emplace(name, value);
-
- for (const auto & delete_key : query.delete_keys)
- {
- auto it = result_changes_map.find(delete_key);
- if (it == result_changes_map.end())
- {
- throw Exception(
- ErrorCodes::BAD_ARGUMENTS,
- "Cannot delete key `{}` because it does not exist in collection",
- delete_key);
- }
- else
- {
- result_changes_map.erase(it);
- auto it_override = result_overridability_map.find(delete_key);
- if (it_override != result_overridability_map.end())
- result_overridability_map.erase(it_override);
- }
- }
-
- create_query.changes.clear();
- for (const auto & [name, value] : result_changes_map)
- create_query.changes.emplace_back(name, value);
- create_query.overridability = std::move(result_overridability_map);
-
- if (create_query.changes.empty())
- throw Exception(
- ErrorCodes::BAD_ARGUMENTS,
- "Named collection cannot be empty (collection name: {})",
- query.collection_name);
-
- writeCreateQueryToMetadata(
- create_query,
- getMetadataPath(query.collection_name),
- getContext()->getSettingsRef(),
- true);
- }
-
- void remove(const std::string & collection_name)
- {
- auto collection_path = getMetadataPath(collection_name);
- if (!fs::exists(collection_path))
- {
- throw Exception(
- ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
- "Cannot remove collection `{}`, because it doesn't exist",
- collection_name);
- }
- (void)fs::remove(collection_path);
- }
-
-private:
- static constexpr auto NAMED_COLLECTIONS_METADATA_DIRECTORY = "named_collections";
-
- static MutableNamedCollectionPtr createNamedCollectionFromAST(
- const ASTCreateNamedCollectionQuery & query)
- {
- const auto & collection_name = query.collection_name;
- const auto config = NamedCollectionConfiguration::createConfiguration(collection_name, query.changes, query.overridability);
-
- std::set> keys;
- for (const auto & [name, _] : query.changes)
- keys.insert(name);
-
- return NamedCollection::create(
- *config, collection_name, "", keys, SourceId::SQL, /* is_mutable */true);
- }
-
- std::string getMetadataPath(const std::string & collection_name) const
- {
- return fs::path(metadata_path) / (escapeForFileName(collection_name) + ".sql");
- }
-
- /// Delete .tmp files. They could be left undeleted in case of
- /// some exception or abrupt server restart.
- void cleanup()
- {
- fs::directory_iterator it{metadata_path};
- std::vector files_to_remove;
- for (; it != fs::directory_iterator{}; ++it)
- {
- const auto & current_path = it->path();
- if (current_path.extension() == ".tmp")
- files_to_remove.push_back(current_path);
- }
- for (const auto & file : files_to_remove)
- (void)fs::remove(file);
- }
-
- static ASTCreateNamedCollectionQuery readCreateQueryFromMetadata(
- const std::string & path,
- const Settings & settings)
- {
- ReadBufferFromFile in(path);
- std::string query;
- readStringUntilEOF(query, in);
-
- ParserCreateNamedCollectionQuery parser;
- auto ast = parseQuery(parser, query, "in file " + path, 0, settings.max_parser_depth, settings.max_parser_backtracks);
- const auto & create_query = ast->as();
- return create_query;
- }
-
- void writeCreateQueryToMetadata(
- const ASTCreateNamedCollectionQuery & query,
- const std::string & path,
- const Settings & settings,
- bool replace = false) const
- {
- if (!replace && fs::exists(path))
- {
- throw Exception(
- ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
- "Metadata file {} for named collection already exists",
- path);
- }
-
- fs::create_directories(metadata_path);
-
- auto tmp_path = path + ".tmp";
- String formatted_query = serializeAST(query);
- WriteBufferFromFile out(tmp_path, formatted_query.size(), O_WRONLY | O_CREAT | O_EXCL);
- writeString(formatted_query, out);
-
- out.next();
- if (settings.fsync_metadata)
- out.sync();
- out.close();
-
- fs::rename(tmp_path, path);
- }
-};
-
-std::unique_lock lockNamedCollectionsTransaction()
-{
- static std::mutex transaction_lock;
- return std::unique_lock(transaction_lock);
-}
-
-void loadFromConfigUnlocked(const Poco::Util::AbstractConfiguration & config, std::unique_lock &)
-{
- auto named_collections = LoadFromConfig(config).getAll();
- LOG_TRACE(
- getLogger("NamedCollectionsUtils"),
- "Loaded {} collections from config", named_collections.size());
-
- NamedCollectionFactory::instance().add(std::move(named_collections));
- is_loaded_from_config = true;
-}
-
-void loadFromConfig(const Poco::Util::AbstractConfiguration & config)
-{
- auto lock = lockNamedCollectionsTransaction();
- loadFromConfigUnlocked(config, lock);
-}
-
-void reloadFromConfig(const Poco::Util::AbstractConfiguration & config)
-{
- auto lock = lockNamedCollectionsTransaction();
- auto collections = LoadFromConfig(config).getAll();
- auto & instance = NamedCollectionFactory::instance();
- instance.removeById(SourceId::CONFIG);
- instance.add(collections);
- is_loaded_from_config = true;
-}
-
-void loadFromSQLUnlocked(ContextPtr context, std::unique_lock &)
-{
- auto named_collections = LoadFromSQL(context).getAll();
- LOG_TRACE(
- getLogger("NamedCollectionsUtils"),
- "Loaded {} collections from SQL", named_collections.size());
-
- NamedCollectionFactory::instance().add(std::move(named_collections));
- is_loaded_from_sql = true;
-}
-
-void loadFromSQL(ContextPtr context)
-{
- auto lock = lockNamedCollectionsTransaction();
- loadFromSQLUnlocked(context, lock);
-}
-
-void loadIfNotUnlocked(std::unique_lock & lock)
-{
- auto global_context = Context::getGlobalContextInstance();
- if (!is_loaded_from_config)
- loadFromConfigUnlocked(global_context->getConfigRef(), lock);
- if (!is_loaded_from_sql)
- loadFromSQLUnlocked(global_context, lock);
-}
-
-void loadIfNot()
-{
- if (is_loaded_from_sql && is_loaded_from_config)
- return;
- auto lock = lockNamedCollectionsTransaction();
- loadIfNotUnlocked(lock);
-}
-
-void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context)
-{
- auto lock = lockNamedCollectionsTransaction();
- loadIfNotUnlocked(lock);
- auto & instance = NamedCollectionFactory::instance();
- if (!instance.exists(query.collection_name))
- {
- if (!query.if_exists)
- {
- throw Exception(
- ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
- "Cannot remove collection `{}`, because it doesn't exist",
- query.collection_name);
- }
- return;
- }
- LoadFromSQL(context).remove(query.collection_name);
- instance.remove(query.collection_name);
-}
-
-void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context)
-{
- auto lock = lockNamedCollectionsTransaction();
- loadIfNotUnlocked(lock);
- auto & instance = NamedCollectionFactory::instance();
- if (instance.exists(query.collection_name))
- {
- if (!query.if_not_exists)
- {
- throw Exception(
- ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
- "A named collection `{}` already exists",
- query.collection_name);
- }
- return;
- }
- instance.add(query.collection_name, LoadFromSQL(context).create(query));
-}
-
-void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context)
-{
- auto lock = lockNamedCollectionsTransaction();
- loadIfNotUnlocked(lock);
- auto & instance = NamedCollectionFactory::instance();
- if (!instance.exists(query.collection_name))
- {
- if (!query.if_exists)
- {
- throw Exception(
- ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
- "Cannot remove collection `{}`, because it doesn't exist",
- query.collection_name);
- }
- return;
- }
- LoadFromSQL(context).update(query);
-
- auto collection = instance.getMutable(query.collection_name);
- auto collection_lock = collection->lock();
-
- for (const auto & [name, value] : query.changes)
- {
- auto it_override = query.overridability.find(name);
- if (it_override != query.overridability.end())
- collection->setOrUpdate(name, convertFieldToString(value), it_override->second);
- else
- collection->setOrUpdate(name, convertFieldToString(value), {});
- }
-
- for (const auto & key : query.delete_keys)
- collection->remove(key);
-}
-
-}
-
-}
diff --git a/src/Common/NamedCollections/NamedCollectionUtils.h b/src/Common/NamedCollections/NamedCollectionUtils.h
deleted file mode 100644
index 293b3ea659d..00000000000
--- a/src/Common/NamedCollections/NamedCollectionUtils.h
+++ /dev/null
@@ -1,42 +0,0 @@
-#pragma once
-#include
-
-namespace Poco { namespace Util { class AbstractConfiguration; } }
-
-namespace DB
-{
-
-class ASTCreateNamedCollectionQuery;
-class ASTAlterNamedCollectionQuery;
-class ASTDropNamedCollectionQuery;
-
-namespace NamedCollectionUtils
-{
-
-enum class SourceId : uint8_t
-{
- NONE = 0,
- CONFIG = 1,
- SQL = 2,
-};
-
-void loadFromConfig(const Poco::Util::AbstractConfiguration & config);
-void reloadFromConfig(const Poco::Util::AbstractConfiguration & config);
-
-/// Load named collections from `context->getPath() / named_collections /`.
-void loadFromSQL(ContextPtr context);
-
-/// Remove collection as well as its metadata from `context->getPath() / named_collections /`.
-void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context);
-
-/// Create a new collection from AST and put it to `context->getPath() / named_collections /`.
-void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context);
-
-/// Update definition of already existing collection from AST and update result in `context->getPath() / named_collections /`.
-void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context);
-
-void loadIfNot();
-
-}
-
-}
diff --git a/src/Common/NamedCollections/NamedCollections.cpp b/src/Common/NamedCollections/NamedCollections.cpp
index 04d2099f4df..74ce405f71d 100644
--- a/src/Common/NamedCollections/NamedCollections.cpp
+++ b/src/Common/NamedCollections/NamedCollections.cpp
@@ -4,7 +4,6 @@
#include
#include
#include
-#include
#include
@@ -297,7 +296,7 @@ MutableNamedCollectionPtr NamedCollection::duplicate() const
auto impl = pimpl->createCopy(collection_name);
return std::unique_ptr(
new NamedCollection(
- std::move(impl), collection_name, NamedCollectionUtils::SourceId::NONE, true));
+ std::move(impl), collection_name, SourceId::NONE, true));
}
NamedCollection::Keys NamedCollection::getKeys(ssize_t depth, const std::string & prefix) const
diff --git a/src/Common/NamedCollections/NamedCollections.h b/src/Common/NamedCollections/NamedCollections.h
index c253c56594f..23862c4515a 100644
--- a/src/Common/NamedCollections/NamedCollections.h
+++ b/src/Common/NamedCollections/NamedCollections.h
@@ -1,7 +1,6 @@
#pragma once
#include
#include
-#include
namespace Poco { namespace Util { class AbstractConfiguration; } }
@@ -23,7 +22,12 @@ class NamedCollection
public:
using Key = std::string;
using Keys = std::set>;
- using SourceId = NamedCollectionUtils::SourceId;
+ enum class SourceId : uint8_t
+ {
+ NONE = 0,
+ CONFIG = 1,
+ SQL = 2,
+ };
static MutableNamedCollectionPtr create(
const Poco::Util::AbstractConfiguration & config,
diff --git a/src/Common/NamedCollections/NamedCollectionsFactory.cpp b/src/Common/NamedCollections/NamedCollectionsFactory.cpp
index dd69952429f..14105a8651d 100644
--- a/src/Common/NamedCollections/NamedCollectionsFactory.cpp
+++ b/src/Common/NamedCollections/NamedCollectionsFactory.cpp
@@ -1,5 +1,7 @@
#include
-#include
+#include
+#include
+#include
namespace DB
{
@@ -17,23 +19,29 @@ NamedCollectionFactory & NamedCollectionFactory::instance()
return instance;
}
+NamedCollectionFactory::~NamedCollectionFactory()
+{
+ shutdown();
+}
+
+void NamedCollectionFactory::shutdown()
+{
+ shutdown_called = true;
+ if (update_task)
+ update_task->deactivate();
+ metadata_storage.reset();
+}
+
bool NamedCollectionFactory::exists(const std::string & collection_name) const
{
std::lock_guard lock(mutex);
- return existsUnlocked(collection_name, lock);
-}
-
-bool NamedCollectionFactory::existsUnlocked(
- const std::string & collection_name,
- std::lock_guard & /* lock */) const
-{
- return loaded_named_collections.contains(collection_name);
+ return exists(collection_name, lock);
}
NamedCollectionPtr NamedCollectionFactory::get(const std::string & collection_name) const
{
std::lock_guard lock(mutex);
- auto collection = tryGetUnlocked(collection_name, lock);
+ auto collection = tryGet(collection_name, lock);
if (!collection)
{
throw Exception(
@@ -47,14 +55,35 @@ NamedCollectionPtr NamedCollectionFactory::get(const std::string & collection_na
NamedCollectionPtr NamedCollectionFactory::tryGet(const std::string & collection_name) const
{
std::lock_guard lock(mutex);
- return tryGetUnlocked(collection_name, lock);
+ return tryGet(collection_name, lock);
+}
+
+NamedCollectionsMap NamedCollectionFactory::getAll() const
+{
+ std::lock_guard lock(mutex);
+ return loaded_named_collections;
+}
+
+bool NamedCollectionFactory::exists(const std::string & collection_name, std::lock_guard &) const
+{
+ return loaded_named_collections.contains(collection_name);
+}
+
+MutableNamedCollectionPtr NamedCollectionFactory::tryGet(
+ const std::string & collection_name,
+ std::lock_guard &) const
+{
+ auto it = loaded_named_collections.find(collection_name);
+ if (it == loaded_named_collections.end())
+ return nullptr;
+ return it->second;
}
MutableNamedCollectionPtr NamedCollectionFactory::getMutable(
- const std::string & collection_name) const
+ const std::string & collection_name,
+ std::lock_guard & lock) const
{
- std::lock_guard lock(mutex);
- auto collection = tryGetUnlocked(collection_name, lock);
+ auto collection = tryGet(collection_name, lock);
if (!collection)
{
throw Exception(
@@ -73,35 +102,10 @@ MutableNamedCollectionPtr NamedCollectionFactory::getMutable(
return collection;
}
-MutableNamedCollectionPtr NamedCollectionFactory::tryGetUnlocked(
- const std::string & collection_name,
- std::lock_guard & /* lock */) const
-{
- auto it = loaded_named_collections.find(collection_name);
- if (it == loaded_named_collections.end())
- return nullptr;
- return it->second;
-}
-
void NamedCollectionFactory::add(
- const std::string & collection_name,
- MutableNamedCollectionPtr collection)
-{
- std::lock_guard lock(mutex);
- addUnlocked(collection_name, collection, lock);
-}
-
-void NamedCollectionFactory::add(NamedCollectionsMap collections)
-{
- std::lock_guard lock(mutex);
- for (const auto & [collection_name, collection] : collections)
- addUnlocked(collection_name, collection, lock);
-}
-
-void NamedCollectionFactory::addUnlocked(
const std::string & collection_name,
MutableNamedCollectionPtr collection,
- std::lock_guard & /* lock */)
+ std::lock_guard &)
{
auto [it, inserted] = loaded_named_collections.emplace(collection_name, collection);
if (!inserted)
@@ -113,10 +117,15 @@ void NamedCollectionFactory::addUnlocked(
}
}
-void NamedCollectionFactory::remove(const std::string & collection_name)
+void NamedCollectionFactory::add(NamedCollectionsMap collections, std::lock_guard & lock)
{
- std::lock_guard lock(mutex);
- bool removed = removeIfExistsUnlocked(collection_name, lock);
+ for (const auto & [collection_name, collection] : collections)
+ add(collection_name, collection, lock);
+}
+
+void NamedCollectionFactory::remove(const std::string & collection_name, std::lock_guard & lock)
+{
+ bool removed = removeIfExists(collection_name, lock);
if (!removed)
{
throw Exception(
@@ -126,17 +135,11 @@ void NamedCollectionFactory::remove(const std::string & collection_name)
}
}
-void NamedCollectionFactory::removeIfExists(const std::string & collection_name)
-{
- std::lock_guard lock(mutex);
- removeIfExistsUnlocked(collection_name, lock); // NOLINT
-}
-
-bool NamedCollectionFactory::removeIfExistsUnlocked(
+bool NamedCollectionFactory::removeIfExists(
const std::string & collection_name,
std::lock_guard & lock)
{
- auto collection = tryGetUnlocked(collection_name, lock);
+ auto collection = tryGet(collection_name, lock);
if (!collection)
return false;
@@ -152,18 +155,246 @@ bool NamedCollectionFactory::removeIfExistsUnlocked(
return true;
}
-void NamedCollectionFactory::removeById(NamedCollectionUtils::SourceId id)
+void NamedCollectionFactory::removeById(NamedCollection::SourceId id, std::lock_guard &)
{
- std::lock_guard lock(mutex);
std::erase_if(
loaded_named_collections,
[&](const auto & value) { return value.second->getSourceId() == id; });
}
-NamedCollectionsMap NamedCollectionFactory::getAll() const
+namespace
+{
+ constexpr auto NAMED_COLLECTIONS_CONFIG_PREFIX = "named_collections";
+
+ std::vector listCollections(const Poco::Util::AbstractConfiguration & config)
+ {
+ Poco::Util::AbstractConfiguration::Keys collections_names;
+ config.keys(NAMED_COLLECTIONS_CONFIG_PREFIX, collections_names);
+ return collections_names;
+ }
+
+ MutableNamedCollectionPtr getCollection(
+ const Poco::Util::AbstractConfiguration & config,
+ const std::string & collection_name)
+ {
+ const auto collection_prefix = fmt::format("{}.{}", NAMED_COLLECTIONS_CONFIG_PREFIX, collection_name);
+ std::queue enumerate_input;
+ std::set> enumerate_result;
+
+ enumerate_input.push(collection_prefix);
+ NamedCollectionConfiguration::listKeys(config, std::move(enumerate_input), enumerate_result, -1);
+
+ /// Collection does not have any keys. (`enumerate_result` == ).
+ const bool collection_is_empty = enumerate_result.size() == 1
+ && *enumerate_result.begin() == collection_prefix;
+
+ std::set> keys;
+ if (!collection_is_empty)
+ {
+ /// Skip collection prefix and add +1 to avoid '.' in the beginning.
+ for (const auto & path : enumerate_result)
+ keys.emplace(path.substr(collection_prefix.size() + 1));
+ }
+
+ return NamedCollection::create(
+ config, collection_name, collection_prefix, keys, NamedCollection::SourceId::CONFIG, /* is_mutable */false);
+ }
+
+ NamedCollectionsMap getNamedCollections(const Poco::Util::AbstractConfiguration & config)
+ {
+ NamedCollectionsMap result;
+ for (const auto & collection_name : listCollections(config))
+ {
+ if (result.contains(collection_name))
+ {
+ throw Exception(
+ ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
+ "Found duplicate named collection `{}`",
+ collection_name);
+ }
+ result.emplace(collection_name, getCollection(config, collection_name));
+ }
+ return result;
+ }
+}
+
+void NamedCollectionFactory::loadIfNot()
{
std::lock_guard lock(mutex);
- return loaded_named_collections;
+ loadIfNot(lock);
+}
+
+bool NamedCollectionFactory::loadIfNot(std::lock_guard & lock)
+{
+ if (loaded)
+ return false;
+
+ auto context = Context::getGlobalContextInstance();
+ metadata_storage = NamedCollectionsMetadataStorage::create(context);
+
+ loadFromConfig(context->getConfigRef(), lock);
+ loadFromSQL(lock);
+
+ if (metadata_storage->supportsPeriodicUpdate())
+ {
+ update_task = context->getSchedulePool().createTask("NamedCollectionsMetadataStorage", [this]{ updateFunc(); });
+ update_task->activate();
+ update_task->schedule();
+ }
+
+ loaded = true;
+ return true;
+}
+
+void NamedCollectionFactory::loadFromConfig(const Poco::Util::AbstractConfiguration & config, std::lock_guard & lock)
+{
+ auto collections = getNamedCollections(config);
+ LOG_TEST(log, "Loaded {} collections from config", collections.size());
+ add(std::move(collections), lock);
+}
+
+void NamedCollectionFactory::reloadFromConfig(const Poco::Util::AbstractConfiguration & config)
+{
+ std::lock_guard lock(mutex);
+ if (loadIfNot(lock))
+ return;
+
+ auto collections = getNamedCollections(config);
+ LOG_TEST(log, "Loaded {} collections from config", collections.size());
+
+ removeById(NamedCollection::SourceId::CONFIG, lock);
+ add(std::move(collections), lock);
+}
+
+void NamedCollectionFactory::loadFromSQL(std::lock_guard & lock)
+{
+ auto collections = metadata_storage->getAll();
+ LOG_TEST(log, "Loaded {} collections from sql", collections.size());
+ add(std::move(collections), lock);
+}
+
+void NamedCollectionFactory::createFromSQL(const ASTCreateNamedCollectionQuery & query)
+{
+ std::lock_guard lock(mutex);
+ loadIfNot(lock);
+
+ if (exists(query.collection_name, lock))
+ {
+ if (query.if_not_exists)
+ return;
+
+ throw Exception(
+ ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
+ "A named collection `{}` already exists",
+ query.collection_name);
+ }
+
+ add(query.collection_name, metadata_storage->create(query), lock);
+}
+
+void NamedCollectionFactory::removeFromSQL(const ASTDropNamedCollectionQuery & query)
+{
+ std::lock_guard lock(mutex);
+ loadIfNot(lock);
+
+ if (!exists(query.collection_name, lock))
+ {
+ if (query.if_exists)
+ return;
+
+ throw Exception(
+ ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
+ "Cannot remove collection `{}`, because it doesn't exist",
+ query.collection_name);
+ }
+
+ metadata_storage->remove(query.collection_name);
+ remove(query.collection_name, lock);
+}
+
+void NamedCollectionFactory::updateFromSQL(const ASTAlterNamedCollectionQuery & query)
+{
+ std::lock_guard lock(mutex);
+ loadIfNot(lock);
+
+ if (!exists(query.collection_name, lock))
+ {
+ if (query.if_exists)
+ return;
+
+ throw Exception(
+ ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
+ "Cannot remove collection `{}`, because it doesn't exist",
+ query.collection_name);
+ }
+
+ metadata_storage->update(query);
+
+ auto collection = getMutable(query.collection_name, lock);
+ auto collection_lock = collection->lock();
+
+ for (const auto & [name, value] : query.changes)
+ {
+ auto it_override = query.overridability.find(name);
+ if (it_override != query.overridability.end())
+ collection->setOrUpdate(name, convertFieldToString(value), it_override->second);
+ else
+ collection->setOrUpdate(name, convertFieldToString(value), {});
+ }
+
+ for (const auto & key : query.delete_keys)
+ collection->remove(key);
+}
+
+void NamedCollectionFactory::reloadFromSQL()
+{
+ std::lock_guard lock(mutex);
+ if (loadIfNot(lock))
+ return;
+
+ auto collections = metadata_storage->getAll();
+ removeById(NamedCollection::SourceId::SQL, lock);
+ add(std::move(collections), lock);
+}
+
+void NamedCollectionFactory::updateFunc()
+{
+ LOG_TRACE(log, "Named collections background updating thread started");
+
+ while (!shutdown_called.load())
+ {
+ if (metadata_storage->waitUpdate())
+ {
+ try
+ {
+ reloadFromSQL();
+ }
+ catch (const Coordination::Exception & e)
+ {
+ if (Coordination::isHardwareError(e.code))
+ {
+ LOG_INFO(log, "Lost ZooKeeper connection, will try to connect again: {}",
+ DB::getCurrentExceptionMessage(true));
+
+ sleepForSeconds(1);
+ }
+ else
+ {
+ tryLogCurrentException(__PRETTY_FUNCTION__);
+ chassert(false);
+ }
+ continue;
+ }
+ catch (...)
+ {
+ DB::tryLogCurrentException(__PRETTY_FUNCTION__);
+ chassert(false);
+ continue;
+ }
+ }
+ }
+
+ LOG_TRACE(log, "Named collections background updating thread finished");
}
}
diff --git a/src/Common/NamedCollections/NamedCollectionsFactory.h b/src/Common/NamedCollections/NamedCollectionsFactory.h
index 2d64a03bde3..6ee5940e686 100644
--- a/src/Common/NamedCollections/NamedCollectionsFactory.h
+++ b/src/Common/NamedCollections/NamedCollectionsFactory.h
@@ -1,58 +1,83 @@
#pragma once
#include
+#include
+#include
namespace DB
{
+class ASTCreateNamedCollectionQuery;
+class ASTDropNamedCollectionQuery;
+class ASTAlterNamedCollectionQuery;
class NamedCollectionFactory : boost::noncopyable
{
public:
static NamedCollectionFactory & instance();
+ ~NamedCollectionFactory();
+
bool exists(const std::string & collection_name) const;
NamedCollectionPtr get(const std::string & collection_name) const;
NamedCollectionPtr tryGet(const std::string & collection_name) const;
- MutableNamedCollectionPtr getMutable(const std::string & collection_name) const;
-
- void add(const std::string & collection_name, MutableNamedCollectionPtr collection);
-
- void add(NamedCollectionsMap collections);
-
- void update(NamedCollectionsMap collections);
-
- void remove(const std::string & collection_name);
-
- void removeIfExists(const std::string & collection_name);
-
- void removeById(NamedCollectionUtils::SourceId id);
-
NamedCollectionsMap getAll() const;
-private:
- bool existsUnlocked(
- const std::string & collection_name,
- std::lock_guard & lock) const;
+ void reloadFromConfig(const Poco::Util::AbstractConfiguration & config);
- MutableNamedCollectionPtr tryGetUnlocked(
- const std::string & collection_name,
- std::lock_guard & lock) const;
+ void reloadFromSQL();
- void addUnlocked(
- const std::string & collection_name,
- MutableNamedCollectionPtr collection,
- std::lock_guard & lock);
+ void createFromSQL(const ASTCreateNamedCollectionQuery & query);
- bool removeIfExistsUnlocked(
- const std::string & collection_name,
- std::lock_guard & lock);
+ void removeFromSQL(const ASTDropNamedCollectionQuery & query);
+ void updateFromSQL(const ASTAlterNamedCollectionQuery & query);
+
+ void loadIfNot();
+
+ void shutdown();
+
+protected:
mutable NamedCollectionsMap loaded_named_collections;
-
mutable std::mutex mutex;
- bool is_initialized = false;
+
+ const LoggerPtr log = getLogger("NamedCollectionFactory");
+
+ bool loaded = false;
+ std::atomic shutdown_called = false;
+ std::unique_ptr metadata_storage;
+ BackgroundSchedulePool::TaskHolder update_task;
+
+ bool loadIfNot(std::lock_guard & lock);
+
+ bool exists(
+ const std::string & collection_name,
+ std::lock_guard & lock) const;
+
+ MutableNamedCollectionPtr getMutable(const std::string & collection_name, std::lock_guard & lock) const;
+
+ void add(const std::string & collection_name, MutableNamedCollectionPtr collection, std::lock_guard & lock);
+
+ void add(NamedCollectionsMap collections, std::lock_guard & lock);
+
+ void update(NamedCollectionsMap collections, std::lock_guard & lock);
+
+ void remove(const std::string & collection_name, std::lock_guard & lock);
+
+ bool removeIfExists(const std::string & collection_name, std::lock_guard & lock);
+
+ MutableNamedCollectionPtr tryGet(const std::string & collection_name, std::lock_guard & lock) const;
+
+ void removeById(NamedCollection::SourceId id, std::lock_guard & lock);
+
+ void loadFromConfig(
+ const Poco::Util::AbstractConfiguration & config,
+ std::lock_guard & lock);
+
+ void loadFromSQL(std::lock_guard & lock);
+
+ void updateFunc();
};
}
diff --git a/src/Common/NamedCollections/NamedCollectionsMetadataStorage.cpp b/src/Common/NamedCollections/NamedCollectionsMetadataStorage.cpp
new file mode 100644
index 00000000000..32fdb25abd3
--- /dev/null
+++ b/src/Common/NamedCollections/NamedCollectionsMetadataStorage.cpp
@@ -0,0 +1,519 @@
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace fs = std::filesystem;
+
+namespace DB
+{
+namespace ErrorCodes
+{
+ extern const int NAMED_COLLECTION_ALREADY_EXISTS;
+ extern const int NAMED_COLLECTION_DOESNT_EXIST;
+ extern const int INVALID_CONFIG_PARAMETER;
+ extern const int BAD_ARGUMENTS;
+ extern const int LOGICAL_ERROR;
+}
+
+static const std::string named_collections_storage_config_path = "named_collections_storage";
+
+namespace
+{
+ MutableNamedCollectionPtr createNamedCollectionFromAST(const ASTCreateNamedCollectionQuery & query)
+ {
+ const auto & collection_name = query.collection_name;
+ const auto config = NamedCollectionConfiguration::createConfiguration(collection_name, query.changes, query.overridability);
+
+ std::set> keys;
+ for (const auto & [name, _] : query.changes)
+ keys.insert(name);
+
+ return NamedCollection::create(
+ *config, collection_name, "", keys, NamedCollection::SourceId::SQL, /* is_mutable */true);
+ }
+
+ std::string getFileName(const std::string & collection_name)
+ {
+ return escapeForFileName(collection_name) + ".sql";
+ }
+}
+
+class NamedCollectionsMetadataStorage::INamedCollectionsStorage
+{
+public:
+ virtual ~INamedCollectionsStorage() = default;
+
+ virtual bool exists(const std::string & path) const = 0;
+
+ virtual std::vector list() const = 0;
+
+ virtual std::string read(const std::string & path) const = 0;
+
+ virtual void write(const std::string & path, const std::string & data, bool replace) = 0;
+
+ virtual void remove(const std::string & path) = 0;
+
+ virtual bool removeIfExists(const std::string & path) = 0;
+
+ virtual bool supportsPeriodicUpdate() const = 0;
+
+ virtual bool waitUpdate(size_t /* timeout */) { return false; }
+};
+
+
+class NamedCollectionsMetadataStorage::LocalStorage : public INamedCollectionsStorage, private WithContext
+{
+private:
+ std::string root_path;
+
+public:
+ LocalStorage(ContextPtr context_, const std::string & path_)
+ : WithContext(context_)
+ , root_path(path_)
+ {
+ if (fs::exists(root_path))
+ cleanup();
+ }
+
+ ~LocalStorage() override = default;
+
+ bool supportsPeriodicUpdate() const override { return false; }
+
+ std::vector list() const override
+ {
+ if (!fs::exists(root_path))
+ return {};
+
+ std::vector elements;
+ for (fs::directory_iterator it{root_path}; it != fs::directory_iterator{}; ++it)
+ {
+ const auto & current_path = it->path();
+ if (current_path.extension() == ".sql")
+ {
+ elements.push_back(it->path());
+ }
+ else
+ {
+ LOG_WARNING(
+ getLogger("LocalStorage"),
+ "Unexpected file {} in named collections directory",
+ current_path.filename().string());
+ }
+ }
+ return elements;
+ }
+
+ bool exists(const std::string & path) const override
+ {
+ return fs::exists(getPath(path));
+ }
+
+ std::string read(const std::string & path) const override
+ {
+ ReadBufferFromFile in(getPath(path));
+ std::string data;
+ readStringUntilEOF(data, in);
+ return data;
+ }
+
+ void write(const std::string & path, const std::string & data, bool replace) override
+ {
+ if (!replace && fs::exists(path))
+ {
+ throw Exception(
+ ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
+ "Metadata file {} for named collection already exists",
+ path);
+ }
+
+ fs::create_directories(root_path);
+
+ auto tmp_path = getPath(path + ".tmp");
+ WriteBufferFromFile out(tmp_path, data.size(), O_WRONLY | O_CREAT | O_EXCL);
+ writeString(data, out);
+
+ out.next();
+ if (getContext()->getSettingsRef().fsync_metadata)
+ out.sync();
+ out.close();
+
+ fs::rename(tmp_path, getPath(path));
+ }
+
+ void remove(const std::string & path) override
+ {
+ if (!removeIfExists(getPath(path)))
+ {
+ throw Exception(
+ ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
+ "Cannot remove `{}`, because it doesn't exist", path);
+ }
+ }
+
+ bool removeIfExists(const std::string & path) override
+ {
+ return fs::remove(getPath(path));
+ }
+
+private:
+ std::string getPath(const std::string & path) const
+ {
+ return fs::path(root_path) / path;
+ }
+
+ /// Delete .tmp files. They could be left undeleted in case of
+ /// some exception or abrupt server restart.
+ void cleanup()
+ {
+ std::vector files_to_remove;
+ for (fs::directory_iterator it{root_path}; it != fs::directory_iterator{}; ++it)
+ {
+ const auto & current_path = it->path();
+ if (current_path.extension() == ".tmp")
+ files_to_remove.push_back(current_path);
+ }
+ for (const auto & file : files_to_remove)
+ fs::remove(file);
+ }
+};
+
+
+class NamedCollectionsMetadataStorage::ZooKeeperStorage : public INamedCollectionsStorage, private WithContext
+{
+private:
+ std::string root_path;
+ mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr};
+ mutable zkutil::EventPtr wait_event;
+ mutable Int32 collections_node_cversion = 0;
+
+public:
+ ZooKeeperStorage(ContextPtr context_, const std::string & path_)
+ : WithContext(context_)
+ , root_path(path_)
+ {
+ if (root_path.empty())
+ throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Collections path cannot be empty");
+
+ if (root_path != "/" && root_path.back() == '/')
+ root_path.resize(root_path.size() - 1);
+ if (root_path.front() != '/')
+ root_path = "/" + root_path;
+
+ auto client = getClient();
+ if (root_path != "/" && !client->exists(root_path))
+ {
+ client->createAncestors(root_path);
+ client->createIfNotExists(root_path, "");
+ }
+ }
+
+ ~ZooKeeperStorage() override = default;
+
+ bool supportsPeriodicUpdate() const override { return true; }
+
+ /// Return true if children changed.
+ bool waitUpdate(size_t timeout) override
+ {
+ if (!wait_event)
+ {
+ /// We did not yet made any list() attempt, so do that.
+ return true;
+ }
+
+ if (wait_event->tryWait(timeout))
+ {
+ /// Children changed before timeout.
+ return true;
+ }
+
+ std::string res;
+ Coordination::Stat stat;
+
+ if (!getClient()->tryGet(root_path, res, &stat))
+ {
+ /// We do create root_path in constructor of this class,
+ /// so this case is not really possible.
+ chassert(false);
+ return false;
+ }
+
+ return stat.cversion != collections_node_cversion;
+ }
+
+ std::vector list() const override
+ {
+ if (!wait_event)
+ wait_event = std::make_shared();
+
+ Coordination::Stat stat;
+ auto children = getClient()->getChildren(root_path, &stat, wait_event);
+ collections_node_cversion = stat.cversion;
+ return children;
+ }
+
+ bool exists(const std::string & path) const override
+ {
+ return getClient()->exists(getPath(path));
+ }
+
+ std::string read(const std::string & path) const override
+ {
+ return getClient()->get(getPath(path));
+ }
+
+ void write(const std::string & path, const std::string & data, bool replace) override
+ {
+ if (replace)
+ {
+ getClient()->createOrUpdate(getPath(path), data, zkutil::CreateMode::Persistent);
+ }
+ else
+ {
+ auto code = getClient()->tryCreate(getPath(path), data, zkutil::CreateMode::Persistent);
+
+ if (code == Coordination::Error::ZNODEEXISTS)
+ {
+ throw Exception(
+ ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
+ "Metadata file {} for named collection already exists",
+ path);
+ }
+ }
+ }
+
+ void remove(const std::string & path) override
+ {
+ getClient()->remove(getPath(path));
+ }
+
+ bool removeIfExists(const std::string & path) override
+ {
+ auto code = getClient()->tryRemove(getPath(path));
+ if (code == Coordination::Error::ZOK)
+ return true;
+ if (code == Coordination::Error::ZNONODE)
+ return false;
+ throw Coordination::Exception::fromPath(code, getPath(path));
+ }
+
+private:
+ zkutil::ZooKeeperPtr getClient() const
+ {
+ if (!zookeeper_client || zookeeper_client->expired())
+ {
+ zookeeper_client = getContext()->getZooKeeper();
+ zookeeper_client->sync(root_path);
+ }
+ return zookeeper_client;
+ }
+
+ std::string getPath(const std::string & path) const
+ {
+ return fs::path(root_path) / path;
+ }
+};
+
+NamedCollectionsMetadataStorage::NamedCollectionsMetadataStorage(
+ std::shared_ptr storage_,
+ ContextPtr context_)
+ : WithContext(context_)
+ , storage(std::move(storage_))
+{
+}
+
+MutableNamedCollectionPtr NamedCollectionsMetadataStorage::get(const std::string & collection_name) const
+{
+ const auto query = readCreateQuery(collection_name);
+ return createNamedCollectionFromAST(query);
+}
+
+NamedCollectionsMap NamedCollectionsMetadataStorage::getAll() const
+{
+ NamedCollectionsMap result;
+ for (const auto & collection_name : listCollections())
+ {
+ if (result.contains(collection_name))
+ {
+ throw Exception(
+ ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
+ "Found duplicate named collection `{}`",
+ collection_name);
+ }
+ result.emplace(collection_name, get(collection_name));
+ }
+ return result;
+}
+
+MutableNamedCollectionPtr NamedCollectionsMetadataStorage::create(const ASTCreateNamedCollectionQuery & query)
+{
+ writeCreateQuery(query);
+ return createNamedCollectionFromAST(query);
+}
+
+void NamedCollectionsMetadataStorage::remove(const std::string & collection_name)
+{
+ storage->remove(getFileName(collection_name));
+}
+
+bool NamedCollectionsMetadataStorage::removeIfExists(const std::string & collection_name)
+{
+ return storage->removeIfExists(getFileName(collection_name));
+}
+
+void NamedCollectionsMetadataStorage::update(const ASTAlterNamedCollectionQuery & query)
+{
+ auto create_query = readCreateQuery(query.collection_name);
+
+ std::unordered_map result_changes_map;
+ for (const auto & [name, value] : query.changes)
+ {
+ auto [it, inserted] = result_changes_map.emplace(name, value);
+ if (!inserted)
+ {
+ throw Exception(
+ ErrorCodes::BAD_ARGUMENTS,
+ "Value with key `{}` is used twice in the SET query (collection name: {})",
+ name, query.collection_name);
+ }
+ }
+
+ for (const auto & [name, value] : create_query.changes)
+ result_changes_map.emplace(name, value);
+
+ std::unordered_map result_overridability_map;
+ for (const auto & [name, value] : query.overridability)
+ result_overridability_map.emplace(name, value);
+ for (const auto & [name, value] : create_query.overridability)
+ result_overridability_map.emplace(name, value);
+
+ for (const auto & delete_key : query.delete_keys)
+ {
+ auto it = result_changes_map.find(delete_key);
+ if (it == result_changes_map.end())
+ {
+ throw Exception(
+ ErrorCodes::BAD_ARGUMENTS,
+ "Cannot delete key `{}` because it does not exist in collection",
+ delete_key);
+ }
+ else
+ {
+ result_changes_map.erase(it);
+ auto it_override = result_overridability_map.find(delete_key);
+ if (it_override != result_overridability_map.end())
+ result_overridability_map.erase(it_override);
+ }
+ }
+
+ create_query.changes.clear();
+ for (const auto & [name, value] : result_changes_map)
+ create_query.changes.emplace_back(name, value);
+ create_query.overridability = std::move(result_overridability_map);
+
+ if (create_query.changes.empty())
+ throw Exception(
+ ErrorCodes::BAD_ARGUMENTS,
+ "Named collection cannot be empty (collection name: {})",
+ query.collection_name);
+
+ chassert(create_query.collection_name == query.collection_name);
+ writeCreateQuery(create_query, true);
+}
+
+std::vector NamedCollectionsMetadataStorage::listCollections() const
+{
+ auto paths = storage->list();
+ std::vector collections;
+ collections.reserve(paths.size());
+ for (const auto & path : paths)
+ collections.push_back(std::filesystem::path(path).stem());
+ return collections;
+}
+
+ASTCreateNamedCollectionQuery NamedCollectionsMetadataStorage::readCreateQuery(const std::string & collection_name) const
+{
+ const auto path = getFileName(collection_name);
+ auto query = storage->read(path);
+ const auto & settings = getContext()->getSettingsRef();
+
+ ParserCreateNamedCollectionQuery parser;
+ auto ast = parseQuery(parser, query, "in file " + path, 0, settings.max_parser_depth, settings.max_parser_backtracks);
+ const auto & create_query = ast->as();
+ return create_query;
+}
+
+void NamedCollectionsMetadataStorage::writeCreateQuery(const ASTCreateNamedCollectionQuery & query, bool replace)
+{
+ auto normalized_query = query.clone();
+ auto & changes = typeid_cast(normalized_query.get())->changes;
+ ::sort(
+ changes.begin(), changes.end(),
+ [](const SettingChange & lhs, const SettingChange & rhs) { return lhs.name < rhs.name; });
+
+ storage->write(getFileName(query.collection_name), serializeAST(*normalized_query), replace);
+}
+
+bool NamedCollectionsMetadataStorage::supportsPeriodicUpdate() const
+{
+ return storage->supportsPeriodicUpdate();
+}
+
+bool NamedCollectionsMetadataStorage::waitUpdate()
+{
+ if (!storage->supportsPeriodicUpdate())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Periodic updates are not supported");
+
+ const auto & config = Context::getGlobalContextInstance()->getConfigRef();
+ const size_t timeout = config.getUInt(named_collections_storage_config_path + ".update_timeout_ms", 5000);
+
+ return storage->waitUpdate(timeout);
+}
+
+std::unique_ptr NamedCollectionsMetadataStorage::create(const ContextPtr & context_)
+{
+ const auto & config = context_->getConfigRef();
+ const auto storage_type = config.getString(named_collections_storage_config_path + ".type", "local");
+
+ if (storage_type == "local")
+ {
+ const auto path = config.getString(
+ named_collections_storage_config_path + ".path",
+ std::filesystem::path(context_->getPath()) / "named_collections");
+
+ LOG_TRACE(getLogger("NamedCollectionsMetadataStorage"),
+ "Using local storage for named collections at path: {}", path);
+
+ auto local_storage = std::make_unique(context_, path);
+ return std::unique_ptr(
+ new NamedCollectionsMetadataStorage(std::move(local_storage), context_));
+ }
+ if (storage_type == "zookeeper" || storage_type == "keeper")
+ {
+ const auto path = config.getString(named_collections_storage_config_path + ".path");
+ auto zk_storage = std::make_unique(context_, path);
+
+ LOG_TRACE(getLogger("NamedCollectionsMetadataStorage"),
+ "Using zookeeper storage for named collections at path: {}", path);
+
+ return std::unique_ptr(
+ new NamedCollectionsMetadataStorage(std::move(zk_storage), context_));
+ }
+
+ throw Exception(
+ ErrorCodes::INVALID_CONFIG_PARAMETER,
+ "Unknown storage for named collections: {}", storage_type);
+}
+
+}
diff --git a/src/Common/NamedCollections/NamedCollectionsMetadataStorage.h b/src/Common/NamedCollections/NamedCollectionsMetadataStorage.h
new file mode 100644
index 00000000000..3c089fe2fa2
--- /dev/null
+++ b/src/Common/NamedCollections/NamedCollectionsMetadataStorage.h
@@ -0,0 +1,52 @@
+#pragma once
+#include
+#include
+#include
+#include
+#include
+
+namespace DB
+{
+
+class NamedCollectionsMetadataStorage : private WithContext
+{
+public:
+ static std::unique_ptr create(const ContextPtr & context);
+
+ NamedCollectionsMap getAll() const;
+
+ MutableNamedCollectionPtr get(const std::string & collection_name) const;
+
+ MutableNamedCollectionPtr create(const ASTCreateNamedCollectionQuery & query);
+
+ void remove(const std::string & collection_name);
+
+ bool removeIfExists(const std::string & collection_name);
+
+ void update(const ASTAlterNamedCollectionQuery & query);
+
+ void shutdown();
+
+ /// Return true if update was made
+ bool waitUpdate();
+
+ bool supportsPeriodicUpdate() const;
+
+private:
+ class INamedCollectionsStorage;
+ class LocalStorage;
+ class ZooKeeperStorage;
+
+ std::shared_ptr storage;
+
+ NamedCollectionsMetadataStorage(std::shared_ptr storage_, ContextPtr context_);
+
+ std::vector listCollections() const;
+
+ ASTCreateNamedCollectionQuery readCreateQuery(const std::string & collection_name) const;
+
+ void writeCreateQuery(const ASTCreateNamedCollectionQuery & query, bool replace = false);
+};
+
+
+}
diff --git a/src/Common/tests/gtest_named_collections.cpp b/src/Common/tests/gtest_named_collections.cpp
index 8a8a364961b..8d9aa2bc213 100644
--- a/src/Common/tests/gtest_named_collections.cpp
+++ b/src/Common/tests/gtest_named_collections.cpp
@@ -1,12 +1,40 @@
#include
#include
-#include
#include
#include
#include
using namespace DB;
+/// A class which allows to test private methods of NamedCollectionFactory.
+class NamedCollectionFactoryFriend : public NamedCollectionFactory
+{
+public:
+ static NamedCollectionFactoryFriend & instance()
+ {
+ static NamedCollectionFactoryFriend instance;
+ return instance;
+ }
+
+ void loadFromConfig(const Poco::Util::AbstractConfiguration & config)
+ {
+ std::lock_guard lock(mutex);
+ NamedCollectionFactory::loadFromConfig(config, lock);
+ }
+
+ void add(const std::string & collection_name, MutableNamedCollectionPtr collection)
+ {
+ std::lock_guard lock(mutex);
+ NamedCollectionFactory::add(collection_name, collection, lock);
+ }
+
+ void remove(const std::string & collection_name)
+ {
+ std::lock_guard lock(mutex);
+ NamedCollectionFactory::remove(collection_name, lock);
+ }
+};
+
TEST(NamedCollections, SimpleConfig)
{
std::string xml(R"CONFIG(
@@ -29,13 +57,13 @@ TEST(NamedCollections, SimpleConfig)
Poco::AutoPtr document = dom_parser.parseString(xml);
Poco::AutoPtr config = new Poco::Util::XMLConfiguration(document);
- NamedCollectionUtils::loadFromConfig(*config);
+ NamedCollectionFactoryFriend::instance().loadFromConfig(*config);
- ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection1"));
- ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection2"));
- ASSERT_TRUE(NamedCollectionFactory::instance().tryGet("collection3") == nullptr);
+ ASSERT_TRUE(NamedCollectionFactoryFriend::instance().exists("collection1"));
+ ASSERT_TRUE(NamedCollectionFactoryFriend::instance().exists("collection2"));
+ ASSERT_TRUE(NamedCollectionFactoryFriend::instance().tryGet("collection3") == nullptr);
- auto collections = NamedCollectionFactory::instance().getAll();
+ auto collections = NamedCollectionFactoryFriend::instance().getAll();
ASSERT_EQ(collections.size(), 2);
ASSERT_TRUE(collections.contains("collection1"));
ASSERT_TRUE(collections.contains("collection2"));
@@ -47,7 +75,7 @@ key3: 3.3
key4: -4
)CONFIG");
- auto collection1 = NamedCollectionFactory::instance().get("collection1");
+ auto collection1 = NamedCollectionFactoryFriend::instance().get("collection1");
ASSERT_TRUE(collection1 != nullptr);
ASSERT_TRUE(collection1->get("key1") == "value1");
@@ -61,7 +89,7 @@ key5: 5
key6: 6.6
)CONFIG");
- auto collection2 = NamedCollectionFactory::instance().get("collection2");
+ auto collection2 = NamedCollectionFactoryFriend::instance().get("collection2");
ASSERT_TRUE(collection2 != nullptr);
ASSERT_TRUE(collection2->get("key4") == "value4");
@@ -69,9 +97,9 @@ key6: 6.6
ASSERT_TRUE(collection2->get("key6") == 6.6);
auto collection2_copy = collections["collection2"]->duplicate();
- NamedCollectionFactory::instance().add("collection2_copy", collection2_copy);
- ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection2_copy"));
- ASSERT_EQ(NamedCollectionFactory::instance().get("collection2_copy")->dumpStructure(),
+ NamedCollectionFactoryFriend::instance().add("collection2_copy", collection2_copy);
+ ASSERT_TRUE(NamedCollectionFactoryFriend::instance().exists("collection2_copy"));
+ ASSERT_EQ(NamedCollectionFactoryFriend::instance().get("collection2_copy")->dumpStructure(),
R"CONFIG(key4: value4
key5: 5
key6: 6.6
@@ -88,8 +116,8 @@ key6: 6.6
collection2_copy->setOrUpdate("key4", "value45", {});
ASSERT_EQ(collection2_copy->getOrDefault("key4", "N"), "value45");
- NamedCollectionFactory::instance().remove("collection2_copy");
- ASSERT_FALSE(NamedCollectionFactory::instance().exists("collection2_copy"));
+ NamedCollectionFactoryFriend::instance().remove("collection2_copy");
+ ASSERT_FALSE(NamedCollectionFactoryFriend::instance().exists("collection2_copy"));
config.reset();
}
@@ -119,11 +147,11 @@ TEST(NamedCollections, NestedConfig)
Poco::AutoPtr document = dom_parser.parseString(xml);
Poco::AutoPtr config = new Poco::Util::XMLConfiguration(document);
- NamedCollectionUtils::loadFromConfig(*config);
+ NamedCollectionFactoryFriend::instance().loadFromConfig(*config);
- ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection3"));
+ ASSERT_TRUE(NamedCollectionFactoryFriend::instance().exists("collection3"));
- auto collection = NamedCollectionFactory::instance().get("collection3");
+ auto collection = NamedCollectionFactoryFriend::instance().get("collection3");
ASSERT_TRUE(collection != nullptr);
ASSERT_EQ(collection->dumpStructure(),
@@ -171,8 +199,8 @@ TEST(NamedCollections, NestedConfigDuplicateKeys)
Poco::AutoPtr document = dom_parser.parseString(xml);
Poco::AutoPtr config = new Poco::Util::XMLConfiguration(document);
- NamedCollectionUtils::loadFromConfig(*config);
- auto collection = NamedCollectionFactory::instance().get("collection");
+ NamedCollectionFactoryFriend::instance().loadFromConfig(*config);
+ auto collection = NamedCollectionFactoryFriend::instance().get("collection");
auto keys = collection->getKeys();
ASSERT_EQ(keys.size(), 6);
diff --git a/src/Core/Settings.h b/src/Core/Settings.h
index cf572459cbc..7f99243e285 100644
--- a/src/Core/Settings.h
+++ b/src/Core/Settings.h
@@ -334,7 +334,7 @@ class IColumn;
M(Bool, fsync_metadata, true, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.", 0) \
\
M(Bool, join_use_nulls, false, "Use NULLs for non-joined rows of outer JOINs for types that can be inside Nullable. If false, use default value of corresponding columns data type.", IMPORTANT) \
- M(Bool, allow_experimental_join_condition, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y.", IMPORTANT) \
+ M(Bool, allow_experimental_join_condition, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y.", 0) \
\
M(JoinStrictness, join_default_strictness, JoinStrictness::All, "Set default strictness in JOIN query. Possible values: empty string, 'ANY', 'ALL'. If empty, query without strictness will throw exception.", 0) \
M(Bool, any_join_distinct_right_table_keys, false, "Enable old ANY JOIN logic with many-to-one left-to-right table keys mapping for all ANY JOINs. It leads to confusing not equal results for 't1 ANY LEFT JOIN t2' and 't2 ANY RIGHT JOIN t1'. ANY RIGHT JOIN needs one-to-many keys mapping to be consistent with LEFT one.", IMPORTANT) \
@@ -517,6 +517,7 @@ class IColumn;
M(UInt64, backup_restore_keeper_value_max_size, 1048576, "Maximum size of data of a [Zoo]Keeper's node during backup", 0) \
M(UInt64, backup_restore_batch_size_for_keeper_multiread, 10000, "Maximum size of batch for multiread request to [Zoo]Keeper during backup or restore", 0) \
M(UInt64, backup_restore_batch_size_for_keeper_multi, 1000, "Maximum size of batch for multi request to [Zoo]Keeper during backup or restore", 0) \
+ M(UInt64, backup_restore_s3_retry_attempts, 1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore.", 0) \
M(UInt64, max_backup_bandwidth, 0, "The maximum read speed in bytes per second for particular backup on server. Zero means unlimited.", 0) \
\
M(Bool, log_profile_events, true, "Log query performance statistics into the query_log, query_thread_log and query_views_log.", 0) \
@@ -1059,7 +1060,8 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
- M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
+ M(UInt64, input_format_parquet_max_block_size, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader.", 0) \
+ M(UInt64, input_format_parquet_prefer_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \
diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h
index 7eb14047ace..b447421671e 100644
--- a/src/Core/SettingsChangesHistory.h
+++ b/src/Core/SettingsChangesHistory.h
@@ -96,6 +96,8 @@ static const std::map
#include
#include
+#include
#include
#include
#include
@@ -174,4 +175,9 @@ DataTypePtr removeNullableOrLowCardinalityNullable(const DataTypePtr & type)
}
+bool canContainNull(const IDataType & type)
+{
+ return type.isNullable() || type.isLowCardinalityNullable() || isDynamic(type) || isVariant(type);
+}
+
}
diff --git a/src/DataTypes/DataTypeNullable.h b/src/DataTypes/DataTypeNullable.h
index 71abe48c151..7a8a54fdf3a 100644
--- a/src/DataTypes/DataTypeNullable.h
+++ b/src/DataTypes/DataTypeNullable.h
@@ -62,4 +62,6 @@ DataTypePtr makeNullableOrLowCardinalityNullableSafe(const DataTypePtr & type);
/// Nullable(T) -> T, LowCardinality(Nullable(T)) -> T
DataTypePtr removeNullableOrLowCardinalityNullable(const DataTypePtr & type);
+bool canContainNull(const IDataType & type);
+
}
diff --git a/src/Databases/DatabasesCommon.cpp b/src/Databases/DatabasesCommon.cpp
index 5fee14ecc2a..fd38a31da5c 100644
--- a/src/Databases/DatabasesCommon.cpp
+++ b/src/Databases/DatabasesCommon.cpp
@@ -41,11 +41,11 @@ void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemo
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot alter table {} because it was created AS table function"
" and doesn't have structure in metadata", backQuote(ast_create_query.getTable()));
- if (!has_structure && !ast_create_query.is_dictionary)
+ if (!has_structure && !ast_create_query.is_dictionary && !ast_create_query.isParameterizedView())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot alter table {} metadata doesn't have structure",
backQuote(ast_create_query.getTable()));
- if (!ast_create_query.is_dictionary)
+ if (!ast_create_query.is_dictionary && !ast_create_query.isParameterizedView())
{
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(metadata.columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(metadata.secondary_indices);
diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp
index a7883919c4c..e90986f2236 100644
--- a/src/Formats/FormatFactory.cpp
+++ b/src/Formats/FormatFactory.cpp
@@ -161,6 +161,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
+ format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types;
format_settings.parquet.use_custom_encoder = settings.output_format_parquet_use_custom_encoder;
diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h
index b296928e4d4..337aafbbe9c 100644
--- a/src/Formats/FormatSettings.h
+++ b/src/Formats/FormatSettings.h
@@ -265,7 +265,8 @@ struct FormatSettings
bool preserve_order = false;
bool use_custom_encoder = true;
bool parallel_encoding = true;
- UInt64 max_block_size = 8192;
+ UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
+ size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
bool output_compliant_nested_types = true;
diff --git a/src/Functions/LeastGreatestGeneric.h b/src/Functions/LeastGreatestGeneric.h
index 9073f14d679..bbab001b00d 100644
--- a/src/Functions/LeastGreatestGeneric.h
+++ b/src/Functions/LeastGreatestGeneric.h
@@ -111,7 +111,7 @@ public:
argument_types.push_back(argument.type);
/// More efficient specialization for two numeric arguments.
- if (arguments.size() == 2 && isNumber(arguments[0].type) && isNumber(arguments[1].type))
+ if (arguments.size() == 2 && isNumber(removeNullable(arguments[0].type)) && isNumber(removeNullable(arguments[1].type)))
return std::make_unique(SpecializedFunction::create(context), argument_types, return_type);
return std::make_unique(
@@ -123,7 +123,7 @@ public:
if (types.empty())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} cannot be called without arguments", getName());
- if (types.size() == 2 && isNumber(types[0]) && isNumber(types[1]))
+ if (types.size() == 2 && isNumber(removeNullable(types[0])) && isNumber(removeNullable(types[1])))
return SpecializedFunction::create(context)->getReturnTypeImpl(types);
return getLeastSupertype(types);
diff --git a/src/Functions/isNotNull.cpp b/src/Functions/isNotNull.cpp
index ea95a5c2b1c..a10e7ebd40c 100644
--- a/src/Functions/isNotNull.cpp
+++ b/src/Functions/isNotNull.cpp
@@ -29,6 +29,18 @@ public:
return name;
}
+ ColumnPtr getConstantResultForNonConstArguments(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) const override
+ {
+ const ColumnWithTypeAndName & elem = arguments[0];
+ if (elem.type->onlyNull())
+ return result_type->createColumnConst(1, UInt8(0));
+
+ if (canContainNull(*elem.type))
+ return nullptr;
+
+ return result_type->createColumnConst(1, UInt8(1));
+ }
+
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForConstants() const override { return true; }
diff --git a/src/Functions/isNull.cpp b/src/Functions/isNull.cpp
index a98ff2ab8e8..95d659b103b 100644
--- a/src/Functions/isNull.cpp
+++ b/src/Functions/isNull.cpp
@@ -31,6 +31,18 @@ public:
return name;
}
+ ColumnPtr getConstantResultForNonConstArguments(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) const override
+ {
+ const ColumnWithTypeAndName & elem = arguments[0];
+ if (elem.type->onlyNull())
+ return result_type->createColumnConst(1, UInt8(1));
+
+ if (canContainNull(*elem.type))
+ return nullptr;
+
+ return result_type->createColumnConst(1, UInt8(0));
+ }
+
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
diff --git a/src/Functions/isNullable.cpp b/src/Functions/isNullable.cpp
index 14874487f40..b24ee4f5e73 100644
--- a/src/Functions/isNullable.cpp
+++ b/src/Functions/isNullable.cpp
@@ -2,6 +2,7 @@
#include
#include
#include
+#include
namespace DB
{
@@ -23,6 +24,15 @@ public:
return name;
}
+ ColumnPtr getConstantResultForNonConstArguments(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) const override
+ {
+ const ColumnWithTypeAndName & elem = arguments[0];
+ if (elem.type->onlyNull() || canContainNull(*elem.type))
+ return result_type->createColumnConst(1, UInt8(1));
+
+ return result_type->createColumnConst(1, UInt8(0));
+ }
+
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForNothing() const override { return false; }
diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h
index bd281846343..1e90acb7f7b 100644
--- a/src/IO/S3/Client.h
+++ b/src/IO/S3/Client.h
@@ -162,7 +162,7 @@ public:
class RetryStrategy : public Aws::Client::RetryStrategy
{
public:
- explicit RetryStrategy(uint32_t maxRetries_ = 10, uint32_t scaleFactor_ = 25, uint32_t maxDelayMs_ = 90000);
+ explicit RetryStrategy(uint32_t maxRetries_ = 10, uint32_t scaleFactor_ = 25, uint32_t maxDelayMs_ = 5000);
/// NOLINTNEXTLINE(google-runtime-int)
bool ShouldRetry(const Aws::Client::AWSError& error, long attemptedRetries) const override;
diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp
index b1fb6a68618..67ea069d46d 100644
--- a/src/Interpreters/Context.cpp
+++ b/src/Interpreters/Context.cpp
@@ -18,6 +18,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -610,6 +611,8 @@ struct ContextSharedPart : boost::noncopyable
LOG_TRACE(log, "Shutting down database catalog");
DatabaseCatalog::shutdown();
+ NamedCollectionFactory::instance().shutdown();
+
delete_async_insert_queue.reset();
SHUTDOWN(log, "merges executor", merge_mutate_executor, wait());
diff --git a/src/Interpreters/InterpreterAlterNamedCollectionQuery.cpp b/src/Interpreters/InterpreterAlterNamedCollectionQuery.cpp
index a4e86879596..79a17fd1844 100644
--- a/src/Interpreters/InterpreterAlterNamedCollectionQuery.cpp
+++ b/src/Interpreters/InterpreterAlterNamedCollectionQuery.cpp
@@ -4,7 +4,7 @@
#include
#include
#include
-#include
+#include
namespace DB
@@ -23,7 +23,7 @@ BlockIO InterpreterAlterNamedCollectionQuery::execute()
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
- NamedCollectionUtils::updateFromSQL(query, current_context);
+ NamedCollectionFactory::instance().updateFromSQL(query);
return {};
}
diff --git a/src/Interpreters/InterpreterCreateNamedCollectionQuery.cpp b/src/Interpreters/InterpreterCreateNamedCollectionQuery.cpp
index 41e87bb73dd..c71441daa8c 100644
--- a/src/Interpreters/InterpreterCreateNamedCollectionQuery.cpp
+++ b/src/Interpreters/InterpreterCreateNamedCollectionQuery.cpp
@@ -4,7 +4,7 @@
#include
#include
#include
-#include
+#include
namespace DB
@@ -23,7 +23,7 @@ BlockIO InterpreterCreateNamedCollectionQuery::execute()
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
- NamedCollectionUtils::createFromSQL(query, current_context);
+ NamedCollectionFactory::instance().createFromSQL(query);
return {};
}
diff --git a/src/Interpreters/InterpreterDropNamedCollectionQuery.cpp b/src/Interpreters/InterpreterDropNamedCollectionQuery.cpp
index baadc85f443..2edaef1b2f2 100644
--- a/src/Interpreters/InterpreterDropNamedCollectionQuery.cpp
+++ b/src/Interpreters/InterpreterDropNamedCollectionQuery.cpp
@@ -4,7 +4,7 @@
#include
#include
#include
-#include
+#include
namespace DB
@@ -23,7 +23,7 @@ BlockIO InterpreterDropNamedCollectionQuery::execute()
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
- NamedCollectionUtils::removeFromSQL(query, current_context);
+ NamedCollectionFactory::instance().removeFromSQL(query);
return {};
}
diff --git a/src/Planner/PlannerExpressionAnalysis.cpp b/src/Planner/PlannerExpressionAnalysis.cpp
index f0a2845c3e8..060bbba1c05 100644
--- a/src/Planner/PlannerExpressionAnalysis.cpp
+++ b/src/Planner/PlannerExpressionAnalysis.cpp
@@ -1,6 +1,7 @@
#include
#include
+#include
#include
#include
@@ -37,7 +38,7 @@ namespace
* Actions before filter are added into into actions chain.
* It is client responsibility to update filter analysis result if filter column must be removed after chain is finalized.
*/
-FilterAnalysisResult analyzeFilter(const QueryTreeNodePtr & filter_expression_node,
+std::optional analyzeFilter(const QueryTreeNodePtr & filter_expression_node,
const ColumnsWithTypeAndName & input_columns,
const PlannerContextPtr & planner_context,
ActionsChain & actions_chain)
@@ -45,7 +46,11 @@ FilterAnalysisResult analyzeFilter(const QueryTreeNodePtr & filter_expression_no
FilterAnalysisResult result;
result.filter_actions = buildActionsDAGFromExpressionNode(filter_expression_node, input_columns, planner_context);
- result.filter_column_name = result.filter_actions->getOutputs().at(0)->result_name;
+ const auto * output = result.filter_actions->getOutputs().at(0);
+ if (output->column && ConstantFilterDescription(*output->column).always_true)
+ return {};
+
+ result.filter_column_name = output->result_name;
actions_chain.addStep(std::make_unique(result.filter_actions));
return result;
@@ -534,8 +539,11 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNo
if (query_node.hasWhere())
{
where_analysis_result_optional = analyzeFilter(query_node.getWhere(), current_output_columns, planner_context, actions_chain);
- where_action_step_index_optional = actions_chain.getLastStepIndex();
- current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
+ if (where_analysis_result_optional)
+ {
+ where_action_step_index_optional = actions_chain.getLastStepIndex();
+ current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
+ }
}
auto aggregation_analysis_result_optional = analyzeAggregation(query_tree, current_output_columns, planner_context, actions_chain);
@@ -548,8 +556,11 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNo
if (query_node.hasHaving())
{
having_analysis_result_optional = analyzeFilter(query_node.getHaving(), current_output_columns, planner_context, actions_chain);
- having_action_step_index_optional = actions_chain.getLastStepIndex();
- current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
+ if (having_analysis_result_optional)
+ {
+ having_action_step_index_optional = actions_chain.getLastStepIndex();
+ current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
+ }
}
auto window_analysis_result_optional = analyzeWindow(query_tree, current_output_columns, planner_context, actions_chain);
@@ -562,8 +573,11 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNo
if (query_node.hasQualify())
{
qualify_analysis_result_optional = analyzeFilter(query_node.getQualify(), current_output_columns, planner_context, actions_chain);
- qualify_action_step_index_optional = actions_chain.getLastStepIndex();
- current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
+ if (qualify_analysis_result_optional)
+ {
+ qualify_action_step_index_optional = actions_chain.getLastStepIndex();
+ current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
+ }
}
auto projection_analysis_result = analyzeProjection(query_node, current_output_columns, planner_context, actions_chain);
diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp
index d86ad745bac..9d1301305b8 100644
--- a/src/Planner/PlannerJoinTree.cpp
+++ b/src/Planner/PlannerJoinTree.cpp
@@ -49,7 +49,6 @@
#include
#include
-#include
#include
#include
@@ -874,8 +873,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
if (!table->isMergeTree())
return false;
- if (std::dynamic_pointer_cast(table)
- && !query_settings.parallel_replicas_for_non_replicated_merge_tree)
+ if (!table->supportsReplication() && !query_settings.parallel_replicas_for_non_replicated_merge_tree)
return false;
return true;
diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
index 04b3a64b6cb..e837d4d5e20 100644
--- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
+++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
@@ -420,6 +420,24 @@ void ParquetBlockInputFormat::initializeIfNeeded()
int num_row_groups = metadata->num_row_groups();
row_group_batches.reserve(num_row_groups);
+ auto adative_chunk_size = [&](int row_group_idx) -> size_t
+ {
+ size_t total_size = 0;
+ auto row_group_meta = metadata->RowGroup(row_group_idx);
+ for (int column_index : column_indices)
+ {
+ total_size += row_group_meta->ColumnChunk(column_index)->total_uncompressed_size();
+ }
+ if (!total_size || !format_settings.parquet.prefer_block_bytes) return 0;
+ auto average_row_bytes = floor(static_cast(total_size) / row_group_meta->num_rows());
+ // avoid inf preferred_num_rows;
+ if (average_row_bytes < 1) return 0;
+ const size_t preferred_num_rows = static_cast(floor(format_settings.parquet.prefer_block_bytes/average_row_bytes));
+ const size_t MIN_ROW_NUM = 128;
+ // size_t != UInt64 in darwin
+ return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast(format_settings.parquet.max_block_size));
+ };
+
for (int row_group = 0; row_group < num_row_groups; ++row_group)
{
if (skip_row_groups.contains(row_group))
@@ -439,6 +457,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().row_groups_idxs.push_back(row_group);
row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows();
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();
+ auto rows = adative_chunk_size(row_group);
+ row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;
}
}
@@ -449,7 +469,7 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
parquet::ArrowReaderProperties arrow_properties;
parquet::ReaderProperties reader_properties(ArrowMemoryPool::instance());
arrow_properties.set_use_threads(false);
- arrow_properties.set_batch_size(format_settings.parquet.max_block_size);
+ arrow_properties.set_batch_size(row_group_batch.adaptive_chunk_size);
// When reading a row group, arrow will:
// 1. Look at `metadata` to get all byte ranges it'll need to read from the file (typically one
diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h
index d6591f5c0a3..24735ee4371 100644
--- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h
+++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h
@@ -208,6 +208,8 @@ private:
size_t total_rows = 0;
size_t total_bytes_compressed = 0;
+ size_t adaptive_chunk_size = 0;
+
std::vector row_groups_idxs;
// These are only used by the decoding thread, so don't require locking the mutex.
diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp
index 4017670ad14..713f2f35fc8 100644
--- a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp
+++ b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp
@@ -421,6 +421,9 @@ struct AggregateProjectionCandidates
/// This flag means that DAG for projection candidate should be used in FilterStep.
bool has_filter = false;
+
+ /// If not empty, try to find exact ranges from parts to speed up trivial count queries.
+ String only_count_column;
};
AggregateProjectionCandidates getAggregateProjectionCandidates(
@@ -502,6 +505,12 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
candidates.minmax_projection.emplace(std::move(minmax));
}
}
+ else
+ {
+ /// Trivial count optimization only applies after @can_use_minmax_projection.
+ if (keys.empty() && aggregates.size() == 1 && typeid_cast(aggregates[0].function.get()))
+ candidates.only_count_column = aggregates[0].column_name;
+ }
}
if (!candidates.minmax_projection)
@@ -584,13 +593,21 @@ std::optional optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
ContextPtr context = reading->getContext();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
AggregateProjectionCandidate * best_candidate = nullptr;
+
+ /// Stores row count from exact ranges of parts.
+ size_t exact_count = 0;
+
if (candidates.minmax_projection)
{
best_candidate = &candidates.minmax_projection->candidate;
}
- else if (!candidates.real.empty())
+ else if (!candidates.real.empty() || !candidates.only_count_column.empty())
{
- auto ordinary_reading_select_result = reading->selectRangesToRead();
+ auto ordinary_reading_select_result = reading->getAnalyzedResult();
+ bool find_exact_ranges = !candidates.only_count_column.empty();
+ if (!ordinary_reading_select_result || (!ordinary_reading_select_result->has_exact_ranges && find_exact_ranges))
+ ordinary_reading_select_result = reading->selectRangesToRead(find_exact_ranges);
+
size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
/// Nothing to read. Ignore projections.
@@ -600,7 +617,49 @@ std::optional optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
return {};
}
- const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
+ auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
+
+ if (!candidates.only_count_column.empty())
+ {
+ for (auto & part_with_ranges : parts_with_ranges)
+ {
+ MarkRanges new_ranges;
+ auto & ranges = part_with_ranges.ranges;
+ const auto & exact_ranges = part_with_ranges.exact_ranges;
+ if (exact_ranges.empty())
+ continue;
+
+ size_t i = 0;
+ size_t len = exact_ranges.size();
+ for (auto & range : ranges)
+ {
+ while (i < len && exact_ranges[i].begin < range.end)
+ {
+ chassert(exact_ranges[i].begin >= range.begin);
+ chassert(exact_ranges[i].end <= range.end);
+
+ /// Found some marks which are not exact
+ if (range.begin < exact_ranges[i].begin)
+ new_ranges.emplace_back(range.begin, exact_ranges[i].begin);
+
+ range.begin = exact_ranges[i].end;
+ ordinary_reading_marks -= exact_ranges[i].end - exact_ranges[i].begin;
+ exact_count += part_with_ranges.data_part->index_granularity.getRowsCountInRange(exact_ranges[i]);
+ ++i;
+ }
+
+ /// Current range still contains some marks which are not exact
+ if (range.begin < range.end)
+ new_ranges.emplace_back(range);
+ }
+ chassert(i == len);
+ part_with_ranges.ranges = std::move(new_ranges);
+ }
+
+ std::erase_if(parts_with_ranges, [&](const auto & part_with_ranges) { return part_with_ranges.ranges.empty(); });
+ if (parts_with_ranges.empty())
+ chassert(ordinary_reading_marks == 0);
+ }
/// Selecting best candidate.
for (auto & candidate : candidates.real)
@@ -630,8 +689,20 @@ std::optional optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
if (!best_candidate)
{
- reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
- return {};
+ if (exact_count > 0)
+ {
+ if (ordinary_reading_marks > 0)
+ {
+ ordinary_reading_select_result->selected_marks = ordinary_reading_marks;
+ ordinary_reading_select_result->selected_rows -= exact_count;
+ reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
+ }
+ }
+ else
+ {
+ reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
+ return {};
+ }
}
}
else
@@ -639,10 +710,11 @@ std::optional optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
return {};
}
- chassert(best_candidate != nullptr);
-
QueryPlanStepPtr projection_reading;
bool has_ordinary_parts;
+ String selected_projection_name;
+ if (best_candidate)
+ selected_projection_name = best_candidate->projection->name;
/// Add reading from projection step.
if (candidates.minmax_projection)
@@ -654,6 +726,32 @@ std::optional optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
projection_reading = std::make_unique(std::move(pipe));
has_ordinary_parts = false;
}
+ else if (best_candidate == nullptr)
+ {
+ chassert(exact_count > 0);
+
+ auto agg_count = std::make_shared(DataTypes{});
+
+ std::vector state(agg_count->sizeOfData());
+ AggregateDataPtr place = state.data();
+ agg_count->create(place);
+ SCOPE_EXIT_MEMORY_SAFE(agg_count->destroy(place));
+ agg_count->set(place, exact_count);
+
+ auto column = ColumnAggregateFunction::create(agg_count);
+ column->insertFrom(place);
+
+ Block block_with_count{
+ {std::move(column),
+ std::make_shared(agg_count, DataTypes{}, Array{}),
+ candidates.only_count_column}};
+
+ Pipe pipe(std::make_shared(std::move(block_with_count)));
+ projection_reading = std::make_unique(std::move(pipe));
+
+ selected_projection_name = "Optimized trivial count";
+ has_ordinary_parts = reading->getAnalyzedResult() != nullptr;
+ }
else
{
auto storage_snapshot = reading->getStorageSnapshot();
@@ -694,46 +792,54 @@ std::optional