Merge remote-tracking branch 'origin/24.3' into backport/24.3/64801

This commit is contained in:
Dmitry Novik 2024-07-30 17:25:14 +02:00
commit d478e0e16d
211 changed files with 3243 additions and 621 deletions

View File

@ -3,7 +3,7 @@
#include <base/defines.h>
#include <fstream>
#include <sstream>
#include <string>
bool cgroupsV2Enabled()
@ -40,7 +40,7 @@ bool cgroupsV2MemoryControllerEnabled()
#endif
}
std::string cgroupV2OfProcess()
std::filesystem::path cgroupV2PathOfProcess()
{
#if defined(OS_LINUX)
chassert(cgroupsV2Enabled());
@ -48,17 +48,18 @@ std::string cgroupV2OfProcess()
/// A simpler way to get the membership is:
std::ifstream cgroup_name_file("/proc/self/cgroup");
if (!cgroup_name_file.is_open())
return "";
return {};
/// With cgroups v2, there will be a *single* line with prefix "0::/"
/// (see https://docs.kernel.org/admin-guide/cgroup-v2.html)
std::string cgroup;
std::getline(cgroup_name_file, cgroup);
static const std::string v2_prefix = "0::/";
if (!cgroup.starts_with(v2_prefix))
return "";
return {};
cgroup = cgroup.substr(v2_prefix.length());
return cgroup;
/// Note: The 'root' cgroup can have an empty cgroup name, this is valid
return default_cgroups_mount / cgroup;
#else
return "";
return {};
#endif
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <filesystem>
#include <string>
#if defined(OS_LINUX)
/// I think it is possible to mount the cgroups hierarchy somewhere else (e.g. when in containers).
@ -16,7 +15,7 @@ bool cgroupsV2Enabled();
/// Assumes that cgroupsV2Enabled() is enabled.
bool cgroupsV2MemoryControllerEnabled();
/// Which cgroup does the process belong to?
/// Returns an empty string if the cgroup cannot be determined.
/// Detects which cgroup v2 the process belongs to and returns the filesystem path to the cgroup.
/// Returns an empty path the cgroup cannot be determined.
/// Assumes that cgroupsV2Enabled() is enabled.
std::string cgroupV2OfProcess();
std::filesystem::path cgroupV2PathOfProcess();

View File

@ -23,8 +23,9 @@ std::optional<uint64_t> getCgroupsV2MemoryLimit()
if (!cgroupsV2MemoryControllerEnabled())
return {};
std::string cgroup = cgroupV2OfProcess();
auto current_cgroup = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup);
std::filesystem::path current_cgroup = cgroupV2PathOfProcess();
if (current_cgroup.empty())
return {};
/// Open the bottom-most nested memory limit setting file. If there is no such file at the current
/// level, try again at the parent level as memory settings are inherited.

View File

@ -1,12 +1,12 @@
# This variables autochanged by tests/ci/version_helper.py:
# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# NOTE: VERSION_REVISION has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
SET(VERSION_REVISION 54484)
SET(VERSION_MAJOR 24)
SET(VERSION_MINOR 3)
SET(VERSION_PATCH 4)
SET(VERSION_GITHASH 7e7f3bdd9be3ced03925d1d602037db8687e6401)
SET(VERSION_DESCRIBE v24.3.4.1-lts)
SET(VERSION_STRING 24.3.4.1)
SET(VERSION_PATCH 6)
SET(VERSION_GITHASH fe54cead6b6eaa09f22cee3ceb9f607eecadc859)
SET(VERSION_DESCRIBE v24.3.6.1-lts)
SET(VERSION_STRING 24.3.6.1)
# end of autochange

View File

