Merge branch 'master' into tighten-limits-functional-tests

This commit is contained in:
Alexey Milovidov 2024-07-31 17:41:14 +02:00
commit 3381c02413
93 changed files with 575 additions and 586 deletions

2
contrib/azure vendored

@ -1 +1 @@
Subproject commit ea3e19a7be08519134c643177d56c7484dfec884
Subproject commit 67272b7ee0adff6b69921b26eb071ba1a353062c

2
contrib/rocksdb vendored

@ -1 +1 @@
Subproject commit be366233921293bd07a84dc4ea6991858665f202
Subproject commit 01e43568fa9f3f7bf107b2b66c00b286b456f33e

View File

@ -5,6 +5,9 @@ if (NOT ENABLE_ROCKSDB)
return()
endif()
# not in original build system, otherwise xxHash.cc fails to compile with ClickHouse C++23 default
set (CMAKE_CXX_STANDARD 20)
# Always disable jemalloc for rocksdb by default because it introduces non-standard jemalloc APIs
option(WITH_JEMALLOC "build with JeMalloc" OFF)
@ -16,14 +19,6 @@ option(WITH_LZ4 "build with lz4" ON)
option(WITH_ZLIB "build with zlib" ON)
option(WITH_ZSTD "build with zstd" ON)
# third-party/folly is only validated to work on Linux and Windows for now.
# So only turn it on there by default.
if(CMAKE_SYSTEM_NAME MATCHES "Linux|Windows")
option(WITH_FOLLY_DISTRIBUTED_MUTEX "build with folly::DistributedMutex" ON)
else()
option(WITH_FOLLY_DISTRIBUTED_MUTEX "build with folly::DistributedMutex" OFF)
endif()
if(WITH_SNAPPY)
add_definitions(-DSNAPPY)
list(APPEND THIRDPARTY_LIBS ch_contrib::snappy)
@ -44,7 +39,7 @@ if(WITH_ZSTD)
list(APPEND THIRDPARTY_LIBS ch_contrib::zstd)
endif()
option(PORTABLE "build a portable binary" ON)
add_definitions(-DROCKSDB_PORTABLE)
if(ENABLE_SSE42 AND ENABLE_PCLMULQDQ)
add_definitions(-DHAVE_SSE42)
@ -59,11 +54,6 @@ if(CMAKE_SYSTEM_PROCESSOR MATCHES "arm64|aarch64|AARCH64")
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=armv8-a+crc+crypto -Wno-unused-function")
endif()
set (HAVE_THREAD_LOCAL 1)
if(HAVE_THREAD_LOCAL)
add_definitions(-DROCKSDB_SUPPORT_THREAD_LOCAL)
endif()
if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
add_definitions(-DOS_MACOSX)
elseif(CMAKE_SYSTEM_NAME MATCHES "Linux")
@ -89,19 +79,21 @@ set(ROCKSDB_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rocksdb")
include_directories(${ROCKSDB_SOURCE_DIR})
include_directories("${ROCKSDB_SOURCE_DIR}/include")
if(WITH_FOLLY_DISTRIBUTED_MUTEX)
include_directories("${ROCKSDB_SOURCE_DIR}/third-party/folly")
endif()
set(SOURCES
${ROCKSDB_SOURCE_DIR}/cache/cache.cc
${ROCKSDB_SOURCE_DIR}/cache/cache_entry_roles.cc
${ROCKSDB_SOURCE_DIR}/cache/cache_key.cc
${ROCKSDB_SOURCE_DIR}/cache/cache_helpers.cc
${ROCKSDB_SOURCE_DIR}/cache/cache_reservation_manager.cc
${ROCKSDB_SOURCE_DIR}/cache/charged_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/clock_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/compressed_secondary_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/lru_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/secondary_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/sharded_cache.cc
${ROCKSDB_SOURCE_DIR}/db/arena_wrapped_db_iter.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_contents.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_fetcher.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_file_addition.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_file_builder.cc
@ -113,6 +105,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/blob/blob_log_format.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_log_sequential_reader.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_log_writer.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_source.cc
${ROCKSDB_SOURCE_DIR}/db/blob/prefetch_buffer_collection.cc
${ROCKSDB_SOURCE_DIR}/db/builder.cc
${ROCKSDB_SOURCE_DIR}/db/c.cc
@ -124,7 +117,11 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_picker_fifo.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_picker_level.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_picker_universal.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_service_job.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_state.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_outputs.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/sst_partitioner.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/subcompaction_state.cc
${ROCKSDB_SOURCE_DIR}/db/convenience.cc
${ROCKSDB_SOURCE_DIR}/db/db_filesnapshot.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/compacted_db_impl.cc
@ -159,10 +156,11 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/merge_helper.cc
${ROCKSDB_SOURCE_DIR}/db/merge_operator.cc
${ROCKSDB_SOURCE_DIR}/db/output_validator.cc
${ROCKSDB_SOURCE_DIR}/db/periodic_work_scheduler.cc
${ROCKSDB_SOURCE_DIR}/db/periodic_task_scheduler.cc
${ROCKSDB_SOURCE_DIR}/db/range_del_aggregator.cc
${ROCKSDB_SOURCE_DIR}/db/range_tombstone_fragmenter.cc
${ROCKSDB_SOURCE_DIR}/db/repair.cc
${ROCKSDB_SOURCE_DIR}/db/seqno_to_time_mapping.cc
${ROCKSDB_SOURCE_DIR}/db/snapshot_impl.cc
${ROCKSDB_SOURCE_DIR}/db/table_cache.cc
${ROCKSDB_SOURCE_DIR}/db/table_properties_collector.cc
@ -174,6 +172,8 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/version_set.cc
${ROCKSDB_SOURCE_DIR}/db/wal_edit.cc
${ROCKSDB_SOURCE_DIR}/db/wal_manager.cc
${ROCKSDB_SOURCE_DIR}/db/wide/wide_column_serialization.cc
${ROCKSDB_SOURCE_DIR}/db/wide/wide_columns.cc
${ROCKSDB_SOURCE_DIR}/db/write_batch.cc
${ROCKSDB_SOURCE_DIR}/db/write_batch_base.cc
${ROCKSDB_SOURCE_DIR}/db/write_controller.cc
@ -182,7 +182,6 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/env/env.cc
${ROCKSDB_SOURCE_DIR}/env/env_chroot.cc
${ROCKSDB_SOURCE_DIR}/env/env_encryption.cc
${ROCKSDB_SOURCE_DIR}/env/env_hdfs.cc
${ROCKSDB_SOURCE_DIR}/env/file_system.cc
${ROCKSDB_SOURCE_DIR}/env/file_system_tracer.cc
${ROCKSDB_SOURCE_DIR}/env/fs_remap.cc
@ -233,16 +232,17 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/options/options.cc
${ROCKSDB_SOURCE_DIR}/options/options_helper.cc
${ROCKSDB_SOURCE_DIR}/options/options_parser.cc
${ROCKSDB_SOURCE_DIR}/port/mmap.cc
${ROCKSDB_SOURCE_DIR}/port/stack_trace.cc
${ROCKSDB_SOURCE_DIR}/table/adaptive/adaptive_table_factory.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/binary_search_index_reader.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_filter_block.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_builder.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_factory.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_iterator.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_reader.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_builder.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_cache.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_prefetcher.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_prefix_index.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/data_block_hash_index.cc
@ -300,9 +300,12 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/trace_replay/trace_record_result.cc
${ROCKSDB_SOURCE_DIR}/trace_replay/trace_record.cc
${ROCKSDB_SOURCE_DIR}/trace_replay/trace_replay.cc
${ROCKSDB_SOURCE_DIR}/util/async_file_reader.cc
${ROCKSDB_SOURCE_DIR}/util/cleanable.cc
${ROCKSDB_SOURCE_DIR}/util/coding.cc
${ROCKSDB_SOURCE_DIR}/util/compaction_job_stats_impl.cc
${ROCKSDB_SOURCE_DIR}/util/comparator.cc
${ROCKSDB_SOURCE_DIR}/util/compression.cc
${ROCKSDB_SOURCE_DIR}/util/compression_context_cache.cc
${ROCKSDB_SOURCE_DIR}/util/concurrent_task_limiter_impl.cc
${ROCKSDB_SOURCE_DIR}/util/crc32c.cc
@ -311,16 +314,17 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/util/murmurhash.cc
${ROCKSDB_SOURCE_DIR}/util/random.cc
${ROCKSDB_SOURCE_DIR}/util/rate_limiter.cc
${ROCKSDB_SOURCE_DIR}/util/regex.cc
${ROCKSDB_SOURCE_DIR}/util/ribbon_config.cc
${ROCKSDB_SOURCE_DIR}/util/slice.cc
${ROCKSDB_SOURCE_DIR}/util/file_checksum_helper.cc
${ROCKSDB_SOURCE_DIR}/util/status.cc
${ROCKSDB_SOURCE_DIR}/util/stderr_logger.cc
${ROCKSDB_SOURCE_DIR}/util/string_util.cc
${ROCKSDB_SOURCE_DIR}/util/thread_local.cc
${ROCKSDB_SOURCE_DIR}/util/threadpool_imp.cc
${ROCKSDB_SOURCE_DIR}/util/xxhash.cc
${ROCKSDB_SOURCE_DIR}/utilities/backupable/backupable_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/agg_merge/agg_merge.cc
${ROCKSDB_SOURCE_DIR}/utilities/backup/backup_engine.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_compaction_filter.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_db_impl.cc
@ -335,6 +339,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/utilities/checkpoint/checkpoint_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/compaction_filters.cc
${ROCKSDB_SOURCE_DIR}/utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc
${ROCKSDB_SOURCE_DIR}/utilities/counted_fs.cc
${ROCKSDB_SOURCE_DIR}/utilities/debug.cc
${ROCKSDB_SOURCE_DIR}/utilities/env_mirror.cc
${ROCKSDB_SOURCE_DIR}/utilities/env_timed.cc
@ -422,15 +427,6 @@ list(APPEND SOURCES
"${ROCKSDB_SOURCE_DIR}/env/fs_posix.cc"
"${ROCKSDB_SOURCE_DIR}/env/io_posix.cc")
if(WITH_FOLLY_DISTRIBUTED_MUTEX)
list(APPEND SOURCES
"${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/detail/Futex.cpp"
"${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/AtomicNotification.cpp"
"${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/DistributedMutex.cpp"
"${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/ParkingLot.cpp"
"${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/WaitOptions.cpp")
endif()
add_library(_rocksdb ${SOURCES})
add_library(ch_contrib::rocksdb ALIAS _rocksdb)
target_link_libraries(_rocksdb PRIVATE ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})

View File

@ -1,16 +1,33 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
/// This file was edited for ClickHouse.
#include <memory>
#include "rocksdb/version.h"
#include "rocksdb/utilities/object_registry.h"
#include "util/string_util.h"
// The build script may replace these values with real values based
// on whether or not GIT is available and the platform settings
static const std::string rocksdb_build_git_sha = "rocksdb_build_git_sha:0";
static const std::string rocksdb_build_git_tag = "rocksdb_build_git_tag:master";
static const std::string rocksdb_build_date = "rocksdb_build_date:2000-01-01";
static const std::string rocksdb_build_git_sha = "rocksdb_build_git_sha:72438a678872544809393b831c7273794c074215";
static const std::string rocksdb_build_git_tag = "rocksdb_build_git_tag:main";
#define HAS_GIT_CHANGES 0
#if HAS_GIT_CHANGES == 0
// If HAS_GIT_CHANGES is 0, the GIT date is used.
// Use the time the branch/tag was last modified
static const std::string rocksdb_build_date = "rocksdb_build_date:2024-07-12 16:01:57";
#else
// If HAS_GIT_CHANGES is > 0, the branch/tag has modifications.
// Use the time the build was created.
static const std::string rocksdb_build_date = "rocksdb_build_date:2024-07-13 17:15:50";
#endif
extern "C" {
} // extern "C"
std::unordered_map<std::string, ROCKSDB_NAMESPACE::RegistrarFunc> ROCKSDB_NAMESPACE::ObjectRegistry::builtins_ = {
};
namespace ROCKSDB_NAMESPACE {
static void AddProperty(std::unordered_map<std::string, std::string> *props, const std::string& name) {
@ -39,12 +56,12 @@ const std::unordered_map<std::string, std::string>& GetRocksBuildProperties() {
}
std::string GetRocksVersionAsString(bool with_patch) {
std::string version = ToString(ROCKSDB_MAJOR) + "." + ToString(ROCKSDB_MINOR);
std::string version = std::to_string(ROCKSDB_MAJOR) + "." + std::to_string(ROCKSDB_MINOR);
if (with_patch) {
return version + "." + ToString(ROCKSDB_PATCH);
return version + "." + std::to_string(ROCKSDB_PATCH);
} else {
return version;
}
}
}
std::string GetRocksBuildInfoAsString(const std::string& program, bool verbose) {

View File

@ -5608,15 +5608,3 @@ Default value: `10000000`.
Minimal size of block to compress in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached.
Default value: `1GiB`.
## restore_replace_external_engines_to_null
For testing purposes. Replaces all external engines to Null to not initiate external connections.
Default value: `False`
## restore_replace_external_table_functions_to_null
For testing purposes. Replaces all external table functions to Null to not initiate external connections.
Default value: `False`

View File

@ -23,7 +23,7 @@ For more detail on window function syntax see: [Window Functions - Syntax](./ind
**Parameters**
- `x` — Column name.
- `offset` — Offset to apply. [(U)Int*](../data-types/int-uint.md). (Optional - `1` by default).
- `default` — Value to return if calculated row exceeds the boundaries of the window frame. (Optional - `null` by default).
- `default` — Value to return if calculated row exceeds the boundaries of the window frame. (Optional - default value of column type when omitted).
**Returned value**

View File

@ -23,7 +23,7 @@ For more detail on window function syntax see: [Window Functions - Syntax](./ind
**Parameters**
- `x` — Column name.
- `offset` — Offset to apply. [(U)Int*](../data-types/int-uint.md). (Optional - `1` by default).
- `default` — Value to return if calculated row exceeds the boundaries of the window frame. (Optional - `null` by default).
- `default` — Value to return if calculated row exceeds the boundaries of the window frame. (Optional - default value of column type when omitted).
**Returned value**

View File

@ -80,7 +80,7 @@ namespace ErrorCodes
void applySettingsOverridesForLocal(ContextMutablePtr context)
{
Settings settings = context->getSettings();
Settings settings = context->getSettingsCopy();
settings.allow_introspection_functions = true;
settings.storage_file_read_method = LocalFSReadMethod::mmap;
@ -184,6 +184,11 @@ void LocalServer::initialize(Poco::Util::Application & self)
cleanup_threads,
0, // We don't need any threads one all the parts will be deleted
cleanup_threads);
getDatabaseCatalogDropTablesThreadPool().initialize(
server_settings.database_catalog_drop_table_concurrency,
0, // We don't need any threads if there are no DROP queries.
server_settings.database_catalog_drop_table_concurrency);
}

View File

@ -1043,6 +1043,11 @@ try
0, // We don't need any threads once all the tables will be created
max_database_replicated_create_table_thread_pool_size);
getDatabaseCatalogDropTablesThreadPool().initialize(
server_settings.database_catalog_drop_table_concurrency,
0, // We don't need any threads if there are no DROP queries.
server_settings.database_catalog_drop_table_concurrency);
/// Initialize global local cache for remote filesystem.
if (config().has("local_cache_for_remote_fs"))
{

View File

@ -2,6 +2,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/IAggregateFunction.h>
@ -42,7 +43,7 @@ public:
if (lower_name.ends_with("if"))
return;
auto & function_arguments_nodes = function_node->getArguments().getNodes();
const auto & function_arguments_nodes = function_node->getArguments().getNodes();
if (function_arguments_nodes.size() != 1)
return;
@ -50,6 +51,8 @@ public:
if (!if_node || if_node->getFunctionName() != "if")
return;
FunctionNodePtr replaced_node;
auto if_arguments_nodes = if_node->getArguments().getNodes();
auto * first_const_node = if_arguments_nodes[1]->as<ConstantNode>();
auto * second_const_node = if_arguments_nodes[2]->as<ConstantNode>();
@ -75,8 +78,11 @@ public:
new_arguments[0] = std::move(if_arguments_nodes[1]);
new_arguments[1] = std::move(if_arguments_nodes[0]);
function_arguments_nodes = std::move(new_arguments);
resolveAggregateFunctionNodeByName(*function_node, function_node->getFunctionName() + "If");
replaced_node = std::make_shared<FunctionNode>(function_node->getFunctionName() + "If");
replaced_node->getArguments().getNodes() = std::move(new_arguments);
replaced_node->getParameters().getNodes() = function_node->getParameters().getNodes();
resolveAggregateFunctionNodeByName(*replaced_node, replaced_node->getFunctionName());
}
}
else if (first_const_node)
@ -104,10 +110,26 @@ public:
FunctionFactory::instance().get("not", getContext())->build(not_function->getArgumentColumns()));
new_arguments[1] = std::move(not_function);
function_arguments_nodes = std::move(new_arguments);
resolveAggregateFunctionNodeByName(*function_node, function_node->getFunctionName() + "If");
replaced_node = std::make_shared<FunctionNode>(function_node->getFunctionName() + "If");
replaced_node->getArguments().getNodes() = std::move(new_arguments);
replaced_node->getParameters().getNodes() = function_node->getParameters().getNodes();
resolveAggregateFunctionNodeByName(*replaced_node, replaced_node->getFunctionName());
}
}
if (!replaced_node)
return;
auto prev_type = function_node->getResultType();
auto curr_type = replaced_node->getResultType();
if (!prev_type->equals(*curr_type))
return;
/// Just in case, CAST compatible aggregate function states.
if (WhichDataType(prev_type).isAggregateFunction() && !DataTypeAggregateFunction::strictEquals(prev_type, curr_type))
node = createCastFunction(std::move(replaced_node), prev_type, getContext());
else
node = std::move(replaced_node);
}
};

