Merge remote-tracking branch 'origin/master' into pr-local-plan

This commit is contained in:
Igor Nikonov 2024-06-11 14:38:21 +00:00
commit a473ecd004
81 changed files with 1915 additions and 847 deletions

2
contrib/googletest vendored

@ -1 +1 @@
Subproject commit e47544ad31cb3ceecd04cc13e8fe556f8df9fe0b
Subproject commit a7f443b80b105f940225332ed3c31f2790092f47

View File

@ -2165,6 +2165,8 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [output_format_parquet_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_fixed_string_as_fixed_byte_array) - use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary/String for FixedString columns. Default value - `true`.
- [output_format_parquet_version](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_version) - The version of Parquet format used in output format. Default value - `2.latest`.
- [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `lz4`.
- [input_format_parquet_max_block_size](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_max_block_size) - Max block row size for parquet reader. Default value - `65409`.
- [input_format_parquet_prefer_block_bytes](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_prefer_block_bytes) - Average block bytes output by parquet reader. Default value - `16744704`.
## ParquetMetadata {data-format-parquet-metadata}

View File

@ -67,6 +67,23 @@ To manage named collections with DDL a user must have the `named_control_collect
In the above example the `password_sha256_hex` value is the hexadecimal representation of the SHA256 hash of the password. This configuration for the user `default` has the attribute `replace=true` as in the default configuration has a plain text `password` set, and it is not possible to have both plain text and sha256 hex passwords set for a user.
:::
### Storage for named collections
Named collections can either be stored on local disk or in zookeeper/keeper. By default local storage is used.
To configure named collections storage in keeper and a `type` (equal to either `keeper` or `zookeeper`) and `path` (path in keeper, where named collections will be stored) to `named_collections_storage` section in configuration file:
```
<clickhouse>
<named_collections_storage>
<type>zookeeper</type>
<path>/named_collections_path/</path>
<update_timeout_ms>1000</update_timeout_ms>
</named_collections_storage>
</clickhouse>
```
An optional configuration parameter `update_timeout_ms` by default is equal to `5000`.
## Storing named collections in configuration files
### XML example

View File

@ -1417,6 +1417,17 @@ Compression method used in output Parquet format. Supported codecs: `snappy`, `l
Default value: `lz4`.
### input_format_parquet_max_block_size {#input_format_parquet_max_block_size}
Max block row size for parquet reader. By controlling the number of rows in each block, you can control the memory usage,
and in some operators that cache blocks, you can improve the accuracy of the operator's memory control。
Default value: `65409`.
### input_format_parquet_prefer_block_bytes {#input_format_parquet_prefer_block_bytes}
Average block bytes output by parquet reader. Lowering the configuration in the case of reading some high compression parquet relieves the memory pressure.
Default value: `65409 * 256 = 16744704`
## Hive format settings {#hive-format-settings}
### input_format_hive_text_fields_delimiter {#input_format_hive_text_fields_delimiter}

View File

@ -48,6 +48,7 @@
#include <Common/FailPoint.h>
#include <Common/CPUID.h>
#include <Common/HTTPConnectionPool.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
#include <Server/waitServersToFinish.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Core/ServerUUID.h>
@ -70,7 +71,6 @@
#include <Storages/System/attachInformationSchemaTables.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/registerRemoteFileMetadatas.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/registerFunctions.h>
@ -1378,7 +1378,7 @@ try
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif
NamedCollectionUtils::loadIfNot();
NamedCollectionFactory::instance().loadIfNot();
/// Initialize main config reloader.
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
@ -1647,7 +1647,7 @@ try
#if USE_SSL
CertificateReloader::instance().tryLoad(*config);
#endif
NamedCollectionUtils::reloadFromConfig(*config);
NamedCollectionFactory::instance().reloadFromConfig(*config);
FileCacheFactory::instance().updateSettingsFromConfig(*config);

View File

@ -1,5 +1,7 @@
#include <Analyzer/FunctionNode.h>
#include <Columns/ColumnConst.h>
#include <Common/SipHash.h>
#include <Common/FieldVisitorToString.h>
@ -58,12 +60,20 @@ ColumnsWithTypeAndName FunctionNode::getArgumentColumns() const
ColumnWithTypeAndName argument_column;
auto * constant = argument->as<ConstantNode>();
if (isNameOfInFunction(function_name) && i == 1)
{
argument_column.type = std::make_shared<DataTypeSet>();
if (constant)
{
/// Created but not filled for the analysis during function resolution.
FutureSetPtr empty_set;
argument_column.column = ColumnConst::create(ColumnSet::create(1, empty_set), 1);
}
}
else
argument_column.type = argument->getResultType();
auto * constant = argument->as<ConstantNode>();
if (constant && !isNotCreatable(argument_column.type))
argument_column.column = argument_column.type->createColumnConst(1, constant->getValue());

View File

@ -551,14 +551,25 @@ private:
in_function->getArguments().getNodes() = std::move(in_arguments);
in_function->resolveAsFunction(in_function_resolver);
DataTypePtr result_type = in_function->getResultType();
const auto * type_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(result_type.get());
if (type_low_cardinality)
result_type = type_low_cardinality->getDictionaryType();
/** For `k :: UInt8`, expression `k = 1 OR k = NULL` with result type Nullable(UInt8)
* is replaced with `k IN (1, NULL)` with result type UInt8.
* Convert it back to Nullable(UInt8).
* And for `k :: LowCardinality(UInt8)`, the transformation of `k IN (1, NULL)` results in type LowCardinality(UInt8).
* Convert it to LowCardinality(Nullable(UInt8)).
*/
if (is_any_nullable && !in_function->getResultType()->isNullable())
if (is_any_nullable && !result_type->isNullable())
{
auto nullable_result_type = std::make_shared<DataTypeNullable>(in_function->getResultType());
auto in_function_nullable = createCastFunction(std::move(in_function), std::move(nullable_result_type), getContext());
DataTypePtr new_result_type = std::make_shared<DataTypeNullable>(result_type);
if (type_low_cardinality)
{
new_result_type = std::make_shared<DataTypeLowCardinality>(new_result_type);
}
auto in_function_nullable = createCastFunction(std::move(in_function), std::move(new_result_type), getContext());
or_operands.push_back(std::move(in_function_nullable));
}
else

View File

@ -54,9 +54,9 @@ namespace
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
settings.auth_settings.region,
context->getRemoteHostFilter(),
static_cast<unsigned>(global_settings.s3_max_redirects),
static_cast<unsigned>(global_settings.s3_retry_attempts),
global_settings.enable_s3_requests_logging,
static_cast<unsigned>(local_settings.s3_max_redirects),
static_cast<unsigned>(local_settings.backup_restore_s3_retry_attempts),
local_settings.enable_s3_requests_logging,
/* for_disk_s3 = */ false,
request_settings.get_request_throttler,
request_settings.put_request_throttler,

View File

@ -45,14 +45,17 @@ struct ProtocolServerMetrics
};
/** Periodically (by default, each second)
* calculates and updates some metrics,
* that are not updated automatically (so, need to be asynchronously calculated).
* calculates and updates some metrics,
* that are not updated automatically (so, need to be asynchronously calculated).
*
* This includes both ClickHouse-related metrics (like memory usage of ClickHouse process)
* and common OS-related metrics (like total memory usage on the server).
* This includes both general process metrics (like memory usage)
* and common OS-related metrics (like total memory usage on the server).
*
* All the values are either gauge type (like the total number of tables, the current memory usage).
* Or delta-counters representing some accumulation during the interval of time.
*
* Server and Keeper specific metrics are contained inside
* ServerAsynchronousMetrics and KeeperAsynchronousMetrics respectively.
*/
class AsynchronousMetrics
{

View File

@ -1,484 +0,0 @@
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/FieldVisitorToString.h>
#include <Common/logger_useful.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTCreateNamedCollectionQuery.h>
#include <Parsers/ASTAlterNamedCollectionQuery.h>
#include <Parsers/ASTDropNamedCollectionQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Interpreters/Context.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int NAMED_COLLECTION_ALREADY_EXISTS;
extern const int NAMED_COLLECTION_DOESNT_EXIST;
extern const int BAD_ARGUMENTS;
}
namespace NamedCollectionUtils
{
static std::atomic<bool> is_loaded_from_config = false;
static std::atomic<bool> is_loaded_from_sql = false;
class LoadFromConfig
{
private:
const Poco::Util::AbstractConfiguration & config;
public:
explicit LoadFromConfig(const Poco::Util::AbstractConfiguration & config_)
: config(config_) {}
std::vector<std::string> listCollections() const
{
Poco::Util::AbstractConfiguration::Keys collections_names;
config.keys(NAMED_COLLECTIONS_CONFIG_PREFIX, collections_names);
return collections_names;
}
NamedCollectionsMap getAll() const
{
NamedCollectionsMap result;
for (const auto & collection_name : listCollections())
{
if (result.contains(collection_name))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Found duplicate named collection `{}`",
collection_name);
}
result.emplace(collection_name, get(collection_name));
}
return result;
}
MutableNamedCollectionPtr get(const std::string & collection_name) const
{
const auto collection_prefix = getCollectionPrefix(collection_name);
std::queue<std::string> enumerate_input;
std::set<std::string, std::less<>> enumerate_result;
enumerate_input.push(collection_prefix);
NamedCollectionConfiguration::listKeys(config, std::move(enumerate_input), enumerate_result, -1);
/// Collection does not have any keys.
/// (`enumerate_result` == <collection_path>).
const bool collection_is_empty = enumerate_result.size() == 1
&& *enumerate_result.begin() == collection_prefix;
std::set<std::string, std::less<>> keys;
if (!collection_is_empty)
{
/// Skip collection prefix and add +1 to avoid '.' in the beginning.
for (const auto & path : enumerate_result)
keys.emplace(path.substr(collection_prefix.size() + 1));
}
return NamedCollection::create(
config, collection_name, collection_prefix, keys, SourceId::CONFIG, /* is_mutable */false);
}
private:
static constexpr auto NAMED_COLLECTIONS_CONFIG_PREFIX = "named_collections";
static std::string getCollectionPrefix(const std::string & collection_name)
{
return fmt::format("{}.{}", NAMED_COLLECTIONS_CONFIG_PREFIX, collection_name);
}
};
class LoadFromSQL : private WithContext
{
private:
const std::string metadata_path;
public:
explicit LoadFromSQL(ContextPtr context_)
: WithContext(context_)
, metadata_path(fs::weakly_canonical(context_->getPath()) / NAMED_COLLECTIONS_METADATA_DIRECTORY)
{
if (fs::exists(metadata_path))
cleanup();
}
std::vector<std::string> listCollections() const
{
if (!fs::exists(metadata_path))
return {};
std::vector<std::string> collection_names;
fs::directory_iterator it{metadata_path};
for (; it != fs::directory_iterator{}; ++it)
{
const auto & current_path = it->path();
if (current_path.extension() == ".sql")
{
collection_names.push_back(it->path().stem());
}
else
{
LOG_WARNING(
getLogger("NamedCollectionsLoadFromSQL"),
"Unexpected file {} in named collections directory",
current_path.filename().string());
}
}
return collection_names;
}
NamedCollectionsMap getAll() const
{
NamedCollectionsMap result;
for (const auto & collection_name : listCollections())
{
if (result.contains(collection_name))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Found duplicate named collection `{}`",
collection_name);
}
result.emplace(collection_name, get(collection_name));
}
return result;
}
MutableNamedCollectionPtr get(const std::string & collection_name) const
{
const auto query = readCreateQueryFromMetadata(
getMetadataPath(collection_name),
getContext()->getSettingsRef());
return createNamedCollectionFromAST(query);
}
MutableNamedCollectionPtr create(const ASTCreateNamedCollectionQuery & query)
{
writeCreateQueryToMetadata(
query,
getMetadataPath(query.collection_name),
getContext()->getSettingsRef());
return createNamedCollectionFromAST(query);
}
void update(const ASTAlterNamedCollectionQuery & query)
{
const auto path = getMetadataPath(query.collection_name);
auto create_query = readCreateQueryFromMetadata(path, getContext()->getSettings());
std::unordered_map<std::string, Field> result_changes_map;
for (const auto & [name, value] : query.changes)
{
auto [it, inserted] = result_changes_map.emplace(name, value);
if (!inserted)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Value with key `{}` is used twice in the SET query (collection name: {})",
name, query.collection_name);
}
}
for (const auto & [name, value] : create_query.changes)
result_changes_map.emplace(name, value);
std::unordered_map<std::string, bool> result_overridability_map;
for (const auto & [name, value] : query.overridability)
result_overridability_map.emplace(name, value);
for (const auto & [name, value] : create_query.overridability)
result_overridability_map.emplace(name, value);
for (const auto & delete_key : query.delete_keys)
{
auto it = result_changes_map.find(delete_key);
if (it == result_changes_map.end())
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot delete key `{}` because it does not exist in collection",
delete_key);
}
else
{
result_changes_map.erase(it);
auto it_override = result_overridability_map.find(delete_key);
if (it_override != result_overridability_map.end())
result_overridability_map.erase(it_override);
}
}
create_query.changes.clear();
for (const auto & [name, value] : result_changes_map)
create_query.changes.emplace_back(name, value);
create_query.overridability = std::move(result_overridability_map);
if (create_query.changes.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Named collection cannot be empty (collection name: {})",
query.collection_name);
writeCreateQueryToMetadata(
create_query,
getMetadataPath(query.collection_name),
getContext()->getSettingsRef(),
true);
}
void remove(const std::string & collection_name)
{
auto collection_path = getMetadataPath(collection_name);
if (!fs::exists(collection_path))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
collection_name);
}
(void)fs::remove(collection_path);
}
private:
static constexpr auto NAMED_COLLECTIONS_METADATA_DIRECTORY = "named_collections";
static MutableNamedCollectionPtr createNamedCollectionFromAST(
const ASTCreateNamedCollectionQuery & query)
{
const auto & collection_name = query.collection_name;
const auto config = NamedCollectionConfiguration::createConfiguration(collection_name, query.changes, query.overridability);
std::set<std::string, std::less<>> keys;
for (const auto & [name, _] : query.changes)
keys.insert(name);
return NamedCollection::create(
*config, collection_name, "", keys, SourceId::SQL, /* is_mutable */true);
}
std::string getMetadataPath(const std::string & collection_name) const
{
return fs::path(metadata_path) / (escapeForFileName(collection_name) + ".sql");
}
/// Delete .tmp files. They could be left undeleted in case of
/// some exception or abrupt server restart.
void cleanup()
{
fs::directory_iterator it{metadata_path};
std::vector<std::string> files_to_remove;
for (; it != fs::directory_iterator{}; ++it)
{
const auto & current_path = it->path();
if (current_path.extension() == ".tmp")
files_to_remove.push_back(current_path);
}
for (const auto & file : files_to_remove)
(void)fs::remove(file);
}
static ASTCreateNamedCollectionQuery readCreateQueryFromMetadata(
const std::string & path,
const Settings & settings)
{
ReadBufferFromFile in(path);
std::string query;
readStringUntilEOF(query, in);
ParserCreateNamedCollectionQuery parser;
auto ast = parseQuery(parser, query, "in file " + path, 0, settings.max_parser_depth, settings.max_parser_backtracks);
const auto & create_query = ast->as<const ASTCreateNamedCollectionQuery &>();
return create_query;
}
void writeCreateQueryToMetadata(
const ASTCreateNamedCollectionQuery & query,
const std::string & path,
const Settings & settings,
bool replace = false) const
{
if (!replace && fs::exists(path))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Metadata file {} for named collection already exists",
path);
}
fs::create_directories(metadata_path);
auto tmp_path = path + ".tmp";
String formatted_query = serializeAST(query);
WriteBufferFromFile out(tmp_path, formatted_query.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(formatted_query, out);
out.next();
if (settings.fsync_metadata)
out.sync();
out.close();
fs::rename(tmp_path, path);
}
};
std::unique_lock<std::mutex> lockNamedCollectionsTransaction()
{
static std::mutex transaction_lock;
return std::unique_lock(transaction_lock);
}
void loadFromConfigUnlocked(const Poco::Util::AbstractConfiguration & config, std::unique_lock<std::mutex> &)
{
auto named_collections = LoadFromConfig(config).getAll();
LOG_TRACE(
getLogger("NamedCollectionsUtils"),
"Loaded {} collections from config", named_collections.size());
NamedCollectionFactory::instance().add(std::move(named_collections));
is_loaded_from_config = true;
}
void loadFromConfig(const Poco::Util::AbstractConfiguration & config)
{
auto lock = lockNamedCollectionsTransaction();
loadFromConfigUnlocked(config, lock);
}
void reloadFromConfig(const Poco::Util::AbstractConfiguration & config)
{
auto lock = lockNamedCollectionsTransaction();
auto collections = LoadFromConfig(config).getAll();
auto & instance = NamedCollectionFactory::instance();
instance.removeById(SourceId::CONFIG);
instance.add(collections);
is_loaded_from_config = true;
}
void loadFromSQLUnlocked(ContextPtr context, std::unique_lock<std::mutex> &)
{
auto named_collections = LoadFromSQL(context).getAll();
LOG_TRACE(
getLogger("NamedCollectionsUtils"),
"Loaded {} collections from SQL", named_collections.size());
NamedCollectionFactory::instance().add(std::move(named_collections));
is_loaded_from_sql = true;
}
void loadFromSQL(ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadFromSQLUnlocked(context, lock);
}
void loadIfNotUnlocked(std::unique_lock<std::mutex> & lock)
{
auto global_context = Context::getGlobalContextInstance();
if (!is_loaded_from_config)
loadFromConfigUnlocked(global_context->getConfigRef(), lock);
if (!is_loaded_from_sql)
loadFromSQLUnlocked(global_context, lock);
}
void loadIfNot()
{
if (is_loaded_from_sql && is_loaded_from_config)
return;
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
}
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
auto & instance = NamedCollectionFactory::instance();
if (!instance.exists(query.collection_name))
{
if (!query.if_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
query.collection_name);
}
return;
}
LoadFromSQL(context).remove(query.collection_name);
instance.remove(query.collection_name);
}
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
auto & instance = NamedCollectionFactory::instance();
if (instance.exists(query.collection_name))
{
if (!query.if_not_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"A named collection `{}` already exists",
query.collection_name);
}
return;
}
instance.add(query.collection_name, LoadFromSQL(context).create(query));
}
void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
auto & instance = NamedCollectionFactory::instance();
if (!instance.exists(query.collection_name))
{
if (!query.if_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
query.collection_name);
}
return;
}
LoadFromSQL(context).update(query);
auto collection = instance.getMutable(query.collection_name);
auto collection_lock = collection->lock();
for (const auto & [name, value] : query.changes)
{
auto it_override = query.overridability.find(name);
if (it_override != query.overridability.end())
collection->setOrUpdate<String, true>(name, convertFieldToString(value), it_override->second);
else
collection->setOrUpdate<String, true>(name, convertFieldToString(value), {});
}
for (const auto & key : query.delete_keys)
collection->remove<true>(key);
}
}
}