@ -41,8 +41,7 @@
"docker/test/stateless": {
"name": "clickhouse/stateless-test",
"dependent": [
"docker/test/stateful",
"docker/test/unit"
"docker/test/stateful"
]
},
"docker/test/stateful": {
@ -122,15 +121,16 @@
"docker/test/base": {
"name": "clickhouse/test-base",
"dependent": [
"docker/test/clickbench",
"docker/test/fuzzer",
"docker/test/libfuzzer",
"docker/test/integration/base",
"docker/test/keeper-jepsen",
"docker/test/libfuzzer",
"docker/test/server-jepsen",
"docker/test/sqllogic",
"docker/test/sqltest",
"docker/test/clickbench",
"docker/test/stateless"
"docker/test/stateless",
"docker/test/unit"
]
},
"docker/test/integration/kerberized_hadoop": {

View File

@ -1,9 +1,7 @@
# rebuild in #33610
# docker build -t clickhouse/unit-test .
ARG FROM_TAG=latest
FROM clickhouse/stateless-test:$FROM_TAG
RUN apt-get install gdb
FROM clickhouse/test-base:$FROM_TAG
COPY run.sh /
CMD ["/bin/bash", "/run.sh"]

View File

@ -366,9 +366,10 @@ try
}
GlobalThreadPool::initialize(
config().getUInt("max_thread_pool_size", 100),
config().getUInt("max_thread_pool_free_size", 1000),
config().getUInt("thread_pool_queue_size", 10000)
/// We need to have sufficient amount of threads for connections + nuraft workers + keeper workers, 1000 is an estimation
std::min(1000U, config().getUInt("max_thread_pool_size", 1000)),
config().getUInt("max_thread_pool_free_size", 100),
config().getUInt("thread_pool_queue_size", 1000)
);
/// Wait for all threads to avoid possible use-after-free (for example logging objects can be already destroyed).
SCOPE_EXIT({

View File

@ -261,7 +261,24 @@ AccessControl::AccessControl()
}
AccessControl::~AccessControl() = default;
AccessControl::~AccessControl()
{
try
{
AccessControl::shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void AccessControl::shutdown()
{
MultipleAccessStorage::shutdown();
removeAllStorages();
}
void AccessControl::setUpFromMainConfig(const Poco::Util::AbstractConfiguration & config_, const String & config_path_,

View File

@ -53,6 +53,9 @@ public:
AccessControl();
~AccessControl() override;
/// Shutdown the access control and stops all background activity.
void shutdown() override;
/// Initializes access storage (user directories).
void setUpFromMainConfig(const Poco::Util::AbstractConfiguration & config_, const String & config_path_,
const zkutil::GetZooKeeper & get_zookeeper_function_);

View File

@ -314,10 +314,13 @@ void ContextAccess::setUser(const UserPtr & user_) const
subscription_for_roles_changes.reset();
enabled_roles = access_control->getEnabledRoles(current_roles, current_roles_with_admin_option);
subscription_for_roles_changes = enabled_roles->subscribeForChanges([this](const std::shared_ptr<const EnabledRolesInfo> & roles_info_)
subscription_for_roles_changes = enabled_roles->subscribeForChanges([weak_ptr = weak_from_this()](const std::shared_ptr<const EnabledRolesInfo> & roles_info_)
{
std::lock_guard lock{mutex};
setRolesInfo(roles_info_);
auto ptr = weak_ptr.lock();
if (!ptr)
return;
std::lock_guard lock{ptr->mutex};
ptr->setRolesInfo(roles_info_);
});
setRolesInfo(enabled_roles->getRolesInfo());

View File

@ -194,11 +194,9 @@ DiskAccessStorage::DiskAccessStorage(const String & storage_name_, const String
DiskAccessStorage::~DiskAccessStorage()
{
stopListsWritingThread();
try
{
writeLists();
DiskAccessStorage::shutdown();
}
catch (...)
{
@ -207,6 +205,17 @@ DiskAccessStorage::~DiskAccessStorage()
}
void DiskAccessStorage::shutdown()
{
stopListsWritingThread();
{
std::lock_guard lock{mutex};
writeLists();
}
}
String DiskAccessStorage::getStorageParamsJSON() const
{
std::lock_guard lock{mutex};

View File

@ -18,6 +18,8 @@ public:
DiskAccessStorage(const String & storage_name_, const String & directory_path_, AccessChangesNotifier & changes_notifier_, bool readonly_, bool allow_backup_);
~DiskAccessStorage() override;
void shutdown() override;
const char * getStorageType() const override { return STORAGE_TYPE; }
String getStorageParamsJSON() const override;

View File

@ -44,6 +44,11 @@ public:
explicit IAccessStorage(const String & storage_name_) : storage_name(storage_name_) {}
virtual ~IAccessStorage() = default;
/// If the AccessStorage has to do some complicated work when destroying - do it in advance.
/// For example, if the AccessStorage contains any threads for background work - ask them to complete and wait for completion.
/// By default, does nothing.
virtual void shutdown() {}
/// Returns the name of this storage.
const String & getStorageName() const { return storage_name; }
virtual const char * getStorageType() const = 0;

View File

@ -34,11 +34,23 @@ MultipleAccessStorage::MultipleAccessStorage(const String & storage_name_)
MultipleAccessStorage::~MultipleAccessStorage()
{
/// It's better to remove the storages in the reverse order because they could depend on each other somehow.
try
{
MultipleAccessStorage::shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void MultipleAccessStorage::shutdown()
{
/// It's better to shutdown the storages in the reverse order because they could depend on each other somehow.
const auto storages = getStoragesPtr();
for (const auto & storage : *storages | boost::adaptors::reversed)
{
removeStorage(storage);
storage->shutdown();
}
}
@ -72,6 +84,16 @@ void MultipleAccessStorage::removeStorage(const StoragePtr & storage_to_remove)
ids_cache.clear();
}
void MultipleAccessStorage::removeAllStorages()
{
/// It's better to remove the storages in the reverse order because they could depend on each other somehow.
const auto storages = getStoragesPtr();
for (const auto & storage : *storages | boost::adaptors::reversed)
{
removeStorage(storage);
}
}
std::vector<StoragePtr> MultipleAccessStorage::getStorages()
{
return *getStoragesPtr();

View File

@ -21,6 +21,8 @@ public:
explicit MultipleAccessStorage(const String & storage_name_ = STORAGE_TYPE);
~MultipleAccessStorage() override;
void shutdown() override;
const char * getStorageType() const override { return STORAGE_TYPE; }
bool isReadOnly() const override;
bool isReadOnly(const UUID & id) const override;
@ -32,6 +34,7 @@ public:
void setStorages(const std::vector<StoragePtr> & storages);
void addStorage(const StoragePtr & new_storage);
void removeStorage(const StoragePtr & storage_to_remove);
void removeAllStorages();
std::vector<StoragePtr> getStorages();
std::vector<ConstStoragePtr> getStorages() const;
std::shared_ptr<const std::vector<StoragePtr>> getStoragesPtr();

View File

@ -66,6 +66,18 @@ ReplicatedAccessStorage::ReplicatedAccessStorage(
}
ReplicatedAccessStorage::~ReplicatedAccessStorage()
{
try
{
ReplicatedAccessStorage::shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void ReplicatedAccessStorage::shutdown()
{
stopWatchingThread();
}

View File

@ -23,6 +23,8 @@ public:
ReplicatedAccessStorage(const String & storage_name, const String & zookeeper_path, zkutil::GetZooKeeper get_zookeeper, AccessChangesNotifier & changes_notifier_, bool allow_backup);
~ReplicatedAccessStorage() override;
void shutdown() override;
const char * getStorageType() const override { return STORAGE_TYPE; }
void startPeriodicReloading() override { startWatchingThread(); }

View File

@ -91,7 +91,8 @@ public:
return std::make_shared<DataTypeNumber<PointType>>();
}
bool allocatesMemoryInArena() const override { return false; }
/// MaxIntersectionsData::Allocator uses the arena
bool allocatesMemoryInArena() const override { return true; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{

View File

@ -228,6 +228,11 @@ public:
return prefix_size + nested_func->sizeOfData();
}
size_t alignOfData() const override
{
return std::max(alignof(Data), nested_func->alignOfData());
}
void create(AggregateDataPtr __restrict place) const override
{
new (place) Data;

View File

@ -10,9 +10,12 @@
namespace DB
{
InterpolateNode::InterpolateNode(QueryTreeNodePtr expression_, QueryTreeNodePtr interpolate_expression_)
InterpolateNode::InterpolateNode(std::shared_ptr<IdentifierNode> expression_, QueryTreeNodePtr interpolate_expression_)
: IQueryTreeNode(children_size)
{
if (expression_)
expression_name = expression_->getIdentifier().getFullName();
children[expression_child_index] = std::move(expression_);
children[interpolate_expression_child_index] = std::move(interpolate_expression_);
}
@ -41,13 +44,23 @@ void InterpolateNode::updateTreeHashImpl(HashState &, CompareOptions) const
QueryTreeNodePtr InterpolateNode::cloneImpl() const
{
return std::make_shared<InterpolateNode>(nullptr /*expression*/, nullptr /*interpolate_expression*/);
auto cloned = std::make_shared<InterpolateNode>(nullptr /*expression*/, nullptr /*interpolate_expression*/);
cloned->expression_name = expression_name;
return cloned;
}
ASTPtr InterpolateNode::toASTImpl(const ConvertToASTOptions & options) const
{
auto result = std::make_shared<ASTInterpolateElement>();
result->column = getExpression()->toAST(options)->getColumnName();
/// Interpolate parser supports only identifier node.
/// In case of alias, identifier is replaced to expression, which can't be parsed.
/// In this case, keep original alias name.
if (const auto * identifier = getExpression()->as<IdentifierNode>())
result->column = identifier->toAST(options)->getColumnName();
else
result->column = expression_name;
result->children.push_back(getInterpolateExpression()->toAST(options));
result->expr = result->children.back();

View File

@ -1,6 +1,6 @@
#pragma once
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/IdentifierNode.h>
#include <Analyzer/ListNode.h>
namespace DB
@ -19,7 +19,7 @@ class InterpolateNode final : public IQueryTreeNode
{
public:
/// Initialize interpolate node with expression and interpolate expression
explicit InterpolateNode(QueryTreeNodePtr expression_, QueryTreeNodePtr interpolate_expression_);
explicit InterpolateNode(std::shared_ptr<IdentifierNode> expression_, QueryTreeNodePtr interpolate_expression_);
/// Get expression to interpolate
const QueryTreeNodePtr & getExpression() const
@ -61,6 +61,9 @@ protected:
ASTPtr toASTImpl(const ConvertToASTOptions & options) const override;
/// Initial name from column identifier.
std::string expression_name;
private:
static constexpr size_t expression_child_index = 0;
static constexpr size_t interpolate_expression_child_index = 1;

View File

@ -51,7 +51,7 @@ public:
using Base = InDepthQueryTreeVisitorWithContext<AggregateFunctionsArithmericOperationsVisitor>;
using Base::Base;
void leaveImpl(QueryTreeNodePtr & node)
void enterImpl(QueryTreeNodePtr & node)
{
if (!getSettings().optimize_arithmetic_operations_in_aggregate_functions)
return;

View File

@ -22,6 +22,7 @@ public:
if (query_node->hasOrderBy())
{
QueryTreeNodeConstRawPtrWithHashSet unique_expressions_nodes_set;
QueryTreeNodes result_nodes;
auto & query_order_by_nodes = query_node->getOrderBy().getNodes();
@ -45,10 +46,9 @@ public:
query_order_by_nodes = std::move(result_nodes);
}
unique_expressions_nodes_set.clear();
if (query_node->hasLimitBy())
{
QueryTreeNodeConstRawPtrWithHashSet unique_expressions_nodes_set;
QueryTreeNodes result_nodes;
auto & query_limit_by_nodes = query_node->getLimitBy().getNodes();
@ -63,9 +63,6 @@ public:
query_limit_by_nodes = std::move(result_nodes);
}
}
private:
QueryTreeNodeConstRawPtrWithHashSet unique_expressions_nodes_set;
};
}

View File

@ -2563,18 +2563,18 @@ std::string QueryAnalyzer::rewriteAggregateFunctionNameIfNeeded(
{
result_aggregate_function_name = settings.count_distinct_implementation;
}
else if (aggregate_function_name_lowercase == "countdistinctif" || aggregate_function_name_lowercase == "countifdistinct")
else if (aggregate_function_name_lowercase == "countifdistinct" ||
(settings.rewrite_count_distinct_if_with_count_distinct_implementation && aggregate_function_name_lowercase == "countdistinctif"))
{
result_aggregate_function_name = settings.count_distinct_implementation;
result_aggregate_function_name += "If";
}
/// Replace aggregateFunctionIfDistinct into aggregateFunctionDistinctIf to make execution more optimal
if (result_aggregate_function_name.ends_with("ifdistinct"))
else if (aggregate_function_name_lowercase.ends_with("ifdistinct"))
{
/// Replace aggregateFunctionIfDistinct into aggregateFunctionDistinctIf to make execution more optimal
size_t prefix_length = result_aggregate_function_name.size() - strlen("ifdistinct");
result_aggregate_function_name = result_aggregate_function_name.substr(0, prefix_length) + "DistinctIf";
}
}
bool need_add_or_null = settings.aggregate_functions_null_for_empty && !result_aggregate_function_name.ends_with("OrNull");
if (need_add_or_null)
@ -5654,6 +5654,17 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
resolveExpressionNode(in_second_argument, scope, false /*allow_lambda_expression*/, true /*allow_table_expression*/);
}
/// Edge case when the first argument of IN is scalar subquery.
auto & in_first_argument = function_in_arguments_nodes[0];
auto first_argument_type = in_first_argument->getNodeType();
if (first_argument_type == QueryTreeNodeType::QUERY || first_argument_type == QueryTreeNodeType::UNION)
{
IdentifierResolveScope subquery_scope(in_first_argument, &scope /*parent_scope*/);
subquery_scope.subquery_depth = scope.subquery_depth + 1;
evaluateScalarSubqueryIfNeeded(in_first_argument, subquery_scope);
}
}
/// Initialize function argument columns
@ -6152,14 +6163,14 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
function_base = function->build(argument_columns);
/// Do not constant fold get scalar functions
bool disable_constant_folding = function_name == "__getScalar" || function_name == "shardNum" ||
function_name == "shardCount" || function_name == "hostName" || function_name == "tcpPort";
// bool disable_constant_folding = function_name == "__getScalar" || function_name == "shardNum" ||
// function_name == "shardCount" || function_name == "hostName" || function_name == "tcpPort";
/** If function is suitable for constant folding try to convert it to constant.
* Example: SELECT plus(1, 1);
* Result: SELECT 2;
*/
if (function_base->isSuitableForConstantFolding() && !disable_constant_folding)
if (function_base->isSuitableForConstantFolding()) // && !disable_constant_folding)
{
auto result_type = function_base->getResultType();
auto executable_function = function_base->prepare(argument_columns);
@ -6568,6 +6579,10 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
node->convertToNullable();
break;
}
/// Check parent scopes until find current query scope.
if (scope_ptr->scope_node->getNodeType() == QueryTreeNodeType::QUERY)
break;
}
}
@ -6842,7 +6857,9 @@ void QueryAnalyzer::resolveInterpolateColumnsNodeList(QueryTreeNodePtr & interpo
auto * column_to_interpolate = interpolate_node_typed.getExpression()->as<IdentifierNode>();
if (!column_to_interpolate)
throw Exception(ErrorCodes::LOGICAL_ERROR, "INTERPOLATE can work only for indentifiers, but {} is found",
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"INTERPOLATE can work only for identifiers, but {} is found",
interpolate_node_typed.getExpression()->formatASTForErrorMessage());
auto column_to_interpolate_name = column_to_interpolate->getIdentifier().getFullName();

View File

@ -32,7 +32,7 @@ public:
return;
auto * function_node = node->as<FunctionNode>();
if (!function_node || !function_node->isAggregateFunction())
if (!function_node || !function_node->isAggregateFunction() || !function_node->getResultType()->equals(DataTypeUInt64()))
return;
auto function_name = function_node->getFunctionName();

View File

@ -41,49 +41,58 @@ public:
return;
bool replaced_argument = false;
auto & uniq_function_arguments_nodes = function_node->getArguments().getNodes();
auto replaced_uniq_function_arguments_nodes = function_node->getArguments().getNodes();
for (auto & uniq_function_argument_node : uniq_function_arguments_nodes)
/// Replace injective function with its single argument
auto remove_injective_function = [&replaced_argument](QueryTreeNodePtr & arg) -> bool
{
auto * uniq_function_argument_node_typed = uniq_function_argument_node->as<FunctionNode>();
if (!uniq_function_argument_node_typed || !uniq_function_argument_node_typed->isOrdinaryFunction())
continue;
auto & uniq_function_argument_node_argument_nodes = uniq_function_argument_node_typed->getArguments().getNodes();
auto * arg_typed = arg->as<FunctionNode>();
if (!arg_typed || !arg_typed->isOrdinaryFunction())
return false;
/// Do not apply optimization if injective function contains multiple arguments
if (uniq_function_argument_node_argument_nodes.size() != 1)
continue;
auto & arg_arguments_nodes = arg_typed->getArguments().getNodes();
if (arg_arguments_nodes.size() != 1)
return false;
const auto & uniq_function_argument_node_function = uniq_function_argument_node_typed->getFunction();
if (!uniq_function_argument_node_function->isInjective({}))
continue;
const auto & arg_function = arg_typed->getFunction();
if (!arg_function->isInjective({}))
return false;
/// Replace injective function with its single argument
uniq_function_argument_node = uniq_function_argument_node_argument_nodes[0];
replaced_argument = true;
arg = arg_arguments_nodes[0];
return replaced_argument = true;
};
for (auto & uniq_function_argument_node : replaced_uniq_function_arguments_nodes)
{
while (remove_injective_function(uniq_function_argument_node))
;
}
if (!replaced_argument)
return;
const auto & function_node_argument_nodes = function_node->getArguments().getNodes();
DataTypes replaced_argument_types;
replaced_argument_types.reserve(replaced_uniq_function_arguments_nodes.size());
DataTypes argument_types;
argument_types.reserve(function_node_argument_nodes.size());
for (const auto & function_node_argument : function_node_argument_nodes)
argument_types.emplace_back(function_node_argument->getResultType());
for (const auto & function_node_argument : replaced_uniq_function_arguments_nodes)
replaced_argument_types.emplace_back(function_node_argument->getResultType());
auto current_aggregate_function = function_node->getAggregateFunction();
AggregateFunctionProperties properties;
auto aggregate_function = AggregateFunctionFactory::instance().get(
auto replaced_aggregate_function = AggregateFunctionFactory::instance().get(
function_node->getFunctionName(),
NullsAction::EMPTY,
argument_types,
function_node->getAggregateFunction()->getParameters(),
replaced_argument_types,
current_aggregate_function->getParameters(),
properties);
function_node->resolveAsAggregateFunction(std::move(aggregate_function));
/// uniqCombined returns nullable with nullable arguments so the result type might change which breaks the pass
if (!replaced_aggregate_function->getResultType()->equals(*current_aggregate_function->getResultType()))
return;
function_node->getArguments().getNodes() = std::move(replaced_uniq_function_arguments_nodes);
function_node->resolveAsAggregateFunction(std::move(replaced_aggregate_function));
}
};

View File

@ -355,13 +355,10 @@ void ColumnAggregateFunction::updateHashWithValue(size_t n, SipHash & hash) cons
hash.update(wbuf.str().c_str(), wbuf.str().size());
}
void ColumnAggregateFunction::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnAggregateFunction::getWeakHash32() const
{
auto s = data.size();
if (hash.getData().size() != data.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), hash.getData().size());
WeakHash32 hash(s);
auto & hash_data = hash.getData();
std::vector<UInt8> v;
@ -372,6 +369,8 @@ void ColumnAggregateFunction::updateWeakHash32(WeakHash32 & hash) const
wbuf.finalize();
hash_data[i] = ::updateWeakHash32(v.data(), v.size(), hash_data[i]);
}
return hash;
}
void ColumnAggregateFunction::updateHashFast(SipHash & hash) const

View File

@ -172,7 +172,7 @@ public:
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override;

View File

@ -271,15 +271,12 @@ void ColumnArray::updateHashWithValue(size_t n, SipHash & hash) const
getData().updateHashWithValue(offset + i, hash);
}
void ColumnArray::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnArray::getWeakHash32() const
{
auto s = offsets->size();
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", s, hash.getData().size());
WeakHash32 hash(s);
WeakHash32 internal_hash(data->size());
data->updateWeakHash32(internal_hash);
WeakHash32 internal_hash = data->getWeakHash32();
Offset prev_offset = 0;
const auto & offsets_data = getOffsets();
@ -300,6 +297,8 @@ void ColumnArray::updateWeakHash32(WeakHash32 & hash) const
prev_offset = offsets_data[i];
}
return hash;
}
void ColumnArray::updateHashFast(SipHash & hash) const

View File

@ -82,7 +82,7 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
void insert(const Field & x) override;

View File

@ -3,6 +3,7 @@
#include <optional>
#include <Core/Field.h>
#include <Columns/IColumn.h>
#include <Common/WeakHash.h>
#include <IO/BufferWithOwnMemory.h>
@ -94,7 +95,7 @@ public:
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); }
void updateWeakHash32(WeakHash32 &) const override { throwMustBeDecompressed(); }
WeakHash32 getWeakHash32() const override { throwMustBeDecompressed(); }
void updateHashFast(SipHash &) const override { throwMustBeDecompressed(); }
ColumnPtr filter(const Filter &, ssize_t) const override { throwMustBeDecompressed(); }
void expand(const Filter &, bool) override { throwMustBeDecompressed(); }

View File

@ -137,18 +137,10 @@ void ColumnConst::updatePermutation(PermutationSortDirection /*direction*/, Perm
{
}
void ColumnConst::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnConst::getWeakHash32() const
{
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
WeakHash32 element_hash(1);
data->updateWeakHash32(element_hash);
size_t data_hash = element_hash.getData()[0];
for (auto & value : hash.getData())
value = static_cast<UInt32>(intHashCRC32(data_hash, value));
WeakHash32 element_hash = data->getWeakHash32();
return WeakHash32(s, element_hash.getData()[0]);
}
void ColumnConst::compareColumn(

View File

@ -190,7 +190,7 @@ public:
data->updateHashWithValue(0, hash);
}
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override
{

View File

@ -27,7 +27,6 @@ namespace ErrorCodes
extern const int PARAMETER_OUT_OF_BOUND;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
template <is_decimal T>
@ -71,13 +70,10 @@ void ColumnDecimal<T>::updateHashWithValue(size_t n, SipHash & hash) const
}
template <is_decimal T>
void ColumnDecimal<T>::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnDecimal<T>::getWeakHash32() const
{
auto s = data.size();
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
WeakHash32 hash(s);
const T * begin = data.data();
const T * end = begin + s;
@ -89,6 +85,8 @@ void ColumnDecimal<T>::updateWeakHash32(WeakHash32 & hash) const
++begin;
++hash_data;
}
return hash;
}
template <is_decimal T>

View File

@ -90,7 +90,7 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
void getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,

377
src/Columns/ColumnDynamic.h Normal file
View File

@ -0,0 +1,377 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnVariant.h>
#include <DataTypes/IDataType.h>
#include <Common/WeakHash.h>
namespace DB
{
/**
* Column for storing Dynamic type values.
* Dynamic column allows to insert and store values of any data types inside.
* Inside it stores:
* - Variant column with all inserted values of different types.
* - Information about currently stored variants.
*
* When new values are inserted into Dynamic column, the internal Variant
* type and column are extended if the inserted value has new type.
*/
class ColumnDynamic final : public COWHelper<IColumnHelper<ColumnDynamic>, ColumnDynamic>
{
public:
///
struct Statistics
{
enum class Source
{
READ, /// Statistics were loaded into column during reading from MergeTree.
MERGE, /// Statistics were calculated during merge of several MergeTree parts.
};
/// Source of the statistics.
Source source;
/// Statistics data: (variant name) -> (total variant size in data part).
std::unordered_map<String, size_t> data;
};
private:
friend class COWHelper<IColumnHelper<ColumnDynamic>, ColumnDynamic>;
struct VariantInfo
{
DataTypePtr variant_type;
/// Name of the whole variant to not call getName() every time.
String variant_name;
/// Names of variants to not call getName() every time on variants.
Names variant_names;
/// Mapping (variant name) -> (global discriminator).
/// It's used during variant extension.
std::unordered_map<String, UInt8> variant_name_to_discriminator;
};
explicit ColumnDynamic(size_t max_dynamic_types_);
ColumnDynamic(MutableColumnPtr variant_column_, const VariantInfo & variant_info_, size_t max_dynamic_types_, const Statistics & statistics_ = {});
public:
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
*/
using Base = COWHelper<IColumnHelper<ColumnDynamic>, ColumnDynamic>;
static Ptr create(const ColumnPtr & variant_column_, const VariantInfo & variant_info_, size_t max_dynamic_types_, const Statistics & statistics_ = {})
{
return ColumnDynamic::create(variant_column_->assumeMutable(), variant_info_, max_dynamic_types_, statistics_);
}
static MutablePtr create(MutableColumnPtr variant_column_, const VariantInfo & variant_info_, size_t max_dynamic_types_, const Statistics & statistics_ = {})
{
return Base::create(std::move(variant_column_), variant_info_, max_dynamic_types_, statistics_);
}
static MutablePtr create(MutableColumnPtr variant_column_, const DataTypePtr & variant_type, size_t max_dynamic_types_, const Statistics & statistics_ = {});
static ColumnPtr create(ColumnPtr variant_column_, const DataTypePtr & variant_type, size_t max_dynamic_types_, const Statistics & statistics_ = {})
{
return create(variant_column_->assumeMutable(), variant_type, max_dynamic_types_, statistics_);
}
static MutablePtr create(size_t max_dynamic_types_)
{
return Base::create(max_dynamic_types_);
}
std::string getName() const override { return "Dynamic(max_types=" + std::to_string(max_dynamic_types) + ")"; }
const char * getFamilyName() const override
{
return "Dynamic";
}
TypeIndex getDataType() const override
{
return TypeIndex::Dynamic;
}
MutableColumnPtr cloneEmpty() const override
{
/// Keep current dynamic structure
return Base::create(variant_column->cloneEmpty(), variant_info, max_dynamic_types, statistics);
}
MutableColumnPtr cloneResized(size_t size) const override
{
return Base::create(variant_column->cloneResized(size), variant_info, max_dynamic_types, statistics);
}
size_t size() const override
{
return variant_column->size();
}
Field operator[](size_t n) const override
{
return (*variant_column)[n];
}
void get(size_t n, Field & res) const override
{
variant_column->get(n, res);
}
bool isDefaultAt(size_t n) const override
{
return variant_column->isDefaultAt(n);
}
bool isNullAt(size_t n) const override
{
return variant_column->isNullAt(n);
}
StringRef getDataAt(size_t n) const override
{
return variant_column->getDataAt(n);
}
void insertData(const char * pos, size_t length) override
{
variant_column->insertData(pos, length);
}
void insert(const Field & x) override;
bool tryInsert(const Field & x) override;
#if !defined(ABORT_ON_LOGICAL_ERROR)
void insertFrom(const IColumn & src_, size_t n) override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
void insertManyFrom(const IColumn & src, size_t position, size_t length) override;
#else
void doInsertFrom(const IColumn & src_, size_t n) override;
void doInsertRangeFrom(const IColumn & src, size_t start, size_t length) override;
void doInsertManyFrom(const IColumn & src, size_t position, size_t length) override;
#endif
void insertDefault() override
{
variant_column->insertDefault();
}
void insertManyDefaults(size_t length) override
{
variant_column->insertManyDefaults(length);
}
void popBack(size_t n) override
{
variant_column->popBack(n);
}
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
WeakHash32 getWeakHash32() const override
{
return variant_column->getWeakHash32();
}
void updateHashFast(SipHash & hash) const override
{
variant_column->updateHashFast(hash);
}
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override
{
return create(variant_column->filter(filt, result_size_hint), variant_info, max_dynamic_types);
}
void expand(const Filter & mask, bool inverted) override
{
variant_column->expand(mask, inverted);
}
ColumnPtr permute(const Permutation & perm, size_t limit) const override
{
return create(variant_column->permute(perm, limit), variant_info, max_dynamic_types);
}
ColumnPtr index(const IColumn & indexes, size_t limit) const override
{
return create(variant_column->index(indexes, limit), variant_info, max_dynamic_types);
}
ColumnPtr replicate(const Offsets & replicate_offsets) const override
{
return create(variant_column->replicate(replicate_offsets), variant_info, max_dynamic_types);
}
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
MutableColumns scattered_variant_columns = variant_column->scatter(num_columns, selector);
MutableColumns scattered_columns;
scattered_columns.reserve(num_columns);
for (auto & scattered_variant_column : scattered_variant_columns)
scattered_columns.emplace_back(create(std::move(scattered_variant_column), variant_info, max_dynamic_types));
return scattered_columns;
}
#if !defined(ABORT_ON_LOGICAL_ERROR)
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
#else
int doCompareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
#endif
bool hasEqualValues() const override
{
return variant_column->hasEqualValues();
}
void getExtremes(Field & min, Field & max) const override
{
variant_column->getExtremes(min, max);
}
void getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, IColumn::Permutation & res) const override
{
variant_column->getPermutation(direction, stability, limit, nan_direction_hint, res);
}
void updatePermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_ranges) const override
{
variant_column->updatePermutation(direction, stability, limit, nan_direction_hint, res, equal_ranges);
}
void reserve(size_t n) override
{
variant_column->reserve(n);
}
void ensureOwnership() override
{
variant_column->ensureOwnership();
}
size_t byteSize() const override
{
return variant_column->byteSize();
}
size_t byteSizeAt(size_t n) const override
{
return variant_column->byteSizeAt(n);
}
size_t allocatedBytes() const override
{
return variant_column->allocatedBytes();
}
void protect() override
{
variant_column->protect();
}
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(variant_column);
}
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override
{
callback(*variant_column);
variant_column->forEachSubcolumnRecursively(callback);
}
bool structureEquals(const IColumn & rhs) const override
{
if (const auto * rhs_concrete = typeid_cast<const ColumnDynamic *>(&rhs))
return max_dynamic_types == rhs_concrete->max_dynamic_types;
return false;
}
ColumnPtr compress() const override;
double getRatioOfDefaultRows(double sample_ratio) const override
{
return variant_column->getRatioOfDefaultRows(sample_ratio);
}
UInt64 getNumberOfDefaultRows() const override
{
return variant_column->getNumberOfDefaultRows();
}
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override
{
variant_column->getIndicesOfNonDefaultRows(indices, from, limit);
}
void finalize() override
{
variant_column->finalize();
}
bool isFinalized() const override
{
return variant_column->isFinalized();
}
/// Apply null map to a nested Variant column.
void applyNullMap(const ColumnVector<UInt8>::Container & null_map);
void applyNegatedNullMap(const ColumnVector<UInt8>::Container & null_map);
const VariantInfo & getVariantInfo() const { return variant_info; }
const ColumnPtr & getVariantColumnPtr() const { return variant_column; }
ColumnPtr & getVariantColumnPtr() { return variant_column; }
const ColumnVariant & getVariantColumn() const { return assert_cast<const ColumnVariant &>(*variant_column); }
ColumnVariant & getVariantColumn() { return assert_cast<ColumnVariant &>(*variant_column); }
bool addNewVariant(const DataTypePtr & new_variant);
void addStringVariant();
bool hasDynamicStructure() const override { return true; }
void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override;
const Statistics & getStatistics() const { return statistics; }
size_t getMaxDynamicTypes() const { return max_dynamic_types; }
private:
/// Combine current variant with the other variant and return global discriminators mapping
/// from other variant to the combined one. It's used for inserting from
/// different variants.
/// Returns nullptr if maximum number of variants is reached and the new variant cannot be created.
std::vector<UInt8> * combineVariants(const VariantInfo & other_variant_info);
void updateVariantInfoAndExpandVariantColumn(const DataTypePtr & new_variant_type);
WrappedPtr variant_column;
/// Store the type of current variant with some additional information.
VariantInfo variant_info;
/// The maximum number of different types that can be stored in this Dynamic column.
/// If exceeded, all new variants will be converted to String.
size_t max_dynamic_types;
/// Size statistics of each variants from MergeTree data part.
/// Used in takeDynamicStructureFromSourceColumns and set during deserialization.
Statistics statistics;
/// Cache (Variant name) -> (global discriminators mapping from this variant to current variant in Dynamic column).
/// Used to avoid mappings recalculation in combineVariants for the same Variant types.
std::unordered_map<String, std::vector<UInt8>> variant_mappings_cache;
/// Cache of Variant types that couldn't be combined with current variant in Dynamic column.
/// Used to avoid checking if combination is possible for the same Variant types.
std::unordered_set<String> variants_with_failed_combination;
};
}