View File

@ -237,7 +237,7 @@ QueryTreeNodePtr QueryTreeBuilder::buildSelectExpression(const ASTPtr & select_q
/// Remove global settings limit and offset
if (const auto & settings_ref = updated_context->getSettingsRef(); settings_ref.limit || settings_ref.offset)
{
Settings settings = updated_context->getSettings();
Settings settings = updated_context->getSettingsCopy();
limit = settings.limit;
offset = settings.offset;
settings.limit = 0;

View File

@ -503,7 +503,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesCacheMiss);
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
Settings subquery_settings = context->getSettingsCopy();
subquery_settings.max_result_rows = 1;
subquery_settings.extremes = false;
subquery_context->setSettings(subquery_settings);

View File

@ -867,7 +867,7 @@ void updateContextForSubqueryExecution(ContextMutablePtr & mutable_context)
* max_rows_in_join, max_bytes_in_join, join_overflow_mode,
* which are checked separately (in the Set, Join objects).
*/
Settings subquery_settings = mutable_context->getSettings();
Settings subquery_settings = mutable_context->getSettingsCopy();
subquery_settings.max_result_rows = 0;
subquery_settings.max_result_bytes = 0;
/// The calculation of extremes does not make sense and is not necessary (if you do it, then the extremes of the subquery can be taken for whole query).

View File

@ -232,7 +232,7 @@ int IBridge::main(const std::vector<std::string> & /*args*/)
auto context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
auto settings = context->getSettings();
auto settings = context->getSettingsCopy();
settings.set("http_max_field_value_size", http_max_field_value_size);
context->setSettings(settings);

View File

@ -656,7 +656,7 @@ void ClientBase::initLogsOutputStream()
void ClientBase::adjustSettings()
{
Settings settings = global_context->getSettings();
Settings settings = global_context->getSettingsCopy();
/// NOTE: Do not forget to set changed=false to avoid sending it to the server (to avoid breakage read only profiles)
@ -865,7 +865,7 @@ bool ClientBase::isSyncInsertWithData(const ASTInsertQuery & insert_query, const
if (!insert_query.data)
return false;
auto settings = context->getSettings();
auto settings = context->getSettingsCopy();
if (insert_query.settings_ast)
settings.applyChanges(insert_query.settings_ast->as<ASTSetQuery>()->changes);
@ -2671,7 +2671,7 @@ bool ClientBase::processMultiQueryFromFile(const String & file_name)
if (!getClientConfiguration().has("log_comment"))
{
Settings settings = client_context->getSettings();
Settings settings = client_context->getSettingsCopy();
/// NOTE: cannot use even weakly_canonical() since it fails for /dev/stdin due to resolving of "pipe:[X]"
settings.log_comment = fs::absolute(fs::path(file_name));
client_context->setSettings(settings);

View File

@ -42,6 +42,7 @@ public:
size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
using Entry = IConnectionPool::Entry;
using PoolWithFailoverBase<IConnectionPool>::isTryResultInvalid;
/** Allocates connection to work. */
Entry get(const ConnectionTimeouts & timeouts) override;

View File

@ -116,6 +116,12 @@ public:
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority);
// Returns if the TryResult provided is an invalid one that cannot be used. Used to prevent logical errors.
bool isTryResultInvalid(const TryResult & result, bool skip_read_only_replicas) const
{
return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly);
}
size_t getPoolSize() const { return nested_pools.size(); }
protected:
@ -300,7 +306,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
throw DB::NetException(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"All connection tries failed. Log: \n\n{}\n", fail_messages);
std::erase_if(try_results, [&](const TryResult & r) { return r.entry.isNull() || !r.is_usable || (skip_read_only_replicas && r.is_readonly); });
std::erase_if(try_results, [&](const TryResult & r) { return isTryResultInvalid(r, skip_read_only_replicas); });
/// Sort so that preferred items are near the beginning.
std::stable_sort(
@ -321,6 +327,9 @@ PoolWithFailoverBase<TNestedPool>::getMany(
}
else if (up_to_date_count >= min_entries)
{
if (try_results.size() < up_to_date_count)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Could not find enough connections for up-to-date results. Got: {}, needed: {}", try_results.size(), up_to_date_count);
/// There is enough up-to-date entries.
try_results.resize(up_to_date_count);
}

View File

@ -2,9 +2,11 @@
#include <Common/TimerDescriptor.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <fmt/format.h>
namespace DB
@ -89,9 +91,29 @@ void TimerDescriptor::drain() const
/// A signal happened, need to retry.
if (errno == EINTR)
continue;
{
/** This is to help with debugging.
*
* Sometimes reading from timer_fd blocks, which should not happen, because we opened it in a non-blocking mode.
* But it could be possible if a rogue 3rd-party library closed our file descriptor by mistake
* (for example by double closing due to the lack of exception safety or if it is a crappy code in plain C)
* and then another file descriptor is opened in its place.
*
* Let's try to get a name of this file descriptor and log it.
*/
LoggerPtr log = getLogger("TimerDescriptor");
throw ErrnoException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot drain timer_fd");
static constexpr ssize_t max_link_path_length = 256;
char link_path[max_link_path_length];
ssize_t link_path_length = readlink(fmt::format("/proc/self/fd/{}", timer_fd).c_str(), link_path, max_link_path_length);
if (-1 == link_path_length)
throw ErrnoException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot readlink for a timer_fd {}", timer_fd);
LOG_TRACE(log, "Received EINTR while trying to drain a TimerDescriptor, fd {}: {}", timer_fd, std::string_view(link_path, link_path_length));
continue;
}
throw ErrnoException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot drain timer_fd {}", timer_fd);
}
chassert(res == sizeof(buf));

View File

@ -1,4 +1,4 @@
#include "ServerSettings.h"
#include <Core/ServerSettings.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB

View File

@ -66,6 +66,15 @@ namespace DB
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \
\
/* Database Catalog */ \
M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \
M(UInt64, database_catalog_unused_dir_hide_timeout_sec, 60 * 60, "Parameter of a task that cleans up garbage from store/ directory. If some subdirectory is not used by clickhouse-server and this directory was not modified for last database_catalog_unused_dir_hide_timeout_sec seconds, the task will 'hide' this directory by removing all access rights. It also works for directories that clickhouse-server does not expect to see inside store/. Zero means 'immediately'.", 0) \
M(UInt64, database_catalog_unused_dir_rm_timeout_sec, 30 * 24 * 60 * 60, "Parameter of a task that cleans up garbage from store/ directory. If some subdirectory is not used by clickhouse-server and it was previously 'hidden' (see database_catalog_unused_dir_hide_timeout_sec) and this directory was not modified for last database_catalog_unused_dir_rm_timeout_sec seconds, the task will remove this directory. It also works for directories that clickhouse-server does not expect to see inside store/. Zero means 'never'.", 0) \
M(UInt64, database_catalog_unused_dir_cleanup_period_sec, 24 * 60 * 60, "Parameter of a task that cleans up garbage from store/ directory. Sets scheduling period of the task. Zero means 'never'.", 0) \
M(UInt64, database_catalog_drop_error_cooldown_sec, 5, "In case if drop table failed, ClickHouse will wait for this timeout before retrying the operation.", 0) \
M(UInt64, database_catalog_drop_table_concurrency, 16, "The size of the threadpool used for dropping tables.", 0) \
\
\
M(UInt64, max_concurrent_queries, 0, "Maximum number of concurrently executed queries. Zero means unlimited.", 0) \
M(UInt64, max_concurrent_insert_queries, 0, "Maximum number of concurrently INSERT queries. Zero means unlimited.", 0) \
M(UInt64, max_concurrent_select_queries, 0, "Maximum number of concurrently SELECT queries. Zero means unlimited.", 0) \

View File

@ -893,8 +893,6 @@ class IColumn;
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(Bool, restore_replace_external_engines_to_null, false, "Replace all the external table engines to Null on restore. Useful for testing purposes", 0) \
M(Bool, restore_replace_external_table_functions_to_null, false, "Replace all table functions to Null on restore. Useful for testing purposes", 0) \
\
\
/* ###################################### */ \
@ -1075,7 +1073,7 @@ class IColumn;
M(Bool, input_format_orc_allow_missing_columns, true, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_orc_use_fast_decoder, true, "Use a faster ORC decoder implementation.", 0) \
M(Bool, input_format_orc_filter_push_down, true, "When reading ORC files, skip whole stripes or row groups based on the WHERE/PREWHERE expressions, min/max statistics or bloom filter in the ORC metadata.", 0) \
M(Bool, input_format_orc_read_use_writer_time_zone, false, "Whether use the writer's time zone in ORC stripe for ORC row reader, the default ORC row reader's time zone is GMT.", 0) \
M(String, input_format_orc_reader_time_zone_name, "GMT", "The time zone name for ORC row reader, the default ORC row reader's time zone is GMT.", 0) \
M(Bool, input_format_parquet_allow_missing_columns, true, "Allow missing columns while reading Parquet input formats", 0) \
M(UInt64, input_format_parquet_local_file_min_bytes_for_seek, 8192, "Min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format", 0) \
M(Bool, input_format_arrow_allow_missing_columns, true, "Allow missing columns while reading Arrow input formats", 0) \

View File

@ -69,7 +69,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"dictionary_validate_primary_key_type", false, false, "Validate primary key type for dictionaries. By default id type for simple layouts will be implicitly converted to UInt64."},
{"collect_hash_table_stats_during_joins", false, true, "New setting."},
{"max_size_to_preallocate_for_joins", 0, 100'000'000, "New setting."},
{"input_format_orc_read_use_writer_time_zone", false, false, "Whether use the writer's time zone in ORC stripe for ORC row reader, the default ORC row reader's time zone is GMT."},
{"input_format_orc_reader_time_zone_name", "GMT", "GMT", "The time zone name for ORC row reader, the default ORC row reader's time zone is GMT."},
{"lightweight_mutation_projection_mode", "throw", "throw", "When lightweight delete happens on a table with projection(s), the possible operations include throw the exception as projection exists, or drop all projection related to this table then do lightweight delete."},
{"database_replicated_allow_heavy_create", true, false, "Long-running DDL queries (CREATE AS SELECT and POPULATE) for Replicated database engine was forbidden"},
{"query_plan_merge_filters", false, false, "Allow to merge filters in the query plan"},
@ -80,9 +80,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
{"backup_restore_s3_retry_attempts", 1000,1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore."},
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."},
{"restore_replace_external_table_functions_to_null", false, false, "New setting."},
{"restore_replace_external_engines_to_null", false, false, "New setting."}
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},

View File

@ -111,7 +111,7 @@ ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const String & table_name, Co
buffer << ") Engine = Dictionary(" << backQuoteIfNeed(table_name) << ")";
}
auto settings = getContext()->getSettingsRef();
const auto & settings = getContext()->getSettingsRef();
ParserCreateQuery parser;
const char * pos = query.data();
std::string error_message;
@ -133,7 +133,7 @@ ASTPtr DatabaseDictionary::getCreateDatabaseQuery() const
if (const auto comment_value = getDatabaseComment(); !comment_value.empty())
buffer << " COMMENT " << backQuote(comment_value);
}
auto settings = getContext()->getSettingsRef();
const auto & settings = getContext()->getSettingsRef();
ParserCreateQuery parser;
return parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
}