View File

@ -1,42 +0,0 @@
#pragma once
#include <Interpreters/Context_fwd.h>
namespace Poco { namespace Util { class AbstractConfiguration; } }
namespace DB
{
class ASTCreateNamedCollectionQuery;
class ASTAlterNamedCollectionQuery;
class ASTDropNamedCollectionQuery;
namespace NamedCollectionUtils
{
enum class SourceId : uint8_t
{
NONE = 0,
CONFIG = 1,
SQL = 2,
};
void loadFromConfig(const Poco::Util::AbstractConfiguration & config);
void reloadFromConfig(const Poco::Util::AbstractConfiguration & config);
/// Load named collections from `context->getPath() / named_collections /`.
void loadFromSQL(ContextPtr context);
/// Remove collection as well as its metadata from `context->getPath() / named_collections /`.
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context);
/// Create a new collection from AST and put it to `context->getPath() / named_collections /`.
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context);
/// Update definition of already existing collection from AST and update result in `context->getPath() / named_collections /`.
void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context);
void loadIfNot();
}
}

View File

@ -4,7 +4,6 @@
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -297,7 +296,7 @@ MutableNamedCollectionPtr NamedCollection::duplicate() const
auto impl = pimpl->createCopy(collection_name);
return std::unique_ptr<NamedCollection>(
new NamedCollection(
std::move(impl), collection_name, NamedCollectionUtils::SourceId::NONE, true));
std::move(impl), collection_name, SourceId::NONE, true));
}
NamedCollection::Keys NamedCollection::getKeys(ssize_t depth, const std::string & prefix) const

View File