View File

@ -128,14 +128,10 @@ void ColumnFixedString::updateHashWithValue(size_t index, SipHash & hash) const
hash.update(reinterpret_cast<const char *>(&chars[n * index]), n);
}
void ColumnFixedString::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnFixedString::getWeakHash32() const
{
auto s = size();
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, "
"hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
WeakHash32 hash(s);
const UInt8 * pos = chars.data();
UInt32 * hash_data = hash.getData().data();
@ -147,6 +143,8 @@ void ColumnFixedString::updateWeakHash32(WeakHash32 & hash) const
pos += n;
++hash_data;
}
return hash;
}
void ColumnFixedString::updateHashFast(SipHash & hash) const

View File

@ -125,7 +125,7 @@ public:
void updateHashWithValue(size_t index, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override;

View File

@ -4,6 +4,7 @@
#include <Core/NamesAndTypes.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Columns/IColumn.h>
#include <Common/WeakHash.h>
namespace DB
@ -122,9 +123,9 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "updateHashWithValue is not implemented for {}", getName());
}
void updateWeakHash32(WeakHash32 &) const override
WeakHash32 getWeakHash32() const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "updateWeakHash32 is not implemented for {}", getName());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getWeakHash32 is not implemented for {}", getName());
}
void updateHashFast(SipHash &) const override

View File

@ -6,6 +6,7 @@
#include <Common/HashTable/HashMap.h>
#include <Common/WeakHash.h>
#include <Common/assert_cast.h>
#include <base/types.h>
#include <base/sort.h>
#include <base/scope_guard.h>
@ -309,19 +310,10 @@ const char * ColumnLowCardinality::skipSerializedInArena(const char * pos) const
return getDictionary().skipSerializedInArena(pos);
}
void ColumnLowCardinality::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnLowCardinality::getWeakHash32() const
{
auto s = size();
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
const auto & dict = getDictionary().getNestedColumn();
WeakHash32 dict_hash(dict->size());
dict->updateWeakHash32(dict_hash);
idx.updateWeakHash(hash, dict_hash);
WeakHash32 dict_hash = getDictionary().getNestedColumn()->getWeakHash32();
return idx.getWeakHash(dict_hash);
}
void ColumnLowCardinality::updateHashFast(SipHash & hash) const
@ -802,10 +794,11 @@ bool ColumnLowCardinality::Index::containsDefault() const
return contains;
}
void ColumnLowCardinality::Index::updateWeakHash(WeakHash32 & hash, WeakHash32 & dict_hash) const
WeakHash32 ColumnLowCardinality::Index::getWeakHash(const WeakHash32 & dict_hash) const
{
WeakHash32 hash(positions->size());
auto & hash_data = hash.getData();
auto & dict_hash_data = dict_hash.getData();
const auto & dict_hash_data = dict_hash.getData();
auto update_weak_hash = [&](auto x)
{
@ -814,10 +807,11 @@ void ColumnLowCardinality::Index::updateWeakHash(WeakHash32 & hash, WeakHash32 &
auto size = data.size();
for (size_t i = 0; i < size; ++i)
hash_data[i] = static_cast<UInt32>(intHashCRC32(dict_hash_data[data[i]], hash_data[i]));
hash_data[i] = dict_hash_data[data[i]];
};
callForType(std::move(update_weak_hash), size_of_type);
return hash;
}
void ColumnLowCardinality::Index::collectSerializedValueSizes(

View File

@ -103,7 +103,7 @@ public:
return getDictionary().updateHashWithValue(getIndexes().getUInt(n), hash);
}
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash &) const override;
@ -311,7 +311,7 @@ public:
bool containsDefault() const;
void updateWeakHash(WeakHash32 & hash, WeakHash32 & dict_hash) const;
WeakHash32 getWeakHash(const WeakHash32 & dict_hash) const;
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const PaddedPODArray<UInt64> & dict_sizes) const;

View File

@ -143,9 +143,9 @@ void ColumnMap::updateHashWithValue(size_t n, SipHash & hash) const
nested->updateHashWithValue(n, hash);
}
void ColumnMap::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnMap::getWeakHash32() const
{
nested->updateWeakHash32(hash);
return nested->getWeakHash32();
}
void ColumnMap::updateHashFast(SipHash & hash) const

View File

@ -64,7 +64,7 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override;
void insertFrom(const IColumn & src_, size_t n) override;
void insertManyFrom(const IColumn & src, size_t position, size_t length) override;

View File

@ -54,25 +54,21 @@ void ColumnNullable::updateHashWithValue(size_t n, SipHash & hash) const
getNestedColumn().updateHashWithValue(n, hash);
}
void ColumnNullable::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnNullable::getWeakHash32() const
{
auto s = size();
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
WeakHash32 old_hash = hash;
nested_column->updateWeakHash32(hash);
WeakHash32 hash = nested_column->getWeakHash32();
const auto & null_map_data = getNullMapData();
auto & hash_data = hash.getData();
auto & old_hash_data = old_hash.getData();
/// Use old data for nulls.
/// Use default for nulls.
for (size_t row = 0; row < s; ++row)
if (null_map_data[row])
hash_data[row] = old_hash_data[row];
hash_data[row] = WeakHash32::kDefaultInitialValue;
return hash;
}
void ColumnNullable::updateHashFast(SipHash & hash) const

View File

@ -114,7 +114,7 @@ public:
void protect() override;
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override;
void getExtremes(Field & min, Field & max) const override;
// Special function for nullable minmax index

View File

@ -4,6 +4,7 @@
#include <Core/Names.h>
#include <Columns/IColumn.h>
#include <Common/PODArray.h>
#include <Common/WeakHash.h>
#include <Common/HashTable/HashMap.h>
#include <DataTypes/Serializations/JSONDataParser.h>
#include <DataTypes/Serializations/SubcolumnsTree.h>
@ -243,7 +244,7 @@ public:
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeConcrete(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeConcrete(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeConcrete(); }
void updateWeakHash32(WeakHash32 &) const override { throwMustBeConcrete(); }
WeakHash32 getWeakHash32() const override { throwMustBeConcrete(); }
void updateHashFast(SipHash &) const override { throwMustBeConcrete(); }
void expand(const Filter &, bool) override { throwMustBeConcrete(); }
bool hasEqualValues() const override { throwMustBeConcrete(); }

View File

@ -664,20 +664,22 @@ void ColumnSparse::updateHashWithValue(size_t n, SipHash & hash) const
values->updateHashWithValue(getValueIndex(n), hash);
}
void ColumnSparse::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnSparse::getWeakHash32() const
{
if (hash.getData().size() != _size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", _size, hash.getData().size());
WeakHash32 values_hash = values->getWeakHash32();
WeakHash32 hash(size());
auto & hash_data = hash.getData();
auto & values_hash_data = values_hash.getData();
auto offset_it = begin();
auto & hash_data = hash.getData();
for (size_t i = 0; i < _size; ++i, ++offset_it)
{
size_t value_index = offset_it.getValueIndex();
auto data_ref = values->getDataAt(value_index);
hash_data[i] = ::updateWeakHash32(reinterpret_cast<const UInt8 *>(data_ref.data), data_ref.size, hash_data[i]);
hash_data[i] = values_hash_data[value_index];
}
return hash;
}
void ColumnSparse::updateHashFast(SipHash & hash) const

View File

@ -127,7 +127,7 @@ public:
void protect() override;
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override;
void getExtremes(Field & min, Field & max) const override;

View File

@ -103,13 +103,10 @@ MutableColumnPtr ColumnString::cloneResized(size_t to_size) const
return res;
}
void ColumnString::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnString::getWeakHash32() const
{
auto s = offsets.size();
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
WeakHash32 hash(s);
const UInt8 * pos = chars.data();
UInt32 * hash_data = hash.getData().data();
@ -125,6 +122,8 @@ void ColumnString::updateWeakHash32(WeakHash32 & hash) const
prev_offset = offset;
++hash_data;
}
return hash;
}