View File

@ -313,7 +313,7 @@ void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const Stri
std::lock_guard lock(mutex);
if (const auto it = snapshot_detached_tables.find(table_name); it == snapshot_detached_tables.end())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Snapshot doesn't contain info about detached table={}", table_name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Snapshot doesn't contain info about detached table `{}`", table_name);
}
else
{
@ -534,7 +534,7 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery() const
{
ASTPtr ast;
auto settings = getContext()->getSettingsRef();
const auto & settings = getContext()->getSettingsRef();
{
std::lock_guard lock(mutex);
auto database_metadata_path = getContext()->getPath() + "metadata/" + escapeForFileName(database_name) + ".sql";
@ -733,7 +733,7 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata(
return nullptr;
}
auto settings = local_context->getSettingsRef();
const auto & settings = local_context->getSettingsRef();
ParserCreateQuery parser;
const char * pos = query.data();
std::string error_message;

View File

@ -89,7 +89,7 @@ static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
static ContextMutablePtr createQueryContext(ContextPtr context)
{
Settings new_query_settings = context->getSettings();
Settings new_query_settings = context->getSettingsCopy();
new_query_settings.insert_allow_materialized_columns = true;
/// To avoid call AST::format

View File

@ -243,7 +243,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.orc.output_row_index_stride = settings.output_format_orc_row_index_stride;
format_settings.orc.use_fast_decoder = settings.input_format_orc_use_fast_decoder;
format_settings.orc.filter_push_down = settings.input_format_orc_filter_push_down;
format_settings.orc.read_use_writer_time_zone = settings.input_format_orc_read_use_writer_time_zone;
format_settings.orc.reader_time_zone_name = settings.input_format_orc_reader_time_zone_name;
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference;

View File

@ -409,7 +409,7 @@ struct FormatSettings
bool use_fast_decoder = true;
bool filter_push_down = true;
UInt64 output_row_index_stride = 10'000;
bool read_use_writer_time_zone = false;
String reader_time_zone_name = "GMT";
} orc{};
/// For capnProto format we should determine how to

View File

@ -39,7 +39,7 @@ public:
FunctionFormatQuery(ContextPtr context, String name_, OutputFormatting output_formatting_, ErrorHandling error_handling_)
: name(name_), output_formatting(output_formatting_), error_handling(error_handling_)
{
const Settings & settings = context->getSettings();
const Settings & settings = context->getSettingsRef();
max_query_size = settings.max_query_size;
max_parser_depth = settings.max_parser_depth;
max_parser_backtracks = settings.max_parser_backtracks;

View File

@ -143,7 +143,7 @@ ColumnPtr FunctionHasColumnInTable::executeImpl(const ColumnsWithTypeAndName & a
/* cluster_name= */ "",
/* password= */ ""
};
auto cluster = std::make_shared<Cluster>(getContext()->getSettings(), host_names, params);
auto cluster = std::make_shared<Cluster>(getContext()->getSettingsRef(), host_names, params);
// FIXME this (probably) needs a non-constant access to query context,
// because it might initialized a storage. Ideally, the tables required

View File

@ -3,8 +3,6 @@
#include <sys/stat.h>
#include <unistd.h>
#include <fmt/format.h>
#include <Common/formatReadable.h>
#include <Common/Exception.h>
#include <base/getPageSize.h>

View File

@ -23,6 +23,9 @@ namespace CurrentMetrics
extern const Metric MergeTreeUnexpectedPartsLoaderThreads;
extern const Metric MergeTreeUnexpectedPartsLoaderThreadsActive;
extern const Metric MergeTreeUnexpectedPartsLoaderThreadsScheduled;
extern const Metric DatabaseCatalogThreads;
extern const Metric DatabaseCatalogThreadsActive;
extern const Metric DatabaseCatalogThreadsScheduled;
extern const Metric DatabaseReplicatedCreateTablesThreads;
extern const Metric DatabaseReplicatedCreateTablesThreadsActive;
extern const Metric DatabaseReplicatedCreateTablesThreadsScheduled;
@ -166,4 +169,11 @@ StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool()
return instance;
}
/// ThreadPool used for dropping tables.
StaticThreadPool & getDatabaseCatalogDropTablesThreadPool()
{
static StaticThreadPool instance("DropTablesThreadPool", CurrentMetrics::DatabaseCatalogThreads, CurrentMetrics::DatabaseCatalogThreadsActive, CurrentMetrics::DatabaseCatalogThreadsScheduled);
return instance;
}
}

View File

@ -69,4 +69,7 @@ StaticThreadPool & getUnexpectedPartsLoadingThreadPool();
/// ThreadPool used for creating tables in DatabaseReplicated.
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool();
/// ThreadPool used for dropping tables.
StaticThreadPool & getDatabaseCatalogDropTablesThreadPool();
}

View File

@ -2270,7 +2270,7 @@ bool Context::displaySecretsInShowAndSelect() const
return shared->server_settings.display_secrets_in_show_and_select;
}
Settings Context::getSettings() const
Settings Context::getSettingsCopy() const
{
SharedLockGuard lock(mutex);
return *settings;

View File

@ -829,7 +829,8 @@ public:
void setMacros(std::unique_ptr<Macros> && macros);
bool displaySecretsInShowAndSelect() const;
Settings getSettings() const;
Settings getSettingsCopy() const;
const Settings & getSettingsRef() const { return *settings; }
void setSettings(const Settings & settings_);
/// Set settings by name.
@ -954,8 +955,6 @@ public:
void makeSessionContext();
void makeGlobalContext();
const Settings & getSettingsRef() const { return *settings; }
void setProgressCallback(ProgressCallback callback);
/// Used in executeQuery() to pass it to the QueryPipeline.
ProgressCallback getProgressCallback() const;

View File

@ -19,6 +19,8 @@
#include <IO/ReadHelpers.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Core/ServerSettings.h>
#include <IO/SharedThreadPools.h>
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <Common/atomicRename.h>
@ -48,9 +50,6 @@
namespace CurrentMetrics
{
extern const Metric TablesToDropQueueSize;
extern const Metric DatabaseCatalogThreads;
extern const Metric DatabaseCatalogThreadsActive;
extern const Metric DatabaseCatalogThreadsScheduled;
}
namespace DB
@ -189,13 +188,6 @@ StoragePtr TemporaryTableHolder::getTable() const
void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
{
drop_delay_sec = getContext()->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);
unused_dir_hide_timeout_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_hide_timeout_sec", unused_dir_hide_timeout_sec);
unused_dir_rm_timeout_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_rm_timeout_sec", unused_dir_rm_timeout_sec);
unused_dir_cleanup_period_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_cleanup_period_sec", unused_dir_cleanup_period_sec);
drop_error_cooldown_sec = getContext()->getConfigRef().getInt64("database_catalog_drop_error_cooldown_sec", drop_error_cooldown_sec);
drop_table_concurrency = getContext()->getConfigRef().getInt64("database_catalog_drop_table_concurrency", drop_table_concurrency);
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, getContext());
attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables);
}
@ -203,7 +195,7 @@ void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
void DatabaseCatalog::createBackgroundTasks()
{
/// It has to be done before databases are loaded (to avoid a race condition on initialization)
if (Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER && unused_dir_cleanup_period_sec)
if (Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER && getContext()->getServerSettings().database_catalog_unused_dir_cleanup_period_sec)
{
auto cleanup_task_holder
= getContext()->getSchedulePool().createTask("DatabaseCatalogCleanupStoreDirectoryTask", [this]() { this->cleanupStoreDirectoryTask(); });
@ -224,7 +216,7 @@ void DatabaseCatalog::startupBackgroundTasks()
{
(*cleanup_task)->activate();
/// Do not start task immediately on server startup, it's not urgent.
(*cleanup_task)->scheduleAfter(unused_dir_hide_timeout_sec * 1000);
(*cleanup_task)->scheduleAfter(static_cast<time_t>(getContext()->getServerSettings().database_catalog_unused_dir_hide_timeout_sec) * 1000);
}
(*drop_task)->activate();
@ -1038,15 +1030,12 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
LOG_INFO(log, "Found {} partially dropped tables. Will load them and retry removal.", dropped_metadata.size());
ThreadPool pool(CurrentMetrics::DatabaseCatalogThreads, CurrentMetrics::DatabaseCatalogThreadsActive, CurrentMetrics::DatabaseCatalogThreadsScheduled);
ThreadPoolCallbackRunnerLocal<void> runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables");
for (const auto & elem : dropped_metadata)
{
pool.scheduleOrThrowOnError([&]()
{
this->enqueueDroppedTableCleanup(elem.second, nullptr, elem.first);
});
runner([this, &elem](){ this->enqueueDroppedTableCleanup(elem.second, nullptr, elem.first); });
}
pool.wait();
runner.waitForAllToFinishAndRethrowFirstError();
}
String DatabaseCatalog::getPathForDroppedMetadata(const StorageID & table_id) const
@ -1135,7 +1124,13 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
}
else
{
tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time + drop_delay_sec});
tables_marked_dropped.push_back
({
table_id,
table,
dropped_metadata_path,
drop_time + static_cast<time_t>(getContext()->getServerSettings().database_atomic_delay_before_drop_table_sec)
});
if (first_async_drop_in_queue == tables_marked_dropped.end())
--first_async_drop_in_queue;
}
@ -1289,13 +1284,7 @@ void DatabaseCatalog::dropTablesParallel(std::vector<DatabaseCatalog::TablesMark
if (tables_to_drop.empty())
return;
ThreadPool pool(
CurrentMetrics::DatabaseCatalogThreads,
CurrentMetrics::DatabaseCatalogThreadsActive,
CurrentMetrics::DatabaseCatalogThreadsScheduled,
/* max_threads */drop_table_concurrency,
/* max_free_threads */0,
/* queue_size */tables_to_drop.size());
ThreadPoolCallbackRunnerLocal<void> runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables");
for (const auto & item : tables_to_drop)
{
@ -1332,7 +1321,7 @@ void DatabaseCatalog::dropTablesParallel(std::vector<DatabaseCatalog::TablesMark
++first_async_drop_in_queue;
tables_marked_dropped.splice(tables_marked_dropped.end(), tables_marked_dropped, table_iterator);
table_iterator->drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + drop_error_cooldown_sec;
table_iterator->drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + getContext()->getServerSettings().database_catalog_drop_error_cooldown_sec;
if (first_async_drop_in_queue == tables_marked_dropped.end())
--first_async_drop_in_queue;
@ -1340,25 +1329,10 @@ void DatabaseCatalog::dropTablesParallel(std::vector<DatabaseCatalog::TablesMark
}
};
try
{
pool.scheduleOrThrowOnError(std::move(job));
}
catch (...)
{
tryLogCurrentException(log, "Cannot drop tables. Will retry later.");
break;
}
runner(std::move(job));
}
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(log, "Cannot drop tables. Will retry later.");
}
runner.waitForAllToFinishAndRethrowFirstError();
}
void DatabaseCatalog::dropTableDataTask()
@ -1375,7 +1349,15 @@ void DatabaseCatalog::dropTableDataTask()
LOG_INFO(log, "Have {} tables in drop queue ({} of them are in use), will try drop {} tables",
drop_tables_count, drop_tables_in_use_count, tables_to_drop.size());
dropTablesParallel(tables_to_drop);
try
{
dropTablesParallel(tables_to_drop);
}
catch (...)
{
/// We don't re-throw exception, because we are in a background pool.
tryLogCurrentException(log, "Cannot drop tables. Will retry later.");
}
}
rescheduleDropTableTask();
@ -1718,7 +1700,7 @@ void DatabaseCatalog::cleanupStoreDirectoryTask()
LOG_TEST(log, "Nothing to clean up from store/ on disk {}", disk_name);
}
(*cleanup_task)->scheduleAfter(unused_dir_cleanup_period_sec * 1000);
(*cleanup_task)->scheduleAfter(static_cast<time_t>(getContext()->getServerSettings().database_catalog_unused_dir_cleanup_period_sec) * 1000);
}
bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskPtr & disk, const String & unused_dir)
@ -1742,7 +1724,7 @@ bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskP
time_t current_time = time(nullptr);
if (st.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO))
{
if (current_time <= max_modification_time + unused_dir_hide_timeout_sec)
if (current_time <= max_modification_time + static_cast<time_t>(getContext()->getServerSettings().database_catalog_unused_dir_hide_timeout_sec))
return false;
LOG_INFO(log, "Removing access rights for unused directory {} from disk {} (will remove it when timeout exceed)", unused_dir, disk_name);
@ -1758,6 +1740,8 @@ bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskP
}
else
{
auto unused_dir_rm_timeout_sec = static_cast<time_t>(getContext()->getServerSettings().database_catalog_unused_dir_rm_timeout_sec);
if (!unused_dir_rm_timeout_sec)
return false;

View File

@ -354,23 +354,8 @@ private:
mutable std::mutex tables_marked_dropped_mutex;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> drop_task;
static constexpr time_t default_drop_delay_sec = 8 * 60;
time_t drop_delay_sec = default_drop_delay_sec;
std::condition_variable wait_table_finally_dropped;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> cleanup_task;
static constexpr time_t default_unused_dir_hide_timeout_sec = 60 * 60; /// 1 hour
time_t unused_dir_hide_timeout_sec = default_unused_dir_hide_timeout_sec;
static constexpr time_t default_unused_dir_rm_timeout_sec = 30 * 24 * 60 * 60; /// 30 days
time_t unused_dir_rm_timeout_sec = default_unused_dir_rm_timeout_sec;
static constexpr time_t default_unused_dir_cleanup_period_sec = 24 * 60 * 60; /// 1 day
time_t unused_dir_cleanup_period_sec = default_unused_dir_cleanup_period_sec;
static constexpr time_t default_drop_error_cooldown_sec = 5;
time_t drop_error_cooldown_sec = default_drop_error_cooldown_sec;
static constexpr size_t default_drop_table_concurrency = 10;
size_t drop_table_concurrency = default_drop_table_concurrency;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> reload_disks_task;
std::mutex reload_disks_mutex;

View File

@ -74,7 +74,7 @@ void ExecuteScalarSubqueriesMatcher::visit(ASTPtr & ast, Data & data)
static auto getQueryInterpreter(const ASTSubquery & subquery, ExecuteScalarSubqueriesMatcher::Data & data)
{
auto subquery_context = Context::createCopy(data.getContext());
Settings subquery_settings = data.getContext()->getSettings();
Settings subquery_settings = data.getContext()->getSettingsCopy();
subquery_settings.max_result_rows = 1;
subquery_settings.extremes = false;
subquery_context->setSettings(subquery_settings);

View File

@ -171,7 +171,7 @@ ExpressionAnalyzer::ExpressionAnalyzer(
PreparedSetsPtr prepared_sets_,
bool is_create_parameterized_view_)
: WithContext(context_)
, query(query_), settings(getContext()->getSettings())
, query(query_), settings(getContext()->getSettingsRef())
, subquery_depth(subquery_depth_)
, syntax(syntax_analyzer_result_)
, is_create_parameterized_view(is_create_parameterized_view_)
@ -984,7 +984,7 @@ static std::shared_ptr<IJoin> tryCreateJoin(
algorithm == JoinAlgorithm::PARALLEL_HASH ||
algorithm == JoinAlgorithm::DEFAULT)
{
const auto & settings = context->getSettings();
const auto & settings = context->getSettingsRef();
if (analyzed_join->allowParallelHashJoin())
return std::make_shared<ConcurrentHashJoin>(

View File

@ -959,40 +959,12 @@ namespace
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
void setNullTableEngine(ASTStorage & storage)
{
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Null";
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.as_table_function)
{
if (getContext()->getSettingsRef().restore_replace_external_table_functions_to_null)
{
const auto & factory = TableFunctionFactory::instance();
auto properties = factory.tryGetProperties(create.as_table_function->as<ASTFunction>()->name);
if (properties && properties->allow_readonly)
return;
if (!create.storage)
{
auto storage_ast = std::make_shared<ASTStorage>();
create.set(create.storage, storage_ast);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage should not be created yet, it's a bug.");
create.as_table_function = nullptr;
setNullTableEngine(*create.storage);
}
return;
}
if (create.is_dictionary || create.is_ordinary_view || create.is_live_view || create.is_window_view)
return;
@ -1043,13 +1015,6 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
/// Some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not: just set default one.
setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
}
/// For external tables with restore_replace_external_engine_to_null setting we replace external engines to
/// Null table engine.
else if (getContext()->getSettingsRef().restore_replace_external_engines_to_null)
{
if (StorageFactory::instance().getStorageFeatures(create.storage->engine->name).source_access_type != AccessType::NONE)
setNullTableEngine(*create.storage);
}
return;
}

View File

@ -6,6 +6,7 @@
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/QueryLog.h>
#include <IO/SharedThreadPools.h>
#include <Access/Common/AccessRightsElement.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTIdentifier.h>
@ -424,18 +425,29 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
auto table_context = Context::createCopy(getContext());
table_context->setInternalQuery(true);
/// Do not hold extra shared pointers to tables
std::vector<std::pair<String, bool>> tables_to_drop;
std::vector<std::pair<StorageID, bool>> tables_to_drop;
// NOTE: This means we wait for all tables to be loaded inside getTablesIterator() call in case of `async_load_databases = true`.
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
{
auto table_ptr = iterator->table();
table_ptr->flushAndPrepareForShutdown();
tables_to_drop.push_back({iterator->name(), table_ptr->isDictionary()});
tables_to_drop.push_back({table_ptr->getStorageID(), table_ptr->isDictionary()});
}
/// Prepare tables for shutdown in parallel.
ThreadPoolCallbackRunnerLocal<void> runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables");
for (const auto & [name, _] : tables_to_drop)
{
auto table_ptr = DatabaseCatalog::instance().getTable(name, table_context);
runner([my_table_ptr = std::move(table_ptr)]()
{
my_table_ptr->flushAndPrepareForShutdown();
});
}
runner.waitForAllToFinishAndRethrowFirstError();
for (const auto & table : tables_to_drop)
{
query_for_table.setTable(table.first);
query_for_table.setTable(table.first.getTableName());
query_for_table.is_dictionary = table.second;
DatabasePtr db;
UUID table_to_wait = UUIDHelpers::Nil;

View File

@ -464,7 +464,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
* to avoid unnecessary squashing.
*/
Settings new_settings = select_context->getSettings();
Settings new_settings = select_context->getSettingsCopy();
new_settings.max_threads = std::max<UInt64>(1, settings.max_insert_threads);

View File

@ -249,7 +249,7 @@ namespace
ContextPtr getSubqueryContext(const ContextPtr & context)
{
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
Settings subquery_settings = context->getSettingsCopy();
subquery_settings.max_result_rows = 0;
subquery_settings.max_result_bytes = 0;
/// The calculation of extremes does not make sense and is not necessary (if you do it, then the extremes of the subquery can be taken for whole query).

View File

@ -308,7 +308,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
if (tables_with_columns.size() < 2)
return {};
auto settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
MultiEnum<JoinAlgorithm> join_algorithm = settings.join_algorithm;
bool try_use_direct_join = join_algorithm.isSet(JoinAlgorithm::DIRECT) || join_algorithm.isSet(JoinAlgorithm::DEFAULT);
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume(), context->getTempDataOnDisk());

View File

@ -657,7 +657,7 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
{
if (auto ctx = context.lock())
{
res.query_settings = std::make_shared<Settings>(ctx->getSettings());
res.query_settings = std::make_shared<Settings>(ctx->getSettingsRef());
res.current_database = ctx->getCurrentDatabase();
}
}

View File

@ -62,7 +62,7 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
* which are checked separately (in the Set, Join objects).
*/
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
Settings subquery_settings = context->getSettingsCopy();
subquery_settings.max_result_rows = 0;
subquery_settings.max_result_bytes = 0;
/// The calculation of `extremes` does not make sense and is not necessary (if you do it, then the `extremes` of the subquery can be taken instead of the whole query).

View File

@ -285,6 +285,8 @@ static bool formatNamedArgWithHiddenValue(IAST * arg, const IAST::FormatSettings
void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.expression_list_prepend_whitespace = false;
if (kind == Kind::CODEC || kind == Kind::STATISTICS || kind == Kind::BACKUP_NAME)
frame.allow_operators = false;
FormatStateStacked nested_need_parens = frame;
FormatStateStacked nested_dont_need_parens = frame;
nested_need_parens.need_parens = true;
@ -308,7 +310,7 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
/// Should this function to be written as operator?
bool written = false;
if (arguments && !parameters && nulls_action == NullsAction::EMPTY)
if (arguments && !parameters && frame.allow_operators && nulls_action == NullsAction::EMPTY)
{
/// Unary prefix operators.
if (arguments->children.size() == 1)

View File

@ -58,6 +58,8 @@ public:
TABLE_ENGINE,
DATABASE_ENGINE,
BACKUP_NAME,
CODEC,
STATISTICS,
};
Kind kind = Kind::ORDINARY_FUNCTION;

View File

@ -696,6 +696,7 @@ bool ParserCodec::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
auto function_node = std::make_shared<ASTFunction>();
function_node->name = "CODEC";
function_node->kind = ASTFunction::Kind::CODEC;
function_node->arguments = expr_list_args;
function_node->children.push_back(function_node->arguments);
@ -723,6 +724,7 @@ bool ParserStatisticsType::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
auto function_node = std::make_shared<ASTFunction>();
function_node->name = "STATISTICS";
function_node->kind = ASTFunction::Kind::STATISTICS;
function_node->arguments = stat_type;
function_node->children.push_back(function_node->arguments);
node = function_node;

View File

@ -33,7 +33,9 @@ public:
{
case ASTFunction::Kind::ORDINARY_FUNCTION: findOrdinaryFunctionSecretArguments(); break;
case ASTFunction::Kind::WINDOW_FUNCTION: break;
case ASTFunction::Kind::LAMBDA_FUNCTION: break;
case ASTFunction::Kind::LAMBDA_FUNCTION: break;
case ASTFunction::Kind::CODEC: break;
case ASTFunction::Kind::STATISTICS: break;
case ASTFunction::Kind::TABLE_ENGINE: findTableEngineSecretArguments(); break;
case ASTFunction::Kind::DATABASE_ENGINE: findDatabaseEngineSecretArguments(); break;
case ASTFunction::Kind::BACKUP_NAME: findBackupNameSecretArguments(); break;

View File

@ -256,6 +256,7 @@ public:
bool expression_list_always_start_on_new_line = false; /// Line feed and indent before expression list even if it's of single element.
bool expression_list_prepend_whitespace = false; /// Prepend whitespace (if it is required)
bool surround_each_list_element_with_parens = false;
bool allow_operators = true; /// Format some functions, such as "plus", "in", etc. as operators.
size_t list_element_index = 0;
const IAST * current_select = nullptr;
};

View File

@ -900,11 +900,7 @@ bool NativeORCBlockInputFormat::prepareStripeReader()
orc::RowReaderOptions row_reader_options;
row_reader_options.includeTypes(include_indices);
if (format_settings.orc.read_use_writer_time_zone)
{
String writer_time_zone = current_stripe_info->getWriterTimezone();
row_reader_options.setTimezoneName(writer_time_zone);
}
row_reader_options.setTimezoneName(format_settings.orc.reader_time_zone_name);
row_reader_options.range(current_stripe_info->getOffset(), current_stripe_info->getLength());
if (format_settings.orc.filter_push_down && sarg)
{

View File

@ -406,7 +406,7 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
{
const Block & header = getPort().getHeader();
const IDataType & type = *header.getByPosition(column_idx).type;
auto settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
/// Advance the token iterator until the start of the column expression
readUntilTheEndOfRowAndReTokenize(column_idx);

View File

@ -134,7 +134,6 @@ AggregatingStep::AggregatingStep(
{
output_stream->sort_description = group_by_sort_description;
output_stream->sort_scope = DataStream::SortScope::Global;
output_stream->has_single_port = true;
}
}
@ -147,7 +146,6 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_,
{
output_stream->sort_description = group_by_sort_description;
output_stream->sort_scope = DataStream::SortScope::Global;
output_stream->has_single_port = true;
}
explicit_sorting_required_for_aggregation_in_order = false;

View File

@ -10,6 +10,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static ITransformingStep::Traits getTraits(bool pre_distinct)
{
const bool preserves_number_of_streams = pre_distinct;
@ -90,7 +95,8 @@ void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil
/// final distinct for sorted stream (sorting inside and among chunks)
if (input_stream.sort_scope == DataStream::SortScope::Global)
{
assert(input_stream.has_single_port);
if (pipeline.getNumStreams() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DistinctStep with in-order expects single input");
if (distinct_sort_desc.size() < columns.size())
{

View File

@ -39,12 +39,13 @@ FillingStep::FillingStep(
, interpolate_description(interpolate_description_)
, use_with_fill_by_sorting_prefix(use_with_fill_by_sorting_prefix_)
{
if (!input_stream_.has_single_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
}
void FillingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (pipeline.getNumStreams() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
@ -69,9 +70,6 @@ void FillingStep::describeActions(JSONBuilder::JSONMap & map) const
void FillingStep::updateOutputStream()
{
if (!input_streams.front().has_single_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
output_stream = createOutputStream(
input_streams.front(), FillingTransform::transformHeader(input_streams.front().header, sort_description), getDataStreamTraits());
}

View File

@ -28,9 +28,6 @@ class DataStream
public:
Block header;
/// QueryPipeline has single port. Totals or extremes ports are not counted.
bool has_single_port = false;
/// Sorting scope. Please keep the mutual order (more strong mode should have greater value).
enum class SortScope : uint8_t
{
@ -51,8 +48,7 @@ public:
bool hasEqualPropertiesWith(const DataStream & other) const
{
return has_single_port == other.has_single_port
&& sort_description == other.sort_description
return sort_description == other.sort_description
&& (sort_description.empty() || sort_scope == other.sort_scope);
}

View File

@ -20,9 +20,6 @@ DataStream ITransformingStep::createOutputStream(
{
DataStream output_stream{.header = std::move(output_header)};
output_stream.has_single_port = stream_traits.returns_single_stream
|| (input_stream.has_single_port && stream_traits.preserves_number_of_streams);
if (stream_traits.preserves_sorting)
{
output_stream.sort_description = input_stream.sort_description;

View File

@ -1055,7 +1055,7 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
}
auto context = read_from_merge_tree->getContext();
const auto & settings = context->getSettings();
const auto & settings = context->getSettingsRef();
if (!settings.optimize_read_in_window_order || (settings.optimize_read_in_order && settings.query_plan_read_in_order) || context->getSettingsRef().allow_experimental_analyzer)
{
return 0;

View File

@ -6,7 +6,7 @@ namespace DB
{
ReadNothingStep::ReadNothingStep(Block output_header)
: ISourceStep(DataStream{.header = std::move(output_header), .has_single_port = true})
: ISourceStep(DataStream{.header = std::move(output_header)})
{
}

View File

@ -16,6 +16,9 @@
#include <Common/Arena.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Functions/CastOverloadResolver.h>
#include <Functions/IFunction.h>
#include <DataTypes/DataTypeString.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
@ -78,6 +81,8 @@ public:
virtual std::optional<WindowFrame> getDefaultFrame() const { return {}; }
virtual ColumnPtr castColumn(const Columns &, const std::vector<size_t> &) { return nullptr; }
/// Is the frame type supported by this function.
virtual bool checkWindowFrameType(const WindowTransform * /*transform*/) const { return true; }
};
@ -1174,6 +1179,9 @@ void WindowTransform::appendChunk(Chunk & chunk)
// Initialize output columns.
for (auto & ws : workspaces)
{
if (ws.window_function_impl)
block.casted_columns.push_back(ws.window_function_impl->castColumn(block.input_columns, ws.argument_column_indices));
block.output_columns.push_back(ws.aggregate_function->getResultType()
->createColumn());
block.output_columns.back()->reserve(block.rows);
@ -2361,6 +2369,8 @@ public:
template <bool is_lead>
struct WindowFunctionLagLeadInFrame final : public WindowFunction
{
FunctionBasePtr func_cast = nullptr;
WindowFunctionLagLeadInFrame(const std::string & name_,
const DataTypes & argument_types_, const Array & parameters_)
: WindowFunction(name_, argument_types_, parameters_, createResultType(argument_types_, name_))
@ -2388,7 +2398,17 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction
return;
}
const auto supertype = getLeastSupertype(DataTypes{argument_types[0], argument_types[2]});
if (argument_types.size() > 3)
{
throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION,
"Function '{}' accepts at most 3 arguments, {} given",
name, argument_types.size());
}
if (argument_types[0]->equals(*argument_types[2]))
return;
const auto supertype = tryGetLeastSupertype(DataTypes{argument_types[0], argument_types[2]});
if (!supertype)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -2405,12 +2425,44 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction
argument_types[2]->getName());
}
if (argument_types.size() > 3)
const auto from_name = argument_types[2]->getName();
const auto to_name = argument_types[0]->getName();
ColumnsWithTypeAndName arguments
{
throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION,
"Function '{}' accepts at most 3 arguments, {} given",
name, argument_types.size());
}
{ argument_types[2], "" },
{
DataTypeString().createColumnConst(0, to_name),
std::make_shared<DataTypeString>(),
""
}
};
auto get_cast_func = [&arguments]
{
FunctionOverloadResolverPtr func_builder_cast = createInternalCastOverloadResolver(CastType::accurate, {});
return func_builder_cast->build(arguments);
};
func_cast = get_cast_func();
}
ColumnPtr castColumn(const Columns & columns, const std::vector<size_t> & idx) override
{
if (!func_cast)
return nullptr;
ColumnsWithTypeAndName arguments
{
{ columns[idx[2]], argument_types[2], "" },
{
DataTypeString().createColumnConst(columns[idx[2]]->size(), argument_types[0]->getName()),
std::make_shared<DataTypeString>(),
""
}
};
return func_cast->execute(arguments, argument_types[0], columns[idx[2]]->size());
}
static DataTypePtr createResultType(const DataTypes & argument_types_, const std::string & name_)
@ -2460,12 +2512,11 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction
if (argument_types.size() > 2)
{
// Column with default values is specified.
// The conversion through Field is inefficient, but we accept
// subtypes of the argument type as a default value (for convenience),
// and it's a pain to write conversion that respects ColumnNothing
// and ColumnConst and so on.
const IColumn & default_column = *current_block.input_columns[
workspace.argument_column_indices[2]].get();
const IColumn & default_column =
current_block.casted_columns[function_index] ?
*current_block.casted_columns[function_index].get() :
*current_block.input_columns[workspace.argument_column_indices[2]].get();
to.insert(default_column[transform->current_row.row]);
}
else

View File

@ -50,6 +50,7 @@ struct WindowTransformBlock
{
Columns original_input_columns;
Columns input_columns;
Columns casted_columns;
MutableColumns output_columns;
size_t rows = 0;

View File

@ -473,7 +473,7 @@ void MySQLHandler::comQuery(ReadBuffer & payload, bool binary_protocol)
query_context->setCurrentQueryId(fmt::format("mysql:{}:{}", connection_id, toString(UUIDHelpers::generateV4())));
/// --- Workaround for Bug 56173. Can be removed when the analyzer is on by default.
auto settings = query_context->getSettings();
auto settings = query_context->getSettingsCopy();
settings.prefer_column_name_to_alias = true;
query_context->setSettings(settings);

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
extern const int TOO_MANY_PARTITIONS;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
/// Can the batch be split and send files from batch one-by-one instead?
@ -241,8 +242,12 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto result = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
connection = std::move(result.front().entry);
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto result = results.front();
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
connection = std::move(result.entry);
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
LOG_DEBUG(parent.log, "Sending a batch of {} files to {} ({} rows, {} bytes).",
@ -299,8 +304,12 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett
parent.storage.getContext()->getOpenTelemetrySpanLog());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto result = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto connection = std::move(result.front().entry);
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto result = results.front();
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
auto connection = std::move(result.entry);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
RemoteInserter remote(*connection, timeouts,

View File

@ -283,7 +283,7 @@ ConnectionPoolWithFailoverPtr DistributedAsyncInsertDirectoryQueue::createPool(c
auto pools = createPoolsForAddresses(addresses, pool_factory, storage.log);
const auto settings = storage.getContext()->getSettings();
const auto & settings = storage.getContext()->getSettingsRef();
return std::make_shared<ConnectionPoolWithFailover>(std::move(pools),
settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(),
@ -412,8 +412,12 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path,
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto result = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto connection = std::move(result.front().entry);
auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto result = results.front();
if (pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
auto connection = std::move(result.entry);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
file_path,

View File

@ -377,7 +377,11 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
/// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
job.connection_entry = std::move(results.front().entry);
auto result = results.front();
if (shard_info.pool->isTryResultInvalid(result, settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
job.connection_entry = std::move(result.entry);
}
else
{

View File

@ -341,15 +341,19 @@ void MergeTreeDeduplicationLog::shutdown()
stopped = true;
if (current_writer)
{
/// If an error has occurred during finalize, we'd like to have the exception set for reset.
/// Otherwise, we'll be in a situation when a finalization didn't happen, and we didn't get
/// any error, causing logical error (see ~MemoryBuffer()).
try
{
current_writer->finalize();
current_writer.reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
current_writer.reset();
}
current_writer.reset();
}
}

View File

@ -34,7 +34,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
, write_settings(write_settings_)
{
MergeTreeWriterSettings writer_settings(
data_part->storage.getContext()->getSettings(),
data_part->storage.getContext()->getSettingsRef(),
write_settings,
storage_settings,
data_part->index_granularity_info.mark_type.adaptive,

View File

@ -23,7 +23,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, /*reset_columns=*/ true)
{
const auto & global_settings = data_part->storage.getContext()->getSettings();
const auto & global_settings = data_part->storage.getContext()->getSettingsRef();
MergeTreeWriterSettings writer_settings(
global_settings,

View File

@ -344,7 +344,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shut
void ReplicatedMergeTreeRestartingThread::shutdown(bool part_of_full_shutdown)
{
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
need_stop = part_of_full_shutdown;
task->deactivate();
/// Explicitly set the event, because the restarting thread will not set it again

View File

@ -193,6 +193,7 @@ ASTPtr ColumnStatisticsDescription::getAST() const
{
auto function_node = std::make_shared<ASTFunction>();
function_node->name = "STATISTICS";
function_node->kind = ASTFunction::Kind::STATISTICS;
function_node->arguments = std::make_shared<ASTExpressionList>();
for (const auto & [type, desc] : types_to_desc)
{

View File

@ -464,7 +464,7 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
setCredentials(credentials, request_uri);
const auto settings = context_->getSettings();
const auto & settings = context_->getSettingsRef();
auto proxy_config = getProxyConfiguration(request_uri.getScheme());
@ -1328,7 +1328,7 @@ std::optional<time_t> IStorageURLBase::tryGetLastModificationTime(
const Poco::Net::HTTPBasicCredentials & credentials,
const ContextPtr & context)
{
auto settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
auto uri = Poco::URI(url);

View File

@ -228,12 +228,12 @@ private:
bool need_only_count;
size_t total_rows_in_file = 0;
Poco::Net::HTTPBasicCredentials credentials;
std::unique_ptr<ReadBuffer> read_buf;
std::shared_ptr<IInputFormat> input_format;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
Poco::Net::HTTPBasicCredentials credentials;
};
class StorageURLSink : public SinkToStorage

View File

@ -97,7 +97,7 @@ bool hasJoin(const ASTSelectWithUnionQuery & ast)
ContextPtr getViewContext(ContextPtr context, const StorageSnapshotPtr & storage_snapshot)
{
auto view_context = storage_snapshot->metadata->getSQLSecurityOverriddenContext(context);
Settings view_settings = view_context->getSettings();
Settings view_settings = view_context->getSettingsCopy();
view_settings.max_result_rows = 0;
view_settings.max_result_bytes = 0;
view_settings.extremes = false;

View File

@ -65,7 +65,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
/// Ignore limit for result number of rows (that could be set during handling CSE/CTE),
/// since this is a service query and should not lead to query failure.
{
Settings new_settings = new_context->getSettings();
Settings new_settings = new_context->getSettingsCopy();
new_settings.max_result_rows = 0;
new_settings.max_result_bytes = 0;
new_context->setSettings(new_settings);

View File

@ -93,7 +93,7 @@ StoragePtr TableFunctionHive::executeImpl(
ColumnsDescription /*cached_columns_*/,
bool /*is_insert_query*/) const
{
const Settings & settings = context_->getSettings();
const Settings & settings = context_->getSettingsRef();
ParserExpression partition_by_parser;
ASTPtr partition_by_ast = parseQuery(
partition_by_parser,

View File

@ -0,0 +1,7 @@
<clickhouse>
<zookeeper>
<enable_fault_injections_during_startup>1</enable_fault_injections_during_startup>
<send_fault_probability>0.001</send_fault_probability>
<recv_fault_probability>0.001</recv_fault_probability>
</zookeeper>
</clickhouse>

View File

@ -3,8 +3,11 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
import os
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
node = cluster.add_instance(
"node",
@ -24,7 +27,21 @@ def started_cluster():
cluster.shutdown()
def start_clean_clickhouse():
# remove fault injection if present
if "fault_injection.xml" in node.exec_in_container(
["bash", "-c", "ls /etc/clickhouse-server/config.d"]
):
print("Removing fault injection")
node.exec_in_container(
["bash", "-c", "rm /etc/clickhouse-server/config.d/fault_injection.xml"]
)
node.restart_clickhouse()
def test_startup_with_small_bg_pool(started_cluster):
start_clean_clickhouse()
node.query("DROP TABLE IF EXISTS replicated_table SYNC")
node.query(
"CREATE TABLE replicated_table (k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree('/clickhouse/replicated_table', 'r1') ORDER BY k"
)
@ -38,10 +55,10 @@ def test_startup_with_small_bg_pool(started_cluster):
node.restart_clickhouse(stop_start_wait_sec=10)
assert_values()
node.query("DROP TABLE replicated_table SYNC")
def test_startup_with_small_bg_pool_partitioned(started_cluster):
start_clean_clickhouse()
node.query("DROP TABLE IF EXISTS replicated_table_partitioned SYNC")
node.query(
"CREATE TABLE replicated_table_partitioned (k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree('/clickhouse/replicated_table_partitioned', 'r1') ORDER BY k"
)
@ -54,10 +71,13 @@ def test_startup_with_small_bg_pool_partitioned(started_cluster):
assert_values()
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
node.restart_clickhouse(stop_start_wait_sec=300)
node.stop_clickhouse(stop_wait_sec=150)
node.copy_file_to_container(
os.path.join(CONFIG_DIR, "fault_injection.xml"),
"/etc/clickhouse-server/config.d/fault_injection.xml",
)
node.start_clickhouse(start_wait_sec=150)
assert_values()
# check that we activate it in the end
node.query_with_retry("INSERT INTO replicated_table_partitioned VALUES(20, 30)")
node.query("DROP TABLE replicated_table_partitioned SYNC")

View File

@ -1,14 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
<allowed_path>/backups/</allowed_path>
</backups>
</clickhouse>

View File

@ -1,21 +0,0 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>replica1</host>
<port>9000</port>
</replica>
<replica>
<host>replica2</host>
<port>9000</port>
</replica>
<replica>
<host>replica3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -1,218 +0,0 @@
import pytest
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
configs = ["configs/remote_servers.xml", "configs/backups_disk.xml"]
node1 = cluster.add_instance(
"replica1",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
node2 = cluster.add_instance(
"replica2",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
node3 = cluster.add_instance(
"replica3",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
nodes = [node1, node2, node3]
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"Disk('backups', '{backup_id_counter}/')"
def cleanup_nodes(nodes, dbname):
for node in nodes:
node.query(f"DROP DATABASE IF EXISTS {dbname} SYNC")
def fill_nodes(nodes, dbname):
cleanup_nodes(nodes, dbname)
for node in nodes:
node.query(
f"CREATE DATABASE {dbname} ENGINE = Replicated('/clickhouse/databases/{dbname}', 'default', '{node.name}')"
)
def drop_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS `clickhouse`.`{tableName}`")
def get_mysql_conn(cluster):
conn = pymysql.connect(
user="root",
password="clickhouse",
host=cluster.mysql8_ip,
port=cluster.mysql8_port,
)
return conn
def fill_tables(cluster, dbname):
fill_nodes(nodes, dbname)
conn = get_mysql_conn(cluster)
with conn.cursor() as cursor:
cursor.execute("DROP DATABASE IF EXISTS clickhouse")
cursor.execute("CREATE DATABASE clickhouse")
cursor.execute("DROP TABLE IF EXISTS clickhouse.inference_table")
cursor.execute(
"CREATE TABLE clickhouse.inference_table (id INT PRIMARY KEY, data BINARY(16) NOT NULL)"
)
cursor.execute(
"INSERT INTO clickhouse.inference_table VALUES (100, X'9fad5e9eefdfb449')"
)
conn.commit()
parameters = "'mysql80:3306', 'clickhouse', 'inference_table', 'root', 'clickhouse'"
node1.query(
f"CREATE TABLE {dbname}.mysql_schema_inference_engine ENGINE=MySQL({parameters})"
)
node1.query(
f"CREATE TABLE {dbname}.mysql_schema_inference_function AS mysql({parameters})"
)
node1.query(f"CREATE TABLE {dbname}.merge_tree (id UInt64, b String) ORDER BY id")
node1.query(f"INSERT INTO {dbname}.merge_tree VALUES (100, 'abc')")
expected = "id\tInt32\t\t\t\t\t\ndata\tFixedString(16)\t\t\t\t\t\n"
assert (
node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_engine")
== expected
)
assert (
node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_function")
== expected
)
assert node1.query(f"SELECT id FROM mysql({parameters})") == "100\n"
assert (
node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_engine") == "100\n"
)
assert (
node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_function")
== "100\n"
)
assert node1.query(f"SELECT id FROM {dbname}.merge_tree") == "100\n"
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def test_restore_table(start_cluster):
fill_tables(cluster, "replicated")
backup_name = new_backup_name()
node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
node2.query(f"BACKUP DATABASE replicated TO {backup_name}")
node2.query("DROP TABLE replicated.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated.mysql_schema_inference_function")
node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
assert node3.query("EXISTS replicated.mysql_schema_inference_engine") == "0\n"
assert node3.query("EXISTS replicated.mysql_schema_inference_function") == "0\n"
node3.query(
f"RESTORE DATABASE replicated FROM {backup_name} SETTINGS allow_different_database_def=true"
)
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated.mysql_schema_inference_engine"
)
== "1\t100\n"
)
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated.mysql_schema_inference_function"
)
== "1\t100\n"
)
assert (
node1.query("SELECT count(), sum(id) FROM replicated.merge_tree") == "1\t100\n"
)
cleanup_nodes(nodes, "replicated")
def test_restore_table_null(start_cluster):
fill_tables(cluster, "replicated2")
backup_name = new_backup_name()
node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
node2.query(f"BACKUP DATABASE replicated2 TO {backup_name}")
node2.query("DROP TABLE replicated2.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated2.mysql_schema_inference_function")
node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
assert node3.query("EXISTS replicated2.mysql_schema_inference_engine") == "0\n"
assert node3.query("EXISTS replicated2.mysql_schema_inference_function") == "0\n"
node3.query(
f"RESTORE DATABASE replicated2 FROM {backup_name} SETTINGS allow_different_database_def=1, allow_different_table_def=1 SETTINGS restore_replace_external_engines_to_null=1, restore_replace_external_table_functions_to_null=1"
)
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_engine"
)
== "0\t0\n"
)
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_function"
)
== "0\t0\n"
)
assert (
node1.query("SELECT count(), sum(id) FROM replicated2.merge_tree") == "1\t100\n"
)
assert (
node1.query(
"SELECT engine FROM system.tables where database = 'replicated2' and name like '%mysql%'"
)
== "Null\nNull\n"
)
assert (
node1.query(
"SELECT engine FROM system.tables where database = 'replicated2' and name like '%merge_tree%'"
)
== "MergeTree\n"
)
cleanup_nodes(nodes, "replicated2")

View File

@ -16,18 +16,39 @@ ${CLICKHOUSE_CLIENT} --query="CREATE TABLE buffer_00763_1 (s String) ENGINE = Bu
${CLICKHOUSE_CLIENT} --query="CREATE TABLE mt_00763_1 (x UInt32, s String) ENGINE = MergeTree ORDER BY x"
${CLICKHOUSE_CLIENT} --query="INSERT INTO mt_00763_1 VALUES (1, '1'), (2, '2'), (3, '3')"
function thread1()
function thread_alter()
{
seq 1 300 | sed -r -e 's/.+/ALTER TABLE mt_00763_1 MODIFY column s UInt32; ALTER TABLE mt_00763_1 MODIFY column s String;/' | ${CLICKHOUSE_CLIENT} --multiquery --ignore-error ||:
local TIMELIMIT=$((SECONDS+$1))
local it=0
while [ $SECONDS -lt "$TIMELIMIT" ] && [ $it -lt 300 ];
do
it=$((it+1))
$CLICKHOUSE_CLIENT --multiquery --ignore-error -q "
ALTER TABLE mt_00763_1 MODIFY column s UInt32;
ALTER TABLE mt_00763_1 MODIFY column s String;
" ||:
done
}
function thread2()
function thread_query()
{
seq 1 2000 | sed -r -e 's/.+/SELECT sum(length(s)) FROM buffer_00763_1;/' | ${CLICKHOUSE_CLIENT} --multiquery --ignore-error 2>&1 | grep -vP '(^3$|^Received exception from server|^Code: 473)'
local TIMELIMIT=$((SECONDS+$1))
local it=0
while [ $SECONDS -lt "$TIMELIMIT" ] && [ $it -lt 2000 ];
do
it=$((it+1))
$CLICKHOUSE_CLIENT --multiquery --ignore-error -q "
SELECT sum(length(s)) FROM buffer_00763_1;
" 2>&1 | grep -vP '(^3$|^Received exception from server|^Code: 473)'
done
}
thread1 &
thread2 &
export -f thread_alter
export -f thread_query
TIMEOUT=30
thread_alter $TIMEOUT &
thread_query $TIMEOUT &
wait

View File

@ -193,7 +193,7 @@ def main():
url = os.environ["CLICKHOUSE_URL"] + "&max_threads=1"
default_index_granularity = 10
total_rows = 8 * default_index_granularity
total_rows = 7 * default_index_granularity
step = default_index_granularity
session = requests.Session()
for index_granularity in [

View File

@ -1,8 +1,7 @@
test_alter_profile case: max_session_count 1 alter_sessions_count 1
test_alter_profile case: max_session_count 2 alter_sessions_count 1
test_alter_profile case: max_sessions_for_user 1
USER_SESSION_LIMIT_EXCEEDED
test_alter_profile case: max_sessions_for_user 2
USER_SESSION_LIMIT_EXCEEDED
test_alter_profile case: max_session_count 1 alter_sessions_count 2
test_alter_profile case: max_session_count 2 alter_sessions_count 2
READONLY
READONLY
READONLY

View File

@ -5,7 +5,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
SESSION_ID_PREFIX="02832_alter_max_sessions_session_$$"
QUERY_ID_PREFIX="02832_alter_max_sessions_query_$$"
PROFILE="02832_alter_max_sessions_profile_$$"
USER="02832_alter_max_sessions_user_$$"
@ -17,48 +16,41 @@ ${CLICKHOUSE_CLIENT} -q $"DROP PROFILE IF EXISTS ${PROFILE}"
${CLICKHOUSE_CLIENT} -q $"CREATE SETTINGS PROFILE ${PROFILE}"
${CLICKHOUSE_CLIENT} -q $"CREATE USER '${USER}' SETTINGS PROFILE '${PROFILE}'"
function run_sessions_set()
function wait_for_query_to_start()
{
local sessions_count="$1"
local session_check="$2"
for ((i = 1 ; i <= ${sessions_count} ; i++)); do
local session_id="${SESSION_ID_PREFIX}_${i}"
local query_id="${QUERY_ID_PREFIX}_${i}"
# Write only expected error text
# More than alter_sessions_count queries will not start.
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${USER}&query_id=${query_id}&session_id=${session_id}&session_check=${session_check}&session_timeout=600&function_sleep_max_microseconds_per_block=120000000" --data-binary "SELECT sleep(120)" | grep -o -m 1 'USER_SESSION_LIMIT_EXCEEDED' &
while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE query_id = '$1'") == 0 ]]; do sleep 0.2; done
}
function test_alter_max_sessions_for_user()
{
local max_sessions_for_user="$1"
echo $"test_alter_profile case: max_sessions_for_user ${max_sessions_for_user}"
# Step 0: Set max_sessions_for_user.
${CLICKHOUSE_CLIENT} -q $"ALTER SETTINGS PROFILE ${PROFILE} SETTINGS max_sessions_for_user = ${max_sessions_for_user}"
# Step 1: Simulaneously run `max_sessions_for_user` queries. These queries should run without any problems.
for ((i = 1 ; i <= max_sessions_for_user ; i++)); do
local query_id="${QUERY_ID_PREFIX}_${i}_${max_sessions_for_user}"
${CLICKHOUSE_CLIENT} --max_block_size 1 --query_id $query_id --user $USER --function_sleep_max_microseconds_per_block=120000000 -q "SELECT sleepEachRow(0.1) FROM numbers(1200)" &>/dev/null &
wait_for_query_to_start $query_id
done
for ((i = 1 ; i <= ${sessions_count} ; i++)); do
local query_id="${QUERY_ID_PREFIX}_${i}"
$CLICKHOUSE_CLIENT --query "KILL QUERY WHERE query_id='$query_id' SYNC" >/dev/null
# Step 2: Run another `max_sessions_for_user` + 1 query. That query should fail.
local query_id="${QUERY_ID_PREFIX}_should_fail"
${CLICKHOUSE_CLIENT} --query_id $query_id --user $USER -q "SELECT 1" 2>&1 | grep -o -m 1 'USER_SESSION_LIMIT_EXCEEDED'
# Step 3: Stop running queries launched at step 1.
for ((i = 1 ; i <= max_sessions_for_user ; i++)); do
local query_id="${QUERY_ID_PREFIX}_${i}_${max_sessions_for_user}"
$CLICKHOUSE_CLIENT --query "KILL QUERY WHERE query_id='$query_id' ASYNC" >/dev/null
done
wait
}
function test_alter_profile()
{
local max_session_count="$1"
local alter_sessions_count="$2"
echo $"test_alter_profile case: max_session_count ${max_session_count} alter_sessions_count ${alter_sessions_count}"
${CLICKHOUSE_CLIENT} -q $"ALTER SETTINGS PROFILE ${PROFILE} SETTINGS max_sessions_for_user = ${max_session_count}"
# Create sessions with $max_session_count restriction
run_sessions_set $max_session_count 0
# Update restriction to $alter_sessions_count
${CLICKHOUSE_CLIENT} -q $"ALTER SETTINGS PROFILE ${PROFILE} SETTINGS max_sessions_for_user = ${alter_sessions_count}"
# Simultaneous sessions should use max settings from profile ($alter_sessions_count)
run_sessions_set $max_session_count 1
}
test_alter_profile 1 1
test_alter_profile 2 1
test_alter_profile 1 2
test_alter_profile 2 2
test_alter_max_sessions_for_user 1
test_alter_max_sessions_for_user 2
${CLICKHOUSE_CLIENT} -q "SELECT 1 SETTINGS max_sessions_for_user = 1" 2>&1 | grep -m 1 -o 'READONLY' | head -1
${CLICKHOUSE_CLIENT} -q $"SET max_sessions_for_user = 1 " 2>&1 | grep -o -m 1 'READONLY' | head -1

View File

@ -30,14 +30,14 @@ QUERY id: 0
FUNCTION id: 15, function_name: toTypeName, function_type: ordinary, result_type: String
ARGUMENTS
LIST id: 16, nodes: 1
FUNCTION id: 2, function_name: anyIf, function_type: aggregate, result_type: Nullable(Int128)
FUNCTION id: 17, function_name: anyIf, function_type: aggregate, result_type: Nullable(Int128)
ARGUMENTS
LIST id: 3, nodes: 2
FUNCTION id: 4, function_name: _CAST, function_type: ordinary, result_type: Nullable(Int128)
LIST id: 18, nodes: 2
FUNCTION id: 19, function_name: _CAST, function_type: ordinary, result_type: Nullable(Int128)
ARGUMENTS
LIST id: 5, nodes: 2
LIST id: 20, nodes: 2
COLUMN id: 6, column_name: number, result_type: UInt64, source_id: 7
CONSTANT id: 8, constant_value: \'Nullable(Int128)\', constant_value_type: String
CONSTANT id: 21, constant_value: \'Nullable(Int128)\', constant_value_type: String
FUNCTION id: 9, function_name: equals, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 10, nodes: 2
@ -50,8 +50,8 @@ QUERY id: 0
JOIN TREE
TABLE_FUNCTION id: 7, alias: __table1, table_function_name: numbers
ARGUMENTS
LIST id: 17, nodes: 1
CONSTANT id: 18, constant_value: UInt64_100, constant_value_type: UInt8
LIST id: 22, nodes: 1
CONSTANT id: 23, constant_value: UInt64_100, constant_value_type: UInt8
SELECT any(if((number % 10) = 5, CAST(NULL, 'Nullable(Int128)'), number)) AS a, toTypeName(a) FROM numbers(100) AS a;
0 Nullable(Int128)
EXPLAIN QUERY TREE SELECT any(if((number % 10) = 5, CAST(NULL, 'Nullable(Int128)'), number)) AS a, toTypeName(a) FROM numbers(100);
@ -84,17 +84,17 @@ QUERY id: 0
FUNCTION id: 17, function_name: toTypeName, function_type: ordinary, result_type: String
ARGUMENTS
LIST id: 18, nodes: 1
FUNCTION id: 2, function_name: anyIf, function_type: aggregate, result_type: Nullable(Int128)
FUNCTION id: 19, function_name: anyIf, function_type: aggregate, result_type: Nullable(Int128)
ARGUMENTS
LIST id: 3, nodes: 2
FUNCTION id: 4, function_name: _CAST, function_type: ordinary, result_type: Nullable(Int128)
LIST id: 20, nodes: 2
FUNCTION id: 21, function_name: _CAST, function_type: ordinary, result_type: Nullable(Int128)
ARGUMENTS
LIST id: 5, nodes: 2
LIST id: 22, nodes: 2
COLUMN id: 6, column_name: number, result_type: UInt64, source_id: 7
CONSTANT id: 8, constant_value: \'Nullable(Int128)\', constant_value_type: String
FUNCTION id: 9, function_name: not, function_type: ordinary, result_type: UInt8
CONSTANT id: 23, constant_value: \'Nullable(Int128)\', constant_value_type: String
FUNCTION id: 24, function_name: not, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 10, nodes: 1
LIST id: 25, nodes: 1
FUNCTION id: 11, function_name: equals, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 12, nodes: 2
@ -107,5 +107,5 @@ QUERY id: 0
JOIN TREE
TABLE_FUNCTION id: 7, alias: __table1, table_function_name: numbers
ARGUMENTS
LIST id: 19, nodes: 1
CONSTANT id: 20, constant_value: UInt64_100, constant_value_type: UInt8
LIST id: 26, nodes: 1
CONSTANT id: 27, constant_value: UInt64_100, constant_value_type: UInt8

View File

@ -5,8 +5,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists test"
$CLICKHOUSE_CLIENT -q "create table test(id UInt64, t DateTime64) Engine=MergeTree order by id"
$CLICKHOUSE_CLIENT -q "insert into test from infile '$CURDIR/data_orc/test_reader_time_zone.snappy.orc' SETTINGS input_format_orc_read_use_writer_time_zone=true FORMAT ORC"
$CLICKHOUSE_CLIENT -q "select * from test SETTINGS session_timezone='Asia/Shanghai'"
$CLICKHOUSE_CLIENT -q "drop table test"
$CLICKHOUSE_CLIENT -q "drop table if exists test_orc_read_timezone"
$CLICKHOUSE_CLIENT -q "create table test_orc_read_timezone(id UInt64, t DateTime64) Engine=MergeTree order by id"
$CLICKHOUSE_CLIENT -q "insert into test_orc_read_timezone from infile '$CURDIR/data_orc/test_reader_time_zone.snappy.orc' SETTINGS input_format_orc_reader_time_zone_name='Asia/Shanghai' FORMAT ORC"
$CLICKHOUSE_CLIENT -q "select * from test_orc_read_timezone SETTINGS session_timezone='Asia/Shanghai'"
$CLICKHOUSE_CLIENT -q "drop table test_orc_read_timezone"

View File

@ -0,0 +1,40 @@
0
1
2
2
2
2
2
2
2
2
0
1
2
3
4
5
6
7
8
9
0
1
2
2
2
2
2
2
2
2
0
1
2
3
4
5
6
7
8
9

View File

@ -0,0 +1,4 @@
SELECT lagInFrame(2::UInt128, 2, number) OVER w FROM numbers(10) WINDOW w AS (ORDER BY number);
SELECT leadInFrame(2::UInt128, 2, number) OVER w FROM numbers(10) WINDOW w AS (ORDER BY number);
SELECT lagInFrame(2::UInt64, 2, number) OVER w FROM numbers(10) WINDOW w AS (ORDER BY number);
SELECT leadInFrame(2::UInt64, 2, number) OVER w FROM numbers(10) WINDOW w AS (ORDER BY number);

View File

@ -0,0 +1,9 @@
03
AggregateFunction(count, Nullable(UInt64))
function_name: _CAST, function_type: ordinary, result_type: AggregateFunction(count, Nullable(UInt64))
function_name: countStateIf, function_type: aggregate, result_type: AggregateFunction(count, UInt64)
constant_value: \'AggregateFunction(count, Nullable(UInt64))\', constant_value_type: String
AggregateFunction(uniq, Nullable(UInt64))
010003000000007518F0A8E7830665
function_name: uniqState, function_type: aggregate, result_type: AggregateFunction(uniq, Nullable(UInt64))
----

View File

@ -0,0 +1,39 @@
SET allow_experimental_analyzer = 1;
-- For function count, rewrite countState to countStateIf changes the type from AggregateFunction(count, Nullable(UInt64)) to AggregateFunction(count, UInt64)
-- We can cast AggregateFunction(count, UInt64) back to AggregateFunction(count, Nullable(UInt64)) with additional _CAST
select hex(countState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1;
select toTypeName(countState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1;
select arrayStringConcat(arraySlice(splitByString(', ', trimLeft(explain)), 2), ', ') from (explain query tree select hex(countState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1) where explain like '%AggregateFunction%';
-- For function uniq, rewrite uniqState to uniqStateIf changes the type from AggregateFunction(uniq, Nullable(UInt64)) to AggregateFunction(uniq, UInt64)
-- We can't cast AggregateFunction(uniq, UInt64) back to AggregateFunction(uniq, Nullable(UInt64)) so rewrite is not happening.
select toTypeName(uniqState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1;
select hex(uniqState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1;
select arrayStringConcat(arraySlice(splitByString(', ', trimLeft(explain)), 2), ', ') from (explain query tree select hex(uniqState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1) where explain like '%AggregateFunction%';
select '----';
CREATE TABLE a
(
`a_id` String
)
ENGINE = MergeTree
PARTITION BY tuple()
ORDER BY tuple();
CREATE TABLE b
(
`b_id` AggregateFunction(uniq, Nullable(String))
)
ENGINE = AggregatingMergeTree
PARTITION BY tuple()
ORDER BY tuple();
CREATE MATERIALIZED VIEW mv TO b
(
`b_id` AggregateFunction(uniq, Nullable(String))
)
AS SELECT uniqState(if(a_id != '', a_id, NULL)) AS b_id
FROM a;

View File

@ -0,0 +1,2 @@
ALTER TABLE t MODIFY COLUMN `c` CODEC(in(1, 2))
ALTER TABLE t MODIFY COLUMN `c` STATISTICS(plus(1, 2))

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# Ensure that these (possibly incorrect) queries can at least be parsed back after formatting.
$CLICKHOUSE_FORMAT --oneline --query "ALTER TABLE t MODIFY COLUMN c CODEC(in(1, 2))" | $CLICKHOUSE_FORMAT --oneline
$CLICKHOUSE_FORMAT --oneline --query "ALTER TABLE t MODIFY COLUMN c STATISTICS(plus(1, 2))" | $CLICKHOUSE_FORMAT --oneline