Merge remote-tracking branch 'ClickHouse/master' into multiquery-followup

This commit is contained in:
Robert Schulze 2024-07-31 15:36:37 +00:00
commit d0a030f03e
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
63 changed files with 272 additions and 451 deletions

2
contrib/azure vendored

@ -1 +1 @@
Subproject commit ea3e19a7be08519134c643177d56c7484dfec884
Subproject commit 67272b7ee0adff6b69921b26eb071ba1a353062c

2
contrib/rocksdb vendored

@ -1 +1 @@
Subproject commit be366233921293bd07a84dc4ea6991858665f202
Subproject commit 01e43568fa9f3f7bf107b2b66c00b286b456f33e

View File

@ -5,6 +5,9 @@ if (NOT ENABLE_ROCKSDB)
return()
endif()
# not in original build system, otherwise xxHash.cc fails to compile with ClickHouse C++23 default
set (CMAKE_CXX_STANDARD 20)
# Always disable jemalloc for rocksdb by default because it introduces non-standard jemalloc APIs
option(WITH_JEMALLOC "build with JeMalloc" OFF)
@ -16,14 +19,6 @@ option(WITH_LZ4 "build with lz4" ON)
option(WITH_ZLIB "build with zlib" ON)
option(WITH_ZSTD "build with zstd" ON)
# third-party/folly is only validated to work on Linux and Windows for now.
# So only turn it on there by default.
if(CMAKE_SYSTEM_NAME MATCHES "Linux|Windows")
option(WITH_FOLLY_DISTRIBUTED_MUTEX "build with folly::DistributedMutex" ON)
else()
option(WITH_FOLLY_DISTRIBUTED_MUTEX "build with folly::DistributedMutex" OFF)
endif()
if(WITH_SNAPPY)
add_definitions(-DSNAPPY)
list(APPEND THIRDPARTY_LIBS ch_contrib::snappy)
@ -44,7 +39,7 @@ if(WITH_ZSTD)
list(APPEND THIRDPARTY_LIBS ch_contrib::zstd)
endif()
option(PORTABLE "build a portable binary" ON)
add_definitions(-DROCKSDB_PORTABLE)
if(ENABLE_SSE42 AND ENABLE_PCLMULQDQ)
add_definitions(-DHAVE_SSE42)
@ -59,11 +54,6 @@ if(CMAKE_SYSTEM_PROCESSOR MATCHES "arm64|aarch64|AARCH64")
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=armv8-a+crc+crypto -Wno-unused-function")
endif()
set (HAVE_THREAD_LOCAL 1)
if(HAVE_THREAD_LOCAL)
add_definitions(-DROCKSDB_SUPPORT_THREAD_LOCAL)
endif()
if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
add_definitions(-DOS_MACOSX)
elseif(CMAKE_SYSTEM_NAME MATCHES "Linux")
@ -89,19 +79,21 @@ set(ROCKSDB_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rocksdb")
include_directories(${ROCKSDB_SOURCE_DIR})
include_directories("${ROCKSDB_SOURCE_DIR}/include")
if(WITH_FOLLY_DISTRIBUTED_MUTEX)
include_directories("${ROCKSDB_SOURCE_DIR}/third-party/folly")
endif()
set(SOURCES
${ROCKSDB_SOURCE_DIR}/cache/cache.cc
${ROCKSDB_SOURCE_DIR}/cache/cache_entry_roles.cc
${ROCKSDB_SOURCE_DIR}/cache/cache_key.cc
${ROCKSDB_SOURCE_DIR}/cache/cache_helpers.cc
${ROCKSDB_SOURCE_DIR}/cache/cache_reservation_manager.cc
${ROCKSDB_SOURCE_DIR}/cache/charged_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/clock_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/compressed_secondary_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/lru_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/secondary_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/sharded_cache.cc
${ROCKSDB_SOURCE_DIR}/db/arena_wrapped_db_iter.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_contents.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_fetcher.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_file_addition.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_file_builder.cc
@ -113,6 +105,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/blob/blob_log_format.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_log_sequential_reader.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_log_writer.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_source.cc
${ROCKSDB_SOURCE_DIR}/db/blob/prefetch_buffer_collection.cc
${ROCKSDB_SOURCE_DIR}/db/builder.cc
${ROCKSDB_SOURCE_DIR}/db/c.cc
@ -124,7 +117,11 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_picker_fifo.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_picker_level.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_picker_universal.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_service_job.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_state.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_outputs.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/sst_partitioner.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/subcompaction_state.cc
${ROCKSDB_SOURCE_DIR}/db/convenience.cc
${ROCKSDB_SOURCE_DIR}/db/db_filesnapshot.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/compacted_db_impl.cc
@ -159,10 +156,11 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/merge_helper.cc
${ROCKSDB_SOURCE_DIR}/db/merge_operator.cc
${ROCKSDB_SOURCE_DIR}/db/output_validator.cc
${ROCKSDB_SOURCE_DIR}/db/periodic_work_scheduler.cc
${ROCKSDB_SOURCE_DIR}/db/periodic_task_scheduler.cc
${ROCKSDB_SOURCE_DIR}/db/range_del_aggregator.cc
${ROCKSDB_SOURCE_DIR}/db/range_tombstone_fragmenter.cc
${ROCKSDB_SOURCE_DIR}/db/repair.cc
${ROCKSDB_SOURCE_DIR}/db/seqno_to_time_mapping.cc
${ROCKSDB_SOURCE_DIR}/db/snapshot_impl.cc
${ROCKSDB_SOURCE_DIR}/db/table_cache.cc
${ROCKSDB_SOURCE_DIR}/db/table_properties_collector.cc
@ -174,6 +172,8 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/version_set.cc
${ROCKSDB_SOURCE_DIR}/db/wal_edit.cc
${ROCKSDB_SOURCE_DIR}/db/wal_manager.cc
${ROCKSDB_SOURCE_DIR}/db/wide/wide_column_serialization.cc
${ROCKSDB_SOURCE_DIR}/db/wide/wide_columns.cc
${ROCKSDB_SOURCE_DIR}/db/write_batch.cc
${ROCKSDB_SOURCE_DIR}/db/write_batch_base.cc
${ROCKSDB_SOURCE_DIR}/db/write_controller.cc
@ -182,7 +182,6 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/env/env.cc
${ROCKSDB_SOURCE_DIR}/env/env_chroot.cc
${ROCKSDB_SOURCE_DIR}/env/env_encryption.cc
${ROCKSDB_SOURCE_DIR}/env/env_hdfs.cc
${ROCKSDB_SOURCE_DIR}/env/file_system.cc
${ROCKSDB_SOURCE_DIR}/env/file_system_tracer.cc
${ROCKSDB_SOURCE_DIR}/env/fs_remap.cc
@ -233,16 +232,17 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/options/options.cc
${ROCKSDB_SOURCE_DIR}/options/options_helper.cc
${ROCKSDB_SOURCE_DIR}/options/options_parser.cc
${ROCKSDB_SOURCE_DIR}/port/mmap.cc
${ROCKSDB_SOURCE_DIR}/port/stack_trace.cc
${ROCKSDB_SOURCE_DIR}/table/adaptive/adaptive_table_factory.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/binary_search_index_reader.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_filter_block.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_builder.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_factory.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_iterator.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_reader.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_builder.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_cache.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_prefetcher.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_prefix_index.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/data_block_hash_index.cc
@ -300,9 +300,12 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/trace_replay/trace_record_result.cc
${ROCKSDB_SOURCE_DIR}/trace_replay/trace_record.cc
${ROCKSDB_SOURCE_DIR}/trace_replay/trace_replay.cc
${ROCKSDB_SOURCE_DIR}/util/async_file_reader.cc
${ROCKSDB_SOURCE_DIR}/util/cleanable.cc
${ROCKSDB_SOURCE_DIR}/util/coding.cc
${ROCKSDB_SOURCE_DIR}/util/compaction_job_stats_impl.cc
${ROCKSDB_SOURCE_DIR}/util/comparator.cc
${ROCKSDB_SOURCE_DIR}/util/compression.cc
${ROCKSDB_SOURCE_DIR}/util/compression_context_cache.cc
${ROCKSDB_SOURCE_DIR}/util/concurrent_task_limiter_impl.cc
${ROCKSDB_SOURCE_DIR}/util/crc32c.cc
@ -311,16 +314,17 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/util/murmurhash.cc
${ROCKSDB_SOURCE_DIR}/util/random.cc
${ROCKSDB_SOURCE_DIR}/util/rate_limiter.cc
${ROCKSDB_SOURCE_DIR}/util/regex.cc
${ROCKSDB_SOURCE_DIR}/util/ribbon_config.cc
${ROCKSDB_SOURCE_DIR}/util/slice.cc
${ROCKSDB_SOURCE_DIR}/util/file_checksum_helper.cc
${ROCKSDB_SOURCE_DIR}/util/status.cc
${ROCKSDB_SOURCE_DIR}/util/stderr_logger.cc
${ROCKSDB_SOURCE_DIR}/util/string_util.cc
${ROCKSDB_SOURCE_DIR}/util/thread_local.cc
${ROCKSDB_SOURCE_DIR}/util/threadpool_imp.cc
${ROCKSDB_SOURCE_DIR}/util/xxhash.cc
${ROCKSDB_SOURCE_DIR}/utilities/backupable/backupable_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/agg_merge/agg_merge.cc
${ROCKSDB_SOURCE_DIR}/utilities/backup/backup_engine.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_compaction_filter.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_db_impl.cc
@ -335,6 +339,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/utilities/checkpoint/checkpoint_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/compaction_filters.cc
${ROCKSDB_SOURCE_DIR}/utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc
${ROCKSDB_SOURCE_DIR}/utilities/counted_fs.cc
${ROCKSDB_SOURCE_DIR}/utilities/debug.cc
${ROCKSDB_SOURCE_DIR}/utilities/env_mirror.cc
${ROCKSDB_SOURCE_DIR}/utilities/env_timed.cc
@ -422,15 +427,6 @@ list(APPEND SOURCES
"${ROCKSDB_SOURCE_DIR}/env/fs_posix.cc"
"${ROCKSDB_SOURCE_DIR}/env/io_posix.cc")
if(WITH_FOLLY_DISTRIBUTED_MUTEX)
list(APPEND SOURCES
"${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/detail/Futex.cpp"
"${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/AtomicNotification.cpp"
"${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/DistributedMutex.cpp"
"${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/ParkingLot.cpp"
"${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/WaitOptions.cpp")
endif()
add_library(_rocksdb ${SOURCES})
add_library(ch_contrib::rocksdb ALIAS _rocksdb)
target_link_libraries(_rocksdb PRIVATE ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})

View File

@ -1,16 +1,33 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
/// This file was edited for ClickHouse.
#include <memory>
#include "rocksdb/version.h"
#include "rocksdb/utilities/object_registry.h"
#include "util/string_util.h"
// The build script may replace these values with real values based
// on whether or not GIT is available and the platform settings
static const std::string rocksdb_build_git_sha = "rocksdb_build_git_sha:0";
static const std::string rocksdb_build_git_tag = "rocksdb_build_git_tag:master";
static const std::string rocksdb_build_date = "rocksdb_build_date:2000-01-01";
static const std::string rocksdb_build_git_sha = "rocksdb_build_git_sha:72438a678872544809393b831c7273794c074215";
static const std::string rocksdb_build_git_tag = "rocksdb_build_git_tag:main";
#define HAS_GIT_CHANGES 0
#if HAS_GIT_CHANGES == 0
// If HAS_GIT_CHANGES is 0, the GIT date is used.
// Use the time the branch/tag was last modified
static const std::string rocksdb_build_date = "rocksdb_build_date:2024-07-12 16:01:57";
#else
// If HAS_GIT_CHANGES is > 0, the branch/tag has modifications.
// Use the time the build was created.
static const std::string rocksdb_build_date = "rocksdb_build_date:2024-07-13 17:15:50";
#endif
extern "C" {
} // extern "C"
std::unordered_map<std::string, ROCKSDB_NAMESPACE::RegistrarFunc> ROCKSDB_NAMESPACE::ObjectRegistry::builtins_ = {
};
namespace ROCKSDB_NAMESPACE {
static void AddProperty(std::unordered_map<std::string, std::string> *props, const std::string& name) {
@ -39,12 +56,12 @@ const std::unordered_map<std::string, std::string>& GetRocksBuildProperties() {
}
std::string GetRocksVersionAsString(bool with_patch) {
std::string version = ToString(ROCKSDB_MAJOR) + "." + ToString(ROCKSDB_MINOR);
std::string version = std::to_string(ROCKSDB_MAJOR) + "." + std::to_string(ROCKSDB_MINOR);
if (with_patch) {
return version + "." + ToString(ROCKSDB_PATCH);
return version + "." + std::to_string(ROCKSDB_PATCH);
} else {
return version;
}
}
}
std::string GetRocksBuildInfoAsString(const std::string& program, bool verbose) {

View File

@ -5608,15 +5608,3 @@ Default value: `10000000`.
Minimal size of block to compress in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached.
Default value: `1GiB`.
## restore_replace_external_engines_to_null
For testing purposes. Replaces all external engines to Null to not initiate external connections.
Default value: `False`
## restore_replace_external_table_functions_to_null
For testing purposes. Replaces all external table functions to Null to not initiate external connections.
Default value: `False`

View File

@ -80,7 +80,7 @@ namespace ErrorCodes
void applySettingsOverridesForLocal(ContextMutablePtr context)
{
Settings settings = context->getSettings();
Settings settings = context->getSettingsCopy();
settings.allow_introspection_functions = true;
settings.storage_file_read_method = LocalFSReadMethod::mmap;

View File

@ -2,6 +2,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/IAggregateFunction.h>
@ -42,7 +43,7 @@ public:
if (lower_name.ends_with("if"))
return;
auto & function_arguments_nodes = function_node->getArguments().getNodes();
const auto & function_arguments_nodes = function_node->getArguments().getNodes();
if (function_arguments_nodes.size() != 1)
return;
@ -50,6 +51,8 @@ public:
if (!if_node || if_node->getFunctionName() != "if")
return;
FunctionNodePtr replaced_node;
auto if_arguments_nodes = if_node->getArguments().getNodes();
auto * first_const_node = if_arguments_nodes[1]->as<ConstantNode>();
auto * second_const_node = if_arguments_nodes[2]->as<ConstantNode>();
@ -75,8 +78,11 @@ public:
new_arguments[0] = std::move(if_arguments_nodes[1]);
new_arguments[1] = std::move(if_arguments_nodes[0]);
function_arguments_nodes = std::move(new_arguments);
resolveAggregateFunctionNodeByName(*function_node, function_node->getFunctionName() + "If");
replaced_node = std::make_shared<FunctionNode>(function_node->getFunctionName() + "If");
replaced_node->getArguments().getNodes() = std::move(new_arguments);
replaced_node->getParameters().getNodes() = function_node->getParameters().getNodes();
resolveAggregateFunctionNodeByName(*replaced_node, replaced_node->getFunctionName());
}
}
else if (first_const_node)
@ -104,10 +110,26 @@ public:
FunctionFactory::instance().get("not", getContext())->build(not_function->getArgumentColumns()));
new_arguments[1] = std::move(not_function);
function_arguments_nodes = std::move(new_arguments);
resolveAggregateFunctionNodeByName(*function_node, function_node->getFunctionName() + "If");
replaced_node = std::make_shared<FunctionNode>(function_node->getFunctionName() + "If");
replaced_node->getArguments().getNodes() = std::move(new_arguments);
replaced_node->getParameters().getNodes() = function_node->getParameters().getNodes();
resolveAggregateFunctionNodeByName(*replaced_node, replaced_node->getFunctionName());
}
}
if (!replaced_node)
return;
auto prev_type = function_node->getResultType();
auto curr_type = replaced_node->getResultType();
if (!prev_type->equals(*curr_type))
return;
/// Just in case, CAST compatible aggregate function states.
if (WhichDataType(prev_type).isAggregateFunction() && !DataTypeAggregateFunction::strictEquals(prev_type, curr_type))
node = createCastFunction(std::move(replaced_node), prev_type, getContext());
else
node = std::move(replaced_node);
}
};