View File

@ -199,7 +199,7 @@ public:
hash.update(reinterpret_cast<const char *>(&chars[offset]), string_size);
}
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override
{

View File

@ -252,16 +252,15 @@ void ColumnTuple::updateHashWithValue(size_t n, SipHash & hash) const
column->updateHashWithValue(n, hash);
}
void ColumnTuple::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnTuple::getWeakHash32() const
{
auto s = size();
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
WeakHash32 hash(s);
for (const auto & column : columns)
column->updateWeakHash32(hash);
hash.update(column->getWeakHash32());
return hash;
}
void ColumnTuple::updateHashFast(SipHash & hash) const

View File

@ -68,7 +68,7 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;

View File

@ -696,36 +696,26 @@ void ColumnVariant::updateHashWithValue(size_t n, SipHash & hash) const
variants[localDiscriminatorByGlobal(global_discr)]->updateHashWithValue(offsetAt(n), hash);
}
void ColumnVariant::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnVariant::getWeakHash32() const
{
auto s = size();
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
/// If we have only NULLs, keep hash unchanged.
if (hasOnlyNulls())
return;
return WeakHash32(s);
/// Optimization for case when there is only 1 non-empty variant and no NULLs.
/// In this case we can just calculate weak hash for this variant.
if (auto non_empty_local_discr = getLocalDiscriminatorOfOneNoneEmptyVariantNoNulls())
{
variants[*non_empty_local_discr]->updateWeakHash32(hash);
return;
}
return variants[*non_empty_local_discr]->getWeakHash32();
/// Calculate weak hash for all variants.
std::vector<WeakHash32> nested_hashes;
for (const auto & variant : variants)
{
WeakHash32 nested_hash(variant->size());
variant->updateWeakHash32(nested_hash);
nested_hashes.emplace_back(std::move(nested_hash));
}
nested_hashes.emplace_back(variant->getWeakHash32());
/// For each row hash is a hash of corresponding row from corresponding variant.
WeakHash32 hash(s);
auto & hash_data = hash.getData();
const auto & local_discriminators_data = getLocalDiscriminators();
const auto & offsets_data = getOffsets();
@ -734,11 +724,10 @@ void ColumnVariant::updateWeakHash32(WeakHash32 & hash) const
Discriminator discr = local_discriminators_data[i];
/// Update hash only for non-NULL values
if (discr != NULL_DISCRIMINATOR)
{
auto nested_hash = nested_hashes[local_discriminators_data[i]].getData()[offsets_data[i]];
hash_data[i] = static_cast<UInt32>(hashCRC32(nested_hash, hash_data[i]));
}
hash_data[i] = nested_hashes[discr].getData()[offsets_data[i]];
}
return hash;
}
void ColumnVariant::updateHashFast(SipHash & hash) const

View File

@ -189,7 +189,7 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
void expand(const Filter & mask, bool inverted) override;

View File

@ -72,13 +72,10 @@ void ColumnVector<T>::updateHashWithValue(size_t n, SipHash & hash) const
}
template <typename T>
void ColumnVector<T>::updateWeakHash32(WeakHash32 & hash) const
WeakHash32 ColumnVector<T>::getWeakHash32() const
{
auto s = data.size();
if (hash.getData().size() != s)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match size of column: "
"column size is {}, hash size is {}", std::to_string(s), std::to_string(hash.getData().size()));
WeakHash32 hash(s);
const T * begin = data.data();
const T * end = begin + s;
@ -90,6 +87,8 @@ void ColumnVector<T>::updateWeakHash32(WeakHash32 & hash) const
++begin;
++hash_data;
}
return hash;
}
template <typename T>

View File

@ -106,7 +106,7 @@ public:
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override;

View File

@ -269,10 +269,10 @@ public:
/// passed bytes to hash must identify sequence of values unambiguously.
virtual void updateHashWithValue(size_t n, SipHash & hash) const = 0;
/// Update hash function value. Hash is calculated for each element.
/// Get hash function value. Hash is calculated for each element.
/// It's a fast weak hash function. Mainly need to scatter data between threads.
/// WeakHash32 must have the same size as column.
virtual void updateWeakHash32(WeakHash32 & hash) const = 0;
virtual WeakHash32 getWeakHash32() const = 0;
/// Update state of hash with all column.
virtual void updateHashFast(SipHash & hash) const = 0;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Columns/IColumn.h>
#include <Common/WeakHash.h>
namespace DB
@ -59,8 +60,9 @@ public:
{
}
void updateWeakHash32(WeakHash32 & /*hash*/) const override
WeakHash32 getWeakHash32() const override
{
return WeakHash32(s);
}
void updateHashFast(SipHash & /*hash*/) const override

View File

@ -1,6 +1,7 @@
#pragma once
#include <optional>
#include <Columns/IColumn.h>
#include <Common/WeakHash.h>
namespace DB
{
@ -162,9 +163,9 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method scatter is not supported for ColumnUnique.");
}
void updateWeakHash32(WeakHash32 &) const override
WeakHash32 getWeakHash32() const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateWeakHash32 is not supported for ColumnUnique.");
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getWeakHash32 is not supported for ColumnUnique.");
}
void updateHashFast(SipHash &) const override

View File

@ -60,8 +60,7 @@ TEST(WeakHash32, ColumnVectorU8)
data.push_back(i);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -77,8 +76,7 @@ TEST(WeakHash32, ColumnVectorI8)
data.push_back(i);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -94,8 +92,7 @@ TEST(WeakHash32, ColumnVectorU16)
data.push_back(i);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -111,8 +108,7 @@ TEST(WeakHash32, ColumnVectorI16)
data.push_back(i);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -128,8 +124,7 @@ TEST(WeakHash32, ColumnVectorU32)
data.push_back(i << 16u);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -145,8 +140,7 @@ TEST(WeakHash32, ColumnVectorI32)
data.push_back(i << 16);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -162,8 +156,7 @@ TEST(WeakHash32, ColumnVectorU64)
data.push_back(i << 32u);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -179,8 +172,7 @@ TEST(WeakHash32, ColumnVectorI64)
data.push_back(i << 32);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -204,8 +196,7 @@ TEST(WeakHash32, ColumnVectorU128)
}
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), eq_data);
}
@ -221,8 +212,7 @@ TEST(WeakHash32, ColumnVectorI128)
data.push_back(i << 32);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -238,8 +228,7 @@ TEST(WeakHash32, ColumnDecimal32)
data.push_back(i << 16);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -255,8 +244,7 @@ TEST(WeakHash32, ColumnDecimal64)
data.push_back(i << 32);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -272,8 +260,7 @@ TEST(WeakHash32, ColumnDecimal128)
data.push_back(i << 32);
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), col->getData());
}
@ -294,8 +281,7 @@ TEST(WeakHash32, ColumnString1)
}
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), data);
}
@ -331,8 +317,7 @@ TEST(WeakHash32, ColumnString2)
}
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), data);
}
@ -369,8 +354,7 @@ TEST(WeakHash32, ColumnString3)
}
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), data);
}
@ -397,8 +381,7 @@ TEST(WeakHash32, ColumnFixedString)
}
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), data);
}
@ -444,8 +427,7 @@ TEST(WeakHash32, ColumnArray)
auto col_arr = ColumnArray::create(std::move(val), std::move(off));
WeakHash32 hash(col_arr->size());
col_arr->updateWeakHash32(hash);
WeakHash32 hash = col_arr->getWeakHash32();
checkColumn(hash.getData(), eq_data);
}
@ -479,8 +461,7 @@ TEST(WeakHash32, ColumnArray2)
auto col_arr = ColumnArray::create(std::move(val), std::move(off));
WeakHash32 hash(col_arr->size());
col_arr->updateWeakHash32(hash);
WeakHash32 hash = col_arr->getWeakHash32();
checkColumn(hash.getData(), eq_data);
}
@ -536,8 +517,7 @@ TEST(WeakHash32, ColumnArrayArray)
auto col_arr = ColumnArray::create(std::move(val), std::move(off));
auto col_arr_arr = ColumnArray::create(std::move(col_arr), std::move(off2));
WeakHash32 hash(col_arr_arr->size());
col_arr_arr->updateWeakHash32(hash);
WeakHash32 hash = col_arr_arr->getWeakHash32();
checkColumn(hash.getData(), eq_data);
}
@ -555,8 +535,7 @@ TEST(WeakHash32, ColumnConst)
auto col_const = ColumnConst::create(std::move(inner_col), 256);
WeakHash32 hash(col_const->size());
col_const->updateWeakHash32(hash);
WeakHash32 hash = col_const->getWeakHash32();
checkColumn(hash.getData(), data);
}
@ -576,8 +555,7 @@ TEST(WeakHash32, ColumnLowcardinality)
}
}
WeakHash32 hash(col->size());
col->updateWeakHash32(hash);
WeakHash32 hash = col->getWeakHash32();
checkColumn(hash.getData(), data);
}
@ -602,8 +580,7 @@ TEST(WeakHash32, ColumnNullable)
auto col_null = ColumnNullable::create(std::move(col), std::move(mask));
WeakHash32 hash(col_null->size());
col_null->updateWeakHash32(hash);
WeakHash32 hash = col_null->getWeakHash32();
checkColumn(hash.getData(), eq);
}
@ -633,8 +610,7 @@ TEST(WeakHash32, ColumnTupleUInt64UInt64)
columns.emplace_back(std::move(col2));
auto col_tuple = ColumnTuple::create(std::move(columns));
WeakHash32 hash(col_tuple->size());
col_tuple->updateWeakHash32(hash);
WeakHash32 hash = col_tuple->getWeakHash32();
checkColumn(hash.getData(), eq);
}
@ -671,8 +647,7 @@ TEST(WeakHash32, ColumnTupleUInt64String)
columns.emplace_back(std::move(col2));
auto col_tuple = ColumnTuple::create(std::move(columns));
WeakHash32 hash(col_tuple->size());
col_tuple->updateWeakHash32(hash);
WeakHash32 hash = col_tuple->getWeakHash32();
checkColumn(hash.getData(), eq);
}
@ -709,8 +684,7 @@ TEST(WeakHash32, ColumnTupleUInt64FixedString)
columns.emplace_back(std::move(col2));
auto col_tuple = ColumnTuple::create(std::move(columns));
WeakHash32 hash(col_tuple->size());
col_tuple->updateWeakHash32(hash);
WeakHash32 hash = col_tuple->getWeakHash32();
checkColumn(hash.getData(), eq);
}
@ -756,8 +730,7 @@ TEST(WeakHash32, ColumnTupleUInt64Array)
columns.emplace_back(ColumnArray::create(std::move(val), std::move(off)));
auto col_tuple = ColumnTuple::create(std::move(columns));
WeakHash32 hash(col_tuple->size());
col_tuple->updateWeakHash32(hash);
WeakHash32 hash = col_tuple->getWeakHash32();
checkColumn(hash.getData(), eq_data);
}

View File