@ -1,7 +1,6 @@
#pragma once
#include <Interpreters/Context.h>
#include <Common/NamedCollections/NamedCollections_fwd.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
namespace Poco { namespace Util { class AbstractConfiguration; } }
@ -23,7 +22,12 @@ class NamedCollection
public:
using Key = std::string;
using Keys = std::set<Key, std::less<>>;
using SourceId = NamedCollectionUtils::SourceId;
enum class SourceId : uint8_t
{
NONE = 0,
CONFIG = 1,
SQL = 2,
};
static MutableNamedCollectionPtr create(
const Poco::Util::AbstractConfiguration & config,

View File

@ -1,5 +1,7 @@
#include <Common/NamedCollections/NamedCollectionsFactory.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
#include <Common/NamedCollections/NamedCollectionsMetadataStorage.h>
#include <base/sleep.h>
namespace DB
{
@ -17,23 +19,29 @@ NamedCollectionFactory & NamedCollectionFactory::instance()
return instance;
}
NamedCollectionFactory::~NamedCollectionFactory()
{
shutdown();
}
void NamedCollectionFactory::shutdown()
{
shutdown_called = true;
if (update_task)
update_task->deactivate();
metadata_storage.reset();
}
bool NamedCollectionFactory::exists(const std::string & collection_name) const
{
std::lock_guard lock(mutex);
return existsUnlocked(collection_name, lock);
}
bool NamedCollectionFactory::existsUnlocked(
const std::string & collection_name,
std::lock_guard<std::mutex> & /* lock */) const
{
return loaded_named_collections.contains(collection_name);
return exists(collection_name, lock);
}
NamedCollectionPtr NamedCollectionFactory::get(const std::string & collection_name) const
{
std::lock_guard lock(mutex);
auto collection = tryGetUnlocked(collection_name, lock);
auto collection = tryGet(collection_name, lock);
if (!collection)
{
throw Exception(
@ -47,14 +55,35 @@ NamedCollectionPtr NamedCollectionFactory::get(const std::string & collection_na
NamedCollectionPtr NamedCollectionFactory::tryGet(const std::string & collection_name) const
{
std::lock_guard lock(mutex);
return tryGetUnlocked(collection_name, lock);
return tryGet(collection_name, lock);
}
NamedCollectionsMap NamedCollectionFactory::getAll() const
{
std::lock_guard lock(mutex);
return loaded_named_collections;
}
bool NamedCollectionFactory::exists(const std::string & collection_name, std::lock_guard<std::mutex> &) const
{
return loaded_named_collections.contains(collection_name);
}
MutableNamedCollectionPtr NamedCollectionFactory::tryGet(
const std::string & collection_name,
std::lock_guard<std::mutex> &) const
{
auto it = loaded_named_collections.find(collection_name);
if (it == loaded_named_collections.end())
return nullptr;
return it->second;
}
MutableNamedCollectionPtr NamedCollectionFactory::getMutable(
const std::string & collection_name) const
const std::string & collection_name,
std::lock_guard<std::mutex> & lock) const
{
std::lock_guard lock(mutex);
auto collection = tryGetUnlocked(collection_name, lock);
auto collection = tryGet(collection_name, lock);
if (!collection)
{
throw Exception(
@ -73,35 +102,10 @@ MutableNamedCollectionPtr NamedCollectionFactory::getMutable(
return collection;
}
MutableNamedCollectionPtr NamedCollectionFactory::tryGetUnlocked(
const std::string & collection_name,
std::lock_guard<std::mutex> & /* lock */) const
{
auto it = loaded_named_collections.find(collection_name);
if (it == loaded_named_collections.end())
return nullptr;
return it->second;
}
void NamedCollectionFactory::add(
const std::string & collection_name,
MutableNamedCollectionPtr collection)
{
std::lock_guard lock(mutex);
addUnlocked(collection_name, collection, lock);
}
void NamedCollectionFactory::add(NamedCollectionsMap collections)
{
std::lock_guard lock(mutex);
for (const auto & [collection_name, collection] : collections)
addUnlocked(collection_name, collection, lock);
}
void NamedCollectionFactory::addUnlocked(
const std::string & collection_name,
MutableNamedCollectionPtr collection,
std::lock_guard<std::mutex> & /* lock */)
std::lock_guard<std::mutex> &)
{
auto [it, inserted] = loaded_named_collections.emplace(collection_name, collection);
if (!inserted)
@ -113,10 +117,15 @@ void NamedCollectionFactory::addUnlocked(
}
}
void NamedCollectionFactory::remove(const std::string & collection_name)
void NamedCollectionFactory::add(NamedCollectionsMap collections, std::lock_guard<std::mutex> & lock)
{
std::lock_guard lock(mutex);
bool removed = removeIfExistsUnlocked(collection_name, lock);
for (const auto & [collection_name, collection] : collections)
add(collection_name, collection, lock);
}
void NamedCollectionFactory::remove(const std::string & collection_name, std::lock_guard<std::mutex> & lock)
{
bool removed = removeIfExists(collection_name, lock);
if (!removed)
{
throw Exception(
@ -126,17 +135,11 @@ void NamedCollectionFactory::remove(const std::string & collection_name)
}
}
void NamedCollectionFactory::removeIfExists(const std::string & collection_name)
{
std::lock_guard lock(mutex);
removeIfExistsUnlocked(collection_name, lock); // NOLINT
}
bool NamedCollectionFactory::removeIfExistsUnlocked(
bool NamedCollectionFactory::removeIfExists(
const std::string & collection_name,
std::lock_guard<std::mutex> & lock)
{
auto collection = tryGetUnlocked(collection_name, lock);
auto collection = tryGet(collection_name, lock);
if (!collection)
return false;
@ -152,18 +155,246 @@ bool NamedCollectionFactory::removeIfExistsUnlocked(
return true;
}
void NamedCollectionFactory::removeById(NamedCollectionUtils::SourceId id)
void NamedCollectionFactory::removeById(NamedCollection::SourceId id, std::lock_guard<std::mutex> &)
{
std::lock_guard lock(mutex);
std::erase_if(
loaded_named_collections,
[&](const auto & value) { return value.second->getSourceId() == id; });
}
NamedCollectionsMap NamedCollectionFactory::getAll() const
namespace
{
constexpr auto NAMED_COLLECTIONS_CONFIG_PREFIX = "named_collections";
std::vector<std::string> listCollections(const Poco::Util::AbstractConfiguration & config)
{
Poco::Util::AbstractConfiguration::Keys collections_names;
config.keys(NAMED_COLLECTIONS_CONFIG_PREFIX, collections_names);
return collections_names;
}
MutableNamedCollectionPtr getCollection(
const Poco::Util::AbstractConfiguration & config,
const std::string & collection_name)
{
const auto collection_prefix = fmt::format("{}.{}", NAMED_COLLECTIONS_CONFIG_PREFIX, collection_name);
std::queue<std::string> enumerate_input;
std::set<std::string, std::less<>> enumerate_result;
enumerate_input.push(collection_prefix);
NamedCollectionConfiguration::listKeys(config, std::move(enumerate_input), enumerate_result, -1);
/// Collection does not have any keys. (`enumerate_result` == <collection_path>).
const bool collection_is_empty = enumerate_result.size() == 1
&& *enumerate_result.begin() == collection_prefix;
std::set<std::string, std::less<>> keys;
if (!collection_is_empty)
{
/// Skip collection prefix and add +1 to avoid '.' in the beginning.
for (const auto & path : enumerate_result)
keys.emplace(path.substr(collection_prefix.size() + 1));
}
return NamedCollection::create(
config, collection_name, collection_prefix, keys, NamedCollection::SourceId::CONFIG, /* is_mutable */false);
}
NamedCollectionsMap getNamedCollections(const Poco::Util::AbstractConfiguration & config)
{
NamedCollectionsMap result;
for (const auto & collection_name : listCollections(config))
{
if (result.contains(collection_name))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Found duplicate named collection `{}`",
collection_name);
}
result.emplace(collection_name, getCollection(config, collection_name));
}
return result;
}
}
void NamedCollectionFactory::loadIfNot()
{
std::lock_guard lock(mutex);
return loaded_named_collections;
loadIfNot(lock);
}
bool NamedCollectionFactory::loadIfNot(std::lock_guard<std::mutex> & lock)
{
if (loaded)
return false;
auto context = Context::getGlobalContextInstance();
metadata_storage = NamedCollectionsMetadataStorage::create(context);
loadFromConfig(context->getConfigRef(), lock);
loadFromSQL(lock);
if (metadata_storage->supportsPeriodicUpdate())
{
update_task = context->getSchedulePool().createTask("NamedCollectionsMetadataStorage", [this]{ updateFunc(); });
update_task->activate();
update_task->schedule();
}
loaded = true;
return true;
}
void NamedCollectionFactory::loadFromConfig(const Poco::Util::AbstractConfiguration & config, std::lock_guard<std::mutex> & lock)
{
auto collections = getNamedCollections(config);
LOG_TEST(log, "Loaded {} collections from config", collections.size());
add(std::move(collections), lock);
}
void NamedCollectionFactory::reloadFromConfig(const Poco::Util::AbstractConfiguration & config)
{
std::lock_guard lock(mutex);
if (loadIfNot(lock))
return;
auto collections = getNamedCollections(config);
LOG_TEST(log, "Loaded {} collections from config", collections.size());
removeById(NamedCollection::SourceId::CONFIG, lock);
add(std::move(collections), lock);
}
void NamedCollectionFactory::loadFromSQL(std::lock_guard<std::mutex> & lock)
{
auto collections = metadata_storage->getAll();
LOG_TEST(log, "Loaded {} collections from sql", collections.size());
add(std::move(collections), lock);
}
void NamedCollectionFactory::createFromSQL(const ASTCreateNamedCollectionQuery & query)
{
std::lock_guard lock(mutex);
loadIfNot(lock);
if (exists(query.collection_name, lock))
{
if (query.if_not_exists)
return;
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"A named collection `{}` already exists",
query.collection_name);
}
add(query.collection_name, metadata_storage->create(query), lock);
}
void NamedCollectionFactory::removeFromSQL(const ASTDropNamedCollectionQuery & query)
{
std::lock_guard lock(mutex);
loadIfNot(lock);
if (!exists(query.collection_name, lock))
{
if (query.if_exists)
return;
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
query.collection_name);
}
metadata_storage->remove(query.collection_name);
remove(query.collection_name, lock);
}
void NamedCollectionFactory::updateFromSQL(const ASTAlterNamedCollectionQuery & query)
{
std::lock_guard lock(mutex);
loadIfNot(lock);
if (!exists(query.collection_name, lock))
{
if (query.if_exists)
return;
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
query.collection_name);
}
metadata_storage->update(query);
auto collection = getMutable(query.collection_name, lock);
auto collection_lock = collection->lock();
for (const auto & [name, value] : query.changes)
{
auto it_override = query.overridability.find(name);
if (it_override != query.overridability.end())
collection->setOrUpdate<String, true>(name, convertFieldToString(value), it_override->second);
else
collection->setOrUpdate<String, true>(name, convertFieldToString(value), {});
}
for (const auto & key : query.delete_keys)
collection->remove<true>(key);
}
void NamedCollectionFactory::reloadFromSQL()
{
std::lock_guard lock(mutex);
if (loadIfNot(lock))
return;
auto collections = metadata_storage->getAll();
removeById(NamedCollection::SourceId::SQL, lock);
add(std::move(collections), lock);
}
void NamedCollectionFactory::updateFunc()
{
LOG_TRACE(log, "Named collections background updating thread started");
while (!shutdown_called.load())
{
if (metadata_storage->waitUpdate())
{
try
{
reloadFromSQL();
}
catch (const Coordination::Exception & e)
{
if (Coordination::isHardwareError(e.code))
{
LOG_INFO(log, "Lost ZooKeeper connection, will try to connect again: {}",
DB::getCurrentExceptionMessage(true));
sleepForSeconds(1);
}
else
{
tryLogCurrentException(__PRETTY_FUNCTION__);
chassert(false);
}
continue;
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
chassert(false);
continue;
}
}
}
LOG_TRACE(log, "Named collections background updating thread finished");
}
}

View File

@ -1,58 +1,83 @@
#pragma once
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/NamedCollections/NamedCollectionsMetadataStorage.h>
#include <Common/logger_useful.h>
namespace DB
{
class ASTCreateNamedCollectionQuery;
class ASTDropNamedCollectionQuery;
class ASTAlterNamedCollectionQuery;
class NamedCollectionFactory : boost::noncopyable
{
public:
static NamedCollectionFactory & instance();
~NamedCollectionFactory();
bool exists(const std::string & collection_name) const;
NamedCollectionPtr get(const std::string & collection_name) const;
NamedCollectionPtr tryGet(const std::string & collection_name) const;
MutableNamedCollectionPtr getMutable(const std::string & collection_name) const;
void add(const std::string & collection_name, MutableNamedCollectionPtr collection);
void add(NamedCollectionsMap collections);
void update(NamedCollectionsMap collections);
void remove(const std::string & collection_name);
void removeIfExists(const std::string & collection_name);
void removeById(NamedCollectionUtils::SourceId id);
NamedCollectionsMap getAll() const;
private:
bool existsUnlocked(
const std::string & collection_name,
std::lock_guard<std::mutex> & lock) const;
void reloadFromConfig(const Poco::Util::AbstractConfiguration & config);
MutableNamedCollectionPtr tryGetUnlocked(
const std::string & collection_name,
std::lock_guard<std::mutex> & lock) const;
void reloadFromSQL();
void addUnlocked(
const std::string & collection_name,
MutableNamedCollectionPtr collection,
std::lock_guard<std::mutex> & lock);
void createFromSQL(const ASTCreateNamedCollectionQuery & query);
bool removeIfExistsUnlocked(
const std::string & collection_name,
std::lock_guard<std::mutex> & lock);
void removeFromSQL(const ASTDropNamedCollectionQuery & query);
void updateFromSQL(const ASTAlterNamedCollectionQuery & query);
void loadIfNot();
void shutdown();
protected:
mutable NamedCollectionsMap loaded_named_collections;
mutable std::mutex mutex;
bool is_initialized = false;
const LoggerPtr log = getLogger("NamedCollectionFactory");
bool loaded = false;
std::atomic<bool> shutdown_called = false;
std::unique_ptr<NamedCollectionsMetadataStorage> metadata_storage;
BackgroundSchedulePool::TaskHolder update_task;
bool loadIfNot(std::lock_guard<std::mutex> & lock);
bool exists(
const std::string & collection_name,
std::lock_guard<std::mutex> & lock) const;
MutableNamedCollectionPtr getMutable(const std::string & collection_name, std::lock_guard<std::mutex> & lock) const;
void add(const std::string & collection_name, MutableNamedCollectionPtr collection, std::lock_guard<std::mutex> & lock);
void add(NamedCollectionsMap collections, std::lock_guard<std::mutex> & lock);
void update(NamedCollectionsMap collections, std::lock_guard<std::mutex> & lock);
void remove(const std::string & collection_name, std::lock_guard<std::mutex> & lock);
bool removeIfExists(const std::string & collection_name, std::lock_guard<std::mutex> & lock);
MutableNamedCollectionPtr tryGet(const std::string & collection_name, std::lock_guard<std::mutex> & lock) const;
void removeById(NamedCollection::SourceId id, std::lock_guard<std::mutex> & lock);
void loadFromConfig(
const Poco::Util::AbstractConfiguration & config,
std::lock_guard<std::mutex> & lock);
void loadFromSQL(std::lock_guard<std::mutex> & lock);
void updateFunc();
};
}

View File

@ -0,0 +1,519 @@
#include <Common/NamedCollections/NamedCollectionsMetadataStorage.h>
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Interpreters/Context.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int NAMED_COLLECTION_ALREADY_EXISTS;
extern const int NAMED_COLLECTION_DOESNT_EXIST;
extern const int INVALID_CONFIG_PARAMETER;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
static const std::string named_collections_storage_config_path = "named_collections_storage";
namespace
{
MutableNamedCollectionPtr createNamedCollectionFromAST(const ASTCreateNamedCollectionQuery & query)
{
const auto & collection_name = query.collection_name;
const auto config = NamedCollectionConfiguration::createConfiguration(collection_name, query.changes, query.overridability);
std::set<std::string, std::less<>> keys;
for (const auto & [name, _] : query.changes)
keys.insert(name);
return NamedCollection::create(
*config, collection_name, "", keys, NamedCollection::SourceId::SQL, /* is_mutable */true);
}
std::string getFileName(const std::string & collection_name)
{
return escapeForFileName(collection_name) + ".sql";
}
}
class NamedCollectionsMetadataStorage::INamedCollectionsStorage
{
public:
virtual ~INamedCollectionsStorage() = default;
virtual bool exists(const std::string & path) const = 0;
virtual std::vector<std::string> list() const = 0;
virtual std::string read(const std::string & path) const = 0;
virtual void write(const std::string & path, const std::string & data, bool replace) = 0;
virtual void remove(const std::string & path) = 0;
virtual bool removeIfExists(const std::string & path) = 0;
virtual bool supportsPeriodicUpdate() const = 0;
virtual bool waitUpdate(size_t /* timeout */) { return false; }
};
class NamedCollectionsMetadataStorage::LocalStorage : public INamedCollectionsStorage, private WithContext
{
private:
std::string root_path;
public:
LocalStorage(ContextPtr context_, const std::string & path_)
: WithContext(context_)
, root_path(path_)
{
if (fs::exists(root_path))
cleanup();
}
~LocalStorage() override = default;
bool supportsPeriodicUpdate() const override { return false; }
std::vector<std::string> list() const override
{
if (!fs::exists(root_path))
return {};
std::vector<std::string> elements;
for (fs::directory_iterator it{root_path}; it != fs::directory_iterator{}; ++it)
{
const auto & current_path = it->path();
if (current_path.extension() == ".sql")
{
elements.push_back(it->path());
}
else
{
LOG_WARNING(
getLogger("LocalStorage"),
"Unexpected file {} in named collections directory",
current_path.filename().string());
}
}
return elements;
}
bool exists(const std::string & path) const override
{
return fs::exists(getPath(path));
}
std::string read(const std::string & path) const override
{
ReadBufferFromFile in(getPath(path));
std::string data;
readStringUntilEOF(data, in);
return data;
}
void write(const std::string & path, const std::string & data, bool replace) override
{
if (!replace && fs::exists(path))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Metadata file {} for named collection already exists",
path);
}
fs::create_directories(root_path);
auto tmp_path = getPath(path + ".tmp");
WriteBufferFromFile out(tmp_path, data.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(data, out);
out.next();
if (getContext()->getSettingsRef().fsync_metadata)
out.sync();
out.close();
fs::rename(tmp_path, getPath(path));
}
void remove(const std::string & path) override
{
if (!removeIfExists(getPath(path)))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove `{}`, because it doesn't exist", path);
}
}
bool removeIfExists(const std::string & path) override
{
return fs::remove(getPath(path));
}
private:
std::string getPath(const std::string & path) const
{
return fs::path(root_path) / path;
}
/// Delete .tmp files. They could be left undeleted in case of
/// some exception or abrupt server restart.
void cleanup()
{
std::vector<std::string> files_to_remove;
for (fs::directory_iterator it{root_path}; it != fs::directory_iterator{}; ++it)
{
const auto & current_path = it->path();
if (current_path.extension() == ".tmp")
files_to_remove.push_back(current_path);
}
for (const auto & file : files_to_remove)
fs::remove(file);
}
};
class NamedCollectionsMetadataStorage::ZooKeeperStorage : public INamedCollectionsStorage, private WithContext
{
private:
std::string root_path;
mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr};
mutable zkutil::EventPtr wait_event;
mutable Int32 collections_node_cversion = 0;
public:
ZooKeeperStorage(ContextPtr context_, const std::string & path_)
: WithContext(context_)
, root_path(path_)
{
if (root_path.empty())
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Collections path cannot be empty");
if (root_path != "/" && root_path.back() == '/')
root_path.resize(root_path.size() - 1);
if (root_path.front() != '/')
root_path = "/" + root_path;
auto client = getClient();
if (root_path != "/" && !client->exists(root_path))
{
client->createAncestors(root_path);
client->createIfNotExists(root_path, "");
}
}
~ZooKeeperStorage() override = default;
bool supportsPeriodicUpdate() const override { return true; }
/// Return true if children changed.
bool waitUpdate(size_t timeout) override
{
if (!wait_event)
{
/// We did not yet made any list() attempt, so do that.
return true;
}
if (wait_event->tryWait(timeout))
{
/// Children changed before timeout.
return true;
}
std::string res;
Coordination::Stat stat;
if (!getClient()->tryGet(root_path, res, &stat))
{
/// We do create root_path in constructor of this class,
/// so this case is not really possible.
chassert(false);
return false;
}
return stat.cversion != collections_node_cversion;
}
std::vector<std::string> list() const override
{
if (!wait_event)
wait_event = std::make_shared<Poco::Event>();
Coordination::Stat stat;
auto children = getClient()->getChildren(root_path, &stat, wait_event);
collections_node_cversion = stat.cversion;
return children;
}
bool exists(const std::string & path) const override
{
return getClient()->exists(getPath(path));
}
std::string read(const std::string & path) const override
{
return getClient()->get(getPath(path));
}
void write(const std::string & path, const std::string & data, bool replace) override
{
if (replace)
{
getClient()->createOrUpdate(getPath(path), data, zkutil::CreateMode::Persistent);
}
else
{
auto code = getClient()->tryCreate(getPath(path), data, zkutil::CreateMode::Persistent);
if (code == Coordination::Error::ZNODEEXISTS)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Metadata file {} for named collection already exists",
path);
}
}
}
void remove(const std::string & path) override
{
getClient()->remove(getPath(path));
}
bool removeIfExists(const std::string & path) override
{
auto code = getClient()->tryRemove(getPath(path));
if (code == Coordination::Error::ZOK)
return true;
if (code == Coordination::Error::ZNONODE)
return false;
throw Coordination::Exception::fromPath(code, getPath(path));
}
private:
zkutil::ZooKeeperPtr getClient() const
{
if (!zookeeper_client || zookeeper_client->expired())
{
zookeeper_client = getContext()->getZooKeeper();
zookeeper_client->sync(root_path);
}
return zookeeper_client;
}
std::string getPath(const std::string & path) const
{
return fs::path(root_path) / path;
}
};
NamedCollectionsMetadataStorage::NamedCollectionsMetadataStorage(
std::shared_ptr<INamedCollectionsStorage> storage_,
ContextPtr context_)
: WithContext(context_)
, storage(std::move(storage_))
{
}
MutableNamedCollectionPtr NamedCollectionsMetadataStorage::get(const std::string & collection_name) const
{
const auto query = readCreateQuery(collection_name);
return createNamedCollectionFromAST(query);
}
NamedCollectionsMap NamedCollectionsMetadataStorage::getAll() const
{
NamedCollectionsMap result;
for (const auto & collection_name : listCollections())
{
if (result.contains(collection_name))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Found duplicate named collection `{}`",
collection_name);
}
result.emplace(collection_name, get(collection_name));
}
return result;
}
MutableNamedCollectionPtr NamedCollectionsMetadataStorage::create(const ASTCreateNamedCollectionQuery & query)
{
writeCreateQuery(query);
return createNamedCollectionFromAST(query);
}
void NamedCollectionsMetadataStorage::remove(const std::string & collection_name)
{
storage->remove(getFileName(collection_name));
}
bool NamedCollectionsMetadataStorage::removeIfExists(const std::string & collection_name)
{
return storage->removeIfExists(getFileName(collection_name));
}
void NamedCollectionsMetadataStorage::update(const ASTAlterNamedCollectionQuery & query)
{
auto create_query = readCreateQuery(query.collection_name);
std::unordered_map<std::string, Field> result_changes_map;
for (const auto & [name, value] : query.changes)
{
auto [it, inserted] = result_changes_map.emplace(name, value);
if (!inserted)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Value with key `{}` is used twice in the SET query (collection name: {})",
name, query.collection_name);
}
}
for (const auto & [name, value] : create_query.changes)
result_changes_map.emplace(name, value);
std::unordered_map<std::string, bool> result_overridability_map;
for (const auto & [name, value] : query.overridability)
result_overridability_map.emplace(name, value);
for (const auto & [name, value] : create_query.overridability)
result_overridability_map.emplace(name, value);
for (const auto & delete_key : query.delete_keys)
{
auto it = result_changes_map.find(delete_key);
if (it == result_changes_map.end())
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot delete key `{}` because it does not exist in collection",
delete_key);
}
else
{
result_changes_map.erase(it);
auto it_override = result_overridability_map.find(delete_key);
if (it_override != result_overridability_map.end())
result_overridability_map.erase(it_override);
}
}
create_query.changes.clear();
for (const auto & [name, value] : result_changes_map)
create_query.changes.emplace_back(name, value);
create_query.overridability = std::move(result_overridability_map);
if (create_query.changes.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Named collection cannot be empty (collection name: {})",
query.collection_name);
chassert(create_query.collection_name == query.collection_name);
writeCreateQuery(create_query, true);
}
std::vector<std::string> NamedCollectionsMetadataStorage::listCollections() const
{
auto paths = storage->list();
std::vector<std::string> collections;
collections.reserve(paths.size());
for (const auto & path : paths)
collections.push_back(std::filesystem::path(path).stem());
return collections;
}
ASTCreateNamedCollectionQuery NamedCollectionsMetadataStorage::readCreateQuery(const std::string & collection_name) const
{
const auto path = getFileName(collection_name);
auto query = storage->read(path);
const auto & settings = getContext()->getSettingsRef();
ParserCreateNamedCollectionQuery parser;
auto ast = parseQuery(parser, query, "in file " + path, 0, settings.max_parser_depth, settings.max_parser_backtracks);
const auto & create_query = ast->as<const ASTCreateNamedCollectionQuery &>();
return create_query;
}
void NamedCollectionsMetadataStorage::writeCreateQuery(const ASTCreateNamedCollectionQuery & query, bool replace)
{
auto normalized_query = query.clone();
auto & changes = typeid_cast<ASTCreateNamedCollectionQuery *>(normalized_query.get())->changes;
::sort(
changes.begin(), changes.end(),
[](const SettingChange & lhs, const SettingChange & rhs) { return lhs.name < rhs.name; });
storage->write(getFileName(query.collection_name), serializeAST(*normalized_query), replace);
}
bool NamedCollectionsMetadataStorage::supportsPeriodicUpdate() const
{
return storage->supportsPeriodicUpdate();
}
bool NamedCollectionsMetadataStorage::waitUpdate()
{
if (!storage->supportsPeriodicUpdate())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Periodic updates are not supported");
const auto & config = Context::getGlobalContextInstance()->getConfigRef();
const size_t timeout = config.getUInt(named_collections_storage_config_path + ".update_timeout_ms", 5000);
return storage->waitUpdate(timeout);
}
std::unique_ptr<NamedCollectionsMetadataStorage> NamedCollectionsMetadataStorage::create(const ContextPtr & context_)
{
const auto & config = context_->getConfigRef();
const auto storage_type = config.getString(named_collections_storage_config_path + ".type", "local");
if (storage_type == "local")
{
const auto path = config.getString(
named_collections_storage_config_path + ".path",
std::filesystem::path(context_->getPath()) / "named_collections");
LOG_TRACE(getLogger("NamedCollectionsMetadataStorage"),
"Using local storage for named collections at path: {}", path);
auto local_storage = std::make_unique<NamedCollectionsMetadataStorage::LocalStorage>(context_, path);
return std::unique_ptr<NamedCollectionsMetadataStorage>(
new NamedCollectionsMetadataStorage(std::move(local_storage), context_));
}
if (storage_type == "zookeeper" || storage_type == "keeper")
{
const auto path = config.getString(named_collections_storage_config_path + ".path");
auto zk_storage = std::make_unique<NamedCollectionsMetadataStorage::ZooKeeperStorage>(context_, path);
LOG_TRACE(getLogger("NamedCollectionsMetadataStorage"),
"Using zookeeper storage for named collections at path: {}", path);
return std::unique_ptr<NamedCollectionsMetadataStorage>(
new NamedCollectionsMetadataStorage(std::move(zk_storage), context_));
}
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Unknown storage for named collections: {}", storage_type);
}
}