View File

@ -237,7 +237,7 @@ QueryTreeNodePtr QueryTreeBuilder::buildSelectExpression(const ASTPtr & select_q
/// Remove global settings limit and offset
if (const auto & settings_ref = updated_context->getSettingsRef(); settings_ref.limit || settings_ref.offset)
{
Settings settings = updated_context->getSettings();
Settings settings = updated_context->getSettingsCopy();
limit = settings.limit;
offset = settings.offset;
settings.limit = 0;

View File

@ -503,7 +503,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesCacheMiss);
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
Settings subquery_settings = context->getSettingsCopy();
subquery_settings.max_result_rows = 1;
subquery_settings.extremes = false;
subquery_context->setSettings(subquery_settings);

View File

@ -867,7 +867,7 @@ void updateContextForSubqueryExecution(ContextMutablePtr & mutable_context)
* max_rows_in_join, max_bytes_in_join, join_overflow_mode,
* which are checked separately (in the Set, Join objects).
*/
Settings subquery_settings = mutable_context->getSettings();
Settings subquery_settings = mutable_context->getSettingsCopy();
subquery_settings.max_result_rows = 0;
subquery_settings.max_result_bytes = 0;
/// The calculation of extremes does not make sense and is not necessary (if you do it, then the extremes of the subquery can be taken for whole query).

View File

@ -232,7 +232,7 @@ int IBridge::main(const std::vector<std::string> & /*args*/)
auto context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
auto settings = context->getSettings();
auto settings = context->getSettingsCopy();
settings.set("http_max_field_value_size", http_max_field_value_size);
context->setSettings(settings);

View File

@ -656,7 +656,7 @@ void ClientBase::initLogsOutputStream()
void ClientBase::adjustSettings()
{
Settings settings = global_context->getSettings();
Settings settings = global_context->getSettingsCopy();
/// NOTE: Do not forget to set changed=false to avoid sending it to the server (to avoid breakage read only profiles)
@ -865,7 +865,7 @@ bool ClientBase::isSyncInsertWithData(const ASTInsertQuery & insert_query, const
if (!insert_query.data)
return false;
auto settings = context->getSettings();
auto settings = context->getSettingsCopy();
if (insert_query.settings_ast)
settings.applyChanges(insert_query.settings_ast->as<ASTSetQuery>()->changes);
@ -2671,7 +2671,7 @@ bool ClientBase::processMultiQueryFromFile(const String & file_name)
if (!getClientConfiguration().has("log_comment"))
{
Settings settings = client_context->getSettings();
Settings settings = client_context->getSettingsCopy();
/// NOTE: cannot use even weakly_canonical() since it fails for /dev/stdin due to resolving of "pipe:[X]"
settings.log_comment = fs::absolute(fs::path(file_name));
client_context->setSettings(settings);

View File

@ -42,6 +42,7 @@ public:
size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
using Entry = IConnectionPool::Entry;
using PoolWithFailoverBase<IConnectionPool>::isTryResultInvalid;
/** Allocates connection to work. */
Entry get(const ConnectionTimeouts & timeouts) override;

View File

@ -116,6 +116,12 @@ public:
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority);
// Returns if the TryResult provided is an invalid one that cannot be used. Used to prevent logical errors.
bool isTryResultInvalid(const TryResult & result, bool skip_read_only_replicas) const
{
return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly);
}
size_t getPoolSize() const { return nested_pools.size(); }
protected:
@ -300,7 +306,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
throw DB::NetException(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"All connection tries failed. Log: \n\n{}\n", fail_messages);
std::erase_if(try_results, [&](const TryResult & r) { return r.entry.isNull() || !r.is_usable || (skip_read_only_replicas && r.is_readonly); });
std::erase_if(try_results, [&](const TryResult & r) { return isTryResultInvalid(r, skip_read_only_replicas); });
/// Sort so that preferred items are near the beginning.
std::stable_sort(
@ -321,6 +327,9 @@ PoolWithFailoverBase<TNestedPool>::getMany(
}
else if (up_to_date_count >= min_entries)
{
if (try_results.size() < up_to_date_count)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Could not find enough connections for up-to-date results. Got: {}, needed: {}", try_results.size(), up_to_date_count);
/// There is enough up-to-date entries.
try_results.resize(up_to_date_count);
}

View File

@ -893,8 +893,6 @@ class IColumn;
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(Bool, restore_replace_external_engines_to_null, false, "Replace all the external table engines to Null on restore. Useful for testing purposes", 0) \
M(Bool, restore_replace_external_table_functions_to_null, false, "Replace all table functions to Null on restore. Useful for testing purposes", 0) \
\
\
/* ###################################### */ \
@ -1075,7 +1073,7 @@ class IColumn;
M(Bool, input_format_orc_allow_missing_columns, true, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_orc_use_fast_decoder, true, "Use a faster ORC decoder implementation.", 0) \
M(Bool, input_format_orc_filter_push_down, true, "When reading ORC files, skip whole stripes or row groups based on the WHERE/PREWHERE expressions, min/max statistics or bloom filter in the ORC metadata.", 0) \
M(Bool, input_format_orc_read_use_writer_time_zone, false, "Whether use the writer's time zone in ORC stripe for ORC row reader, the default ORC row reader's time zone is GMT.", 0) \
M(String, input_format_orc_reader_time_zone_name, "GMT", "The time zone name for ORC row reader, the default ORC row reader's time zone is GMT.", 0) \
M(Bool, input_format_parquet_allow_missing_columns, true, "Allow missing columns while reading Parquet input formats", 0) \
M(UInt64, input_format_parquet_local_file_min_bytes_for_seek, 8192, "Min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format", 0) \
M(Bool, input_format_arrow_allow_missing_columns, true, "Allow missing columns while reading Arrow input formats", 0) \

View File

@ -69,7 +69,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"dictionary_validate_primary_key_type", false, false, "Validate primary key type for dictionaries. By default id type for simple layouts will be implicitly converted to UInt64."},
{"collect_hash_table_stats_during_joins", false, true, "New setting."},
{"max_size_to_preallocate_for_joins", 0, 100'000'000, "New setting."},
{"input_format_orc_read_use_writer_time_zone", false, false, "Whether use the writer's time zone in ORC stripe for ORC row reader, the default ORC row reader's time zone is GMT."},
{"input_format_orc_reader_time_zone_name", "GMT", "GMT", "The time zone name for ORC row reader, the default ORC row reader's time zone is GMT."},
{"lightweight_mutation_projection_mode", "throw", "throw", "When lightweight delete happens on a table with projection(s), the possible operations include throw the exception as projection exists, or drop all projection related to this table then do lightweight delete."},
{"database_replicated_allow_heavy_create", true, false, "Long-running DDL queries (CREATE AS SELECT and POPULATE) for Replicated database engine was forbidden"},
{"query_plan_merge_filters", false, false, "Allow to merge filters in the query plan"},
@ -80,9 +80,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
{"backup_restore_s3_retry_attempts", 1000,1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore."},
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."},
{"restore_replace_external_table_functions_to_null", false, false, "New setting."},
{"restore_replace_external_engines_to_null", false, false, "New setting."}
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},

View File

@ -111,7 +111,7 @@ ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const String & table_name, Co
buffer << ") Engine = Dictionary(" << backQuoteIfNeed(table_name) << ")";
}
auto settings = getContext()->getSettingsRef();
const auto & settings = getContext()->getSettingsRef();
ParserCreateQuery parser;
const char * pos = query.data();
std::string error_message;
@ -133,7 +133,7 @@ ASTPtr DatabaseDictionary::getCreateDatabaseQuery() const
if (const auto comment_value = getDatabaseComment(); !comment_value.empty())
buffer << " COMMENT " << backQuote(comment_value);
}
auto settings = getContext()->getSettingsRef();
const auto & settings = getContext()->getSettingsRef();
ParserCreateQuery parser;
return parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
}