@ -148,13 +148,17 @@ void * Allocator<clear_memory_, populate>::realloc(void * buf, size_t old_size,
else if (alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
auto trace_free = CurrentMemoryTracker::free(old_size);
/// Realloc can do 2 possible things:
/// - expand existing memory region
/// - allocate new memory block and free the old one
/// Because we don't know which option will be picked we need to make sure there is enough
/// memory for all options
auto trace_alloc = CurrentMemoryTracker::alloc(new_size);
trace_free.onFree(buf, old_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
{
[[maybe_unused]] auto trace_free = CurrentMemoryTracker::free(new_size);
throw DB::ErrnoException(
DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY,
"Allocator: Cannot realloc from {} to {}",
@ -163,6 +167,8 @@ void * Allocator<clear_memory_, populate>::realloc(void * buf, size_t old_size,
}
buf = new_buf;
auto trace_free = CurrentMemoryTracker::free(old_size);
trace_free.onFree(buf, old_size);
trace_alloc.onAlloc(buf, new_size);
if constexpr (clear_memory)

View File

@ -5,7 +5,7 @@
namespace DB
{
static void inline hexStringDecode(const char * pos, const char * end, char *& out, size_t word_size = 2)
static void inline hexStringDecode(const char * pos, const char * end, char *& out, size_t word_size)
{
if ((end - pos) & 1)
{
@ -23,7 +23,7 @@ static void inline hexStringDecode(const char * pos, const char * end, char *& o
++out;
}
static void inline binStringDecode(const char * pos, const char * end, char *& out)
static void inline binStringDecode(const char * pos, const char * end, char *& out, size_t word_size)
{
if (pos == end)
{
@ -53,7 +53,7 @@ static void inline binStringDecode(const char * pos, const char * end, char *& o
++out;
}
assert((end - pos) % 8 == 0);
chassert((end - pos) % word_size == 0);
while (end - pos != 0)
{

View File

@ -125,8 +125,9 @@ std::optional<std::string> getCgroupsV2FileName()
if (!cgroupsV2MemoryControllerEnabled())
return {};
String cgroup = cgroupV2OfProcess();
auto current_cgroup = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup);
std::filesystem::path current_cgroup = cgroupV2PathOfProcess();
if (current_cgroup.empty())
return {};
/// Return the bottom-most nested current memory file. If there is no such file at the current
/// level, try again at the parent level as memory settings are inherited.

View File

@ -85,7 +85,18 @@ StatusFile::StatusFile(std::string path_, FillFunction fill_)
/// Write information about current server instance to the file.
WriteBufferFromFileDescriptor out(fd, 1024);
fill(out);
try
{
fill(out);
/// Finalize here to avoid throwing exceptions in destructor.
out.finalize();
}
catch (...)
{
/// Finalize in case of exception to avoid throwing exceptions in destructor
out.finalize();
throw;
}
}
catch (...)
{

View File

@ -1,2 +1,24 @@
#include <Common/WeakHash.h>
#include <Common/Exception.h>
#include <Common/HashTable/Hash.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void WeakHash32::update(const WeakHash32 & other)
{
size_t size = data.size();
if (size != other.data.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of WeakHash32 does not match:"
"left size is {}, right size is {}", size, other.data.size());
for (size_t i = 0; i < size; ++i)
data[i] = static_cast<UInt32>(intHashCRC32(other.data[i], data[i]));
}
}

View File

@ -11,9 +11,8 @@ namespace DB
/// The main purpose why this class needed is to support data initialization. Initially, every bit is 1.
class WeakHash32
{
static constexpr UInt32 kDefaultInitialValue = ~UInt32(0);
public:
static constexpr UInt32 kDefaultInitialValue = ~UInt32(0);
using Container = PaddedPODArray<UInt32>;
@ -22,6 +21,8 @@ public:
void reset(size_t size, UInt32 initial_value = kDefaultInitialValue) { data.assign(size, initial_value); }
void update(const WeakHash32 & other);
const Container & getData() const { return data; }
Container & getData() { return data; }

View File

@ -969,6 +969,10 @@ void ZooKeeper::receiveEvent()
if (request_info.callback)
request_info.callback(*response);
/// Finalize current session if we receive a hardware error from ZooKeeper
if (err != Error::ZOK && isHardwareError(err))
finalize(/*error_send*/ false, /*error_receive*/ true, fmt::format("Hardware error: {}", err));
}

View File

@ -37,12 +37,12 @@ uint32_t getCGroupLimitedCPUCores(unsigned default_cpu_count)
/// cgroupsv2
if (cgroupsV2Enabled())
{
/// First, we identify the cgroup the process belongs
std::string cgroup = cgroupV2OfProcess();
if (cgroup.empty())
/// First, we identify the path of the cgroup the process belongs
std::filesystem::path cgroup_path = cgroupV2PathOfProcess();
if (cgroup_path.empty())
return default_cpu_count;
auto current_cgroup = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup);
auto current_cgroup = cgroup_path;
// Looking for cpu.max in directories from the current cgroup to the top level
// It does not stop on the first time since the child could have a greater value than parent
@ -62,7 +62,7 @@ uint32_t getCGroupLimitedCPUCores(unsigned default_cpu_count)
}
current_cgroup = current_cgroup.parent_path();
}
current_cgroup = default_cgroups_mount / cgroup;
current_cgroup = cgroup_path;
// Looking for cpuset.cpus.effective in directories from the current cgroup to the top level
while (current_cgroup != default_cgroups_mount.parent_path())
{

View File

@ -808,7 +808,11 @@ void LogEntryStorage::startCommitLogsPrefetch(uint64_t last_committed_index) con
for (; current_index <= max_index_for_prefetch; ++current_index)
{
const auto & [changelog_description, position, size] = logs_location.at(current_index);
auto location_it = logs_location.find(current_index);
if (location_it == logs_location.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Location of log entry with index {} is missing", current_index);
const auto & [changelog_description, position, size] = location_it->second;
if (total_size == 0)
current_file_info = &file_infos.emplace_back(changelog_description, position, /* count */ 1);
else if (total_size + size > commit_logs_cache.size_threshold)
@ -1416,7 +1420,11 @@ LogEntriesPtr LogEntryStorage::getLogEntriesBetween(uint64_t start, uint64_t end
}
else
{
const auto & log_location = logs_location.at(i);
auto location_it = logs_location.find(i);
if (location_it == logs_location.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Location of log entry with index {} is missing", i);
const auto & log_location = location_it->second;
if (!read_info)
set_new_file(log_location);

View File

@ -7,11 +7,12 @@
#include <mutex>
#include <string>
#include <Coordination/KeeperLogStore.h>
#include <Coordination/KeeperSnapshotManagerS3.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStateManager.h>
#include <Coordination/KeeperSnapshotManagerS3.h>
#include <Coordination/LoggerWrapper.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Disks/DiskLocal.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <boost/algorithm/string.hpp>
@ -27,7 +28,7 @@
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/Stopwatch.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Disks/DiskLocal.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#include <fmt/chrono.h>
@ -365,6 +366,11 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co
LockMemoryExceptionInThread::removeUniqueLock();
};
/// At least 16 threads for network communication in asio.
/// asio is async framework, so even with 1 thread it should be ok, but
/// still as safeguard it's better to have some redundant capacity here
asio_opts.thread_pool_size_ = std::max(16U, getNumberOfPhysicalCPUCores());
if (state_manager->isSecure())
{
#if USE_SSL

View File

@ -532,6 +532,10 @@ bool KeeperStorage::UncommittedState::hasACL(int64_t session_id, bool is_local,
if (is_local)
return check_auth(storage.session_and_auth[session_id]);
/// we want to close the session and with that we will remove all the auth related to the session
if (closed_sessions.contains(session_id))
return false;
if (check_auth(storage.session_and_auth[session_id]))
return true;
@ -557,6 +561,10 @@ void KeeperStorage::UncommittedState::addDelta(Delta new_delta)
auto & uncommitted_auth = session_and_auth[auth_delta->session_id];
uncommitted_auth.emplace_back(&auth_delta->auth_id);
}
else if (const auto * close_session_delta = std::get_if<CloseSessionDelta>(&added_delta.operation))
{
closed_sessions.insert(close_session_delta->session_id);
}
}
void KeeperStorage::UncommittedState::addDeltas(std::vector<Delta> new_deltas)
@ -607,7 +615,10 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
uncommitted_auth.pop_front();
if (uncommitted_auth.empty())
session_and_auth.erase(add_auth->session_id);
}
else if (auto * close_session = std::get_if<CloseSessionDelta>(&front_delta.operation))
{
closed_sessions.erase(close_session->session_id);
}
deltas.pop_front();
@ -680,6 +691,10 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
session_and_auth.erase(add_auth->session_id);
}
}
else if (auto * close_session = std::get_if<CloseSessionDelta>(&delta_it->operation))
{
closed_sessions.erase(close_session->session_id);
}
}
if (delta_it == deltas.rend())
@ -876,6 +891,10 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
session_and_auth[operation.session_id].emplace_back(std::move(operation.auth_id));
return Coordination::Error::ZOK;
}
else if constexpr (std::same_as<DeltaType, KeeperStorage::CloseSessionDelta>)
{
return Coordination::Error::ZOK;
}
else
{
// shouldn't be called in any process functions
@ -1000,9 +1019,11 @@ struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageReques
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
Coordination::ZooKeeperResponsePtr
process(KeeperStorage & /* storage */, int64_t /* zxid */) const override
process(KeeperStorage & storage, int64_t zxid) const override
{
return zk_request->makeResponse();
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
response_ptr->error = storage.commit(zxid);
return response_ptr;
}
};
@ -2367,6 +2388,7 @@ void KeeperStorage::preprocessRequest(
ephemerals.erase(session_ephemerals);
}
new_deltas.emplace_back(transaction.zxid, CloseSessionDelta{session_id});
new_digest = calculateNodesDigest(new_digest, new_deltas);
return;
}
@ -2428,8 +2450,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
}
}
uncommitted_state.commit(zxid);
clearDeadWatches(session_id);
auto auth_it = session_and_auth.find(session_id);
if (auth_it != session_and_auth.end())
@ -2474,7 +2494,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
else
{
response = request_processor->process(*this, zxid);
uncommitted_state.commit(zxid);
}
/// Watches for this requests are added to the watches lists
@ -2514,6 +2533,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
results.push_back(ResponseForSession{session_id, response});
}
uncommitted_state.commit(zxid);
return results;
}

View File

@ -314,8 +314,13 @@ public:
AuthID auth_id;
};
struct CloseSessionDelta
{
int64_t session_id;
};
using Operation = std::
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>;
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta, CloseSessionDelta>;
struct Delta
{
@ -351,6 +356,7 @@ public:
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
std::unordered_map<int64_t, std::list<const AuthID *>> session_and_auth;
std::unordered_set<int64_t> closed_sessions;
struct UncommittedNode
{

View File

@ -2019,6 +2019,186 @@ TEST_P(CoordinationTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitte
EXPECT_EQ(acls[0].permissions, 31);
}
TEST_P(CoordinationTest, TestPreprocessWhenCloseSessionIsPrecommitted)
{
using namespace Coordination;
using namespace DB;
ChangelogDirTest snapshots("./snapshots");
setSnapshotDirectory("./snapshots");
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
int64_t session_without_auth = 1;
int64_t session_with_auth = 2;
size_t term = 0;
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, keeper_context, nullptr);
state_machine->init();
auto & storage = state_machine->getStorageUnsafe();
const auto & uncommitted_state = storage.uncommitted_state;
auto auth_req = std::make_shared<ZooKeeperAuthRequest>();
auth_req->scheme = "digest";
auth_req->data = "test_user:test_password";
// Add auth data to the session
auto auth_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), auth_req);
state_machine->pre_commit(1, auth_entry->get_buf());
state_machine->commit(1, auth_entry->get_buf());
std::string node_without_acl = "/node_without_acl";
{
auto create_req = std::make_shared<ZooKeeperCreateRequest>();
create_req->path = node_without_acl;
create_req->data = "notmodified";
auto create_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), create_req);
state_machine->pre_commit(2, create_entry->get_buf());
state_machine->commit(2, create_entry->get_buf());
ASSERT_TRUE(storage.container.contains(node_without_acl));
}
std::string node_with_acl = "/node_with_acl";
{
auto create_req = std::make_shared<ZooKeeperCreateRequest>();
create_req->path = node_with_acl;
create_req->data = "notmodified";
create_req->acls = {{.permissions = ACL::All, .scheme = "auth", .id = ""}};
auto create_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), create_req);
state_machine->pre_commit(3, create_entry->get_buf());
state_machine->commit(3, create_entry->get_buf());
ASSERT_TRUE(storage.container.contains(node_with_acl));
}
auto set_req_with_acl = std::make_shared<ZooKeeperSetRequest>();
set_req_with_acl->path = node_with_acl;
set_req_with_acl->data = "modified";
auto set_req_without_acl = std::make_shared<ZooKeeperSetRequest>();
set_req_without_acl->path = node_without_acl;
set_req_without_acl->data = "modified";
const auto reset_node_value
= [&](const auto & path) { storage.container.updateValue(path, [](auto & node) { node.setData("notmodified"); }); };
auto close_req = std::make_shared<ZooKeeperCloseRequest>();
{
SCOPED_TRACE("Session with Auth");
// test we can modify both nodes
auto set_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_with_acl);
state_machine->pre_commit(5, set_entry->get_buf());
state_machine->commit(5, set_entry->get_buf());
ASSERT_TRUE(storage.container.find(node_with_acl)->value.getData() == "modified");
reset_node_value(node_with_acl);
set_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_without_acl);
state_machine->pre_commit(6, set_entry->get_buf());
state_machine->commit(6, set_entry->get_buf());
ASSERT_TRUE(storage.container.find(node_without_acl)->value.getData() == "modified");
reset_node_value(node_without_acl);
auto close_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), close_req);
// Pre-commit close session
state_machine->pre_commit(7, close_entry->get_buf());
/// will be rejected because we don't have required auth
auto set_entry_with_acl = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_with_acl);
state_machine->pre_commit(8, set_entry_with_acl->get_buf());
/// will be accepted because no ACL
auto set_entry_without_acl = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_without_acl);
state_machine->pre_commit(9, set_entry_without_acl->get_buf());
ASSERT_TRUE(uncommitted_state.getNode(node_with_acl)->getData() == "notmodified");
ASSERT_TRUE(uncommitted_state.getNode(node_without_acl)->getData() == "modified");
state_machine->rollback(9, set_entry_without_acl->get_buf());
state_machine->rollback(8, set_entry_with_acl->get_buf());
// let's commit close and verify we get same outcome
state_machine->commit(7, close_entry->get_buf());
/// will be rejected because we don't have required auth
set_entry_with_acl = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_with_acl);
state_machine->pre_commit(8, set_entry_with_acl->get_buf());
/// will be accepted because no ACL
set_entry_without_acl = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_without_acl);
state_machine->pre_commit(9, set_entry_without_acl->get_buf());
ASSERT_TRUE(uncommitted_state.getNode(node_with_acl)->getData() == "notmodified");
ASSERT_TRUE(uncommitted_state.getNode(node_without_acl)->getData() == "modified");
state_machine->commit(8, set_entry_with_acl->get_buf());
state_machine->commit(9, set_entry_without_acl->get_buf());
ASSERT_TRUE(storage.container.find(node_with_acl)->value.getData() == "notmodified");
ASSERT_TRUE(storage.container.find(node_without_acl)->value.getData() == "modified");
reset_node_value(node_without_acl);
}
{
SCOPED_TRACE("Session without Auth");
// test we can modify only node without acl
auto set_entry = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_with_acl);
state_machine->pre_commit(10, set_entry->get_buf());
state_machine->commit(10, set_entry->get_buf());
ASSERT_TRUE(storage.container.find(node_with_acl)->value.getData() == "notmodified");
set_entry = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_without_acl);
state_machine->pre_commit(11, set_entry->get_buf());
state_machine->commit(11, set_entry->get_buf());
ASSERT_TRUE(storage.container.find(node_without_acl)->value.getData() == "modified");
reset_node_value(node_without_acl);
auto close_entry = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), close_req);
// Pre-commit close session
state_machine->pre_commit(12, close_entry->get_buf());
/// will be rejected because we don't have required auth
auto set_entry_with_acl = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_with_acl);
state_machine->pre_commit(13, set_entry_with_acl->get_buf());
/// will be accepted because no ACL
auto set_entry_without_acl = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_without_acl);
state_machine->pre_commit(14, set_entry_without_acl->get_buf());
ASSERT_TRUE(uncommitted_state.getNode(node_with_acl)->getData() == "notmodified");
ASSERT_TRUE(uncommitted_state.getNode(node_without_acl)->getData() == "modified");
state_machine->rollback(14, set_entry_without_acl->get_buf());
state_machine->rollback(13, set_entry_with_acl->get_buf());
// let's commit close and verify we get same outcome
state_machine->commit(12, close_entry->get_buf());
/// will be rejected because we don't have required auth
set_entry_with_acl = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_with_acl);
state_machine->pre_commit(13, set_entry_with_acl->get_buf());
/// will be accepted because no ACL
set_entry_without_acl = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_without_acl);
state_machine->pre_commit(14, set_entry_without_acl->get_buf());
ASSERT_TRUE(uncommitted_state.getNode(node_with_acl)->getData() == "notmodified");
ASSERT_TRUE(uncommitted_state.getNode(node_without_acl)->getData() == "modified");
state_machine->commit(13, set_entry_with_acl->get_buf());
state_machine->commit(14, set_entry_without_acl->get_buf());
ASSERT_TRUE(storage.container.find(node_with_acl)->value.getData() == "notmodified");
ASSERT_TRUE(storage.container.find(node_without_acl)->value.getData() == "modified");
reset_node_value(node_without_acl);
}
}
TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted)
{
using namespace Coordination;

View File

@ -981,6 +981,8 @@ class IColumn;
M(Char, format_csv_delimiter, ',', "The character to be considered as a delimiter in CSV data. If setting with a string, a string has to have a length of 1.", 0) \
M(Bool, format_csv_allow_single_quotes, false, "If it is set to true, allow strings in single quotes.", 0) \
M(Bool, format_csv_allow_double_quotes, true, "If it is set to true, allow strings in double quotes.", 0) \
M(Bool, output_format_csv_serialize_tuple_into_separate_columns, true, "If it set to true, then Tuples in CSV format are serialized as separate columns (that is, their nesting in the tuple is lost)", 0) \
M(Bool, input_format_csv_deserialize_separate_columns_into_tuple, true, "If it set to true, then separate columns written in CSV format can be deserialized to Tuple column.", 0) \
M(Bool, output_format_csv_crlf_end_of_line, false, "If it is set true, end of line in CSV format will be \\r\\n instead of \\n.", 0) \
M(Bool, input_format_csv_allow_cr_end_of_line, false, "If it is set true, \\r will be allowed at end of line not followed by \\n", 0) \
M(Bool, input_format_csv_enum_as_number, false, "Treat inserted enum values in CSV formats as enum indices", 0) \
@ -1016,6 +1018,7 @@ class IColumn;
M(UInt64, input_format_max_bytes_to_read_for_schema_inference, 32 * 1024 * 1024, "The maximum bytes of data to read for automatic schema inference", 0) \
M(Bool, input_format_csv_use_best_effort_in_schema_inference, true, "Use some tweaks and heuristics to infer schema in CSV format", 0) \
M(Bool, input_format_csv_try_infer_numbers_from_strings, false, "Try to infer numbers from string fields while schema inference in CSV format", 0) \
M(Bool, input_format_csv_try_infer_strings_from_quoted_tuples, true, "Interpret quoted tuples in the input data as a value of type String.", 0) \
M(Bool, input_format_tsv_use_best_effort_in_schema_inference, true, "Use some tweaks and heuristics to infer schema in TSV format", 0) \
M(Bool, input_format_csv_detect_header, true, "Automatically detect header with names and types in CSV format", 0) \
M(Bool, input_format_csv_allow_whitespace_or_tab_as_delimiter, false, "Allow to use spaces and tabs(\\t) as field delimiter in the CSV strings", 0) \

View File

@ -124,6 +124,9 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"azure_max_upload_part_size", 5ull*1024*1024*1024, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to Azure blob storage."},
{"azure_upload_part_size_multiply_factor", 2, 2, "Multiply azure_min_upload_part_size by this factor each time azure_multiply_parts_count_threshold parts were uploaded from a single write to Azure blob storage."},
{"azure_upload_part_size_multiply_parts_count_threshold", 500, 500, "Each time this number of parts was uploaded to Azure blob storage, azure_min_upload_part_size is multiplied by azure_upload_part_size_multiply_factor."},
{"output_format_csv_serialize_tuple_into_separate_columns", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_csv_deserialize_separate_columns_into_tuple", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_csv_try_infer_strings_from_quoted_tuples", true, true, "A new way of how interpret tuples in CSV format was added."},
{"temporary_data_in_cache_reserve_space_wait_lock_timeout_milliseconds", (10 * 60 * 1000), (10 * 60 * 1000), "Wait time to lock cache for sapce reservation in temporary data in filesystem cache"},
}},
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},