View File

@ -0,0 +1,52 @@
#pragma once
#include <Parsers/ASTCreateNamedCollectionQuery.h>
#include <Parsers/ASTAlterNamedCollectionQuery.h>
#include <Parsers/ASTDropNamedCollectionQuery.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Core/BackgroundSchedulePool.h>
namespace DB
{
class NamedCollectionsMetadataStorage : private WithContext
{
public:
static std::unique_ptr<NamedCollectionsMetadataStorage> create(const ContextPtr & context);
NamedCollectionsMap getAll() const;
MutableNamedCollectionPtr get(const std::string & collection_name) const;
MutableNamedCollectionPtr create(const ASTCreateNamedCollectionQuery & query);
void remove(const std::string & collection_name);
bool removeIfExists(const std::string & collection_name);
void update(const ASTAlterNamedCollectionQuery & query);
void shutdown();
/// Return true if update was made
bool waitUpdate();
bool supportsPeriodicUpdate() const;
private:
class INamedCollectionsStorage;
class LocalStorage;
class ZooKeeperStorage;
std::shared_ptr<INamedCollectionsStorage> storage;
NamedCollectionsMetadataStorage(std::shared_ptr<INamedCollectionsStorage> storage_, ContextPtr context_);
std::vector<std::string> listCollections() const;
ASTCreateNamedCollectionQuery readCreateQuery(const std::string & collection_name) const;
void writeCreateQuery(const ASTCreateNamedCollectionQuery & query, bool replace = false);
};
}

View File

@ -1,12 +1,40 @@
#include <Common/tests/gtest_global_context.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/DOM/DOMParser.h>
#include <gtest/gtest.h>
using namespace DB;
/// A class which allows to test private methods of NamedCollectionFactory.
class NamedCollectionFactoryFriend : public NamedCollectionFactory
{
public:
static NamedCollectionFactoryFriend & instance()
{
static NamedCollectionFactoryFriend instance;
return instance;
}
void loadFromConfig(const Poco::Util::AbstractConfiguration & config)
{
std::lock_guard lock(mutex);
NamedCollectionFactory::loadFromConfig(config, lock);
}
void add(const std::string & collection_name, MutableNamedCollectionPtr collection)
{
std::lock_guard lock(mutex);
NamedCollectionFactory::add(collection_name, collection, lock);
}
void remove(const std::string & collection_name)
{
std::lock_guard lock(mutex);
NamedCollectionFactory::remove(collection_name, lock);
}
};
TEST(NamedCollections, SimpleConfig)
{
std::string xml(R"CONFIG(<clickhouse>
@ -29,13 +57,13 @@ TEST(NamedCollections, SimpleConfig)
Poco::AutoPtr<Poco::XML::Document> document = dom_parser.parseString(xml);
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(document);
NamedCollectionUtils::loadFromConfig(*config);
NamedCollectionFactoryFriend::instance().loadFromConfig(*config);
ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection1"));
ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection2"));
ASSERT_TRUE(NamedCollectionFactory::instance().tryGet("collection3") == nullptr);
ASSERT_TRUE(NamedCollectionFactoryFriend::instance().exists("collection1"));
ASSERT_TRUE(NamedCollectionFactoryFriend::instance().exists("collection2"));
ASSERT_TRUE(NamedCollectionFactoryFriend::instance().tryGet("collection3") == nullptr);
auto collections = NamedCollectionFactory::instance().getAll();
auto collections = NamedCollectionFactoryFriend::instance().getAll();
ASSERT_EQ(collections.size(), 2);
ASSERT_TRUE(collections.contains("collection1"));
ASSERT_TRUE(collections.contains("collection2"));
@ -47,7 +75,7 @@ key3: 3.3
key4: -4
)CONFIG");
auto collection1 = NamedCollectionFactory::instance().get("collection1");
auto collection1 = NamedCollectionFactoryFriend::instance().get("collection1");
ASSERT_TRUE(collection1 != nullptr);
ASSERT_TRUE(collection1->get<String>("key1") == "value1");
@ -61,7 +89,7 @@ key5: 5
key6: 6.6
)CONFIG");
auto collection2 = NamedCollectionFactory::instance().get("collection2");
auto collection2 = NamedCollectionFactoryFriend::instance().get("collection2");
ASSERT_TRUE(collection2 != nullptr);
ASSERT_TRUE(collection2->get<String>("key4") == "value4");
@ -69,9 +97,9 @@ key6: 6.6
ASSERT_TRUE(collection2->get<Float64>("key6") == 6.6);
auto collection2_copy = collections["collection2"]->duplicate();
NamedCollectionFactory::instance().add("collection2_copy", collection2_copy);
ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection2_copy"));
ASSERT_EQ(NamedCollectionFactory::instance().get("collection2_copy")->dumpStructure(),
NamedCollectionFactoryFriend::instance().add("collection2_copy", collection2_copy);
ASSERT_TRUE(NamedCollectionFactoryFriend::instance().exists("collection2_copy"));
ASSERT_EQ(NamedCollectionFactoryFriend::instance().get("collection2_copy")->dumpStructure(),
R"CONFIG(key4: value4
key5: 5
key6: 6.6
@ -88,8 +116,8 @@ key6: 6.6
collection2_copy->setOrUpdate<String>("key4", "value45", {});
ASSERT_EQ(collection2_copy->getOrDefault<String>("key4", "N"), "value45");
NamedCollectionFactory::instance().remove("collection2_copy");
ASSERT_FALSE(NamedCollectionFactory::instance().exists("collection2_copy"));
NamedCollectionFactoryFriend::instance().remove("collection2_copy");
ASSERT_FALSE(NamedCollectionFactoryFriend::instance().exists("collection2_copy"));
config.reset();
}
@ -119,11 +147,11 @@ TEST(NamedCollections, NestedConfig)
Poco::AutoPtr<Poco::XML::Document> document = dom_parser.parseString(xml);
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(document);
NamedCollectionUtils::loadFromConfig(*config);
NamedCollectionFactoryFriend::instance().loadFromConfig(*config);
ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection3"));
ASSERT_TRUE(NamedCollectionFactoryFriend::instance().exists("collection3"));
auto collection = NamedCollectionFactory::instance().get("collection3");
auto collection = NamedCollectionFactoryFriend::instance().get("collection3");
ASSERT_TRUE(collection != nullptr);
ASSERT_EQ(collection->dumpStructure(),
@ -171,8 +199,8 @@ TEST(NamedCollections, NestedConfigDuplicateKeys)
Poco::AutoPtr<Poco::XML::Document> document = dom_parser.parseString(xml);
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(document);
NamedCollectionUtils::loadFromConfig(*config);
auto collection = NamedCollectionFactory::instance().get("collection");
NamedCollectionFactoryFriend::instance().loadFromConfig(*config);
auto collection = NamedCollectionFactoryFriend::instance().get("collection");
auto keys = collection->getKeys();
ASSERT_EQ(keys.size(), 6);

View File

@ -334,7 +334,7 @@ class IColumn;
M(Bool, fsync_metadata, true, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.", 0) \
\
M(Bool, join_use_nulls, false, "Use NULLs for non-joined rows of outer JOINs for types that can be inside Nullable. If false, use default value of corresponding columns data type.", IMPORTANT) \
M(Bool, allow_experimental_join_condition, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y.", IMPORTANT) \
M(Bool, allow_experimental_join_condition, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y.", 0) \
\
M(JoinStrictness, join_default_strictness, JoinStrictness::All, "Set default strictness in JOIN query. Possible values: empty string, 'ANY', 'ALL'. If empty, query without strictness will throw exception.", 0) \
M(Bool, any_join_distinct_right_table_keys, false, "Enable old ANY JOIN logic with many-to-one left-to-right table keys mapping for all ANY JOINs. It leads to confusing not equal results for 't1 ANY LEFT JOIN t2' and 't2 ANY RIGHT JOIN t1'. ANY RIGHT JOIN needs one-to-many keys mapping to be consistent with LEFT one.", IMPORTANT) \
@ -517,6 +517,7 @@ class IColumn;
M(UInt64, backup_restore_keeper_value_max_size, 1048576, "Maximum size of data of a [Zoo]Keeper's node during backup", 0) \
M(UInt64, backup_restore_batch_size_for_keeper_multiread, 10000, "Maximum size of batch for multiread request to [Zoo]Keeper during backup or restore", 0) \
M(UInt64, backup_restore_batch_size_for_keeper_multi, 1000, "Maximum size of batch for multi request to [Zoo]Keeper during backup or restore", 0) \
M(UInt64, backup_restore_s3_retry_attempts, 1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore.", 0) \
M(UInt64, max_backup_bandwidth, 0, "The maximum read speed in bytes per second for particular backup on server. Zero means unlimited.", 0) \
\
M(Bool, log_profile_events, true, "Log query performance statistics into the query_log, query_thread_log and query_views_log.", 0) \
@ -1059,7 +1060,8 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
M(UInt64, input_format_parquet_max_block_size, DEFAULT_BLOCK_SIZE, "Max block size for parquet reader.", 0) \
M(UInt64, input_format_parquet_prefer_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \

View File

@ -96,6 +96,8 @@ static const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges
{"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"},
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
{"input_format_parquet_max_block_size", 8192, DEFAULT_BLOCK_SIZE, "Increase block size for parquet reader."},
{"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."},
{"enable_blob_storage_log", true, true, "Write information about blob storage operations to system.blob_storage_log table"},
{"allow_statistic_optimize", false, false, "Old setting which popped up here being renamed."},
{"allow_experimental_statistic", false, false, "Old setting which popped up here being renamed."},
@ -113,6 +115,7 @@ static const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges
{"http_max_chunk_size", 0, 0, "Internal limitation"},
{"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."},
{"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"},
{"backup_restore_s3_retry_attempts", 0, 1000, "A new setting."},
{"cast_string_to_dynamic_use_inference", false, false, "Add setting to allow converting String to Dynamic through parsing"},
{"allow_experimental_dynamic_type", false, false, "Add new experimental Dynamic type"},
{"azure_max_blocks_in_multipart_upload", 50000, 50000, "Maximum number of blocks in multipart upload for Azure."},

View File

@ -3,6 +3,7 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeVariant.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnConst.h>
#include <Core/Field.h>
@ -174,4 +175,9 @@ DataTypePtr removeNullableOrLowCardinalityNullable(const DataTypePtr & type)
}
bool canContainNull(const IDataType & type)
{
return type.isNullable() || type.isLowCardinalityNullable() || isDynamic(type) || isVariant(type);
}
}

View File

@ -62,4 +62,6 @@ DataTypePtr makeNullableOrLowCardinalityNullableSafe(const DataTypePtr & type);
/// Nullable(T) -> T, LowCardinality(Nullable(T)) -> T
DataTypePtr removeNullableOrLowCardinalityNullable(const DataTypePtr & type);
bool canContainNull(const IDataType & type);
}

View File

@ -41,11 +41,11 @@ void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemo
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot alter table {} because it was created AS table function"
" and doesn't have structure in metadata", backQuote(ast_create_query.getTable()));
if (!has_structure && !ast_create_query.is_dictionary)
if (!has_structure && !ast_create_query.is_dictionary && !ast_create_query.isParameterizedView())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot alter table {} metadata doesn't have structure",
backQuote(ast_create_query.getTable()));
if (!ast_create_query.is_dictionary)
if (!ast_create_query.is_dictionary && !ast_create_query.isParameterizedView())
{
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(metadata.columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(metadata.secondary_indices);

View File

@ -161,6 +161,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types;
format_settings.parquet.use_custom_encoder = settings.output_format_parquet_use_custom_encoder;

View File

@ -265,7 +265,8 @@ struct FormatSettings
bool preserve_order = false;
bool use_custom_encoder = true;
bool parallel_encoding = true;
UInt64 max_block_size = 8192;
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
bool output_compliant_nested_types = true;

View File

@ -111,7 +111,7 @@ public:
argument_types.push_back(argument.type);
/// More efficient specialization for two numeric arguments.
if (arguments.size() == 2 && isNumber(arguments[0].type) && isNumber(arguments[1].type))
if (arguments.size() == 2 && isNumber(removeNullable(arguments[0].type)) && isNumber(removeNullable(arguments[1].type)))
return std::make_unique<FunctionToFunctionBaseAdaptor>(SpecializedFunction::create(context), argument_types, return_type);
return std::make_unique<FunctionToFunctionBaseAdaptor>(
@ -123,7 +123,7 @@ public:
if (types.empty())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} cannot be called without arguments", getName());
if (types.size() == 2 && isNumber(types[0]) && isNumber(types[1]))
if (types.size() == 2 && isNumber(removeNullable(types[0])) && isNumber(removeNullable(types[1])))
return SpecializedFunction::create(context)->getReturnTypeImpl(types);
return getLeastSupertype(types);

View File

@ -29,6 +29,18 @@ public:
return name;
}
ColumnPtr getConstantResultForNonConstArguments(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) const override
{
const ColumnWithTypeAndName & elem = arguments[0];
if (elem.type->onlyNull())
return result_type->createColumnConst(1, UInt8(0));
if (canContainNull(*elem.type))
return nullptr;
return result_type->createColumnConst(1, UInt8(1));
}
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForConstants() const override { return true; }

View File

@ -31,6 +31,18 @@ public:
return name;
}
ColumnPtr getConstantResultForNonConstArguments(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) const override
{
const ColumnWithTypeAndName & elem = arguments[0];
if (elem.type->onlyNull())
return result_type->createColumnConst(1, UInt8(1));
if (canContainNull(*elem.type))
return nullptr;
return result_type->createColumnConst(1, UInt8(0));
}
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }

View File

@ -2,6 +2,7 @@
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
@ -23,6 +24,15 @@ public:
return name;
}
ColumnPtr getConstantResultForNonConstArguments(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) const override
{
const ColumnWithTypeAndName & elem = arguments[0];
if (elem.type->onlyNull() || canContainNull(*elem.type))
return result_type->createColumnConst(1, UInt8(1));
return result_type->createColumnConst(1, UInt8(0));
}
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForNothing() const override { return false; }