View File

@ -534,7 +534,7 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery() const
{
ASTPtr ast;
auto settings = getContext()->getSettingsRef();
const auto & settings = getContext()->getSettingsRef();
{
std::lock_guard lock(mutex);
auto database_metadata_path = getContext()->getPath() + "metadata/" + escapeForFileName(database_name) + ".sql";
@ -733,7 +733,7 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata(
return nullptr;
}
auto settings = local_context->getSettingsRef();
const auto & settings = local_context->getSettingsRef();
ParserCreateQuery parser;
const char * pos = query.data();
std::string error_message;

View File

@ -89,7 +89,7 @@ static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
static ContextMutablePtr createQueryContext(ContextPtr context)
{
Settings new_query_settings = context->getSettings();
Settings new_query_settings = context->getSettingsCopy();
new_query_settings.insert_allow_materialized_columns = true;
/// To avoid call AST::format

View File

@ -243,7 +243,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.orc.output_row_index_stride = settings.output_format_orc_row_index_stride;
format_settings.orc.use_fast_decoder = settings.input_format_orc_use_fast_decoder;
format_settings.orc.filter_push_down = settings.input_format_orc_filter_push_down;
format_settings.orc.read_use_writer_time_zone = settings.input_format_orc_read_use_writer_time_zone;
format_settings.orc.reader_time_zone_name = settings.input_format_orc_reader_time_zone_name;
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference;

View File

@ -409,7 +409,7 @@ struct FormatSettings
bool use_fast_decoder = true;
bool filter_push_down = true;
UInt64 output_row_index_stride = 10'000;
bool read_use_writer_time_zone = false;
String reader_time_zone_name = "GMT";
} orc{};
/// For capnProto format we should determine how to

View File

@ -39,7 +39,7 @@ public:
FunctionFormatQuery(ContextPtr context, String name_, OutputFormatting output_formatting_, ErrorHandling error_handling_)
: name(name_), output_formatting(output_formatting_), error_handling(error_handling_)
{
const Settings & settings = context->getSettings();
const Settings & settings = context->getSettingsRef();
max_query_size = settings.max_query_size;
max_parser_depth = settings.max_parser_depth;
max_parser_backtracks = settings.max_parser_backtracks;

View File

@ -143,7 +143,7 @@ ColumnPtr FunctionHasColumnInTable::executeImpl(const ColumnsWithTypeAndName & a
/* cluster_name= */ "",
/* password= */ ""
};
auto cluster = std::make_shared<Cluster>(getContext()->getSettings(), host_names, params);
auto cluster = std::make_shared<Cluster>(getContext()->getSettingsRef(), host_names, params);
// FIXME this (probably) needs a non-constant access to query context,
// because it might initialized a storage. Ideally, the tables required

View File

@ -2270,7 +2270,7 @@ bool Context::displaySecretsInShowAndSelect() const
return shared->server_settings.display_secrets_in_show_and_select;
}
Settings Context::getSettings() const
Settings Context::getSettingsCopy() const
{
SharedLockGuard lock(mutex);
return *settings;

View File

@ -829,7 +829,8 @@ public:
void setMacros(std::unique_ptr<Macros> && macros);
bool displaySecretsInShowAndSelect() const;
Settings getSettings() const;
Settings getSettingsCopy() const;
const Settings & getSettingsRef() const { return *settings; }
void setSettings(const Settings & settings_);
/// Set settings by name.
@ -954,8 +955,6 @@ public:
void makeSessionContext();
void makeGlobalContext();
const Settings & getSettingsRef() const { return *settings; }
void setProgressCallback(ProgressCallback callback);
/// Used in executeQuery() to pass it to the QueryPipeline.
ProgressCallback getProgressCallback() const;

View File

@ -74,7 +74,7 @@ void ExecuteScalarSubqueriesMatcher::visit(ASTPtr & ast, Data & data)
static auto getQueryInterpreter(const ASTSubquery & subquery, ExecuteScalarSubqueriesMatcher::Data & data)
{
auto subquery_context = Context::createCopy(data.getContext());
Settings subquery_settings = data.getContext()->getSettings();
Settings subquery_settings = data.getContext()->getSettingsCopy();
subquery_settings.max_result_rows = 1;
subquery_settings.extremes = false;
subquery_context->setSettings(subquery_settings);

View File

@ -171,7 +171,7 @@ ExpressionAnalyzer::ExpressionAnalyzer(
PreparedSetsPtr prepared_sets_,
bool is_create_parameterized_view_)
: WithContext(context_)
, query(query_), settings(getContext()->getSettings())
, query(query_), settings(getContext()->getSettingsRef())
, subquery_depth(subquery_depth_)
, syntax(syntax_analyzer_result_)
, is_create_parameterized_view(is_create_parameterized_view_)
@ -984,7 +984,7 @@ static std::shared_ptr<IJoin> tryCreateJoin(
algorithm == JoinAlgorithm::PARALLEL_HASH ||
algorithm == JoinAlgorithm::DEFAULT)
{
const auto & settings = context->getSettings();
const auto & settings = context->getSettingsRef();
if (analyzed_join->allowParallelHashJoin())
return std::make_shared<ConcurrentHashJoin>(

View File

@ -959,40 +959,12 @@ namespace
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
void setNullTableEngine(ASTStorage & storage)
{
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Null";
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.as_table_function)
{
if (getContext()->getSettingsRef().restore_replace_external_table_functions_to_null)
{
const auto & factory = TableFunctionFactory::instance();
auto properties = factory.tryGetProperties(create.as_table_function->as<ASTFunction>()->name);
if (properties && properties->allow_readonly)
return;
if (!create.storage)
{
auto storage_ast = std::make_shared<ASTStorage>();
create.set(create.storage, storage_ast);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage should not be created yet, it's a bug.");
create.as_table_function = nullptr;
setNullTableEngine(*create.storage);
}
return;
}
if (create.is_dictionary || create.is_ordinary_view || create.is_live_view || create.is_window_view)
return;
@ -1043,13 +1015,6 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
/// Some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not: just set default one.
setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
}
/// For external tables with restore_replace_external_engine_to_null setting we replace external engines to
/// Null table engine.
else if (getContext()->getSettingsRef().restore_replace_external_engines_to_null)
{
if (StorageFactory::instance().getStorageFeatures(create.storage->engine->name).source_access_type != AccessType::NONE)
setNullTableEngine(*create.storage);
}
return;
}

View File

@ -464,7 +464,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
* to avoid unnecessary squashing.
*/
Settings new_settings = select_context->getSettings();
Settings new_settings = select_context->getSettingsCopy();
new_settings.max_threads = std::max<UInt64>(1, settings.max_insert_threads);

View File

@ -249,7 +249,7 @@ namespace
ContextPtr getSubqueryContext(const ContextPtr & context)
{
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
Settings subquery_settings = context->getSettingsCopy();
subquery_settings.max_result_rows = 0;
subquery_settings.max_result_bytes = 0;
/// The calculation of extremes does not make sense and is not necessary (if you do it, then the extremes of the subquery can be taken for whole query).

View File

@ -308,7 +308,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
if (tables_with_columns.size() < 2)
return {};
auto settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
MultiEnum<JoinAlgorithm> join_algorithm = settings.join_algorithm;
bool try_use_direct_join = join_algorithm.isSet(JoinAlgorithm::DIRECT) || join_algorithm.isSet(JoinAlgorithm::DEFAULT);
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume(), context->getTempDataOnDisk());

View File

@ -657,7 +657,7 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
{
if (auto ctx = context.lock())
{
res.query_settings = std::make_shared<Settings>(ctx->getSettings());
res.query_settings = std::make_shared<Settings>(ctx->getSettingsRef());
res.current_database = ctx->getCurrentDatabase();
}
}

View File

@ -62,7 +62,7 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
* which are checked separately (in the Set, Join objects).
*/
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
Settings subquery_settings = context->getSettingsCopy();
subquery_settings.max_result_rows = 0;
subquery_settings.max_result_bytes = 0;
/// The calculation of `extremes` does not make sense and is not necessary (if you do it, then the `extremes` of the subquery can be taken instead of the whole query).

View File

@ -900,11 +900,7 @@ bool NativeORCBlockInputFormat::prepareStripeReader()
orc::RowReaderOptions row_reader_options;
row_reader_options.includeTypes(include_indices);
if (format_settings.orc.read_use_writer_time_zone)
{
String writer_time_zone = current_stripe_info->getWriterTimezone();
row_reader_options.setTimezoneName(writer_time_zone);
}
row_reader_options.setTimezoneName(format_settings.orc.reader_time_zone_name);
row_reader_options.range(current_stripe_info->getOffset(), current_stripe_info->getLength());
if (format_settings.orc.filter_push_down && sarg)
{

View File

@ -406,7 +406,7 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
{
const Block & header = getPort().getHeader();
const IDataType & type = *header.getByPosition(column_idx).type;
auto settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
/// Advance the token iterator until the start of the column expression
readUntilTheEndOfRowAndReTokenize(column_idx);

View File

@ -134,7 +134,6 @@ AggregatingStep::AggregatingStep(
{
output_stream->sort_description = group_by_sort_description;
output_stream->sort_scope = DataStream::SortScope::Global;
output_stream->has_single_port = true;
}
}
@ -147,7 +146,6 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_,
{
output_stream->sort_description = group_by_sort_description;
output_stream->sort_scope = DataStream::SortScope::Global;
output_stream->has_single_port = true;
}
explicit_sorting_required_for_aggregation_in_order = false;

View File

@ -10,6 +10,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static ITransformingStep::Traits getTraits(bool pre_distinct)
{
const bool preserves_number_of_streams = pre_distinct;
@ -90,7 +95,8 @@ void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil
/// final distinct for sorted stream (sorting inside and among chunks)
if (input_stream.sort_scope == DataStream::SortScope::Global)
{
assert(input_stream.has_single_port);
if (pipeline.getNumStreams() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DistinctStep with in-order expects single input");
if (distinct_sort_desc.size() < columns.size())
{

View File

@ -39,12 +39,13 @@ FillingStep::FillingStep(
, interpolate_description(interpolate_description_)
, use_with_fill_by_sorting_prefix(use_with_fill_by_sorting_prefix_)
{
if (!input_stream_.has_single_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
}
void FillingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (pipeline.getNumStreams() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
@ -69,9 +70,6 @@ void FillingStep::describeActions(JSONBuilder::JSONMap & map) const
void FillingStep::updateOutputStream()
{
if (!input_streams.front().has_single_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
output_stream = createOutputStream(
input_streams.front(), FillingTransform::transformHeader(input_streams.front().header, sort_description), getDataStreamTraits());
}

View File

@ -28,9 +28,6 @@ class DataStream
public:
Block header;
/// QueryPipeline has single port. Totals or extremes ports are not counted.
bool has_single_port = false;
/// Sorting scope. Please keep the mutual order (more strong mode should have greater value).
enum class SortScope : uint8_t
{
@ -51,8 +48,7 @@ public:
bool hasEqualPropertiesWith(const DataStream & other) const
{
return has_single_port == other.has_single_port
&& sort_description == other.sort_description
return sort_description == other.sort_description
&& (sort_description.empty() || sort_scope == other.sort_scope);
}

View File

@ -20,9 +20,6 @@ DataStream ITransformingStep::createOutputStream(
{
DataStream output_stream{.header = std::move(output_header)};
output_stream.has_single_port = stream_traits.returns_single_stream
|| (input_stream.has_single_port && stream_traits.preserves_number_of_streams);
if (stream_traits.preserves_sorting)
{
output_stream.sort_description = input_stream.sort_description;

View File

@ -1055,7 +1055,7 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
}
auto context = read_from_merge_tree->getContext();
const auto & settings = context->getSettings();
const auto & settings = context->getSettingsRef();
if (!settings.optimize_read_in_window_order || (settings.optimize_read_in_order && settings.query_plan_read_in_order) || context->getSettingsRef().allow_experimental_analyzer)
{
return 0;

View File

@ -6,7 +6,7 @@ namespace DB
{
ReadNothingStep::ReadNothingStep(Block output_header)
: ISourceStep(DataStream{.header = std::move(output_header), .has_single_port = true})
: ISourceStep(DataStream{.header = std::move(output_header)})
{
}

View File

@ -473,7 +473,7 @@ void MySQLHandler::comQuery(ReadBuffer & payload, bool binary_protocol)
query_context->setCurrentQueryId(fmt::format("mysql:{}:{}", connection_id, toString(UUIDHelpers::generateV4())));
/// --- Workaround for Bug 56173. Can be removed when the analyzer is on by default.
auto settings = query_context->getSettings();
auto settings = query_context->getSettingsCopy();
settings.prefer_column_name_to_alias = true;
query_context->setSettings(settings);

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
extern const int TOO_MANY_PARTITIONS;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
/// Can the batch be split and send files from batch one-by-one instead?
@ -241,8 +242,12 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto result = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
connection = std::move(result.front().entry);
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto result = results.front();
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
connection = std::move(result.entry);
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
LOG_DEBUG(parent.log, "Sending a batch of {} files to {} ({} rows, {} bytes).",
@ -299,8 +304,12 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett
parent.storage.getContext()->getOpenTelemetrySpanLog());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto result = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto connection = std::move(result.front().entry);
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto result = results.front();
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
auto connection = std::move(result.entry);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
RemoteInserter remote(*connection, timeouts,

View File

@ -283,7 +283,7 @@ ConnectionPoolWithFailoverPtr DistributedAsyncInsertDirectoryQueue::createPool(c
auto pools = createPoolsForAddresses(addresses, pool_factory, storage.log);
const auto settings = storage.getContext()->getSettings();
const auto & settings = storage.getContext()->getSettingsRef();
return std::make_shared<ConnectionPoolWithFailover>(std::move(pools),
settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(),
@ -412,8 +412,12 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path,
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto result = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto connection = std::move(result.front().entry);
auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto result = results.front();
if (pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
auto connection = std::move(result.entry);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
file_path,

View File

@ -377,7 +377,11 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
/// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
job.connection_entry = std::move(results.front().entry);
auto result = results.front();
if (shard_info.pool->isTryResultInvalid(result, settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
job.connection_entry = std::move(result.entry);
}
else
{

View File

@ -341,15 +341,19 @@ void MergeTreeDeduplicationLog::shutdown()
stopped = true;
if (current_writer)
{
/// If an error has occurred during finalize, we'd like to have the exception set for reset.
/// Otherwise, we'll be in a situation when a finalization didn't happen, and we didn't get
/// any error, causing logical error (see ~MemoryBuffer()).
try
{
current_writer->finalize();
current_writer.reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
current_writer.reset();
}
current_writer.reset();
}
}

View File

@ -34,7 +34,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
, write_settings(write_settings_)
{
MergeTreeWriterSettings writer_settings(
data_part->storage.getContext()->getSettings(),
data_part->storage.getContext()->getSettingsRef(),
write_settings,
storage_settings,
data_part->index_granularity_info.mark_type.adaptive,

View File

@ -23,7 +23,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, /*reset_columns=*/ true)
{
const auto & global_settings = data_part->storage.getContext()->getSettings();
const auto & global_settings = data_part->storage.getContext()->getSettingsRef();
MergeTreeWriterSettings writer_settings(
global_settings,

View File

@ -464,7 +464,7 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
setCredentials(credentials, request_uri);
const auto settings = context_->getSettings();
const auto & settings = context_->getSettingsRef();
auto proxy_config = getProxyConfiguration(request_uri.getScheme());
@ -1328,7 +1328,7 @@ std::optional<time_t> IStorageURLBase::tryGetLastModificationTime(
const Poco::Net::HTTPBasicCredentials & credentials,
const ContextPtr & context)
{
auto settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
auto uri = Poco::URI(url);

View File

@ -97,7 +97,7 @@ bool hasJoin(const ASTSelectWithUnionQuery & ast)
ContextPtr getViewContext(ContextPtr context, const StorageSnapshotPtr & storage_snapshot)
{
auto view_context = storage_snapshot->metadata->getSQLSecurityOverriddenContext(context);
Settings view_settings = view_context->getSettings();
Settings view_settings = view_context->getSettingsCopy();
view_settings.max_result_rows = 0;
view_settings.max_result_bytes = 0;
view_settings.extremes = false;

View File

@ -65,7 +65,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
/// Ignore limit for result number of rows (that could be set during handling CSE/CTE),
/// since this is a service query and should not lead to query failure.
{
Settings new_settings = new_context->getSettings();
Settings new_settings = new_context->getSettingsCopy();
new_settings.max_result_rows = 0;
new_settings.max_result_bytes = 0;
new_context->setSettings(new_settings);

View File

@ -93,7 +93,7 @@ StoragePtr TableFunctionHive::executeImpl(
ColumnsDescription /*cached_columns_*/,
bool /*is_insert_query*/) const
{
const Settings & settings = context_->getSettings();
const Settings & settings = context_->getSettingsRef();
ParserExpression partition_by_parser;
ASTPtr partition_by_ast = parseQuery(
partition_by_parser,

View File

@ -1,14 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
<allowed_path>/backups/</allowed_path>
</backups>
</clickhouse>

View File

@ -1,21 +0,0 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>replica1</host>
<port>9000</port>
</replica>
<replica>
<host>replica2</host>
<port>9000</port>
</replica>
<replica>
<host>replica3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -1,218 +0,0 @@
import pytest
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
configs = ["configs/remote_servers.xml", "configs/backups_disk.xml"]
node1 = cluster.add_instance(
"replica1",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
node2 = cluster.add_instance(
"replica2",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
node3 = cluster.add_instance(
"replica3",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
nodes = [node1, node2, node3]
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"Disk('backups', '{backup_id_counter}/')"
def cleanup_nodes(nodes, dbname):
for node in nodes:
node.query(f"DROP DATABASE IF EXISTS {dbname} SYNC")
def fill_nodes(nodes, dbname):
cleanup_nodes(nodes, dbname)
for node in nodes:
node.query(
f"CREATE DATABASE {dbname} ENGINE = Replicated('/clickhouse/databases/{dbname}', 'default', '{node.name}')"
)
def drop_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS `clickhouse`.`{tableName}`")
def get_mysql_conn(cluster):
conn = pymysql.connect(
user="root",
password="clickhouse",
host=cluster.mysql8_ip,
port=cluster.mysql8_port,
)
return conn
def fill_tables(cluster, dbname):
fill_nodes(nodes, dbname)
conn = get_mysql_conn(cluster)
with conn.cursor() as cursor:
cursor.execute("DROP DATABASE IF EXISTS clickhouse")
cursor.execute("CREATE DATABASE clickhouse")
cursor.execute("DROP TABLE IF EXISTS clickhouse.inference_table")
cursor.execute(
"CREATE TABLE clickhouse.inference_table (id INT PRIMARY KEY, data BINARY(16) NOT NULL)"
)
cursor.execute(
"INSERT INTO clickhouse.inference_table VALUES (100, X'9fad5e9eefdfb449')"
)
conn.commit()
parameters = "'mysql80:3306', 'clickhouse', 'inference_table', 'root', 'clickhouse'"
node1.query(
f"CREATE TABLE {dbname}.mysql_schema_inference_engine ENGINE=MySQL({parameters})"
)
node1.query(
f"CREATE TABLE {dbname}.mysql_schema_inference_function AS mysql({parameters})"
)
node1.query(f"CREATE TABLE {dbname}.merge_tree (id UInt64, b String) ORDER BY id")
node1.query(f"INSERT INTO {dbname}.merge_tree VALUES (100, 'abc')")
expected = "id\tInt32\t\t\t\t\t\ndata\tFixedString(16)\t\t\t\t\t\n"
assert (
node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_engine")
== expected
)
assert (
node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_function")
== expected
)
assert node1.query(f"SELECT id FROM mysql({parameters})") == "100\n"
assert (
node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_engine") == "100\n"
)
assert (
node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_function")
== "100\n"
)
assert node1.query(f"SELECT id FROM {dbname}.merge_tree") == "100\n"
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def test_restore_table(start_cluster):
fill_tables(cluster, "replicated")
backup_name = new_backup_name()
node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
node2.query(f"BACKUP DATABASE replicated TO {backup_name}")
node2.query("DROP TABLE replicated.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated.mysql_schema_inference_function")
node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
assert node3.query("EXISTS replicated.mysql_schema_inference_engine") == "0\n"
assert node3.query("EXISTS replicated.mysql_schema_inference_function") == "0\n"
node3.query(
f"RESTORE DATABASE replicated FROM {backup_name} SETTINGS allow_different_database_def=true"
)
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated.mysql_schema_inference_engine"
)
== "1\t100\n"
)
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated.mysql_schema_inference_function"
)
== "1\t100\n"
)
assert (
node1.query("SELECT count(), sum(id) FROM replicated.merge_tree") == "1\t100\n"
)
cleanup_nodes(nodes, "replicated")
def test_restore_table_null(start_cluster):
fill_tables(cluster, "replicated2")
backup_name = new_backup_name()
node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
node2.query(f"BACKUP DATABASE replicated2 TO {backup_name}")
node2.query("DROP TABLE replicated2.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated2.mysql_schema_inference_function")
node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
assert node3.query("EXISTS replicated2.mysql_schema_inference_engine") == "0\n"
assert node3.query("EXISTS replicated2.mysql_schema_inference_function") == "0\n"
node3.query(
f"RESTORE DATABASE replicated2 FROM {backup_name} SETTINGS allow_different_database_def=1, allow_different_table_def=1 SETTINGS restore_replace_external_engines_to_null=1, restore_replace_external_table_functions_to_null=1"
)
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_engine"
)
== "0\t0\n"
)
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_function"
)
== "0\t0\n"
)
assert (
node1.query("SELECT count(), sum(id) FROM replicated2.merge_tree") == "1\t100\n"
)
assert (
node1.query(
"SELECT engine FROM system.tables where database = 'replicated2' and name like '%mysql%'"
)
== "Null\nNull\n"
)
assert (
node1.query(
"SELECT engine FROM system.tables where database = 'replicated2' and name like '%merge_tree%'"
)
== "MergeTree\n"
)
cleanup_nodes(nodes, "replicated2")

View File

@ -16,18 +16,39 @@ ${CLICKHOUSE_CLIENT} --query="CREATE TABLE buffer_00763_1 (s String) ENGINE = Bu
${CLICKHOUSE_CLIENT} --query="CREATE TABLE mt_00763_1 (x UInt32, s String) ENGINE = MergeTree ORDER BY x"
${CLICKHOUSE_CLIENT} --query="INSERT INTO mt_00763_1 VALUES (1, '1'), (2, '2'), (3, '3')"
function thread1()
function thread_alter()
{
seq 1 300 | sed -r -e 's/.+/ALTER TABLE mt_00763_1 MODIFY column s UInt32; ALTER TABLE mt_00763_1 MODIFY column s String;/' | ${CLICKHOUSE_CLIENT} --ignore-error ||:
local TIMELIMIT=$((SECONDS+$1))
local it=0
while [ $SECONDS -lt "$TIMELIMIT" ] && [ $it -lt 300 ];
do
it=$((it+1))
$CLICKHOUSE_CLIENT --ignore-error -q "
ALTER TABLE mt_00763_1 MODIFY column s UInt32;
ALTER TABLE mt_00763_1 MODIFY column s String;
" ||:
done
}
function thread2()
function thread_query()
{
seq 1 2000 | sed -r -e 's/.+/SELECT sum(length(s)) FROM buffer_00763_1;/' | ${CLICKHOUSE_CLIENT} --ignore-error 2>&1 | grep -vP '(^3$|^Received exception from server|^Code: 473)'
local TIMELIMIT=$((SECONDS+$1))
local it=0
while [ $SECONDS -lt "$TIMELIMIT" ] && [ $it -lt 2000 ];
do
it=$((it+1))
$CLICKHOUSE_CLIENT --ignore-error -q "
SELECT sum(length(s)) FROM buffer_00763_1;
" 2>&1 | grep -vP '(^3$|^Received exception from server|^Code: 473)'
done
}
thread1 &
thread2 &
export -f thread_alter
export -f thread_query
TIMEOUT=30
thread_alter $TIMEOUT &
thread_query $TIMEOUT &
wait

View File

@ -193,7 +193,7 @@ def main():
url = os.environ["CLICKHOUSE_URL"] + "&max_threads=1"
default_index_granularity = 10
total_rows = 8 * default_index_granularity
total_rows = 7 * default_index_granularity
step = default_index_granularity
session = requests.Session()
for index_granularity in [

View File

@ -30,14 +30,14 @@ QUERY id: 0
FUNCTION id: 15, function_name: toTypeName, function_type: ordinary, result_type: String
ARGUMENTS
LIST id: 16, nodes: 1
FUNCTION id: 2, function_name: anyIf, function_type: aggregate, result_type: Nullable(Int128)
FUNCTION id: 17, function_name: anyIf, function_type: aggregate, result_type: Nullable(Int128)
ARGUMENTS
LIST id: 3, nodes: 2
FUNCTION id: 4, function_name: _CAST, function_type: ordinary, result_type: Nullable(Int128)
LIST id: 18, nodes: 2
FUNCTION id: 19, function_name: _CAST, function_type: ordinary, result_type: Nullable(Int128)
ARGUMENTS
LIST id: 5, nodes: 2
LIST id: 20, nodes: 2
COLUMN id: 6, column_name: number, result_type: UInt64, source_id: 7
CONSTANT id: 8, constant_value: \'Nullable(Int128)\', constant_value_type: String
CONSTANT id: 21, constant_value: \'Nullable(Int128)\', constant_value_type: String
FUNCTION id: 9, function_name: equals, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 10, nodes: 2
@ -50,8 +50,8 @@ QUERY id: 0
JOIN TREE
TABLE_FUNCTION id: 7, alias: __table1, table_function_name: numbers
ARGUMENTS
LIST id: 17, nodes: 1
CONSTANT id: 18, constant_value: UInt64_100, constant_value_type: UInt8
LIST id: 22, nodes: 1
CONSTANT id: 23, constant_value: UInt64_100, constant_value_type: UInt8
SELECT any(if((number % 10) = 5, CAST(NULL, 'Nullable(Int128)'), number)) AS a, toTypeName(a) FROM numbers(100) AS a;
0 Nullable(Int128)
EXPLAIN QUERY TREE SELECT any(if((number % 10) = 5, CAST(NULL, 'Nullable(Int128)'), number)) AS a, toTypeName(a) FROM numbers(100);
@ -84,17 +84,17 @@ QUERY id: 0
FUNCTION id: 17, function_name: toTypeName, function_type: ordinary, result_type: String
ARGUMENTS
LIST id: 18, nodes: 1
FUNCTION id: 2, function_name: anyIf, function_type: aggregate, result_type: Nullable(Int128)
FUNCTION id: 19, function_name: anyIf, function_type: aggregate, result_type: Nullable(Int128)
ARGUMENTS
LIST id: 3, nodes: 2
FUNCTION id: 4, function_name: _CAST, function_type: ordinary, result_type: Nullable(Int128)
LIST id: 20, nodes: 2
FUNCTION id: 21, function_name: _CAST, function_type: ordinary, result_type: Nullable(Int128)
ARGUMENTS
LIST id: 5, nodes: 2
LIST id: 22, nodes: 2
COLUMN id: 6, column_name: number, result_type: UInt64, source_id: 7
CONSTANT id: 8, constant_value: \'Nullable(Int128)\', constant_value_type: String
FUNCTION id: 9, function_name: not, function_type: ordinary, result_type: UInt8
CONSTANT id: 23, constant_value: \'Nullable(Int128)\', constant_value_type: String
FUNCTION id: 24, function_name: not, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 10, nodes: 1
LIST id: 25, nodes: 1
FUNCTION id: 11, function_name: equals, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 12, nodes: 2
@ -107,5 +107,5 @@ QUERY id: 0
JOIN TREE
TABLE_FUNCTION id: 7, alias: __table1, table_function_name: numbers
ARGUMENTS
LIST id: 19, nodes: 1
CONSTANT id: 20, constant_value: UInt64_100, constant_value_type: UInt8
LIST id: 26, nodes: 1
CONSTANT id: 27, constant_value: UInt64_100, constant_value_type: UInt8

View File

@ -5,8 +5,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists test"
$CLICKHOUSE_CLIENT -q "create table test(id UInt64, t DateTime64) Engine=MergeTree order by id"
$CLICKHOUSE_CLIENT -q "insert into test from infile '$CURDIR/data_orc/test_reader_time_zone.snappy.orc' SETTINGS input_format_orc_read_use_writer_time_zone=true FORMAT ORC"
$CLICKHOUSE_CLIENT -q "select * from test SETTINGS session_timezone='Asia/Shanghai'"
$CLICKHOUSE_CLIENT -q "drop table test"
$CLICKHOUSE_CLIENT -q "drop table if exists test_orc_read_timezone"
$CLICKHOUSE_CLIENT -q "create table test_orc_read_timezone(id UInt64, t DateTime64) Engine=MergeTree order by id"
$CLICKHOUSE_CLIENT -q "insert into test_orc_read_timezone from infile '$CURDIR/data_orc/test_reader_time_zone.snappy.orc' SETTINGS input_format_orc_reader_time_zone_name='Asia/Shanghai' FORMAT ORC"
$CLICKHOUSE_CLIENT -q "select * from test_orc_read_timezone SETTINGS session_timezone='Asia/Shanghai'"
$CLICKHOUSE_CLIENT -q "drop table test_orc_read_timezone"

View File

@ -0,0 +1,9 @@
03
AggregateFunction(count, Nullable(UInt64))
function_name: _CAST, function_type: ordinary, result_type: AggregateFunction(count, Nullable(UInt64))
function_name: countStateIf, function_type: aggregate, result_type: AggregateFunction(count, UInt64)
constant_value: \'AggregateFunction(count, Nullable(UInt64))\', constant_value_type: String
AggregateFunction(uniq, Nullable(UInt64))
010003000000007518F0A8E7830665
function_name: uniqState, function_type: aggregate, result_type: AggregateFunction(uniq, Nullable(UInt64))
----

View File

@ -0,0 +1,39 @@
SET allow_experimental_analyzer = 1;
-- For function count, rewrite countState to countStateIf changes the type from AggregateFunction(count, Nullable(UInt64)) to AggregateFunction(count, UInt64)
-- We can cast AggregateFunction(count, UInt64) back to AggregateFunction(count, Nullable(UInt64)) with additional _CAST
select hex(countState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1;
select toTypeName(countState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1;
select arrayStringConcat(arraySlice(splitByString(', ', trimLeft(explain)), 2), ', ') from (explain query tree select hex(countState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1) where explain like '%AggregateFunction%';
-- For function uniq, rewrite uniqState to uniqStateIf changes the type from AggregateFunction(uniq, Nullable(UInt64)) to AggregateFunction(uniq, UInt64)
-- We can't cast AggregateFunction(uniq, UInt64) back to AggregateFunction(uniq, Nullable(UInt64)) so rewrite is not happening.
select toTypeName(uniqState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1;
select hex(uniqState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1;
select arrayStringConcat(arraySlice(splitByString(', ', trimLeft(explain)), 2), ', ') from (explain query tree select hex(uniqState(if(toNullable(number % 2 = 0), number, null))) from numbers(5) settings optimize_rewrite_aggregate_function_with_if=1) where explain like '%AggregateFunction%';
select '----';
CREATE TABLE a
(
`a_id` String
)
ENGINE = MergeTree
PARTITION BY tuple()
ORDER BY tuple();
CREATE TABLE b
(
`b_id` AggregateFunction(uniq, Nullable(String))
)
ENGINE = AggregatingMergeTree
PARTITION BY tuple()
ORDER BY tuple();
CREATE MATERIALIZED VIEW mv TO b
(
`b_id` AggregateFunction(uniq, Nullable(String))
)
AS SELECT uniqState(if(a_id != '', a_id, NULL)) AS b_id
FROM a;