View File

@ -527,26 +527,98 @@ void SerializationTuple::serializeTextXML(const IColumn & column, size_t row_num
void SerializationTuple::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
WriteBufferFromOwnString wb;
serializeText(column, row_num, wb, settings);
writeCSV(wb.str(), ostr);
if (settings.csv.serialize_tuple_into_separate_columns)
{
for (size_t i = 0; i < elems.size(); ++i)
{
if (i != 0)
writeChar(settings.csv.tuple_delimiter, ostr);
elems[i]->serializeTextCSV(extractElementColumn(column, i), row_num, ostr, settings);
}
}
else
{
WriteBufferFromOwnString wb;
serializeText(column, row_num, wb, settings);
writeCSV(wb.str(), ostr);
}
}
void SerializationTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String s;
readCSV(s, istr, settings.csv);
ReadBufferFromString rb(s);
deserializeText(column, rb, settings, true);
if (settings.csv.deserialize_separate_columns_into_tuple)
{
addElementSafe<void>(elems.size(), column, [&]
{
const size_t size = elems.size();
for (size_t i = 0; i < size; ++i)
{
if (i != 0)
{
skipWhitespaceIfAny(istr);
assertChar(settings.csv.tuple_delimiter, istr);
skipWhitespaceIfAny(istr);
}
auto & element_column = extractElementColumn(column, i);
if (settings.null_as_default && !isColumnNullableOrLowCardinalityNullable(element_column))
SerializationNullable::deserializeNullAsDefaultOrNestedTextCSV(element_column, istr, settings, elems[i]);
else
elems[i]->deserializeTextCSV(element_column, istr, settings);
}
return true;
});
}
else
{
String s;
readCSV(s, istr, settings.csv);
ReadBufferFromString rb(s);
deserializeText(column, rb, settings, true);
}
}
bool SerializationTuple::tryDeserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String s;
if (!tryReadCSV(s, istr, settings.csv))
return false;
ReadBufferFromString rb(s);
return tryDeserializeText(column, rb, settings, true);
if (settings.csv.deserialize_separate_columns_into_tuple)
{
return addElementSafe<bool>(elems.size(), column, [&]
{
const size_t size = elems.size();
for (size_t i = 0; i < size; ++i)
{
if (i != 0)
{
skipWhitespaceIfAny(istr);
if (!checkChar(settings.csv.tuple_delimiter, istr))
return false;
skipWhitespaceIfAny(istr);
}
auto & element_column = extractElementColumn(column, i);
if (settings.null_as_default && !isColumnNullableOrLowCardinalityNullable(element_column))
{
if (!SerializationNullable::tryDeserializeNullAsDefaultOrNestedTextCSV(element_column, istr, settings, elems[i]))
return false;
}
else
{
if (!elems[i]->tryDeserializeTextCSV(element_column, istr, settings))
return false;
}
}
return true;
});
}
else
{
String s;
if (!tryReadCSV(s, istr, settings.csv))
return false;
ReadBufferFromString rb(s);
return tryDeserializeText(column, rb, settings, true);
}
}
void SerializationTuple::enumerateStreams(

View File

@ -11,6 +11,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTTLElement.h>
#include <Poco/String.h>
@ -200,6 +201,13 @@ void DDLLoadingDependencyVisitor::extractTableNameFromArgument(const ASTFunction
qualified_name.database = table_identifier->getDatabaseName();
qualified_name.table = table_identifier->shortName();
}
else if (arg->as<ASTSubquery>())
{
/// Allow IN subquery.
/// Do not add tables from the subquery into dependencies,
/// because CREATE will succeed anyway.
return;
}
else
{
assert(false);

View File

@ -44,11 +44,11 @@ void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemo
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot alter table {} because it was created AS table function"
" and doesn't have structure in metadata", backQuote(ast_create_query.getTable()));
if (!has_structure && !ast_create_query.is_dictionary)
if (!has_structure && !ast_create_query.is_dictionary && !ast_create_query.isParameterizedView())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot alter table {} metadata doesn't have structure",
backQuote(ast_create_query.getTable()));
if (!ast_create_query.is_dictionary)
if (!ast_create_query.is_dictionary && !ast_create_query.isParameterizedView())
{
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(metadata.columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(metadata.secondary_indices);

View File

@ -861,9 +861,9 @@ size_t HashedArrayDictionary<dictionary_key_type, sharded>::getItemsShortCircuit
const auto & element = attribute_container[element_index];
if constexpr (is_nullable)
set_value(key_index, element, (*attribute.is_index_null)[shard][element_index]);
set_value(keys_found, element, (*attribute.is_index_null)[shard][element_index]);
else
set_value(key_index, element, false);
set_value(keys_found, element, false);
default_mask[key_index] = 0;
@ -962,9 +962,9 @@ size_t HashedArrayDictionary<dictionary_key_type, sharded>::getItemsShortCircuit
const auto & element = attribute_container[found_element_index];
if constexpr (is_nullable)
set_value(key_index, element, (*attribute.is_index_null)[shard][found_element_index]);
set_value(keys_found, element, (*attribute.is_index_null)[shard][found_element_index]);
else
set_value(key_index, element, false);
set_value(keys_found, element, false);
}
}

View File