View File

@ -162,7 +162,7 @@ public:
class RetryStrategy : public Aws::Client::RetryStrategy
{
public:
explicit RetryStrategy(uint32_t maxRetries_ = 10, uint32_t scaleFactor_ = 25, uint32_t maxDelayMs_ = 90000);
explicit RetryStrategy(uint32_t maxRetries_ = 10, uint32_t scaleFactor_ = 25, uint32_t maxDelayMs_ = 5000);
/// NOLINTNEXTLINE(google-runtime-int)
bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override;

View File

@ -18,6 +18,7 @@
#include <Common/callOnce.h>
#include <Common/SharedLockGuard.h>
#include <Common/PageCache.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
#include <Coordination/KeeperDispatcher.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
@ -610,6 +611,8 @@ struct ContextSharedPart : boost::noncopyable
LOG_TRACE(log, "Shutting down database catalog");
DatabaseCatalog::shutdown();
NamedCollectionFactory::instance().shutdown();
delete_async_insert_queue.reset();
SHUTDOWN(log, "merges executor", merge_mutate_executor, wait());

View File

@ -4,7 +4,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
namespace DB
@ -23,7 +23,7 @@ BlockIO InterpreterAlterNamedCollectionQuery::execute()
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
NamedCollectionUtils::updateFromSQL(query, current_context);
NamedCollectionFactory::instance().updateFromSQL(query);
return {};
}

View File

@ -4,7 +4,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
namespace DB
@ -23,7 +23,7 @@ BlockIO InterpreterCreateNamedCollectionQuery::execute()
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
NamedCollectionUtils::createFromSQL(query, current_context);
NamedCollectionFactory::instance().createFromSQL(query);
return {};
}

View File

@ -4,7 +4,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
namespace DB
@ -23,7 +23,7 @@ BlockIO InterpreterDropNamedCollectionQuery::execute()
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
NamedCollectionUtils::removeFromSQL(query, current_context);
NamedCollectionFactory::instance().removeFromSQL(query);
return {};
}

View File

@ -1,6 +1,7 @@
#include <Planner/PlannerExpressionAnalysis.h>
#include <Columns/ColumnNullable.h>
#include <Columns/FilterDescription.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
@ -37,7 +38,7 @@ namespace
* Actions before filter are added into into actions chain.
* It is client responsibility to update filter analysis result if filter column must be removed after chain is finalized.
*/
FilterAnalysisResult analyzeFilter(const QueryTreeNodePtr & filter_expression_node,
std::optional<FilterAnalysisResult> analyzeFilter(const QueryTreeNodePtr & filter_expression_node,
const ColumnsWithTypeAndName & input_columns,
const PlannerContextPtr & planner_context,
ActionsChain & actions_chain)
@ -45,7 +46,11 @@ FilterAnalysisResult analyzeFilter(const QueryTreeNodePtr & filter_expression_no
FilterAnalysisResult result;
result.filter_actions = buildActionsDAGFromExpressionNode(filter_expression_node, input_columns, planner_context);
result.filter_column_name = result.filter_actions->getOutputs().at(0)->result_name;
const auto * output = result.filter_actions->getOutputs().at(0);
if (output->column && ConstantFilterDescription(*output->column).always_true)
return {};
result.filter_column_name = output->result_name;
actions_chain.addStep(std::make_unique<ActionsChainStep>(result.filter_actions));
return result;
@ -534,8 +539,11 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNo
if (query_node.hasWhere())
{
where_analysis_result_optional = analyzeFilter(query_node.getWhere(), current_output_columns, planner_context, actions_chain);
where_action_step_index_optional = actions_chain.getLastStepIndex();
current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
if (where_analysis_result_optional)
{
where_action_step_index_optional = actions_chain.getLastStepIndex();
current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
}
}
auto aggregation_analysis_result_optional = analyzeAggregation(query_tree, current_output_columns, planner_context, actions_chain);
@ -548,8 +556,11 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNo
if (query_node.hasHaving())
{
having_analysis_result_optional = analyzeFilter(query_node.getHaving(), current_output_columns, planner_context, actions_chain);
having_action_step_index_optional = actions_chain.getLastStepIndex();
current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
if (having_analysis_result_optional)
{
having_action_step_index_optional = actions_chain.getLastStepIndex();
current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
}
}
auto window_analysis_result_optional = analyzeWindow(query_tree, current_output_columns, planner_context, actions_chain);
@ -562,8 +573,11 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNo
if (query_node.hasQualify())
{
qualify_analysis_result_optional = analyzeFilter(query_node.getQualify(), current_output_columns, planner_context, actions_chain);
qualify_action_step_index_optional = actions_chain.getLastStepIndex();
current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
if (qualify_analysis_result_optional)
{
qualify_action_step_index_optional = actions_chain.getLastStepIndex();
current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
}
}
auto projection_analysis_result = analyzeProjection(query_node, current_output_columns, planner_context, actions_chain);

View File

@ -49,7 +49,6 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Storages/StorageDummy.h>
#include <Storages/StorageMergeTree.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/Context.h>
@ -874,8 +873,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
if (!table->isMergeTree())
return false;
if (std::dynamic_pointer_cast<StorageMergeTree>(table)
&& !query_settings.parallel_replicas_for_non_replicated_merge_tree)
if (!table->supportsReplication() && !query_settings.parallel_replicas_for_non_replicated_merge_tree)
return false;
return true;

View File

@ -420,6 +420,24 @@ void ParquetBlockInputFormat::initializeIfNeeded()
int num_row_groups = metadata->num_row_groups();
row_group_batches.reserve(num_row_groups);
auto adative_chunk_size = [&](int row_group_idx) -> size_t
{
size_t total_size = 0;
auto row_group_meta = metadata->RowGroup(row_group_idx);
for (int column_index : column_indices)
{
total_size += row_group_meta->ColumnChunk(column_index)->total_uncompressed_size();
}
if (!total_size || !format_settings.parquet.prefer_block_bytes) return 0;
auto average_row_bytes = floor(static_cast<double>(total_size) / row_group_meta->num_rows());
// avoid inf preferred_num_rows;
if (average_row_bytes < 1) return 0;
const size_t preferred_num_rows = static_cast<size_t>(floor(format_settings.parquet.prefer_block_bytes/average_row_bytes));
const size_t MIN_ROW_NUM = 128;
// size_t != UInt64 in darwin
return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast<size_t>(format_settings.parquet.max_block_size));
};
for (int row_group = 0; row_group < num_row_groups; ++row_group)
{
if (skip_row_groups.contains(row_group))
@ -439,6 +457,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().row_groups_idxs.push_back(row_group);
row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows();
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();
auto rows = adative_chunk_size(row_group);
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;
}
}
@ -449,7 +469,7 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
parquet::ArrowReaderProperties arrow_properties;
parquet::ReaderProperties reader_properties(ArrowMemoryPool::instance());
arrow_properties.set_use_threads(false);
arrow_properties.set_batch_size(format_settings.parquet.max_block_size);
arrow_properties.set_batch_size(row_group_batch.adaptive_chunk_size);
// When reading a row group, arrow will:
// 1. Look at `metadata` to get all byte ranges it'll need to read from the file (typically one

View File

@ -208,6 +208,8 @@ private:
size_t total_rows = 0;
size_t total_bytes_compressed = 0;
size_t adaptive_chunk_size = 0;
std::vector<int> row_groups_idxs;
// These are only used by the decoding thread, so don't require locking the mutex.

View File

@ -421,6 +421,9 @@ struct AggregateProjectionCandidates
/// This flag means that DAG for projection candidate should be used in FilterStep.
bool has_filter = false;
/// If not empty, try to find exact ranges from parts to speed up trivial count queries.
String only_count_column;
};
AggregateProjectionCandidates getAggregateProjectionCandidates(
@ -502,6 +505,12 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
candidates.minmax_projection.emplace(std::move(minmax));
}
}
else
{
/// Trivial count optimization only applies after @can_use_minmax_projection.
if (keys.empty() && aggregates.size() == 1 && typeid_cast<const AggregateFunctionCount *>(aggregates[0].function.get()))
candidates.only_count_column = aggregates[0].column_name;
}
}
if (!candidates.minmax_projection)
@ -584,13 +593,21 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
ContextPtr context = reading->getContext();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
AggregateProjectionCandidate * best_candidate = nullptr;
/// Stores row count from exact ranges of parts.
size_t exact_count = 0;
if (candidates.minmax_projection)
{
best_candidate = &candidates.minmax_projection->candidate;
}
else if (!candidates.real.empty())
else if (!candidates.real.empty() || !candidates.only_count_column.empty())
{
auto ordinary_reading_select_result = reading->selectRangesToRead();
auto ordinary_reading_select_result = reading->getAnalyzedResult();
bool find_exact_ranges = !candidates.only_count_column.empty();
if (!ordinary_reading_select_result || (!ordinary_reading_select_result->has_exact_ranges && find_exact_ranges))
ordinary_reading_select_result = reading->selectRangesToRead(find_exact_ranges);
size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
/// Nothing to read. Ignore projections.
@ -600,7 +617,49 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
return {};
}
const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
if (!candidates.only_count_column.empty())
{
for (auto & part_with_ranges : parts_with_ranges)
{
MarkRanges new_ranges;
auto & ranges = part_with_ranges.ranges;
const auto & exact_ranges = part_with_ranges.exact_ranges;
if (exact_ranges.empty())
continue;
size_t i = 0;
size_t len = exact_ranges.size();
for (auto & range : ranges)
{
while (i < len && exact_ranges[i].begin < range.end)
{
chassert(exact_ranges[i].begin >= range.begin);
chassert(exact_ranges[i].end <= range.end);
/// Found some marks which are not exact
if (range.begin < exact_ranges[i].begin)
new_ranges.emplace_back(range.begin, exact_ranges[i].begin);
range.begin = exact_ranges[i].end;
ordinary_reading_marks -= exact_ranges[i].end - exact_ranges[i].begin;
exact_count += part_with_ranges.data_part->index_granularity.getRowsCountInRange(exact_ranges[i]);
++i;
}
/// Current range still contains some marks which are not exact
if (range.begin < range.end)
new_ranges.emplace_back(range);
}
chassert(i == len);
part_with_ranges.ranges = std::move(new_ranges);
}
std::erase_if(parts_with_ranges, [&](const auto & part_with_ranges) { return part_with_ranges.ranges.empty(); });
if (parts_with_ranges.empty())
chassert(ordinary_reading_marks == 0);
}
/// Selecting best candidate.
for (auto & candidate : candidates.real)
@ -630,8 +689,20 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
if (!best_candidate)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return {};
if (exact_count > 0)
{
if (ordinary_reading_marks > 0)
{
ordinary_reading_select_result->selected_marks = ordinary_reading_marks;
ordinary_reading_select_result->selected_rows -= exact_count;
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
}
}
else
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return {};
}
}
}
else
@ -639,10 +710,11 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
return {};
}
chassert(best_candidate != nullptr);
QueryPlanStepPtr projection_reading;
bool has_ordinary_parts;
String selected_projection_name;
if (best_candidate)
selected_projection_name = best_candidate->projection->name;
/// Add reading from projection step.
if (candidates.minmax_projection)
@ -654,6 +726,32 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
has_ordinary_parts = false;
}
else if (best_candidate == nullptr)
{
chassert(exact_count > 0);
auto agg_count = std::make_shared<AggregateFunctionCount>(DataTypes{});
std::vector<char> state(agg_count->sizeOfData());
AggregateDataPtr place = state.data();
agg_count->create(place);
SCOPE_EXIT_MEMORY_SAFE(agg_count->destroy(place));
agg_count->set(place, exact_count);
auto column = ColumnAggregateFunction::create(agg_count);
column->insertFrom(place);
Block block_with_count{
{std::move(column),
std::make_shared<DataTypeAggregateFunction>(agg_count, DataTypes{}, Array{}),
candidates.only_count_column}};
Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(block_with_count)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
selected_projection_name = "Optimized trivial count";
has_ordinary_parts = reading->getAnalyzedResult() != nullptr;
}
else
{
auto storage_snapshot = reading->getStorageSnapshot();
@ -694,46 +792,54 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
context->getQueryContext()->addQueryAccessInfo(Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = best_candidate->projection->name,
.projection_name = selected_projection_name,
});
}
// LOG_TRACE(getLogger("optimizeUseProjections"), "Projection reading header {}",
// projection_reading->getOutputStream().header.dumpStructure());
projection_reading->setStepDescription(best_candidate->projection->name);
projection_reading->setStepDescription(selected_projection_name);
auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)});
auto & expr_or_filter_node = nodes.emplace_back();
if (candidates.has_filter)
/// Root node of optimized child plan using @projection_name
QueryPlan::Node * aggregate_projection_node = nullptr;
if (best_candidate)
{
expr_or_filter_node.step = std::make_unique<FilterStep>(
projection_reading_node.step->getOutputStream(),
best_candidate->dag,
best_candidate->dag->getOutputs().front()->result_name,
true);
}
else
expr_or_filter_node.step = std::make_unique<ExpressionStep>(
projection_reading_node.step->getOutputStream(),
best_candidate->dag);
aggregate_projection_node = &nodes.emplace_back();
if (candidates.has_filter)
{
aggregate_projection_node->step = std::make_unique<FilterStep>(
projection_reading_node.step->getOutputStream(),
best_candidate->dag,
best_candidate->dag->getOutputs().front()->result_name,
true);
}
else
aggregate_projection_node->step
= std::make_unique<ExpressionStep>(projection_reading_node.step->getOutputStream(), best_candidate->dag);
expr_or_filter_node.children.push_back(&projection_reading_node);
aggregate_projection_node->children.push_back(&projection_reading_node);
}
else /// trivial count optimization
{
aggregate_projection_node = &projection_reading_node;
}
if (!has_ordinary_parts)
{
/// All parts are taken from projection
aggregating->requestOnlyMergeForAggregateProjection(expr_or_filter_node.step->getOutputStream());
node.children.front() = &expr_or_filter_node;
aggregating->requestOnlyMergeForAggregateProjection(aggregate_projection_node->step->getOutputStream());
node.children.front() = aggregate_projection_node;
}
else
{
node.step = aggregating->convertToAggregatingProjection(expr_or_filter_node.step->getOutputStream());
node.children.push_back(&expr_or_filter_node);
node.step = aggregating->convertToAggregatingProjection(aggregate_projection_node->step->getOutputStream());
node.children.push_back(aggregate_projection_node);
}
return best_candidate->projection->name;
return selected_projection_name;
}
}

View File

@ -139,7 +139,9 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
const auto & query_info = reading->getQueryInfo();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
auto ordinary_reading_select_result = reading->selectRangesToRead();
auto ordinary_reading_select_result = reading->getAnalyzedResult();
if (!ordinary_reading_select_result)
ordinary_reading_select_result = reading->selectRangesToRead();
size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
/// Nothing to read. Ignore projections.

View File

@ -25,8 +25,7 @@ namespace QueryPlanOptimizations
bool canUseProjectionForReadingStep(ReadFromMergeTree * reading)
{
/// Probably some projection already was applied.
if (reading->hasAnalyzedResult())
if (reading->getAnalyzedResult() && reading->getAnalyzedResult()->readFromProjection())
return false;
if (reading->isQueryWithFinal())

View File

@ -1383,9 +1383,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
return merging_pipes.empty() ? Pipe::unitePipes(std::move(no_merging_pipes)) : Pipe::unitePipes(std::move(merging_pipes));
}
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead() const
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(bool find_exact_ranges) const
{
return selectRangesToRead(prepared_parts, alter_conversions_for_parts, false /* find_exact_ranges */);
return selectRangesToRead(prepared_parts, alter_conversions_for_parts, find_exact_ranges);
}
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
@ -1686,6 +1686,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
result.selected_marks_pk = sum_marks_pk;
result.total_marks_pk = total_marks_pk;
result.selected_rows = sum_rows;
result.has_exact_ranges = result.selected_parts == 0 || find_exact_ranges;
if (query_info_.input_order_info)
result.read_type = (query_info_.input_order_info->direction > 0)