@ -1135,7 +1135,7 @@ size_t HashedDictionary<dictionary_key_type, sparse, sharded>::getItemsShortCirc
if (it != container.end())
{
set_value(key_index, getValueFromCell(it));
set_value(keys_found, getValueFromCell(it));
default_mask[key_index] = 0;
++keys_found;
@ -1143,7 +1143,7 @@ size_t HashedDictionary<dictionary_key_type, sparse, sharded>::getItemsShortCirc
// Need to consider items in is_nullable_sets as well, see blockToAttributes()
else if (is_nullable && (*attribute.is_nullable_sets)[shard].find(key) != nullptr)
{
set_null(key_index);
set_null(keys_found);
default_mask[key_index] = 0;
++keys_found;

View File

@ -304,7 +304,7 @@ DataTypePtr tryInferDataTypeByEscapingRule(const String & field, const FormatSet
auto type = tryInferDataTypeForSingleField(data, format_settings);
/// If we couldn't infer any type or it's a number and csv.try_infer_numbers_from_strings = 0, we determine it as a string.
if (!type || (isNumber(type) && !format_settings.csv.try_infer_numbers_from_strings))
if (!type || (format_settings.csv.try_infer_strings_from_quoted_tuples && isTuple(type)) || (!format_settings.csv.try_infer_numbers_from_strings && isNumber(type)))
return std::make_shared<DataTypeString>();
return type;
@ -440,13 +440,15 @@ String getAdditionalFormatInfoByEscapingRule(const FormatSettings & settings, Fo
case FormatSettings::EscapingRule::CSV:
result += fmt::format(
", use_best_effort_in_schema_inference={}, bool_true_representation={}, bool_false_representation={},"
" null_representation={}, delimiter={}, tuple_delimiter={}",
" null_representation={}, delimiter={}, tuple_delimiter={}, try_infer_numbers_from_strings={}, try_infer_strings_from_quoted_tuples={}",
settings.csv.use_best_effort_in_schema_inference,
settings.bool_true_representation,
settings.bool_false_representation,
settings.csv.null_representation,
settings.csv.delimiter,
settings.csv.tuple_delimiter);
settings.csv.tuple_delimiter,
settings.csv.try_infer_numbers_from_strings,
settings.csv.try_infer_strings_from_quoted_tuples);
break;
case FormatSettings::EscapingRule::JSON:
result += fmt::format(

View File

@ -76,6 +76,8 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.avro.output_rows_in_file = settings.output_format_avro_rows_in_file;
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.serialize_tuple_into_separate_columns = settings.output_format_csv_serialize_tuple_into_separate_columns;
format_settings.csv.deserialize_separate_columns_into_tuple = settings.input_format_csv_deserialize_separate_columns_into_tuple;
format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line;
format_settings.csv.allow_cr_end_of_line = settings.input_format_csv_allow_cr_end_of_line;
format_settings.csv.delimiter = settings.format_csv_delimiter;
@ -93,6 +95,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.csv.allow_variable_number_of_columns = settings.input_format_csv_allow_variable_number_of_columns;
format_settings.csv.use_default_on_bad_values = settings.input_format_csv_use_default_on_bad_values;
format_settings.csv.try_infer_numbers_from_strings = settings.input_format_csv_try_infer_numbers_from_strings;
format_settings.csv.try_infer_strings_from_quoted_tuples = settings.input_format_csv_try_infer_strings_from_quoted_tuples;
format_settings.hive_text.fields_delimiter = settings.input_format_hive_text_fields_delimiter;
format_settings.hive_text.collection_items_delimiter = settings.input_format_hive_text_collection_items_delimiter;
format_settings.hive_text.map_keys_delimiter = settings.input_format_hive_text_map_keys_delimiter;

View File

@ -152,6 +152,8 @@ struct FormatSettings
char delimiter = ',';
bool allow_single_quotes = true;
bool allow_double_quotes = true;
bool serialize_tuple_into_separate_columns = true;
bool deserialize_separate_columns_into_tuple = true;
bool empty_as_default = false;
bool crlf_end_of_line = false;
bool allow_cr_end_of_line = false;
@ -169,6 +171,7 @@ struct FormatSettings
bool allow_variable_number_of_columns = false;
bool use_default_on_bad_values = false;
bool try_infer_numbers_from_strings = true;
bool try_infer_strings_from_quoted_tuples = true;
} csv{};
struct HiveText

View File

@ -3,14 +3,14 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <Common/BitHelpers.h>
#include <Common/BinStringDecodeHelper.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/castColumn.h>
#include <Common/BinStringDecodeHelper.h>
#include <Common/BitHelpers.h>
namespace DB
{
@ -218,10 +218,7 @@ struct UnbinImpl
static constexpr auto name = "unbin";
static constexpr size_t word_size = 8;
static void decode(const char * pos, const char * end, char *& out)
{
binStringDecode(pos, end, out);
}
static void decode(const char * pos, const char * end, char *& out) { binStringDecode(pos, end, out, word_size); }
};
/// Encode number or string to string with binary or hexadecimal representation
@ -651,7 +648,15 @@ public:
size_t size = in_offsets.size();
out_offsets.resize(size);
out_vec.resize(in_vec.size() / word_size + size);
size_t max_out_len = 0;
for (size_t i = 0; i < in_offsets.size(); ++i)
{
const size_t len = in_offsets[i] - (i == 0 ? 0 : in_offsets[i - 1])
- /* trailing zero symbol that is always added in ColumnString and that is ignored while decoding */ 1;
max_out_len += (len + word_size - 1) / word_size + /* trailing zero symbol that is always added by Impl::decode */ 1;
}
out_vec.resize(max_out_len);
char * begin = reinterpret_cast<char *>(out_vec.data());
char * pos = begin;
@ -661,6 +666,7 @@ public:
{
size_t new_offset = in_offsets[i];
/// `new_offset - 1` because in ColumnString each string is stored with trailing zero byte
Impl::decode(reinterpret_cast<const char *>(&in_vec[prev_offset]), reinterpret_cast<const char *>(&in_vec[new_offset - 1]), pos);
out_offsets[i] = pos - begin;
@ -668,6 +674,9 @@ public:
prev_offset = new_offset;
}
chassert(
static_cast<size_t>(pos - begin) <= out_vec.size(),
fmt::format("too small amount of memory was preallocated: needed {}, but have only {}", pos - begin, out_vec.size()));
out_vec.resize(pos - begin);
return col_res;
@ -680,11 +689,11 @@ public:
ColumnString::Offsets & out_offsets = col_res->getOffsets();
const ColumnString::Chars & in_vec = col_fix_string->getChars();
size_t n = col_fix_string->getN();
const size_t n = col_fix_string->getN();
size_t size = col_fix_string->size();
out_offsets.resize(size);
out_vec.resize(in_vec.size() / word_size + size);
out_vec.resize(((n + word_size - 1) / word_size + /* trailing zero symbol that is always added by Impl::decode */ 1) * size);
char * begin = reinterpret_cast<char *>(out_vec.data());
char * pos = begin;
@ -694,6 +703,7 @@ public:
{
size_t new_offset = prev_offset + n;
/// here we don't subtract 1 from `new_offset` because in ColumnFixedString strings are stored without trailing zero byte
Impl::decode(reinterpret_cast<const char *>(&in_vec[prev_offset]), reinterpret_cast<const char *>(&in_vec[new_offset]), pos);
out_offsets[i] = pos - begin;
@ -701,6 +711,9 @@ public:
prev_offset = new_offset;
}
chassert(
static_cast<size_t>(pos - begin) <= out_vec.size(),
fmt::format("too small amount of memory was preallocated: needed {}, but have only {}", pos - begin, out_vec.size()));
out_vec.resize(pos - begin);
return col_res;

View File

@ -47,55 +47,86 @@ bool allArgumentsAreConstants(const ColumnsWithTypeAndName & args)
return true;
}
/// Replaces single low cardinality column in a function call by its dictionary
/// This can only happen after the arguments have been adapted in IFunctionOverloadResolver::getReturnType
/// as it's only possible if there is one low cardinality column and, optionally, const columns
ColumnPtr replaceLowCardinalityColumnsByNestedAndGetDictionaryIndexes(
ColumnsWithTypeAndName & args, bool can_be_executed_on_default_arguments, size_t input_rows_count)
{
size_t num_rows = input_rows_count;
/// We return the LC indexes so the LC can be reconstructed with the function result
ColumnPtr indexes;
/// Find first LowCardinality column and replace it to nested dictionary.
for (auto & column : args)
size_t number_low_cardinality_columns = 0;
size_t last_low_cardinality = 0;
size_t number_const_columns = 0;
size_t number_full_columns = 0;
for (size_t i = 0; i < args.size(); i++)
{
if (const auto * low_cardinality_column = checkAndGetColumn<ColumnLowCardinality>(column.column.get()))
auto const & arg = args[i];
if (checkAndGetColumn<ColumnLowCardinality>(arg.column.get()))
{
/// Single LowCardinality column is supported now.
if (indexes)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected single dictionary argument for function.");
const auto * low_cardinality_type = checkAndGetDataType<DataTypeLowCardinality>(column.type.get());
if (!low_cardinality_type)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Incompatible type for LowCardinality column: {}",
column.type->getName());
if (can_be_executed_on_default_arguments)
{
/// Normal case, when function can be executed on values' default.
column.column = low_cardinality_column->getDictionary().getNestedColumn();
indexes = low_cardinality_column->getIndexesPtr();
}
else
{
/// Special case when default value can't be used. Example: 1 % LowCardinality(Int).
/// LowCardinality always contains default, so 1 % 0 will throw exception in normal case.
auto dict_encoded = low_cardinality_column->getMinimalDictionaryEncodedColumn(0, low_cardinality_column->size());
column.column = dict_encoded.dictionary;
indexes = dict_encoded.indexes;
}
num_rows = column.column->size();
column.type = low_cardinality_type->getDictionaryType();
number_low_cardinality_columns++;
last_low_cardinality = i;
}
else if (checkAndGetColumn<ColumnConst>(arg.column.get()))
number_const_columns++;
else
number_full_columns++;
}
/// Change size of constants.
if (!number_low_cardinality_columns && !number_const_columns)
return nullptr;
if (number_full_columns > 0 || number_low_cardinality_columns > 1)
{
/// This should not be possible but currently there are multiple tests in CI failing because of it
/// TODO: Fix those cases, then enable this exception
#if 0
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected low cardinality types found. Low cardinality: {}. Full {}. Const {}",
number_low_cardinality_columns, number_full_columns, number_const_columns);
#else
return nullptr;
#endif
}
else if (number_low_cardinality_columns == 1)
{
auto & lc_arg = args[last_low_cardinality];
const auto * low_cardinality_type = checkAndGetDataType<DataTypeLowCardinality>(lc_arg.type.get());
if (!low_cardinality_type)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incompatible type for LowCardinality column: {}", lc_arg.type->getName());
const auto * low_cardinality_column = checkAndGetColumn<ColumnLowCardinality>(lc_arg.column.get());
chassert(low_cardinality_column);
if (can_be_executed_on_default_arguments)
{
/// Normal case, when function can be executed on values' default.
lc_arg.column = low_cardinality_column->getDictionary().getNestedColumn();
indexes = low_cardinality_column->getIndexesPtr();
}
else
{
/// Special case when default value can't be used. Example: 1 % LowCardinality(Int).
/// LowCardinality always contains default, so 1 % 0 will throw exception in normal case.
auto dict_encoded = low_cardinality_column->getMinimalDictionaryEncodedColumn(0, low_cardinality_column->size());
lc_arg.column = dict_encoded.dictionary;
indexes = dict_encoded.indexes;
}
/// The new column will have a different number of rows, normally less but occasionally it might be more (NULL)
input_rows_count = lc_arg.column->size();
lc_arg.type = low_cardinality_type->getDictionaryType();
}
/// Change size of constants
for (auto & column : args)
{
if (const auto * column_const = checkAndGetColumn<ColumnConst>(column.column.get()))
{
column.column = column_const->removeLowCardinality()->cloneResized(num_rows);
column.type = removeLowCardinality(column.type);
column.column = ColumnConst::create(recursiveRemoveLowCardinality(column_const->getDataColumnPtr()), input_rows_count);
column.type = recursiveRemoveLowCardinality(column.type);
}
}
@ -271,6 +302,8 @@ ColumnPtr IExecutableFunction::executeWithoutSparseColumns(const ColumnsWithType
bool can_be_executed_on_default_arguments = canBeExecutedOnDefaultArguments();
const auto & dictionary_type = res_low_cardinality_type->getDictionaryType();
/// The arguments should have been adapted in IFunctionOverloadResolver::getReturnType
/// So there is only one low cardinality column (and optionally some const columns) and no full column
ColumnPtr indexes = replaceLowCardinalityColumnsByNestedAndGetDictionaryIndexes(
columns_without_low_cardinality, can_be_executed_on_default_arguments, input_rows_count);

View File

@ -225,6 +225,17 @@ public:
virtual bool isDeterministicInScopeOfQuery() const { return true; }
/** This is a special flags for functions which return constant value for the server,
* but the result could be different for different servers in distributed query.
*
* This functions can't support constant folding on the initiator, but can on the follower.
* We can't apply some optimizations as well (e.g. can't remove constant result from GROUP BY key).
* So, it is convenient to have a special flag for them.
*
* Examples are: "__getScalar" and every function from serverConstants.cpp
*/
virtual bool isServerConstant() const { return false; }
/** Lets you know if the function is monotonic in a range of values.
* This is used to work with the index in a sorted chunk of data.
* And allows to use the index not only when it is written, for example `date >= const`, but also, for example, `toMonth(date) >= 11`.
@ -483,6 +494,7 @@ public:
virtual bool isInjective(const ColumnsWithTypeAndName & /*sample_columns*/) const { return false; }
virtual bool isDeterministic() const { return true; }
virtual bool isDeterministicInScopeOfQuery() const { return true; }
virtual bool isServerConstant() const { return false; }
virtual bool isStateful() const { return false; }
using ShortCircuitSettings = IFunctionBase::ShortCircuitSettings;

View File

@ -84,6 +84,8 @@ public:
bool isDeterministicInScopeOfQuery() const override { return function->isDeterministicInScopeOfQuery(); }
bool isServerConstant() const override { return function->isServerConstant(); }
bool isShortCircuit(ShortCircuitSettings & settings, size_t number_of_arguments) const override { return function->isShortCircuit(settings, number_of_arguments); }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & args) const override { return function->isSuitableForShortCircuitArgumentsExecution(args); }

View File

@ -53,6 +53,8 @@ public:
/// getMacro may return different values on different shards/replicas, so it's not constant for distributed query
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isServerConstant() const override { return true; }
size_t getNumberOfArguments() const override
{
return 1;

View File

@ -49,6 +49,8 @@ public:
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
bool isServerConstant() const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 1 || !isString(arguments[0].type) || !arguments[0].column || !isColumnConst(*arguments[0].column))
@ -105,6 +107,8 @@ public:
bool isDeterministic() const override { return false; }
bool isServerConstant() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }

View File

@ -20,117 +20,125 @@ namespace DB
namespace
{
template<typename Derived, typename T, typename ColumnT>
class FunctionServerConstantBase : public FunctionConstantBase<Derived, T, ColumnT>
{
public:
using FunctionConstantBase<Derived, T, ColumnT>::FunctionConstantBase;
bool isServerConstant() const override { return true; }
};
#if defined(__ELF__) && !defined(OS_FREEBSD)
/// buildId() - returns the compiler build id of the running binary.
class FunctionBuildId : public FunctionConstantBase<FunctionBuildId, String, DataTypeString>
class FunctionBuildId : public FunctionServerConstantBase<FunctionBuildId, String, DataTypeString>
{
public:
static constexpr auto name = "buildId";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionBuildId>(context); }
explicit FunctionBuildId(ContextPtr context) : FunctionConstantBase(SymbolIndex::instance().getBuildIDHex(), context->isDistributed()) {}
explicit FunctionBuildId(ContextPtr context) : FunctionServerConstantBase(SymbolIndex::instance().getBuildIDHex(), context->isDistributed()) {}
};
#endif
/// Get the host name. Is is constant on single server, but is not constant in distributed queries.
class FunctionHostName : public FunctionConstantBase<FunctionHostName, String, DataTypeString>
/// Get the host name. It is constant on single server, but is not constant in distributed queries.
class FunctionHostName : public FunctionServerConstantBase<FunctionHostName, String, DataTypeString>
{
public:
static constexpr auto name = "hostName";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionHostName>(context); }
explicit FunctionHostName(ContextPtr context) : FunctionConstantBase(DNSResolver::instance().getHostName(), context->isDistributed()) {}
explicit FunctionHostName(ContextPtr context) : FunctionServerConstantBase(DNSResolver::instance().getHostName(), context->isDistributed()) {}
};
class FunctionServerUUID : public FunctionConstantBase<FunctionServerUUID, UUID, DataTypeUUID>
class FunctionServerUUID : public FunctionServerConstantBase<FunctionServerUUID, UUID, DataTypeUUID>
{
public:
static constexpr auto name = "serverUUID";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionServerUUID>(context); }
explicit FunctionServerUUID(ContextPtr context) : FunctionConstantBase(ServerUUID::get(), context->isDistributed()) {}
explicit FunctionServerUUID(ContextPtr context) : FunctionServerConstantBase(ServerUUID::get(), context->isDistributed()) {}
};
class FunctionTCPPort : public FunctionConstantBase<FunctionTCPPort, UInt16, DataTypeUInt16>
class FunctionTCPPort : public FunctionServerConstantBase<FunctionTCPPort, UInt16, DataTypeUInt16>
{
public:
static constexpr auto name = "tcpPort";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionTCPPort>(context); }
explicit FunctionTCPPort(ContextPtr context) : FunctionConstantBase(context->getTCPPort(), context->isDistributed()) {}
explicit FunctionTCPPort(ContextPtr context) : FunctionServerConstantBase(context->getTCPPort(), context->isDistributed()) {}
};
/// Returns timezone for current session.
class FunctionTimezone : public FunctionConstantBase<FunctionTimezone, String, DataTypeString>
class FunctionTimezone : public FunctionServerConstantBase<FunctionTimezone, String, DataTypeString>
{
public:
static constexpr auto name = "timezone";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionTimezone>(context); }
explicit FunctionTimezone(ContextPtr context) : FunctionConstantBase(DateLUT::instance().getTimeZone(), context->isDistributed()) {}
explicit FunctionTimezone(ContextPtr context) : FunctionServerConstantBase(DateLUT::instance().getTimeZone(), context->isDistributed()) {}
};
/// Returns the server time zone (timezone in which server runs).
class FunctionServerTimezone : public FunctionConstantBase<FunctionServerTimezone, String, DataTypeString>
class FunctionServerTimezone : public FunctionServerConstantBase<FunctionServerTimezone, String, DataTypeString>
{
public:
static constexpr auto name = "serverTimezone";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionServerTimezone>(context); }
explicit FunctionServerTimezone(ContextPtr context) : FunctionConstantBase(DateLUT::serverTimezoneInstance().getTimeZone(), context->isDistributed()) {}
explicit FunctionServerTimezone(ContextPtr context) : FunctionServerConstantBase(DateLUT::serverTimezoneInstance().getTimeZone(), context->isDistributed()) {}
};
/// Returns server uptime in seconds.
class FunctionUptime : public FunctionConstantBase<FunctionUptime, UInt32, DataTypeUInt32>
class FunctionUptime : public FunctionServerConstantBase<FunctionUptime, UInt32, DataTypeUInt32>
{
public:
static constexpr auto name = "uptime";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionUptime>(context); }
explicit FunctionUptime(ContextPtr context) : FunctionConstantBase(context->getUptimeSeconds(), context->isDistributed()) {}
explicit FunctionUptime(ContextPtr context) : FunctionServerConstantBase(context->getUptimeSeconds(), context->isDistributed()) {}
};
/// version() - returns the current version as a string.
class FunctionVersion : public FunctionConstantBase<FunctionVersion, String, DataTypeString>
class FunctionVersion : public FunctionServerConstantBase<FunctionVersion, String, DataTypeString>
{
public:
static constexpr auto name = "version";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionVersion>(context); }
explicit FunctionVersion(ContextPtr context) : FunctionConstantBase(VERSION_STRING, context->isDistributed()) {}
explicit FunctionVersion(ContextPtr context) : FunctionServerConstantBase(VERSION_STRING, context->isDistributed()) {}
};
/// revision() - returns the current revision.
class FunctionRevision : public FunctionConstantBase<FunctionRevision, UInt32, DataTypeUInt32>
class FunctionRevision : public FunctionServerConstantBase<FunctionRevision, UInt32, DataTypeUInt32>
{
public:
static constexpr auto name = "revision";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionRevision>(context); }
explicit FunctionRevision(ContextPtr context) : FunctionConstantBase(ClickHouseRevision::getVersionRevision(), context->isDistributed()) {}
explicit FunctionRevision(ContextPtr context) : FunctionServerConstantBase(ClickHouseRevision::getVersionRevision(), context->isDistributed()) {}
};
class FunctionZooKeeperSessionUptime : public FunctionConstantBase<FunctionZooKeeperSessionUptime, UInt32, DataTypeUInt32>
class FunctionZooKeeperSessionUptime : public FunctionServerConstantBase<FunctionZooKeeperSessionUptime, UInt32, DataTypeUInt32>
{
public:
static constexpr auto name = "zookeeperSessionUptime";
explicit FunctionZooKeeperSessionUptime(ContextPtr context)
: FunctionConstantBase(context->getZooKeeperSessionUptime(), context->isDistributed())
: FunctionServerConstantBase(context->getZooKeeperSessionUptime(), context->isDistributed())
{
}
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionZooKeeperSessionUptime>(context); }
};
class FunctionGetOSKernelVersion : public FunctionConstantBase<FunctionGetOSKernelVersion, String, DataTypeString>
class FunctionGetOSKernelVersion : public FunctionServerConstantBase<FunctionGetOSKernelVersion, String, DataTypeString>
{
public:
static constexpr auto name = "getOSKernelVersion";
explicit FunctionGetOSKernelVersion(ContextPtr context) : FunctionConstantBase(Poco::Environment::osName() + " " + Poco::Environment::osVersion(), context->isDistributed()) {}
explicit FunctionGetOSKernelVersion(ContextPtr context) : FunctionServerConstantBase(Poco::Environment::osName() + " " + Poco::Environment::osVersion(), context->isDistributed()) {}
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionGetOSKernelVersion>(context); }
};
class FunctionDisplayName : public FunctionConstantBase<FunctionDisplayName, String, DataTypeString>
class FunctionDisplayName : public FunctionServerConstantBase<FunctionDisplayName, String, DataTypeString>
{
public:
static constexpr auto name = "displayName";
explicit FunctionDisplayName(ContextPtr context) : FunctionConstantBase(context->getConfigRef().getString("display_name", getFQDNOrHostName()), context->isDistributed()) {}
explicit FunctionDisplayName(ContextPtr context) : FunctionServerConstantBase(context->getConfigRef().getString("display_name", getFQDNOrHostName()), context->isDistributed()) {}
static FunctionPtr create(ContextPtr context) {return std::make_shared<FunctionDisplayName>(context); }
};
}

View File

@ -804,6 +804,17 @@ void Client::updateURIForBucket(const std::string & bucket, S3::URI new_uri) con
cache->uri_for_bucket_cache.emplace(bucket, std::move(new_uri));
}
ClientCache::ClientCache(const ClientCache & other)
{
{
std::lock_guard lock(other.region_cache_mutex);
region_for_bucket_cache = other.region_for_bucket_cache;
}
{
std::lock_guard lock(other.uri_cache_mutex);
uri_for_bucket_cache = other.uri_for_bucket_cache;
}
}
void ClientCache::clearCache()
{

View File

@ -54,10 +54,7 @@ struct ClientCache
{
ClientCache() = default;
ClientCache(const ClientCache & other)
: region_for_bucket_cache(other.region_for_bucket_cache)
, uri_for_bucket_cache(other.uri_for_bucket_cache)
{}
ClientCache(const ClientCache & other);
ClientCache(ClientCache && other) = delete;
@ -66,11 +63,11 @@ struct ClientCache
void clearCache();
std::mutex region_cache_mutex;
std::unordered_map<std::string, std::string> region_for_bucket_cache;
mutable std::mutex region_cache_mutex;
std::unordered_map<std::string, std::string> region_for_bucket_cache TSA_GUARDED_BY(region_cache_mutex);
std::mutex uri_cache_mutex;
std::unordered_map<std::string, URI> uri_for_bucket_cache;
mutable std::mutex uri_cache_mutex;
std::unordered_map<std::string, URI> uri_for_bucket_cache TSA_GUARDED_BY(uri_cache_mutex);
};
class ClientCacheRegistry
@ -89,7 +86,7 @@ private:
ClientCacheRegistry() = default;
std::mutex clients_mutex;
std::unordered_map<ClientCache *, std::weak_ptr<ClientCache>> client_caches;
std::unordered_map<ClientCache *, std::weak_ptr<ClientCache>> client_caches TSA_GUARDED_BY(clients_mutex);
};
bool isS3ExpressEndpoint(const std::string & endpoint);

View File

@ -105,7 +105,14 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
{
finalize();
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void WriteBufferFromFileDescriptor::finalizeImpl()

View File

@ -114,6 +114,34 @@ namespace
else if (query.grantees)
user.grantees = *query.grantees;
}
time_t getValidUntilFromAST(ASTPtr valid_until, ContextPtr context)
{
if (context)
valid_until = evaluateConstantExpressionAsLiteral(valid_until, context);
const String valid_until_str = checkAndGetLiteralArgument<String>(valid_until, "valid_until");
if (valid_until_str == "infinity")
return 0;
time_t time = 0;
ReadBufferFromString in(valid_until_str);
if (context)
{
const auto & time_zone = DateLUT::instance("");
const auto & utc_time_zone = DateLUT::instance("UTC");
parseDateTimeBestEffort(time, in, time_zone, utc_time_zone);
}
else
{
readDateTimeText(time, in);
}
return time;
}
}
BlockIO InterpreterCreateUserQuery::execute()
@ -134,23 +162,7 @@ BlockIO InterpreterCreateUserQuery::execute()
std::optional<time_t> valid_until;
if (query.valid_until)
{
const ASTPtr valid_until_literal = evaluateConstantExpressionAsLiteral(query.valid_until, getContext());
const String valid_until_str = checkAndGetLiteralArgument<String>(valid_until_literal, "valid_until");
time_t time = 0;
if (valid_until_str != "infinity")
{
const auto & time_zone = DateLUT::instance("");
const auto & utc_time_zone = DateLUT::instance("UTC");
ReadBufferFromString in(valid_until_str);
parseDateTimeBestEffort(time, in, time_zone, utc_time_zone);
}
valid_until = time;
}
valid_until = getValidUntilFromAST(query.valid_until, getContext());
std::optional<RolesOrUsersSet> default_roles_from_query;
if (query.default_roles)
@ -259,7 +271,11 @@ void InterpreterCreateUserQuery::updateUserFromQuery(User & user, const ASTCreat
if (query.auth_data)
auth_data = AuthenticationData::fromAST(*query.auth_data, {}, !query.attach);
updateUserFromQueryImpl(user, query, auth_data, {}, {}, {}, {}, {}, allow_no_password, allow_plaintext_password, true);
std::optional<time_t> valid_until;
if (query.valid_until)
valid_until = getValidUntilFromAST(query.valid_until, {});
updateUserFromQueryImpl(user, query, auth_data, {}, {}, {}, {}, valid_until, allow_no_password, allow_plaintext_password, true);
}
void registerInterpreterCreateUserQuery(InterpreterFactory & factory)

View File

@ -438,6 +438,12 @@ BlockIO InterpreterGrantQuery::execute()
RolesOrUsersSet roles_to_revoke;
collectRolesToGrantOrRevoke(access_control, query, roles_to_grant, roles_to_revoke);
/// Replacing empty database with the default. This step must be done before replication to avoid privilege escalation.
String current_database = getContext()->getCurrentDatabase();
elements_to_grant.replaceEmptyDatabase(current_database);
elements_to_revoke.replaceEmptyDatabase(current_database);
query.access_rights_elements.replaceEmptyDatabase(current_database);
/// Executing on cluster.
if (!query.cluster.empty())
{
@ -453,9 +459,6 @@ BlockIO InterpreterGrantQuery::execute()
}
/// Check if the current user has corresponding access rights granted with grant option.
String current_database = getContext()->getCurrentDatabase();
elements_to_grant.replaceEmptyDatabase(current_database);
elements_to_revoke.replaceEmptyDatabase(current_database);
bool need_check_grantees_are_allowed = true;
if (!query.current_grants)
checkGrantOption(access_control, *current_user_access, grantees, need_check_grantees_are_allowed, elements_to_grant, elements_to_revoke);

View File

@ -1689,7 +1689,7 @@ void ActionsDAG::mergeNodes(ActionsDAG && second)
}
}
ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split_nodes, bool create_split_nodes_mapping) const
ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split_nodes, bool create_split_nodes_mapping, bool avoid_duplicate_inputs) const
{
/// Split DAG into two parts.
/// (first_nodes, first_outputs) is a part which will have split_list in result.
@ -1703,6 +1703,14 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
/// List of nodes from current actions which are not inputs, but will be in second part.
NodeRawConstPtrs new_inputs;
/// Avoid new inputs to have the same name as existing inputs.
/// It's allowed for DAG but may break Block invariant 'columns with identical name must have identical structure'.
std::unordered_set<std::string_view> duplicate_inputs;
size_t duplicate_counter = 0;
if (avoid_duplicate_inputs)
for (const auto * input : inputs)
duplicate_inputs.insert(input->result_name);
struct Frame
{
const Node * node = nullptr;
@ -1815,7 +1823,8 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
input_node.result_name = child->result_name;
child_data.to_second = &second_nodes.emplace_back(std::move(input_node));
new_inputs.push_back(child);
if (child->type != ActionType::INPUT)
new_inputs.push_back(child);
}
}
@ -1871,7 +1880,32 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
for (const auto * input : new_inputs)
{
const auto & cur = data[input];
auto & cur = data[input];
if (avoid_duplicate_inputs)
{
bool is_name_updated = false;
while (!duplicate_inputs.insert(cur.to_first->result_name).second)
{
is_name_updated = true;
cur.to_first->result_name = fmt::format("{}_{}", input->result_name, duplicate_counter);
++duplicate_counter;
}
if (is_name_updated)
{
Node input_node;
input_node.type = ActionType::INPUT;
input_node.result_type = cur.to_first->result_type;
input_node.result_name = cur.to_first->result_name;
auto * new_input = &second_nodes.emplace_back(std::move(input_node));
cur.to_second->type = ActionType::ALIAS;
cur.to_second->children = {new_input};
cur.to_second = new_input;
}
}
second_inputs.push_back(cur.to_second);
first_outputs.push_back(cur.to_first);
}

View File

@ -342,7 +342,7 @@ public:
/// initial DAG : (a, b, c, d, e) -> (w, x, y, z) | 1 a 2 b 3 c 4 d 5 e 6 -> 1 2 3 4 5 6 w x y z
/// split (first) : (a, c, d) -> (i, j, k, w, y) | 1 a 2 b 3 c 4 d 5 e 6 -> 1 2 b 3 4 5 e 6 i j k w y
/// split (second) : (i, j, k, y, b, e) -> (x, y, z) | 1 2 b 3 4 5 e 6 i j k w y -> 1 2 3 4 5 6 w x y z
SplitResult split(std::unordered_set<const Node *> split_nodes, bool create_split_nodes_mapping = false) const;
SplitResult split(std::unordered_set<const Node *> split_nodes, bool create_split_nodes_mapping = false, bool avoid_duplicate_inputs = false) const;
/// Splits actions into two parts. Returned first half may be swapped with ARRAY JOIN.
SplitResult splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const;

View File

@ -200,7 +200,7 @@ IColumn::Selector ConcurrentHashJoin::selectDispatchBlock(const Strings & key_co
{
const auto & key_col = from_block.getByName(key_name).column->convertToFullColumnIfConst();
const auto & key_col_no_lc = recursiveRemoveLowCardinality(recursiveRemoveSparse(key_col));
key_col_no_lc->updateWeakHash32(hash);
hash.update(key_col_no_lc->getWeakHash32());
}
return hashToSelector(hash, num_shards);
}

View File

@ -669,6 +669,9 @@ struct ContextSharedPart : boost::noncopyable
}
}
LOG_TRACE(log, "Shutting down AccessControl");
access_control->shutdown();
{
std::lock_guard lock(mutex);

View File

@ -194,6 +194,10 @@ static void setLazyExecutionInfo(
}
lazy_execution_info.short_circuit_ancestors_info[parent].insert(indexes.begin(), indexes.end());
/// After checking arguments_with_disabled_lazy_execution, if there is no relation with parent,
/// disable the current node.
if (indexes.empty())
lazy_execution_info.can_be_lazy_executed = false;
}
else
/// If lazy execution is disabled for one of parents, we should disable it for current node.
@ -291,9 +295,9 @@ static std::unordered_set<const ActionsDAG::Node *> processShortCircuitFunctions
/// Firstly, find all short-circuit functions and get their settings.
std::unordered_map<const ActionsDAG::Node *, IFunctionBase::ShortCircuitSettings> short_circuit_nodes;
IFunctionBase::ShortCircuitSettings short_circuit_settings;
for (const auto & node : nodes)
{
IFunctionBase::ShortCircuitSettings short_circuit_settings;
if (node.type == ActionsDAG::ActionType::FUNCTION && node.function_base->isShortCircuit(short_circuit_settings, node.children.size()) && !node.children.empty())
short_circuit_nodes[&node] = short_circuit_settings;
}

Some files were not shown because too many files have changed in this diff Show More