View File

@ -100,7 +100,9 @@ public:
UInt64 selected_marks_pk = 0;
UInt64 total_marks_pk = 0;
UInt64 selected_rows = 0;
bool has_exact_ranges = false;
bool readFromProjection() const { return !parts_with_ranges.empty() && parts_with_ranges.front().data_part->isProjectionPart(); }
void checkLimits(const Settings & settings, const SelectQueryInfo & query_info_) const;
};
@ -181,7 +183,7 @@ public:
AnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts, std::vector<AlterConversionsPtr> alter_conversions, bool find_exact_ranges = false) const;
AnalysisResultPtr selectRangesToRead() const;
AnalysisResultPtr selectRangesToRead(bool find_exact_ranges = false) const;
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
@ -196,7 +198,7 @@ public:
bool requestOutputEachPartitionThroughSeparatePort();
bool willOutputEachPartitionThroughSeparatePort() const { return output_each_partition_through_separate_port; }
bool hasAnalyzedResult() const { return analyzed_result_ptr != nullptr; }
AnalysisResultPtr getAnalyzedResult() const { return analyzed_result_ptr; }
void setAnalyzedResult(AnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); }
ReadFromMergeTree::AnalysisResult getAnalysisResult() const;

View File

@ -18,6 +18,7 @@
#include <Interpreters/inplaceBlockConversions.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Storages/StorageView.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTConstraintDeclaration.h>
@ -1613,7 +1614,10 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
}
}
if (all_columns.empty())
/// Parameterized views do not have 'columns' in their metadata
bool is_parameterized_view = table->as<StorageView>() && table->as<StorageView>()->isParameterizedView();
if (!is_parameterized_view && all_columns.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot DROP or CLEAR all columns");
validateColumnsDefaultsAndGetSampleBlock(default_expr_list, all_columns.getAll(), context);

View File

@ -95,7 +95,7 @@ MutableNamedCollectionPtr tryGetNamedCollectionWithOverrides(
if (asts.empty())
return nullptr;
NamedCollectionUtils::loadIfNot();
NamedCollectionFactory::instance().loadIfNot();
auto collection_name = getCollectionName(asts);
if (!collection_name.has_value())

View File

@ -64,7 +64,6 @@ ProjectionDescription ProjectionDescription::clone() const
other.sample_block_for_keys = sample_block_for_keys;
other.metadata = metadata;
other.key_size = key_size;
other.is_minmax_count_projection = is_minmax_count_projection;
other.primary_key_max_column_name = primary_key_max_column_name;
other.partition_value_indices = partition_value_indices;
@ -195,7 +194,6 @@ ProjectionDescription ProjectionDescription::getMinMaxCountProjection(
ContextPtr query_context)
{
ProjectionDescription result;
result.is_minmax_count_projection = true;
auto select_query = std::make_shared<ASTProjectionSelectQuery>();
ASTPtr select_expression_list = std::make_shared<ASTExpressionList>();
@ -282,13 +280,11 @@ ProjectionDescription ProjectionDescription::getMinMaxCountProjection(
return result;
}
void ProjectionDescription::recalculateWithNewColumns(const ColumnsDescription & new_columns, ContextPtr query_context)
{
*this = getProjectionFromAST(definition_ast, new_columns, query_context);
}
Block ProjectionDescription::calculate(const Block & block, ContextPtr context) const
{
auto mut_context = Context::createCopy(context);

View File

@ -56,8 +56,6 @@ struct ProjectionDescription
size_t key_size = 0;
bool is_minmax_count_projection = false;
/// If a primary key expression is used in the minmax_count projection, store the name of max expression.
String primary_key_max_column_name;

View File

@ -33,7 +33,7 @@ void StorageSystemNamedCollections::fillData(MutableColumns & res_columns, Conte
{
const auto & access = context->getAccess();
NamedCollectionUtils::loadIfNot();
NamedCollectionFactory::instance().loadIfNot();
auto collections = NamedCollectionFactory::instance().getAll();
for (const auto & [name, collection] : collections)

View File

@ -368,17 +368,21 @@ TEST(TransformQueryForExternalDatabase, Null)
check(state, 1, {"field"},
"SELECT field FROM table WHERE field IS NULL",
R"(SELECT "field" FROM "test"."table" WHERE "field" IS NULL)");
R"(SELECT "field" FROM "test"."table" WHERE "field" IS NULL)",
R"(SELECT "field" FROM "test"."table" WHERE 1 = 0)");
check(state, 1, {"field"},
"SELECT field FROM table WHERE field IS NOT NULL",
R"(SELECT "field" FROM "test"."table" WHERE "field" IS NOT NULL)");
R"(SELECT "field" FROM "test"."table" WHERE "field" IS NOT NULL)",
R"(SELECT "field" FROM "test"."table")");
check(state, 1, {"field"},
"SELECT field FROM table WHERE isNull(field)",
R"(SELECT "field" FROM "test"."table" WHERE "field" IS NULL)");
R"(SELECT "field" FROM "test"."table" WHERE "field" IS NULL)",
R"(SELECT "field" FROM "test"."table" WHERE 1 = 0)");
check(state, 1, {"field"},
"SELECT field FROM table WHERE isNotNull(field)",
R"(SELECT "field" FROM "test"."table" WHERE "field" IS NOT NULL)");
R"(SELECT "field" FROM "test"."table" WHERE "field" IS NOT NULL)",
R"(SELECT "field" FROM "test"."table")");
}
TEST(TransformQueryForExternalDatabase, ToDate)

View File

@ -280,6 +280,7 @@ builds_job_config = JobConfig(
"./packages",
"./docker/packager/packager",
"./rust",
"./tests/ci/version_helper.py",
# FIXME: This is a WA to rebuild the CH and recreate the Performance.tar.zst artifact
# when there are changes in performance test scripts.
# Due to the current design of the perf test we need to rebuild CH when the performance test changes,

View File

@ -7,7 +7,7 @@ import os.path as p
import re
import subprocess
import tempfile
from typing import Any, List, Optional
from typing import Any, List, Literal, Optional
logger = logging.getLogger(__name__)
@ -15,7 +15,9 @@ logger = logging.getLogger(__name__)
# \A and \Z match only start and end of the whole string
RELEASE_BRANCH_REGEXP = r"\A\d+[.]\d+\Z"
TAG_REGEXP = (
r"\Av\d{2}[.][1-9]\d*[.][1-9]\d*[.][1-9]\d*-(testing|prestable|stable|lts)\Z"
r"\Av\d{2}" # First two digits of major part
r"([.][1-9]\d*){3}" # minor.patch.tweak parts
r"-(new|testing|prestable|stable|lts)\Z" # suffix with a version type
)
SHA_REGEXP = re.compile(r"\A([0-9]|[a-f]){40}\Z")
@ -122,17 +124,35 @@ class Git:
_tag_pattern = re.compile(TAG_REGEXP)
def __init__(self, ignore_no_tags: bool = False):
"""
new_tag is used for special v24.1.1.1-new tags where the previous version is moved to the release branch
* 66666666666 Some another commit with version 24.8.1.xxxxx-testing, tweak is counted from new_tag = v24.8.1.1-new
| * 55555555555 (tag: v24.7.1.123123123-stable, branch: 24.7) tweak counted from new_tag = v24.7.1.1-new
|/
* 44444444444 (tag: v24.8.1.1-new)
| * 33333333333 (tag: v24.6.1.123123123-stable, branch: 24.6) tweak counted from new_tag = v24.6.1.1-new
|/
* 22222222222 (tag: v24.7.1.1-new)
| * 11111111111 (tag: v24.5.1.123123123-stable, branch: 24.5) tweak counted from new_tag = v24.4.1.2088-stable
|/
* 00000000000 (tag: v24.6.1.1-new)
* 6d4b31322d1 (tag: v24.4.1.2088-stable)
* 2c5c589a882 (tag: v24.3.1.2672-lts)
* 891689a4150 (tag: v24.2.1.2248-stable)
* 5a024dfc093 (tag: v24.1.1.2048-stable)
* a2faa65b080 (tag: v23.12.1.1368-stable)
* 05bc8ef1e02 (tag: v23.11.1.2711-stable)
"""
self.root = git_runner.cwd
self._ignore_no_tags = ignore_no_tags
self.run = git_runner.run
self.latest_tag = ""
self.new_tag = ""
self.new_branch = ""
self.branch = ""
self.sha = ""
self.sha_short = ""
self.description = "shallow-checkout"
self.commits_since_tag = 0
self.commits_since_latest = 0
self.commits_since_new = 0
self.update()
def update(self):
@ -155,10 +175,20 @@ class Git:
stderr = subprocess.DEVNULL if suppress_stderr else None
self.latest_tag = self.run("git describe --tags --abbrev=0", stderr=stderr)
# Format should be: {latest_tag}-{commits_since_tag}-g{sha_short}
self.description = self.run("git describe --tags --long")
self.commits_since_tag = int(
self.commits_since_latest = int(
self.run(f"git rev-list {self.latest_tag}..HEAD --count")
)
if self.latest_tag.endswith("-new"):
# We won't change the behaviour of the the "latest_tag"
# So here we set "new_tag" to the previous tag in the graph, that will allow
# getting alternative "tweak"
self.new_tag = self.run(
f"git describe --tags --abbrev=0 --exclude='{self.latest_tag}'",
stderr=stderr,
)
self.commits_since_new = int(
self.run(f"git rev-list {self.new_tag}..HEAD --count")
)
@staticmethod
def check_tag(value: str) -> None:
@ -187,19 +217,34 @@ class Git:
@property
def tweak(self) -> int:
if not self.latest_tag.endswith("-testing"):
return self._tweak("latest")
@property
def tweak_to_new(self) -> int:
return self._tweak("new")
def _tweak(self, tag_type: Literal["latest", "new"]) -> int:
"""Accepts latest or new as a tag_type and returns the tweak number to it"""
if tag_type == "latest":
commits = self.commits_since_latest
tag = self.latest_tag
else:
commits = self.commits_since_new
tag = self.new_tag
if not tag.endswith("-testing"):
# When we are on the tag, we still need to have tweak=1 to not
# break cmake with versions like 12.13.14.0
if not self.commits_since_tag:
# We are in a tagged commit. The tweak should match the
# current version's value
version = self.latest_tag.split("-", maxsplit=1)[0]
try:
return int(version.split(".")[-1])
except ValueError:
# There are no tags, or a wrong tag. Return default
return TWEAK
return self.commits_since_tag
if commits:
return commits
# We are in a tagged commit or shallow checkout. The tweak should match the
# current version's value
version = tag.split("-", maxsplit=1)[0]
try:
return int(version.split(".")[-1])
except ValueError:
# There are no tags (shallow checkout), or a wrong tag. Return default
return TWEAK
version = self.latest_tag.split("-", maxsplit=1)[0]
return int(version.split(".")[-1]) + self.commits_since_tag
version = tag.split("-", maxsplit=1)[0]
return int(version.split(".")[-1]) + commits

View File

@ -94,6 +94,7 @@ class Release:
self._version = get_version_from_repo(git=self._git)
self.release_version = self.version
self._release_branch = ""
self._version_new_tag = None # type: Optional[ClickHouseVersion]
self._rollback_stack = [] # type: List[str]
def run(
@ -180,7 +181,8 @@ class Release:
)
raise
self.check_commit_release_ready()
if self.release_type == self.PATCH:
self.check_commit_release_ready()
def do(
self, check_dirty: bool, check_run_from_master: bool, check_branch: bool
@ -328,10 +330,16 @@ class Release:
self.check_no_tags_after()
# Create release branch
self.read_version()
with self._create_branch(self.release_branch, self.release_commit):
with self._checkout(self.release_branch, True):
with self._bump_release_branch():
yield
assert self._version_new_tag is not None
with self._create_tag(
self._version_new_tag.describe,
self.release_commit,
f"Initial commit for release {self._version_new_tag.major}.{self._version_new_tag.minor}",
):
with self._create_branch(self.release_branch, self.release_commit):
with self._checkout(self.release_branch, True):
with self._bump_release_branch():
yield
@contextmanager
def patch_release(self):
@ -444,6 +452,11 @@ class Release:
self.version.with_description(VersionType.TESTING)
self._update_cmake_contributors(self.version)
self._commit_cmake_contributors(self.version)
# Create a version-new tag
self._version_new_tag = self.version.copy()
self._version_new_tag.tweak = 1
self._version_new_tag.with_description(VersionType.NEW)
with self._push(helper_branch):
body_file = get_abs_path(".github/PULL_REQUEST_TEMPLATE.md")
# The following command is rolled back by deleting branch in self._push
@ -458,10 +471,10 @@ class Release:
@contextmanager
def _checkout(self, ref: str, with_checkout_back: bool = False) -> Iterator[None]:
self._git.update()
orig_ref = self._git.branch or self._git.sha
need_rollback = False
rollback_cmd = ""
if ref not in (self._git.branch, self._git.sha):
need_rollback = True
self.run(f"git checkout {ref}")
# checkout is not put into rollback_stack intentionally
rollback_cmd = f"git checkout {orig_ref}"
@ -474,7 +487,7 @@ class Release:
self.run(f"git reset --hard; git checkout -f {orig_ref}")
raise
# Normal flow when we need to checkout back
if with_checkout_back and need_rollback:
if with_checkout_back and rollback_cmd:
self.run(rollback_cmd)
@contextmanager
@ -510,9 +523,9 @@ class Release:
@contextmanager
def _create_gh_release(self, as_prerelease: bool) -> Iterator[None]:
with self._create_tag():
tag = self.release_version.describe
with self._create_tag(tag, self.release_commit):
# Preserve tag if version is changed
tag = self.release_version.describe
prerelease = ""
if as_prerelease:
prerelease = "--prerelease"
@ -534,13 +547,13 @@ class Release:
raise
@contextmanager
def _create_tag(self):
tag = self.release_version.describe
self.run(
f"git tag -a -m 'Release {tag}' '{tag}' {self.release_commit}",
dry_run=self.dry_run,
)
rollback_cmd = f"{self.dry_run_prefix}git tag -d '{tag}'"
def _create_tag(
self, tag: str, commit: str, tag_message: str = ""
) -> Iterator[None]:
tag_message = tag_message or "Release {tag}"
# Create tag even in dry-run
self.run(f"git tag -a -m '{tag_message}' '{tag}' {commit}")
rollback_cmd = f"git tag -d '{tag}'"
self._rollback_stack.append(rollback_cmd)
try:
with self._push(tag):

View File

@ -1,10 +1,11 @@
#!/usr/bin/env python
from unittest.mock import patch
import os.path as p
import unittest
from dataclasses import dataclass
from unittest.mock import patch
from git_helper import Git, Runner, CWD
from git_helper import CWD, Git, Runner, git_runner
class TestRunner(unittest.TestCase):
@ -35,8 +36,10 @@ class TestRunner(unittest.TestCase):
class TestGit(unittest.TestCase):
def setUp(self):
"""we use dummy git object"""
# get the git_runner's cwd to set it properly before the Runner is patched
_ = git_runner.cwd
run_patcher = patch("git_helper.Runner.run", return_value="")
self.run_mock = run_patcher.start()
run_mock = run_patcher.start()
self.addCleanup(run_patcher.stop)
update_patcher = patch("git_helper.Git.update")
update_mock = update_patcher.start()
@ -44,15 +47,13 @@ class TestGit(unittest.TestCase):
self.git = Git()
update_mock.assert_called_once()
self.git.run("test")
self.run_mock.assert_called_once()
self.git.new_branch = "NEW_BRANCH_NAME"
self.git.new_tag = "v21.12.333.22222-stable"
run_mock.assert_called_once()
self.git.branch = "old_branch"
self.git.sha = ""
self.git.sha_short = ""
self.git.latest_tag = ""
self.git.description = ""
self.git.commits_since_tag = 0
self.git.commits_since_latest = 0
self.git.commits_since_new = 0
def test_tags(self):
self.git.new_tag = "v21.12.333.22222-stable"
@ -71,11 +72,30 @@ class TestGit(unittest.TestCase):
setattr(self.git, tag_attr, tag)
def test_tweak(self):
self.git.commits_since_tag = 0
self.assertEqual(self.git.tweak, 1)
self.git.commits_since_tag = 2
self.assertEqual(self.git.tweak, 2)
self.git.latest_tag = "v21.12.333.22222-testing"
self.assertEqual(self.git.tweak, 22224)
self.git.commits_since_tag = 0
self.assertEqual(self.git.tweak, 22222)
# tweak for the latest tag
@dataclass
class TestCase:
tag: str
commits: int
tweak: int
cases = (
TestCase("", 0, 1),
TestCase("", 2, 2),
TestCase("v21.12.333.22222-stable", 0, 22222),
TestCase("v21.12.333.22222-stable", 2, 2),
TestCase("v21.12.333.22222-testing", 0, 22222),
TestCase("v21.12.333.22222-testing", 2, 22224),
)
for tag, commits, tweak in (
("latest_tag", "commits_since_latest", "tweak"),
("new_tag", "commits_since_new", "tweak_to_new"),
):
for tc in cases:
setattr(self.git, tag, tc.tag)
setattr(self.git, commits, tc.commits)
self.assertEqual(
getattr(self.git, tweak),
tc.tweak,
f"Wrong tweak for tag {tc.tag} and commits {tc.commits} of {tag}",
)

View File

@ -2,8 +2,13 @@
import unittest
from argparse import ArgumentTypeError
from dataclasses import dataclass
from pathlib import Path
import version_helper as vh
from git_helper import Git
CHV = vh.ClickHouseVersion
class TestFunctions(unittest.TestCase):
@ -32,3 +37,55 @@ class TestFunctions(unittest.TestCase):
for error_case in error_cases:
with self.assertRaises(ArgumentTypeError):
version = vh.version_arg(error_case[0])
def test_get_version_from_repo(self):
@dataclass
class TestCase:
latest_tag: str
commits_since_latest: int
new_tag: str
commits_since_new: int
expected: CHV
cases = (
TestCase(
"v24.6.1.1-new",
15,
"v24.4.1.2088-stable",
415,
CHV(24, 5, 1, 54487, None, 415),
),
TestCase(
"v24.6.1.1-testing",
15,
"v24.4.1.2088-stable",
415,
CHV(24, 5, 1, 54487, None, 16),
),
TestCase(
"v24.6.1.1-stable",
15,
"v24.4.1.2088-stable",
415,
CHV(24, 5, 1, 54487, None, 15),
),
TestCase(
"v24.5.1.1-stable",
15,
"v24.4.1.2088-stable",
415,
CHV(24, 5, 1, 54487, None, 15),
),
)
git = Git(True)
for tc in cases:
git.latest_tag = tc.latest_tag
git.commits_since_latest = tc.commits_since_latest
git.new_tag = tc.new_tag
git.commits_since_new = tc.commits_since_new
self.assertEqual(
vh.get_version_from_repo(
Path("tests/ci/tests/autogenerated_versions.txt"), git
),
tc.expected,
)

View File

@ -0,0 +1,12 @@
# This variables autochanged by tests/ci/version_helper.py:
# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
SET(VERSION_REVISION 54487)
SET(VERSION_MAJOR 24)
SET(VERSION_MINOR 5)
SET(VERSION_PATCH 1)
SET(VERSION_GITHASH 70a1d3a63d47f0be077d67b8deb907230fc7cfb0)
SET(VERSION_DESCRIBE v24.5.1.1-testing)
SET(VERSION_STRING 24.5.1.1)
# end of autochange

1
tests/ci/tmp/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
import logging
import os.path as p
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, ArgumentTypeError
from pathlib import Path
from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, Union
from git_helper import TWEAK, Git, get_tags, git_runner, removeprefix
@ -22,7 +22,7 @@ VERSIONS = Dict[str, Union[int, str]]
VERSIONS_TEMPLATE = """# This variables autochanged by tests/ci/version_helper.py:
# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# NOTE: VERSION_REVISION has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
SET(VERSION_REVISION {revision})
SET(VERSION_MAJOR {major})
@ -47,7 +47,7 @@ class ClickHouseVersion:
patch: Union[int, str],
revision: Union[int, str],
git: Optional[Git],
tweak: Optional[str] = None,
tweak: Optional[Union[int, str]] = None,
):
self._major = int(major)
self._minor = int(minor)
@ -95,7 +95,7 @@ class ClickHouseVersion:
if self._git is not None:
self._git.update()
return ClickHouseVersion(
self.major, self.minor, self.patch, self.revision, self._git, "1"
self.major, self.minor, self.patch, self.revision, self._git, 1
)
@property
@ -114,6 +114,10 @@ class ClickHouseVersion:
def tweak(self) -> int:
return self._tweak
@tweak.setter
def tweak(self, tweak: int) -> None:
self._tweak = tweak
@property
def revision(self) -> int:
return self._revision
@ -172,7 +176,7 @@ class ClickHouseVersion:
self.patch,
self.revision,
self._git,
str(self.tweak),
self.tweak,
)
try:
copy.with_description(self.description)
@ -190,7 +194,9 @@ class ClickHouseVersion:
and self.tweak == other.tweak
)
def __lt__(self, other: "ClickHouseVersion") -> bool:
def __lt__(self, other: Any) -> bool:
if not isinstance(self, type(other)):
return NotImplemented
for part in ("major", "minor", "patch", "tweak"):
if getattr(self, part) < getattr(other, part):
return True
@ -220,10 +226,11 @@ ClickHouseVersions = List[ClickHouseVersion]
class VersionType:
LTS = "lts"
NEW = "new"
PRESTABLE = "prestable"
STABLE = "stable"
TESTING = "testing"
VALID = (TESTING, PRESTABLE, STABLE, LTS)
VALID = (NEW, TESTING, PRESTABLE, STABLE, LTS)
def validate_version(version: str) -> None:
@ -234,43 +241,56 @@ def validate_version(version: str) -> None:
int(part)
def get_abs_path(path: str) -> str:
return p.abspath(p.join(git_runner.cwd, path))
def get_abs_path(path: Union[Path, str]) -> Path:
return (Path(git_runner.cwd) / path).absolute()
def read_versions(versions_path: str = FILE_WITH_VERSION_PATH) -> VERSIONS:
def read_versions(versions_path: Union[Path, str] = FILE_WITH_VERSION_PATH) -> VERSIONS:
versions = {}
path_to_file = get_abs_path(versions_path)
with open(path_to_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line.startswith("SET("):
continue
for line in get_abs_path(versions_path).read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line.startswith("SET("):
continue
value = 0 # type: Union[int, str]
name, value = line[4:-1].split(maxsplit=1)
name = removeprefix(name, "VERSION_").lower()
try:
value = int(value)
except ValueError:
pass
versions[name] = value
value = 0 # type: Union[int, str]
name, value = line[4:-1].split(maxsplit=1)
name = removeprefix(name, "VERSION_").lower()
try:
value = int(value)
except ValueError:
pass
versions[name] = value
return versions
def get_version_from_repo(
versions_path: str = FILE_WITH_VERSION_PATH,
versions_path: Union[Path, str] = FILE_WITH_VERSION_PATH,
git: Optional[Git] = None,
) -> ClickHouseVersion:
"""Get a ClickHouseVersion from FILE_WITH_VERSION_PATH. When the `git` parameter is
present, a proper `tweak` version part is calculated for case if the latest tag has
a `new` type and greater than version in `FILE_WITH_VERSION_PATH`"""
versions = read_versions(versions_path)
return ClickHouseVersion(
cmake_version = ClickHouseVersion(
versions["major"],
versions["minor"],
versions["patch"],
versions["revision"],
git,
)
# Since 24.5 we have tags like v24.6.1.1-new, and we must check if the release
# branch already has it's own commit. It's necessary for a proper tweak version
if git is not None and git.latest_tag:
version_from_tag = get_version_from_tag(git.latest_tag)
if (
version_from_tag.description == VersionType.NEW
and cmake_version < version_from_tag
):
# We are in a new release branch without existing release.
# We should change the tweak version to a `tweak_to_new`
cmake_version.tweak = git.tweak_to_new
return cmake_version
def get_version_from_string(
@ -350,15 +370,15 @@ def get_supported_versions(
def update_cmake_version(
version: ClickHouseVersion,
versions_path: str = FILE_WITH_VERSION_PATH,
versions_path: Union[Path, str] = FILE_WITH_VERSION_PATH,
) -> None:
path_to_file = get_abs_path(versions_path)
with open(path_to_file, "w", encoding="utf-8") as f:
f.write(VERSIONS_TEMPLATE.format_map(version.as_dict()))
get_abs_path(versions_path).write_text(
VERSIONS_TEMPLATE.format_map(version.as_dict()), encoding="utf-8"
)
def update_contributors(
relative_contributors_path: str = GENERATED_CONTRIBUTORS,
relative_contributors_path: Union[Path, str] = GENERATED_CONTRIBUTORS,
force: bool = False,
raise_error: bool = False,
) -> None:
@ -378,13 +398,11 @@ def update_contributors(
)
contributors = [f' "{c}",' for c in contributors]
executer = p.relpath(p.realpath(__file__), git_runner.cwd)
executer = Path(__file__).relative_to(git_runner.cwd)
content = CONTRIBUTORS_TEMPLATE.format(
executer=executer, contributors="\n".join(contributors)
)
contributors_path = get_abs_path(relative_contributors_path)
with open(contributors_path, "w", encoding="utf-8") as cfd:
cfd.write(content)
get_abs_path(relative_contributors_path).write_text(content, encoding="utf-8")
def update_version_local(version, version_type="testing"):

View File

@ -2,6 +2,7 @@
<profiles>
<default>
<s3_retry_attempts>5</s3_retry_attempts>
<backup_restore_s3_retry_attempts>5</backup_restore_s3_retry_attempts>
</default>
</profiles>
<users>

View File

@ -0,0 +1,12 @@
<clickhouse>
<named_collections_storage>
<type>zookeeper</type>
<path>/named_collections_path/</path>
<update_timeout_ms>5000</update_timeout_ms>
</named_collections_storage>
<named_collections>
<collection1>
<key1>value1</key1>
</collection1>
</named_collections>
</clickhouse>

View File

@ -9,6 +9,8 @@ NAMED_COLLECTIONS_CONFIG = os.path.join(
SCRIPT_DIR, "./configs/config.d/named_collections.xml"
)
ZK_PATH = "/named_collections_path"
@pytest.fixture(scope="module")
def cluster():
@ -24,6 +26,28 @@ def cluster():
],
stay_alive=True,
)
cluster.add_instance(
"node_with_keeper",
main_configs=[
"configs/config.d/named_collections_with_zookeeper.xml",
],
user_configs=[
"configs/users.d/users.xml",
],
stay_alive=True,
with_zookeeper=True,
)
cluster.add_instance(
"node_with_keeper_2",
main_configs=[
"configs/config.d/named_collections_with_zookeeper.xml",
],
user_configs=[
"configs/users.d/users.xml",
],
stay_alive=True,
with_zookeeper=True,
)
cluster.add_instance(
"node_only_named_collection_control",
main_configs=[
@ -447,8 +471,16 @@ def test_config_reload(cluster):
)
def test_sql_commands(cluster):
node = cluster.instances["node"]
@pytest.mark.parametrize("with_keeper", [False, True])
def test_sql_commands(cluster, with_keeper):
zk = None
node = None
if with_keeper:
node = cluster.instances["node_with_keeper"]
zk = cluster.get_kazoo_client("zoo1")
else:
node = cluster.instances["node"]
assert "1" == node.query("select count() from system.named_collections").strip()
node.query("CREATE NAMED COLLECTION collection2 AS key1=1, key2='value2'")
@ -479,6 +511,14 @@ def test_sql_commands(cluster):
"select collection['key2'] from system.named_collections where name = 'collection2'"
).strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 1 == len(children)
assert "collection2.sql" in children
assert (
b"CREATE NAMED COLLECTION collection2 AS key1 = 1, key2 = 'value2'"
in zk.get(ZK_PATH + "/collection2.sql")[0]
)
check_created()
node.restart_clickhouse()
@ -508,6 +548,15 @@ def test_sql_commands(cluster):
).strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 1 == len(children)
assert "collection2.sql" in children
assert (
b"CREATE NAMED COLLECTION collection2 AS key1 = 4, key2 = 'value2', key3 = 'value3'"
in zk.get(ZK_PATH + "/collection2.sql")[0]
)
check_altered()
node.restart_clickhouse()
check_altered()
@ -522,6 +571,15 @@ def test_sql_commands(cluster):
).strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 1 == len(children)
assert "collection2.sql" in children
assert (
b"CREATE NAMED COLLECTION collection2 AS key1 = 4, key3 = 'value3'"
in zk.get(ZK_PATH + "/collection2.sql")[0]
)
check_deleted()
node.restart_clickhouse()
check_deleted()
@ -529,6 +587,7 @@ def test_sql_commands(cluster):
node.query(
"ALTER NAMED COLLECTION collection2 SET key3=3, key4='value4' DELETE key1"
)
time.sleep(2)
def check_altered_and_deleted():
assert (
@ -552,6 +611,15 @@ def test_sql_commands(cluster):
).strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 1 == len(children)
assert "collection2.sql" in children
assert (
b"CREATE NAMED COLLECTION collection2 AS key3 = 3, key4 = 'value4'"
in zk.get(ZK_PATH + "/collection2.sql")[0]
)
check_altered_and_deleted()
node.restart_clickhouse()
check_altered_and_deleted()
@ -564,7 +632,132 @@ def test_sql_commands(cluster):
"collection1"
== node.query("select name from system.named_collections").strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 0 == len(children)
check_dropped()
node.restart_clickhouse()
check_dropped()
def test_keeper_storage(cluster):
node1 = cluster.instances["node_with_keeper"]
node2 = cluster.instances["node_with_keeper_2"]
zk = cluster.get_kazoo_client("zoo1")
assert "1" == node1.query("select count() from system.named_collections").strip()
assert "1" == node2.query("select count() from system.named_collections").strip()
node1.query("CREATE NAMED COLLECTION collection2 AS key1=1, key2='value2'")
def check_created(node):
assert (
"collection1\ncollection2"
== node.query("select name from system.named_collections").strip()
)
assert (
"['key1','key2']"
== node.query(
"select mapKeys(collection) from system.named_collections where name = 'collection2'"
).strip()
)
assert (
"1"
== node.query(
"select collection['key1'] from system.named_collections where name = 'collection2'"
).strip()
)
assert (
"value2"
== node.query(
"select collection['key2'] from system.named_collections where name = 'collection2'"
).strip()
)
children = zk.get_children(ZK_PATH)
assert 1 == len(children)
assert "collection2.sql" in children
assert (
b"CREATE NAMED COLLECTION collection2 AS key1 = 1, key2 = 'value2'"
in zk.get(ZK_PATH + "/collection2.sql")[0]
)
check_created(node1)
check_created(node2)
node1.restart_clickhouse()
node2.restart_clickhouse()
check_created(node1)
check_created(node2)
node2.query("ALTER NAMED COLLECTION collection2 SET key1=4, key3='value3'")
time.sleep(5)
def check_altered(node):
assert (
"['key1','key2','key3']"
== node.query(
"select mapKeys(collection) from system.named_collections where name = 'collection2'"
).strip()
)
assert (
"4"
== node.query(
"select collection['key1'] from system.named_collections where name = 'collection2'"
).strip()
)
assert (
"value3"
== node.query(
"select collection['key3'] from system.named_collections where name = 'collection2'"
).strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 1 == len(children)
assert "collection2.sql" in children
assert (
b"CREATE NAMED COLLECTION collection2 AS key1 = 4, key2 = 'value2', key3 = 'value3'"
in zk.get(ZK_PATH + "/collection2.sql")[0]
)
check_altered(node2)
check_altered(node1)
node1.restart_clickhouse()
node2.restart_clickhouse()
check_altered(node1)
check_altered(node2)
node1.query("DROP NAMED COLLECTION collection2")
time.sleep(5)
def check_dropped(node):
assert "1" == node.query("select count() from system.named_collections").strip()
assert (
"collection1"
== node.query("select name from system.named_collections").strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 0 == len(children)
check_dropped(node1)
check_dropped(node2)
node1.restart_clickhouse()
node2.restart_clickhouse()
check_dropped(node1)
check_dropped(node2)

View File

@ -0,0 +1,10 @@
<test>
<create_query>CREATE TABLE test (id Int32, x1 Nullable(Int32), x2 Nullable(Float32)) ENGINE = MergeTree() ORDER BY id</create_query>
<fill_query>INSERT INTO test SELECT number, number+1, number + 2 FROM numbers(1000000)</fill_query>
<query tag='LEAST'>SELECT COUNT(1) FROM test WHERE least(x1, x2) > 1</query>
<query tag='GREATEST'>SELECT COUNT(1) FROM test WHERE GREATEST(x1, x2) > 1</query>
<drop_query>DROP TABLE IF EXISTS test</drop_query>
</test>

View File

@ -11,9 +11,11 @@ ${CLICKHOUSE_CLIENT} --query="CREATE TABLE single_col_partition_key(x UInt32) EN
${CLICKHOUSE_CLIENT} --query="INSERT INTO single_col_partition_key VALUES (1), (2), (3), (4), (11), (12), (20)"
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM single_col_partition_key WHERE x < 3 FORMAT XML" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM single_col_partition_key WHERE x >= 11 FORMAT XML" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM single_col_partition_key WHERE x = 20 FORMAT XML" | grep -F rows_read | sed 's/^[ \t]*//g'
DISABLE_COUNT_OPTIMIZATION="SETTINGS optimize_trivial_count_query = 0, optimize_use_implicit_projections = 0"
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM single_col_partition_key WHERE x < 3 FORMAT XML $DISABLE_COUNT_OPTIMIZATION" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM single_col_partition_key WHERE x >= 11 FORMAT XML $DISABLE_COUNT_OPTIMIZATION" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM single_col_partition_key WHERE x = 20 FORMAT XML $DISABLE_COUNT_OPTIMIZATION" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="DROP TABLE single_col_partition_key"
@ -31,14 +33,14 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO composite_partition_key VALUES \
${CLICKHOUSE_CLIENT} --query="INSERT INTO composite_partition_key VALUES \
(301, 20, 3), (302, 21, 3), (303, 22, 3)"
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE a > 400 FORMAT XML SETTINGS optimize_trivial_count_query = 0" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE b = 11 FORMAT XML SETTINGS optimize_trivial_count_query = 0" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE c = 4 FORMAT XML SETTINGS optimize_trivial_count_query = 0" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE a > 400 FORMAT XML $DISABLE_COUNT_OPTIMIZATION" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE b = 11 FORMAT XML $DISABLE_COUNT_OPTIMIZATION" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE c = 4 FORMAT XML $DISABLE_COUNT_OPTIMIZATION" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE a < 200 AND c = 2 FORMAT XML SETTINGS optimize_trivial_count_query = 0" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE a = 301 AND b < 20 FORMAT XML SETTINGS optimize_trivial_count_query = 0" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE b >= 12 AND c = 2 FORMAT XML SETTINGS optimize_trivial_count_query = 0" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE a < 200 AND c = 2 FORMAT XML $DISABLE_COUNT_OPTIMIZATION" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE a = 301 AND b < 20 FORMAT XML $DISABLE_COUNT_OPTIMIZATION" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE b >= 12 AND c = 2 FORMAT XML $DISABLE_COUNT_OPTIMIZATION" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE a = 301 AND b = 21 AND c = 3 FORMAT XML SETTINGS optimize_trivial_count_query = 0" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM composite_partition_key WHERE a = 301 AND b = 21 AND c = 3 FORMAT XML $DISABLE_COUNT_OPTIMIZATION" | grep -F rows_read | sed 's/^[ \t]*//g'
${CLICKHOUSE_CLIENT} --query="DROP TABLE composite_partition_key"

View File

@ -45,6 +45,7 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO enum_test_table VALUES ('hello'), ('wo
${CLICKHOUSE_CLIENT} --query="INSERT INTO date_test_table VALUES (1), (2), (2), (256), (257), (257);"
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=debug/g')
CLICKHOUSE_CLIENT="${CLICKHOUSE_CLIENT} --optimize_use_implicit_projections 0"
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM string_test_table WHERE toUInt64(val) == 0;" 2>&1 |grep -q "3 marks to read from 1 ranges" && echo "no monotonic int case: String -> UInt64"
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM fixed_string_test_table WHERE toUInt64(val) == 0;" 2>&1 |grep -q "3 marks to read from 1 ranges" && echo "no monotonic int case: FixedString -> UInt64"

View File

@ -0,0 +1,3 @@
ReadFromMergeTree (default.x)
ReadFromPreparedSource (Optimized trivial count)
5

View File

@ -0,0 +1,8 @@
drop table if exists x;
create table x (i int) engine MergeTree order by i settings index_granularity = 3;
insert into x select * from numbers(10);
select * from (explain select count() from x where (i >= 3 and i <= 6) or i = 7) where explain like '%ReadFromPreparedSource%' or explain like '%ReadFromMergeTree%';
select count() from x where (i >= 3 and i <= 6) or i = 7;
drop table x;

View File

@ -15,6 +15,6 @@ SELECT isNull(t0.c0) OR COUNT('\n?pVa')
FROM t0
GROUP BY t0.c0
HAVING isNull(isNull(t0.c0))
SETTINGS aggregate_functions_null_for_empty = 1, enable_optimize_predicate_expression = 0;
SETTINGS aggregate_functions_null_for_empty = 1, enable_optimize_predicate_expression = 0 format Null;
drop table if exists t0;

View File

@ -10,11 +10,15 @@ QUERY id: 0
JOIN TREE
TABLE id: 3, alias: __table1, table_name: default.t_logical_expressions_optimizer_low_cardinality
WHERE
FUNCTION id: 4, function_name: in, function_type: ordinary, result_type: UInt8
FUNCTION id: 4, function_name: or, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 6, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
FUNCTION id: 6, function_name: in, function_type: ordinary, result_type: LowCardinality(UInt8)
ARGUMENTS
LIST id: 7, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 8, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
CONSTANT id: 9, constant_value: UInt64_0, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT a
FROM t_logical_expressions_optimizer_low_cardinality
@ -28,11 +32,15 @@ QUERY id: 0
JOIN TREE
TABLE id: 3, alias: __table1, table_name: default.t_logical_expressions_optimizer_low_cardinality
WHERE
FUNCTION id: 4, function_name: in, function_type: ordinary, result_type: UInt8
FUNCTION id: 4, function_name: or, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 6, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
FUNCTION id: 6, function_name: in, function_type: ordinary, result_type: LowCardinality(UInt8)
ARGUMENTS
LIST id: 7, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 8, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
CONSTANT id: 9, constant_value: UInt64_0, constant_value_type: UInt8
SETTINGS allow_experimental_analyzer=1
SELECT a
FROM t_logical_expressions_optimizer_low_cardinality
@ -46,11 +54,15 @@ QUERY id: 0
JOIN TREE
TABLE id: 3, alias: __table1, table_name: default.t_logical_expressions_optimizer_low_cardinality
WHERE
FUNCTION id: 4, function_name: notIn, function_type: ordinary, result_type: UInt8
FUNCTION id: 4, function_name: _CAST, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 6, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
FUNCTION id: 6, function_name: notIn, function_type: ordinary, result_type: LowCardinality(UInt8)
ARGUMENTS
LIST id: 7, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 8, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
CONSTANT id: 9, constant_value: \'UInt8\', constant_value_type: String
SETTINGS allow_experimental_analyzer=1
SELECT a
FROM t_logical_expressions_optimizer_low_cardinality
@ -64,11 +76,15 @@ QUERY id: 0
JOIN TREE
TABLE id: 3, alias: __table1, table_name: default.t_logical_expressions_optimizer_low_cardinality
WHERE
FUNCTION id: 4, function_name: notIn, function_type: ordinary, result_type: UInt8
FUNCTION id: 4, function_name: _CAST, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 6, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
FUNCTION id: 6, function_name: notIn, function_type: ordinary, result_type: LowCardinality(UInt8)
ARGUMENTS
LIST id: 7, nodes: 2
COLUMN id: 2, column_name: a, result_type: LowCardinality(String), source_id: 3
CONSTANT id: 8, constant_value: Tuple_(\'x\', \'y\'), constant_value_type: Tuple(String, String)
CONSTANT id: 9, constant_value: \'UInt8\', constant_value_type: String
SETTINGS allow_experimental_analyzer=1
SELECT a
FROM t_logical_expressions_optimizer_low_cardinality

View File

@ -10,7 +10,7 @@ SELECT key, value FROM dict_flat ORDER BY key ASC;
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_flat ORDER BY key ASC;
1 First
2 SecondUpdated
@ -27,7 +27,7 @@ SELECT key, value FROM dict_flat_custom ORDER BY key ASC;
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_flat_custom ORDER BY key ASC;
1 First
2 SecondUpdated
@ -44,7 +44,7 @@ SELECT key, value FROM dict_hashed ORDER BY key ASC;
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_hashed ORDER BY key ASC;
1 First
2 SecondUpdated
@ -61,7 +61,7 @@ SELECT key, value FROM dict_hashed_custom ORDER BY key ASC;
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_hashed_custom ORDER BY key ASC;
1 First
2 SecondUpdated
@ -78,7 +78,7 @@ SELECT key, value FROM dict_complex_key_hashed ORDER BY key ASC;
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_complex_key_hashed ORDER BY key ASC;
1 First
2 SecondUpdated
@ -95,7 +95,7 @@ SELECT key, value FROM dict_complex_key_hashed_custom ORDER BY key ASC;
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_complex_key_hashed_custom ORDER BY key ASC;
1 First
2 SecondUpdated

View File

@ -60,7 +60,7 @@ for layout in "${layouts[@]}"; do
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM $dictionary_name ORDER BY key ASC;
-- { echoOff }

View File

@ -206,7 +206,7 @@ select count(), sum(number) from file('02892.orc', ORC, 'number UInt64, negative
select count(), min(negative_or_null), max(negative_or_null) from file('02892.orc', ORC, 'number UInt64, negative_or_null Int64') where (negative_or_null < -500);
596 -1099 -501
select count(), sum(number) from file('02892.orc', ORC, 'number UInt64, negative_or_null Int64') where indexHint(negative_or_null is null);
1000 499500
0 0
select count(), min(negative_or_null), max(negative_or_null) from file('02892.orc', ORC, 'number UInt64, negative_or_null Int64') where (negative_or_null is null);
0 0 0
select count(), sum(number) from file('02892.orc', ORC, 'number UInt64, negative_or_null Int64') where indexHint(negative_or_null in (0, -1, -10, -100, -1000));

View File

@ -0,0 +1 @@
CREATE VIEW default.test_table_comment AS (SELECT toString({date_from:String})) COMMENT \'test comment\'

View File

@ -0,0 +1,5 @@
DROP TABLE IF EXISTS test_table_comment;
CREATE VIEW test_table_comment AS SELECT toString({date_from:String});
ALTER TABLE test_table_comment MODIFY COMMENT 'test comment';
SELECT create_table_query FROM system.tables WHERE name = 'test_table_comment' AND database = currentDatabase();
DROP TABLE test_table_comment;

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: no-random-merge-tree-settings
# Tags: no-random-settings, no-random-merge-tree-settings
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
@ -8,7 +8,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
${CLICKHOUSE_CLIENT} -nq "
DROP TABLE IF EXISTS t;
CREATE TABLE t(a UInt32, b UInt32, c UInt32, d UInt32) ENGINE=MergeTree ORDER BY a SETTINGS min_bytes_for_wide_part=1, min_rows_for_wide_part=1;
CREATE TABLE t(a UInt32, b UInt32, c UInt32, d UInt32) ENGINE=MergeTree ORDER BY a SETTINGS min_bytes_for_wide_part=0, min_rows_for_wide_part=0;
INSERT INTO t SELECT number, number, number, number FROM numbers_mt(1e7);

View File

@ -0,0 +1,4 @@
65409
16
128
2363

View File

@ -0,0 +1,25 @@
-- Tags: no-fasttest, no-parallel, no-random-settings
set max_insert_threads=1;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet);
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_max_block_size=16;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;
CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30720;
INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000);
SELECT max(blockSize()) FROM test_parquet;
DROP TABLE IF EXISTS test_parquet;

View File

@ -0,0 +1 @@
ReadFromPreparedSource (Optimized trivial count)

View File

@ -0,0 +1,30 @@
CREATE TABLE checks
(
`pull_request_number` UInt32,
`commit_sha` LowCardinality(String),
`check_name` LowCardinality(String),
`check_status` LowCardinality(String),
`check_duration_ms` UInt64,
`check_start_time` DateTime,
`test_name` LowCardinality(String),
`test_status` LowCardinality(String),
`test_duration_ms` UInt64,
`report_url` String,
`pull_request_url` String,
`commit_url` String,
`task_url` String,
`base_ref` String,
`base_repo` String,
`head_ref` String,
`head_repo` String,
`test_context_raw` String,
`instance_type` LowCardinality(String),
`instance_id` String,
`date` Date MATERIALIZED toDate(check_start_time)
)
ENGINE = MergeTree ORDER BY (date, pull_request_number, commit_sha, check_name, test_name, check_start_time);
insert into checks select * from generateRandom() limit 1;
select trimLeft(explain) from (explain SELECT count(1) FROM checks WHERE test_name IS NOT NULL) where explain like '%ReadFromPreparedSource%' SETTINGS allow_experimental_analyzer = 1, allow_experimental_parallel_reading_from_replicas = 0;

View File

@ -127,7 +127,9 @@ CREATE TABLE 03165_token_ft
INDEX idx_message message TYPE full_text() GRANULARITY 1
)
ENGINE = MergeTree
ORDER BY id;
ORDER BY id
-- Full text index works only with full parts.
SETTINGS min_bytes_for_full_part_storage=0;
INSERT INTO 03165_token_ft VALUES(1, 'Service is not ready');

View File

@ -1,6 +1,8 @@
-- Tags: no-replicated-database
-- Tag no-replicated-database: Requires investigation
SET optimize_use_implicit_projections = 0;
EXPLAIN ESTIMATE SELECT count() FROM test.hits WHERE CounterID = 29103473;
EXPLAIN ESTIMATE SELECT count() FROM test.hits WHERE CounterID != 29103473;
EXPLAIN ESTIMATE SELECT count() FROM test.hits WHERE CounterID > 29103473;

View File

@ -1,3 +1,5 @@
SET optimize_use_implicit_projections = 0;
-- the work for scalar subquery is properly accounted:
SET max_rows_to_read = 1000000;
SELECT 1 = (SELECT count() FROM test.hits WHERE NOT ignore(AdvEngineID)); -- { serverError TOO_MANY_ROWS }