mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Merge remote-tracking branch 'rschu1ze/master' into move-isallascii
This commit is contained in:
commit
70637b2ecf
7
.github/actions/common_setup/action.yml
vendored
7
.github/actions/common_setup/action.yml
vendored
@ -28,3 +28,10 @@ runs:
|
||||
run: |
|
||||
# to remove every leftovers
|
||||
sudo rm -fr "$TEMP_PATH" && mkdir -p "$TEMP_PATH"
|
||||
- name: Tune vm.mmap_rnd_bits for sanitizers
|
||||
shell: bash
|
||||
run: |
|
||||
sudo sysctl vm.mmap_rnd_bits
|
||||
# https://github.com/google/sanitizers/issues/856
|
||||
echo "Tune vm.mmap_rnd_bits for sanitizers"
|
||||
sudo sysctl vm.mmap_rnd_bits=28
|
||||
|
@ -1,11 +1,23 @@
|
||||
set (DEFAULT_LIBS "-nodefaultlibs")
|
||||
|
||||
if (${CMAKE_SYSTEM_PROCESSOR} STREQUAL "amd64")
|
||||
execute_process (COMMAND ${CMAKE_CXX_COMPILER} --print-file-name=libclang_rt.builtins-x86_64.a OUTPUT_VARIABLE BUILTINS_LIBRARY OUTPUT_STRIP_TRAILING_WHITESPACE)
|
||||
set(system_processor "x86_64")
|
||||
else ()
|
||||
execute_process (COMMAND ${CMAKE_CXX_COMPILER} --print-file-name=libclang_rt.builtins-${CMAKE_SYSTEM_PROCESSOR}.a OUTPUT_VARIABLE BUILTINS_LIBRARY OUTPUT_STRIP_TRAILING_WHITESPACE)
|
||||
set(system_processor "${CMAKE_SYSTEM_PROCESSOR}")
|
||||
endif ()
|
||||
|
||||
file(GLOB bprefix "/usr/local/llvm${COMPILER_VERSION_MAJOR}/lib/clang/${COMPILER_VERSION_MAJOR}/lib/${system_processor}-portbld-freebsd*/")
|
||||
message(STATUS "-Bprefix: ${bprefix}")
|
||||
|
||||
execute_process(COMMAND ${CMAKE_CXX_COMPILER} -Bprefix=${bprefix} --print-file-name=libclang_rt.builtins-${system_processor}.a OUTPUT_VARIABLE BUILTINS_LIBRARY OUTPUT_STRIP_TRAILING_WHITESPACE)
|
||||
# --print-file-name simply prints what you passed in case of nothing was resolved, so let's try one other possible option
|
||||
if (BUILTINS_LIBRARY STREQUAL "libclang_rt.builtins-${system_processor}.a")
|
||||
execute_process(COMMAND ${CMAKE_CXX_COMPILER} -Bprefix=${bprefix} --print-file-name=libclang_rt.builtins.a OUTPUT_VARIABLE BUILTINS_LIBRARY OUTPUT_STRIP_TRAILING_WHITESPACE)
|
||||
endif()
|
||||
if (BUILTINS_LIBRARY STREQUAL "libclang_rt.builtins.a")
|
||||
message(FATAL_ERROR "libclang_rt.builtins had not been found")
|
||||
endif()
|
||||
|
||||
set (DEFAULT_LIBS "${DEFAULT_LIBS} ${BUILTINS_LIBRARY} ${COVERAGE_OPTION} -lc -lm -lrt -lpthread")
|
||||
|
||||
message(STATUS "Default libraries: ${DEFAULT_LIBS}")
|
||||
|
@ -7,7 +7,7 @@ endif()
|
||||
|
||||
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libbcrypt")
|
||||
|
||||
set(SRCS
|
||||
set(SRCS
|
||||
"${LIBRARY_DIR}/bcrypt.c"
|
||||
"${LIBRARY_DIR}/crypt_blowfish/crypt_blowfish.c"
|
||||
"${LIBRARY_DIR}/crypt_blowfish/crypt_gensalt.c"
|
||||
@ -16,4 +16,13 @@ set(SRCS
|
||||
|
||||
add_library(_bcrypt ${SRCS})
|
||||
target_include_directories(_bcrypt SYSTEM PUBLIC "${LIBRARY_DIR}")
|
||||
# Avoid conflicts for crypt_r on FreeBSD [1]:
|
||||
#
|
||||
# - char *crypt_r(__const char *key, __const char *setting, void *data);
|
||||
# - char *crypt_r(const char *, const char *, struct crypt_data *);
|
||||
#
|
||||
# [1]: https://github.com/freebsd/freebsd-src/commit/5f521d7ba72145092ea23ff6081d8791ad6c1f9d
|
||||
#
|
||||
# NOTE: ow-crypt.h is unsed only internally, so PRIVATE is enough
|
||||
target_compile_definitions(_bcrypt PRIVATE -D__SKIP_GNU)
|
||||
add_library(ch_contrib::bcrypt ALIAS _bcrypt)
|
||||
|
@ -1,5 +1,4 @@
|
||||
# rebuild in #33610
|
||||
# docker build -t clickhouse/fasttest .
|
||||
# docker build -t clickhouse/fasttest .
|
||||
ARG FROM_TAG=latest
|
||||
FROM clickhouse/test-util:$FROM_TAG
|
||||
|
||||
|
@ -36,6 +36,11 @@
|
||||
<allow_experimental_object_type>
|
||||
<readonly/>
|
||||
</allow_experimental_object_type>
|
||||
|
||||
<!-- Prevent stack overflow -->
|
||||
<max_ast_depth>
|
||||
<readonly/>
|
||||
</max_ast_depth>
|
||||
</constraints>
|
||||
</default>
|
||||
</profiles>
|
||||
|
@ -154,6 +154,11 @@ EOL
|
||||
<allow_experimental_object_type>
|
||||
<readonly/>
|
||||
</allow_experimental_object_type>
|
||||
|
||||
<!-- Prevent stack overflow -->
|
||||
<max_ast_depth>
|
||||
<readonly/>
|
||||
</max_ast_depth>
|
||||
</constraints>
|
||||
</default>
|
||||
</profiles>
|
||||
|
@ -5,6 +5,14 @@ FROM ubuntu:22.04
|
||||
ARG apt_archive="http://archive.ubuntu.com"
|
||||
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
|
||||
|
||||
# FIXME: rebuild for clang 18.1.3, that contains a workaround [1] for
|
||||
# sanitizers issue [2]:
|
||||
#
|
||||
# $ git tag --contains c2a57034eff048cd36c563c8e0051db3a70991b3 | tail -1
|
||||
# llvmorg-18.1.3
|
||||
#
|
||||
# [1]: https://github.com/llvm/llvm-project/commit/c2a57034eff048cd36c563c8e0051db3a70991b3
|
||||
# [2]: https://github.com/ClickHouse/ClickHouse/issues/64086
|
||||
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=18
|
||||
|
||||
RUN apt-get update \
|
||||
|
@ -148,6 +148,7 @@ if (BUILD_STANDALONE_KEEPER)
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/createReadBufferFromFileBase.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/IOUringReader.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/getIOUringReader.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/WriteBufferFromTemporaryFile.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/WriteBufferWithFinalizeCallback.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/AsynchronousBoundedReadBuffer.cpp
|
||||
|
@ -39,6 +39,7 @@ public:
|
||||
std::optional<UUID> backup_uuid;
|
||||
bool deduplicate_files = true;
|
||||
bool allow_s3_native_copy = true;
|
||||
bool allow_azure_native_copy = true;
|
||||
bool use_same_s3_credentials_for_base_backup = false;
|
||||
bool azure_attempt_to_create_container = true;
|
||||
ReadSettings read_settings;
|
||||
|
@ -31,22 +31,28 @@ namespace ErrorCodes
|
||||
|
||||
BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
|
||||
StorageAzureBlob::Configuration configuration_,
|
||||
bool allow_azure_native_copy,
|
||||
const ReadSettings & read_settings_,
|
||||
const WriteSettings & write_settings_,
|
||||
const ContextPtr & context_)
|
||||
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderAzureBlobStorage"))
|
||||
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
|
||||
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.getConnectionURLWithContainer(), false, false}
|
||||
, configuration(configuration_)
|
||||
{
|
||||
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
|
||||
client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true});
|
||||
|
||||
object_storage = std::make_unique<AzureObjectStorage>("BackupReaderAzureBlobStorage",
|
||||
std::move(client_ptr),
|
||||
StorageAzureBlob::createSettings(context_),
|
||||
configuration_.container);
|
||||
object_storage = std::make_unique<AzureObjectStorage>(
|
||||
"BackupReaderAzureBlobStorage",
|
||||
std::move(client_ptr),
|
||||
StorageAzureBlob::createSettings(context_),
|
||||
configuration.container,
|
||||
configuration.getConnectionURLWithContainer());
|
||||
|
||||
client = object_storage->getAzureBlobStorageClient();
|
||||
settings = object_storage->getSettings();
|
||||
auto settings_copy = *object_storage->getSettings();
|
||||
settings_copy.use_native_copy = allow_azure_native_copy;
|
||||
settings = std::make_unique<const AzureObjectStorageSettings>(settings_copy);
|
||||
}
|
||||
|
||||
BackupReaderAzureBlobStorage::~BackupReaderAzureBlobStorage() = default;
|
||||
@ -76,9 +82,9 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
|
||||
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode)
|
||||
{
|
||||
auto destination_data_source_description = destination_disk->getDataSourceDescription();
|
||||
if ((destination_data_source_description.type == DataSourceType::ObjectStorage)
|
||||
&& (destination_data_source_description.object_storage_type == ObjectStorageType::Azure)
|
||||
&& (destination_data_source_description.is_encrypted == encrypted_in_backup))
|
||||
LOG_TRACE(log, "Source description {}, desctionation description {}", data_source_description.description, destination_data_source_description.description);
|
||||
if (destination_data_source_description.sameKind(data_source_description)
|
||||
&& destination_data_source_description.is_encrypted == encrypted_in_backup)
|
||||
{
|
||||
LOG_TRACE(log, "Copying {} from AzureBlobStorage to disk {}", path_in_backup, destination_disk->getName());
|
||||
auto write_blob_function = [&](const Strings & blob_path, WriteMode mode, const std::optional<ObjectAttributes> &) -> size_t
|
||||
@ -116,12 +122,13 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
|
||||
|
||||
BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
|
||||
StorageAzureBlob::Configuration configuration_,
|
||||
bool allow_azure_native_copy,
|
||||
const ReadSettings & read_settings_,
|
||||
const WriteSettings & write_settings_,
|
||||
const ContextPtr & context_,
|
||||
bool attempt_to_create_container)
|
||||
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterAzureBlobStorage"))
|
||||
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
|
||||
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.getConnectionURLWithContainer(), false, false}
|
||||
, configuration(configuration_)
|
||||
{
|
||||
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false, attempt_to_create_container);
|
||||
@ -130,9 +137,12 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
|
||||
object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
|
||||
std::move(client_ptr),
|
||||
StorageAzureBlob::createSettings(context_),
|
||||
configuration_.container);
|
||||
configuration_.container,
|
||||
configuration.getConnectionURLWithContainer());
|
||||
client = object_storage->getAzureBlobStorageClient();
|
||||
settings = object_storage->getSettings();
|
||||
auto settings_copy = *object_storage->getSettings();
|
||||
settings_copy.use_native_copy = allow_azure_native_copy;
|
||||
settings = std::make_unique<const AzureObjectStorageSettings>(settings_copy);
|
||||
}
|
||||
|
||||
void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
|
||||
@ -140,7 +150,9 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backu
|
||||
{
|
||||
/// Use the native copy as a more optimal way to copy a file from AzureBlobStorage to AzureBlobStorage if it's possible.
|
||||
auto source_data_source_description = src_disk->getDataSourceDescription();
|
||||
if (source_data_source_description.sameKind(data_source_description) && (source_data_source_description.is_encrypted == copy_encrypted))
|
||||
LOG_TRACE(log, "Source description {}, desctionation description {}", source_data_source_description.description, data_source_description.description);
|
||||
if (source_data_source_description.sameKind(data_source_description)
|
||||
&& source_data_source_description.is_encrypted == copy_encrypted)
|
||||
{
|
||||
/// getBlobPath() can return more than 3 elements if the file is stored as multiple objects in AzureBlobStorage container.
|
||||
/// In this case we can't use the native copy.
|
||||
|
@ -16,7 +16,12 @@ namespace DB
|
||||
class BackupReaderAzureBlobStorage : public BackupReaderDefault
|
||||
{
|
||||
public:
|
||||
BackupReaderAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
|
||||
BackupReaderAzureBlobStorage(
|
||||
StorageAzureBlob::Configuration configuration_,
|
||||
bool allow_azure_native_copy,
|
||||
const ReadSettings & read_settings_,
|
||||
const WriteSettings & write_settings_,
|
||||
const ContextPtr & context_);
|
||||
~BackupReaderAzureBlobStorage() override;
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
@ -37,7 +42,13 @@ private:
|
||||
class BackupWriterAzureBlobStorage : public BackupWriterDefault
|
||||
{
|
||||
public:
|
||||
BackupWriterAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_, bool attempt_to_create_container);
|
||||
BackupWriterAzureBlobStorage(
|
||||
StorageAzureBlob::Configuration configuration_,
|
||||
bool allow_azure_native_copy,
|
||||
const ReadSettings & read_settings_,
|
||||
const WriteSettings & write_settings_,
|
||||
const ContextPtr & context_,
|
||||
bool attempt_to_create_container);
|
||||
~BackupWriterAzureBlobStorage() override;
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
|
@ -27,6 +27,7 @@ namespace ErrorCodes
|
||||
M(Bool, decrypt_files_from_encrypted_disks) \
|
||||
M(Bool, deduplicate_files) \
|
||||
M(Bool, allow_s3_native_copy) \
|
||||
M(Bool, allow_azure_native_copy) \
|
||||
M(Bool, use_same_s3_credentials_for_base_backup) \
|
||||
M(Bool, azure_attempt_to_create_container) \
|
||||
M(Bool, read_from_filesystem_cache) \
|
||||
|
@ -44,6 +44,9 @@ struct BackupSettings
|
||||
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
|
||||
bool allow_s3_native_copy = true;
|
||||
|
||||
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
|
||||
bool allow_azure_native_copy = true;
|
||||
|
||||
/// Whether base backup to S3 should inherit credentials from the BACKUP query.
|
||||
bool use_same_s3_credentials_for_base_backup = false;
|
||||
|
||||
|
@ -598,6 +598,7 @@ void BackupsWorker::doBackup(
|
||||
backup_create_params.backup_uuid = backup_settings.backup_uuid;
|
||||
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
|
||||
backup_create_params.allow_s3_native_copy = backup_settings.allow_s3_native_copy;
|
||||
backup_create_params.allow_azure_native_copy = backup_settings.allow_azure_native_copy;
|
||||
backup_create_params.use_same_s3_credentials_for_base_backup = backup_settings.use_same_s3_credentials_for_base_backup;
|
||||
backup_create_params.azure_attempt_to_create_container = backup_settings.azure_attempt_to_create_container;
|
||||
backup_create_params.read_settings = getReadSettingsForBackup(context, backup_settings);
|
||||
|
@ -135,10 +135,12 @@ void registerBackupEngineAzureBlobStorage(BackupFactory & factory)
|
||||
|
||||
if (params.open_mode == IBackup::OpenMode::READ)
|
||||
{
|
||||
auto reader = std::make_shared<BackupReaderAzureBlobStorage>(configuration,
|
||||
params.read_settings,
|
||||
params.write_settings,
|
||||
params.context);
|
||||
auto reader = std::make_shared<BackupReaderAzureBlobStorage>(
|
||||
configuration,
|
||||
params.allow_azure_native_copy,
|
||||
params.read_settings,
|
||||
params.write_settings,
|
||||
params.context);
|
||||
|
||||
return std::make_unique<BackupImpl>(
|
||||
params.backup_info,
|
||||
@ -150,11 +152,13 @@ void registerBackupEngineAzureBlobStorage(BackupFactory & factory)
|
||||
}
|
||||
else
|
||||
{
|
||||
auto writer = std::make_shared<BackupWriterAzureBlobStorage>(configuration,
|
||||
params.read_settings,
|
||||
params.write_settings,
|
||||
params.context,
|
||||
params.azure_attempt_to_create_container);
|
||||
auto writer = std::make_shared<BackupWriterAzureBlobStorage>(
|
||||
configuration,
|
||||
params.allow_azure_native_copy,
|
||||
params.read_settings,
|
||||
params.write_settings,
|
||||
params.context,
|
||||
params.azure_attempt_to_create_container);
|
||||
|
||||
return std::make_unique<BackupImpl>(
|
||||
params.backup_info,
|
||||
|
@ -22,6 +22,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
|
||||
@ -116,6 +117,38 @@ void ColumnNullable::get(size_t n, Field & res) const
|
||||
getNestedColumn().get(n, res);
|
||||
}
|
||||
|
||||
Float64 ColumnNullable::getFloat64(size_t n) const
|
||||
{
|
||||
if (isNullAt(n))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of {} at {} is NULL while calling method getFloat64", getName(), n);
|
||||
else
|
||||
return getNestedColumn().getFloat64(n);
|
||||
}
|
||||
|
||||
Float32 ColumnNullable::getFloat32(size_t n) const
|
||||
{
|
||||
if (isNullAt(n))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of {} at {} is NULL while calling method getFloat32", getName(), n);
|
||||
else
|
||||
return getNestedColumn().getFloat32(n);
|
||||
}
|
||||
|
||||
UInt64 ColumnNullable::getUInt(size_t n) const
|
||||
{
|
||||
if (isNullAt(n))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of {} at {} is NULL while calling method getUInt", getName(), n);
|
||||
else
|
||||
return getNestedColumn().getUInt(n);
|
||||
}
|
||||
|
||||
Int64 ColumnNullable::getInt(size_t n) const
|
||||
{
|
||||
if (isNullAt(n))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of {} at {} is NULL while calling method getInt", getName(), n);
|
||||
else
|
||||
return getNestedColumn().getInt(n);
|
||||
}
|
||||
|
||||
void ColumnNullable::insertData(const char * pos, size_t length)
|
||||
{
|
||||
if (pos == nullptr)
|
||||
|
@ -57,6 +57,10 @@ public:
|
||||
void get(size_t n, Field & res) const override;
|
||||
bool getBool(size_t n) const override { return isNullAt(n) ? false : nested_column->getBool(n); }
|
||||
UInt64 get64(size_t n) const override { return nested_column->get64(n); }
|
||||
Float64 getFloat64(size_t n) const override;
|
||||
Float32 getFloat32(size_t n) const override;
|
||||
UInt64 getUInt(size_t n) const override;
|
||||
Int64 getInt(size_t n) const override;
|
||||
bool isDefaultAt(size_t n) const override { return isNullAt(n); }
|
||||
StringRef getDataAt(size_t) const override;
|
||||
/// Will insert null value if pos=nullptr
|
||||
|
39
src/Common/CopyableAtomic.h
Normal file
39
src/Common/CopyableAtomic.h
Normal file
@ -0,0 +1,39 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <utility>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
template <typename T>
|
||||
struct CopyableAtomic
|
||||
{
|
||||
CopyableAtomic(const CopyableAtomic & other)
|
||||
: value(other.value.load())
|
||||
{}
|
||||
|
||||
explicit CopyableAtomic(T && value_)
|
||||
: value(std::forward<T>(value_))
|
||||
{}
|
||||
|
||||
CopyableAtomic & operator=(const CopyableAtomic & other)
|
||||
{
|
||||
value = other.value.load();
|
||||
return *this;
|
||||
}
|
||||
|
||||
CopyableAtomic & operator=(bool value_)
|
||||
{
|
||||
value = value_;
|
||||
return *this;
|
||||
}
|
||||
|
||||
explicit operator T() const { return value; }
|
||||
|
||||
const T & getValue() const { return value; }
|
||||
|
||||
std::atomic<T> value;
|
||||
};
|
||||
|
||||
}
|
@ -498,8 +498,10 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
|
||||
|
||||
template class ThreadPoolImpl<std::thread>;
|
||||
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>;
|
||||
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, false>>;
|
||||
template class ThreadFromGlobalPoolImpl<true, true>;
|
||||
template class ThreadFromGlobalPoolImpl<true, false>;
|
||||
template class ThreadFromGlobalPoolImpl<false, false>;
|
||||
|
||||
std::unique_ptr<GlobalThreadPool> GlobalThreadPool::the_instance;
|
||||
|
||||
|
@ -242,6 +242,11 @@ public:
|
||||
if (unlikely(global_profiler_real_time_period != 0 || global_profiler_cpu_time_period != 0))
|
||||
thread_status.initGlobalProfiler(global_profiler_real_time_period, global_profiler_cpu_time_period);
|
||||
}
|
||||
else
|
||||
{
|
||||
UNUSED(global_profiler_real_time_period);
|
||||
UNUSED(global_profiler_cpu_time_period);
|
||||
}
|
||||
|
||||
std::apply(function, arguments);
|
||||
},
|
||||
|
@ -23,6 +23,9 @@ thread_local ThreadStatus constinit * current_thread = nullptr;
|
||||
namespace
|
||||
{
|
||||
|
||||
/// For aarch64 16K is not enough (likely due to tons of registers)
|
||||
constexpr size_t UNWIND_MINSIGSTKSZ = 32 << 10;
|
||||
|
||||
/// Alternative stack for signal handling.
|
||||
///
|
||||
/// This stack should not be located in the TLS (thread local storage), since:
|
||||
@ -50,7 +53,7 @@ struct ThreadStack
|
||||
free(data);
|
||||
}
|
||||
|
||||
static size_t getSize() { return std::max<size_t>(16 << 10, MINSIGSTKSZ); }
|
||||
static size_t getSize() { return std::max<size_t>(UNWIND_MINSIGSTKSZ, MINSIGSTKSZ); }
|
||||
void * getData() const { return data; }
|
||||
|
||||
private:
|
||||
@ -124,26 +127,6 @@ ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)
|
||||
#endif
|
||||
}
|
||||
|
||||
void ThreadStatus::initGlobalProfiler([[maybe_unused]] UInt64 global_profiler_real_time_period, [[maybe_unused]] UInt64 global_profiler_cpu_time_period)
|
||||
{
|
||||
#if !defined(SANITIZER) && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD) && !defined(__APPLE__)
|
||||
try
|
||||
{
|
||||
if (global_profiler_real_time_period > 0)
|
||||
query_profiler_real = std::make_unique<QueryProfilerReal>(thread_id,
|
||||
/* period= */ static_cast<UInt32>(global_profiler_real_time_period));
|
||||
|
||||
if (global_profiler_cpu_time_period > 0)
|
||||
query_profiler_cpu = std::make_unique<QueryProfilerCPU>(thread_id,
|
||||
/* period= */ static_cast<UInt32>(global_profiler_cpu_time_period));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException("ThreadStatus", "Cannot initialize GlobalProfiler");
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
ThreadGroupPtr ThreadStatus::getThreadGroup() const
|
||||
{
|
||||
chassert(current_thread == this);
|
||||
|
@ -1,8 +1,15 @@
|
||||
clickhouse_add_executable(zkutil_test_commands zkutil_test_commands.cpp)
|
||||
target_link_libraries(zkutil_test_commands PRIVATE clickhouse_common_zookeeper_no_log)
|
||||
target_link_libraries(zkutil_test_commands PRIVATE
|
||||
clickhouse_common_zookeeper_no_log
|
||||
dbms)
|
||||
|
||||
clickhouse_add_executable(zkutil_test_commands_new_lib zkutil_test_commands_new_lib.cpp)
|
||||
target_link_libraries(zkutil_test_commands_new_lib PRIVATE clickhouse_common_zookeeper_no_log clickhouse_compression)
|
||||
target_link_libraries(zkutil_test_commands_new_lib PRIVATE
|
||||
clickhouse_common_zookeeper_no_log
|
||||
clickhouse_compression
|
||||
dbms)
|
||||
|
||||
clickhouse_add_executable(zkutil_test_async zkutil_test_async.cpp)
|
||||
target_link_libraries(zkutil_test_async PRIVATE clickhouse_common_zookeeper_no_log)
|
||||
target_link_libraries(zkutil_test_async PRIVATE
|
||||
clickhouse_common_zookeeper_no_log
|
||||
dbms)
|
||||
|
@ -20,6 +20,9 @@
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
|
||||
using ThreadFromGlobalPoolSimple = ThreadFromGlobalPoolImpl</* propagate_opentelemetry_context= */ false, /* global_trace_collector_allowed= */ false>;
|
||||
using SimpleThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolSimple>;
|
||||
|
||||
using Key = UInt64;
|
||||
using Value = UInt64;
|
||||
|
||||
@ -255,7 +258,7 @@ int main(int argc, char ** argv)
|
||||
|
||||
std::cerr << std::fixed << std::setprecision(2);
|
||||
|
||||
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, num_threads);
|
||||
SimpleThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, num_threads);
|
||||
|
||||
Source data(n);
|
||||
|
||||
|
@ -20,6 +20,9 @@
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
|
||||
using ThreadFromGlobalPoolSimple = ThreadFromGlobalPoolImpl</* propagate_opentelemetry_context= */ false, /* global_trace_collector_allowed= */ false>;
|
||||
using SimpleThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolSimple>;
|
||||
|
||||
using Key = UInt64;
|
||||
using Value = UInt64;
|
||||
using Source = std::vector<Key>;
|
||||
@ -38,7 +41,7 @@ struct AggregateIndependent
|
||||
template <typename Creator, typename Updater>
|
||||
static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results,
|
||||
Creator && creator, Updater && updater,
|
||||
ThreadPool & pool)
|
||||
SimpleThreadPool & pool)
|
||||
{
|
||||
results.reserve(num_threads);
|
||||
for (size_t i = 0; i < num_threads; ++i)
|
||||
@ -76,7 +79,7 @@ struct AggregateIndependentWithSequentialKeysOptimization
|
||||
template <typename Creator, typename Updater>
|
||||
static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results,
|
||||
Creator && creator, Updater && updater,
|
||||
ThreadPool & pool)
|
||||
SimpleThreadPool & pool)
|
||||
{
|
||||
results.reserve(num_threads);
|
||||
for (size_t i = 0; i < num_threads; ++i)
|
||||
@ -124,7 +127,7 @@ struct MergeSequential
|
||||
template <typename Merger>
|
||||
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
|
||||
Merger && merger,
|
||||
ThreadPool &)
|
||||
SimpleThreadPool &)
|
||||
{
|
||||
for (size_t i = 1; i < num_maps; ++i)
|
||||
{
|
||||
@ -144,7 +147,7 @@ struct MergeSequentialTransposed /// In practice not better than usual.
|
||||
template <typename Merger>
|
||||
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
|
||||
Merger && merger,
|
||||
ThreadPool &)
|
||||
SimpleThreadPool &)
|
||||
{
|
||||
std::vector<typename Map::iterator> iterators(num_maps);
|
||||
for (size_t i = 1; i < num_maps; ++i)
|
||||
@ -177,7 +180,7 @@ struct MergeParallelForTwoLevelTable
|
||||
template <typename Merger>
|
||||
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
|
||||
Merger && merger,
|
||||
ThreadPool & pool)
|
||||
SimpleThreadPool & pool)
|
||||
{
|
||||
for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket)
|
||||
pool.scheduleOrThrowOnError([&, bucket, num_maps]
|
||||
@ -202,7 +205,7 @@ struct Work
|
||||
template <typename Creator, typename Updater, typename Merger>
|
||||
static void NO_INLINE execute(const Source & data, size_t num_threads,
|
||||
Creator && creator, Updater && updater, Merger && merger,
|
||||
ThreadPool & pool)
|
||||
SimpleThreadPool & pool)
|
||||
{
|
||||
std::vector<std::unique_ptr<Map>> intermediate_results;
|
||||
|
||||
@ -282,7 +285,7 @@ int main(int argc, char ** argv)
|
||||
|
||||
std::cerr << std::fixed << std::setprecision(2);
|
||||
|
||||
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, num_threads);
|
||||
SimpleThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, num_threads);
|
||||
|
||||
Source data(n);
|
||||
|
||||
|
@ -14,6 +14,8 @@ int value = 0;
|
||||
static void f() { ++value; }
|
||||
static void * g(void *) { f(); return {}; }
|
||||
|
||||
using ThreadFromGlobalPoolSimple = ThreadFromGlobalPoolImpl</* propagate_opentelemetry_context= */ false, /* global_trace_collector_allowed= */ false>;
|
||||
using SimpleThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolSimple>;
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
@ -72,7 +74,7 @@ int main(int argc, char ** argv)
|
||||
|
||||
test(n, "Create and destroy ThreadPool each iteration", []
|
||||
{
|
||||
ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1);
|
||||
SimpleThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1);
|
||||
tp.scheduleOrThrowOnError(f);
|
||||
tp.wait();
|
||||
});
|
||||
@ -93,7 +95,7 @@ int main(int argc, char ** argv)
|
||||
});
|
||||
|
||||
{
|
||||
ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1);
|
||||
SimpleThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1);
|
||||
|
||||
test(n, "Schedule job for Threadpool each iteration", [&tp]
|
||||
{
|
||||
@ -103,7 +105,7 @@ int main(int argc, char ** argv)
|
||||
}
|
||||
|
||||
{
|
||||
ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 128);
|
||||
SimpleThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 128);
|
||||
|
||||
test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp]
|
||||
{
|
||||
|
@ -3,8 +3,8 @@
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/RWLock.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Core/Types.h>
|
||||
#include <base/types.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <base/phdr_cache.h>
|
||||
#include <random>
|
||||
#include <pcg_random.hpp>
|
||||
@ -541,7 +541,7 @@ TEST(Common, RWLockWriteLockTimeoutDuringWriteWithWaitingRead)
|
||||
events.add(wc ? "Locked wb" : "Failed to lock wb");
|
||||
EXPECT_EQ(wc, nullptr);
|
||||
});
|
||||
|
||||
|
||||
std::thread rc_thread([&] ()
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/callOnce.h>
|
||||
#include <Disks/IO/IOUringReader.h>
|
||||
#include <Disks/IO/getIOUringReader.h>
|
||||
|
||||
#include <Core/ServerSettings.h>
|
||||
|
||||
@ -303,10 +304,10 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
|
||||
}
|
||||
|
||||
#if USE_LIBURING
|
||||
IOUringReader & Context::getIOURingReader() const
|
||||
IOUringReader & Context::getIOUringReader() const
|
||||
{
|
||||
callOnce(shared->io_uring_reader_initialized, [&] {
|
||||
shared->io_uring_reader = std::make_unique<IOUringReader>(512);
|
||||
shared->io_uring_reader = createIOUringReader();
|
||||
});
|
||||
|
||||
return *shared->io_uring_reader;
|
||||
@ -457,4 +458,9 @@ const ServerSettings & Context::getServerSettings() const
|
||||
return shared->server_settings;
|
||||
}
|
||||
|
||||
bool Context::hasTraceCollector() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -137,7 +137,7 @@ public:
|
||||
|
||||
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
|
||||
#if USE_LIBURING
|
||||
IOUringReader & getIOURingReader() const;
|
||||
IOUringReader & getIOUringReader() const;
|
||||
#endif
|
||||
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
|
||||
ThreadPool & getThreadPoolWriter() const;
|
||||
@ -163,6 +163,8 @@ public:
|
||||
zkutil::ZooKeeperPtr getZooKeeper() const;
|
||||
|
||||
const ServerSettings & getServerSettings() const;
|
||||
|
||||
bool hasTraceCollector() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/ThreadStatus.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -11,4 +12,8 @@ void CurrentThread::attachToGroup(const ThreadGroupPtr &)
|
||||
{
|
||||
}
|
||||
|
||||
void ThreadStatus::initGlobalProfiler(UInt64 /*global_profiler_real_time_period*/, UInt64 /*global_profiler_cpu_time_period*/)
|
||||
{
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_RESTORE_FROM_FIELD_DUMP;
|
||||
extern const int DECIMAL_OVERFLOW;
|
||||
extern const int INCORRECT_DATA;
|
||||
}
|
||||
|
||||
template <is_decimal T>
|
||||
@ -28,7 +29,7 @@ T DecimalField<T>::getScaleMultiplier() const
|
||||
return DecimalUtils::scaleMultiplier<T>(scale);
|
||||
}
|
||||
|
||||
inline Field getBinaryValue(UInt8 type, ReadBuffer & buf)
|
||||
Field getBinaryValue(UInt8 type, ReadBuffer & buf)
|
||||
{
|
||||
switch (static_cast<Field::Types::Which>(type))
|
||||
{
|
||||
@ -146,7 +147,7 @@ inline Field getBinaryValue(UInt8 type, ReadBuffer & buf)
|
||||
case Field::Types::CustomType:
|
||||
return Field();
|
||||
}
|
||||
UNREACHABLE();
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Unknown field type {}", std::to_string(type));
|
||||
}
|
||||
|
||||
void readBinary(Array & x, ReadBuffer & buf)
|
||||
@ -575,7 +576,7 @@ template bool decimalLessOrEqual<Decimal256>(Decimal256 x, Decimal256 y, UInt32
|
||||
template bool decimalLessOrEqual<DateTime64>(DateTime64 x, DateTime64 y, UInt32 x_scale, UInt32 y_scale);
|
||||
|
||||
|
||||
inline void writeText(const Null & x, WriteBuffer & buf)
|
||||
void writeText(const Null & x, WriteBuffer & buf)
|
||||
{
|
||||
if (x.isNegativeInfinity())
|
||||
writeText("-Inf", buf);
|
||||
|
@ -1,4 +1,3 @@
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Databases/MySQL/MySQLBinlog.h>
|
||||
#include <Databases/MySQL/MySQLBinlogEventsDispatcher.h>
|
||||
#include <Databases/MySQL/MySQLBinlogClient.h>
|
||||
|
@ -1,5 +1,4 @@
|
||||
#include "IOUringReader.h"
|
||||
#include <memory>
|
||||
|
||||
#if USE_LIBURING
|
||||
|
||||
@ -13,6 +12,7 @@
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
|
@ -4,9 +4,9 @@
|
||||
#include <IO/MMapReadBufferFromFileWithCache.h>
|
||||
#include <IO/AsynchronousReadBufferFromFile.h>
|
||||
#include <Disks/IO/IOUringReader.h>
|
||||
#include <Disks/IO/getIOUringReader.h>
|
||||
#include <Disks/IO/ThreadPoolReader.h>
|
||||
#include <Disks/IO/getThreadPoolReader.h>
|
||||
#include <IO/SynchronousReader.h>
|
||||
#include <IO/AsynchronousReader.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include "config.h"
|
||||
@ -100,14 +100,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
||||
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
|
||||
{
|
||||
#if USE_LIBURING
|
||||
auto global_context = Context::getGlobalContextInstance();
|
||||
if (!global_context)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot obtain io_uring reader (global context not initialized)");
|
||||
|
||||
auto & reader = global_context->getIOURingReader();
|
||||
if (!reader.isSupported())
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
|
||||
|
||||
auto & reader = getIOUringReaderOrThrow();
|
||||
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
||||
reader,
|
||||
settings.priority,
|
||||
|
41
src/Disks/IO/getIOUringReader.cpp
Normal file
41
src/Disks/IO/getIOUringReader.cpp
Normal file
@ -0,0 +1,41 @@
|
||||
#include <Disks/IO/getIOUringReader.h>
|
||||
|
||||
#if USE_LIBURING
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/ErrorCodes.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
std::unique_ptr<IOUringReader> createIOUringReader()
|
||||
{
|
||||
return std::make_unique<IOUringReader>(512);
|
||||
}
|
||||
|
||||
IOUringReader & getIOUringReaderOrThrow(ContextPtr context)
|
||||
{
|
||||
auto & reader = context->getIOUringReader();
|
||||
if (!reader.isSupported())
|
||||
{
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
|
||||
IOUringReader & getIOUringReaderOrThrow()
|
||||
{
|
||||
auto context = Context::getGlobalContextInstance();
|
||||
if (!context)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
|
||||
return getIOUringReaderOrThrow(context);
|
||||
}
|
||||
|
||||
}
|
||||
#endif
|
21
src/Disks/IO/getIOUringReader.h
Normal file
21
src/Disks/IO/getIOUringReader.h
Normal file
@ -0,0 +1,21 @@
|
||||
#pragma once
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#if USE_LIBURING
|
||||
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Disks/IO/IOUringReader.h>
|
||||
#include <memory>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::unique_ptr<IOUringReader> createIOUringReader();
|
||||
|
||||
IOUringReader & getIOUringReaderOrThrow(ContextPtr);
|
||||
|
||||
IOUringReader & getIOUringReaderOrThrow();
|
||||
|
||||
}
|
||||
#endif
|
@ -107,11 +107,13 @@ AzureObjectStorage::AzureObjectStorage(
|
||||
const String & name_,
|
||||
AzureClientPtr && client_,
|
||||
SettingsPtr && settings_,
|
||||
const String & object_namespace_)
|
||||
const String & object_namespace_,
|
||||
const String & description_)
|
||||
: name(name_)
|
||||
, client(std::move(client_))
|
||||
, settings(std::move(settings_))
|
||||
, object_namespace(object_namespace_)
|
||||
, description(description_)
|
||||
, log(getLogger("AzureObjectStorage"))
|
||||
{
|
||||
}
|
||||
@ -409,7 +411,8 @@ std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(const std
|
||||
name,
|
||||
getAzureBlobContainerClient(config, config_prefix),
|
||||
getAzureBlobStorageSettings(config, config_prefix, context),
|
||||
object_namespace
|
||||
object_namespace,
|
||||
description
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -81,7 +81,8 @@ public:
|
||||
const String & name_,
|
||||
AzureClientPtr && client_,
|
||||
SettingsPtr && settings_,
|
||||
const String & object_namespace_);
|
||||
const String & object_namespace_,
|
||||
const String & description_);
|
||||
|
||||
void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const override;
|
||||
|
||||
@ -93,7 +94,7 @@ public:
|
||||
|
||||
std::string getCommonKeyPrefix() const override { return ""; }
|
||||
|
||||
std::string getDescription() const override { return client.get()->GetUrl(); }
|
||||
std::string getDescription() const override { return description; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
@ -172,6 +173,7 @@ private:
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
|
||||
MultiVersion<AzureObjectStorageSettings> settings;
|
||||
const String object_namespace; /// container + prefix
|
||||
const String description; /// url + container
|
||||
|
||||
LoggerPtr log;
|
||||
};
|
||||
|
@ -306,11 +306,14 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
|
||||
bool /* skip_access_check */) -> ObjectStoragePtr
|
||||
{
|
||||
AzureBlobStorageEndpoint endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
|
||||
std::string endpoint_string = endpoint.getEndpoint();
|
||||
|
||||
return createObjectStorage<AzureObjectStorage>(
|
||||
ObjectStorageType::Azure, config, config_prefix, name,
|
||||
getAzureBlobContainerClient(config, config_prefix),
|
||||
getAzureBlobStorageSettings(config, config_prefix, context),
|
||||
endpoint.prefix.empty() ? endpoint.container_name : endpoint.container_name + "/" + endpoint.prefix);
|
||||
endpoint.prefix.empty() ? endpoint.container_name : endpoint.container_name + "/" + endpoint.prefix,
|
||||
endpoint.prefix.empty() ? endpoint_string : endpoint_string.substr(0, endpoint_string.length() - endpoint.prefix.length()));
|
||||
};
|
||||
factory.registerObjectStorageType("azure_blob_storage", creator);
|
||||
factory.registerObjectStorageType("azure", creator);
|
||||
|
@ -3,7 +3,7 @@ add_subdirectory(divide)
|
||||
include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
|
||||
add_headers_and_sources(clickhouse_functions .)
|
||||
|
||||
extract_into_parent_list(clickhouse_functions_sources dbms_sources
|
||||
set(DBMS_FUNCTIONS
|
||||
IFunction.cpp
|
||||
FunctionFactory.cpp
|
||||
FunctionHelpers.cpp
|
||||
@ -15,7 +15,10 @@ extract_into_parent_list(clickhouse_functions_sources dbms_sources
|
||||
checkHyperscanRegexp.cpp
|
||||
array/has.cpp
|
||||
CastOverloadResolver.cpp
|
||||
# Provides dependency for cast - createFunctionBaseCast()
|
||||
FunctionsConversion.cpp
|
||||
)
|
||||
extract_into_parent_list(clickhouse_functions_sources dbms_sources ${DBMS_FUNCTIONS})
|
||||
extract_into_parent_list(clickhouse_functions_headers dbms_headers
|
||||
IFunction.h
|
||||
FunctionFactory.h
|
||||
@ -26,6 +29,10 @@ extract_into_parent_list(clickhouse_functions_headers dbms_headers
|
||||
)
|
||||
|
||||
add_library(clickhouse_functions_obj OBJECT ${clickhouse_functions_headers} ${clickhouse_functions_sources})
|
||||
if (OMIT_HEAVY_DEBUG_SYMBOLS)
|
||||
target_compile_options(clickhouse_functions_obj PRIVATE "-g0")
|
||||
set_source_files_properties(${DBMS_FUNCTIONS} PROPERTIES COMPILE_FLAGS "-g0")
|
||||
endif()
|
||||
|
||||
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_obj>)
|
||||
|
||||
@ -62,10 +69,6 @@ if (TARGET OpenSSL::Crypto)
|
||||
list (APPEND PUBLIC_LIBS OpenSSL::Crypto)
|
||||
endif()
|
||||
|
||||
if (OMIT_HEAVY_DEBUG_SYMBOLS)
|
||||
target_compile_options(clickhouse_functions_obj PRIVATE "-g0")
|
||||
endif()
|
||||
|
||||
if (TARGET ch_contrib::icu)
|
||||
list (APPEND PRIVATE_LIBS ch_contrib::icu)
|
||||
endif ()
|
||||
|
@ -49,7 +49,7 @@ public:
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
"Illegal type {} of argument {} of function {}. Must be Float64",
|
||||
arg->getName(), i, getName());
|
||||
arg->getName(), i + 1, getName());
|
||||
}
|
||||
return std::make_shared<DataTypeFloat64>();
|
||||
}
|
||||
|
@ -289,6 +289,7 @@ void copyAzureBlobStorageFile(
|
||||
|
||||
if (settings->use_native_copy)
|
||||
{
|
||||
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);
|
||||
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
|
||||
if (dest_client->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
|
||||
|
@ -846,7 +846,7 @@ LockedKey::~LockedKey()
|
||||
/// See comment near cleanupThreadFunc() for more details.
|
||||
|
||||
key_metadata->key_state = KeyMetadata::KeyState::REMOVING;
|
||||
LOG_DEBUG(key_metadata->logger(), "Submitting key {} for removal", getKey());
|
||||
LOG_TRACE(key_metadata->logger(), "Submitting key {} for removal", getKey());
|
||||
key_metadata->addToCleanupQueue();
|
||||
}
|
||||
|
||||
|
@ -37,6 +37,7 @@
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
#include <Disks/StoragePolicy.h>
|
||||
#include <Disks/IO/IOUringReader.h>
|
||||
#include <Disks/IO/getIOUringReader.h>
|
||||
#include <IO/SynchronousReader.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <Interpreters/ActionLocksManager.h>
|
||||
@ -5188,10 +5189,10 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
|
||||
}
|
||||
|
||||
#if USE_LIBURING
|
||||
IOUringReader & Context::getIOURingReader() const
|
||||
IOUringReader & Context::getIOUringReader() const
|
||||
{
|
||||
callOnce(shared->io_uring_reader_initialized, [&] {
|
||||
shared->io_uring_reader = std::make_unique<IOUringReader>(512);
|
||||
shared->io_uring_reader = createIOUringReader();
|
||||
});
|
||||
|
||||
return *shared->io_uring_reader;
|
||||
|
@ -1246,7 +1246,7 @@ public:
|
||||
|
||||
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
|
||||
#if USE_LIBURING
|
||||
IOUringReader & getIOURingReader() const;
|
||||
IOUringReader & getIOUringReader() const;
|
||||
#endif
|
||||
|
||||
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
|
||||
|
@ -458,6 +458,31 @@ void ThreadStatus::resetPerformanceCountersLastUsage()
|
||||
taskstats->reset();
|
||||
}
|
||||
|
||||
void ThreadStatus::initGlobalProfiler([[maybe_unused]] UInt64 global_profiler_real_time_period, [[maybe_unused]] UInt64 global_profiler_cpu_time_period)
|
||||
{
|
||||
#if !defined(SANITIZER) && !defined(__APPLE__)
|
||||
/// profilers are useless without trace collector
|
||||
auto global_context_ptr = global_context.lock();
|
||||
if (!global_context_ptr || !global_context_ptr->hasTraceCollector())
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
if (global_profiler_real_time_period > 0)
|
||||
query_profiler_real = std::make_unique<QueryProfilerReal>(thread_id,
|
||||
/* period= */ static_cast<UInt32>(global_profiler_real_time_period));
|
||||
|
||||
if (global_profiler_cpu_time_period > 0)
|
||||
query_profiler_cpu = std::make_unique<QueryProfilerCPU>(thread_id,
|
||||
/* period= */ static_cast<UInt32>(global_profiler_cpu_time_period));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException("ThreadStatus", "Cannot initialize GlobalProfiler");
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void ThreadStatus::initQueryProfiler()
|
||||
{
|
||||
if (internal_thread)
|
||||
|
@ -7,6 +7,8 @@
|
||||
#include <Common/UTF8Helpers.h>
|
||||
#include <Common/PODArray.h>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -16,7 +18,14 @@ PrettyBlockOutputFormat::PrettyBlockOutputFormat(
|
||||
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool mono_block_, bool color_)
|
||||
: IOutputFormat(header_, out_), format_settings(format_settings_), serializations(header_.getSerializations()), color(color_), mono_block(mono_block_)
|
||||
{
|
||||
readable_number_tip = header_.getColumns().size() == 1 && WhichDataType(header_.getDataTypes()[0]->getTypeId()).isNumber();
|
||||
/// Decide whether we should print a tip near the single number value in the result.
|
||||
if (header_.getColumns().size() == 1)
|
||||
{
|
||||
/// Check if it is a numeric type, possible wrapped by Nullable or LowCardinality.
|
||||
DataTypePtr type = removeNullable(recursiveRemoveLowCardinality(header_.getDataTypes().at(0)));
|
||||
if (isNumber(type))
|
||||
readable_number_tip = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -497,6 +506,9 @@ void PrettyBlockOutputFormat::writeReadableNumberTip(const Chunk & chunk)
|
||||
if (!is_single_number)
|
||||
return;
|
||||
|
||||
if (columns[0]->isNullAt(0))
|
||||
return;
|
||||
|
||||
auto value = columns[0]->getFloat64(0);
|
||||
auto threshold = format_settings.pretty.output_format_pretty_single_large_number_tip_threshold;
|
||||
|
||||
|
@ -1296,8 +1296,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
|
||||
size_t last_index_mark = 0;
|
||||
|
||||
PostingsCacheForStore cache_in_store;
|
||||
|
||||
if (dynamic_cast<const MergeTreeIndexFullText *>(&*index_helper) != nullptr)
|
||||
if (dynamic_cast<const MergeTreeIndexFullText *>(index_helper.get()))
|
||||
cache_in_store.store = GinIndexStoreFactory::instance().get(index_helper->getFileName(), part->getDataPartStoragePtr());
|
||||
|
||||
for (size_t i = 0; i < ranges.size(); ++i)
|
||||
@ -1315,12 +1314,12 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
|
||||
auto ann_condition = std::dynamic_pointer_cast<IMergeTreeIndexConditionApproximateNearestNeighbor>(condition);
|
||||
if (ann_condition != nullptr)
|
||||
{
|
||||
// vector of indexes of useful ranges
|
||||
/// An array of indices of useful ranges.
|
||||
auto result = ann_condition->getUsefulRanges(granule);
|
||||
|
||||
for (auto range : result)
|
||||
{
|
||||
// range for corresponding index
|
||||
/// The range for the corresponding index.
|
||||
MarkRange data_range(
|
||||
std::max(ranges[i].begin, index_mark * index_granularity + range),
|
||||
std::min(ranges[i].end, index_mark * index_granularity + range + 1));
|
||||
@ -1344,8 +1343,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
|
||||
continue;
|
||||
|
||||
MarkRange data_range(
|
||||
std::max(ranges[i].begin, index_mark * index_granularity),
|
||||
std::min(ranges[i].end, (index_mark + 1) * index_granularity));
|
||||
std::max(ranges[i].begin, index_mark * index_granularity),
|
||||
std::min(ranges[i].end, (index_mark + 1) * index_granularity));
|
||||
|
||||
if (res.empty() || data_range.begin - res.back().end > min_marks_for_seek)
|
||||
res.push_back(data_range);
|
||||
|
@ -35,8 +35,7 @@ MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
|
||||
size_t max_rows_)
|
||||
: index_name(index_name_)
|
||||
, max_rows(max_rows_)
|
||||
, index_sample_block(index_sample_block_)
|
||||
, block(index_sample_block)
|
||||
, block(index_sample_block_.cloneEmpty())
|
||||
{
|
||||
}
|
||||
|
||||
@ -47,8 +46,7 @@ MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
|
||||
MutableColumns && mutable_columns_)
|
||||
: index_name(index_name_)
|
||||
, max_rows(max_rows_)
|
||||
, index_sample_block(index_sample_block_)
|
||||
, block(index_sample_block.cloneWithColumns(std::move(mutable_columns_)))
|
||||
, block(index_sample_block_.cloneWithColumns(std::move(mutable_columns_)))
|
||||
{
|
||||
}
|
||||
|
||||
@ -67,10 +65,11 @@ void MergeTreeIndexGranuleSet::serializeBinary(WriteBuffer & ostr) const
|
||||
}
|
||||
|
||||
size_serialization->serializeBinary(size(), ostr, {});
|
||||
size_t num_columns = block.columns();
|
||||
|
||||
for (size_t i = 0; i < index_sample_block.columns(); ++i)
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
const auto & type = index_sample_block.getByPosition(i).type;
|
||||
const auto & type = block.getByPosition(i).type;
|
||||
|
||||
ISerialization::SerializeBinaryBulkSettings settings;
|
||||
settings.getter = [&ostr](ISerialization::SubstreamPath) -> WriteBuffer * { return &ostr; };
|
||||
@ -92,8 +91,6 @@ void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr, MergeTreeInd
|
||||
if (version != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown index version {}.", version);
|
||||
|
||||
block.clear();
|
||||
|
||||
Field field_rows;
|
||||
const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>());
|
||||
size_type->getDefaultSerialization()->deserializeBinary(field_rows, istr, {});
|
||||
@ -102,24 +99,22 @@ void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr, MergeTreeInd
|
||||
if (rows_to_read == 0)
|
||||
return;
|
||||
|
||||
for (size_t i = 0; i < index_sample_block.columns(); ++i)
|
||||
size_t num_columns = block.columns();
|
||||
|
||||
ISerialization::DeserializeBinaryBulkSettings settings;
|
||||
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
|
||||
settings.position_independent_encoding = false;
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
const auto & column = index_sample_block.getByPosition(i);
|
||||
const auto & type = column.type;
|
||||
ColumnPtr new_column = type->createColumn();
|
||||
|
||||
|
||||
ISerialization::DeserializeBinaryBulkSettings settings;
|
||||
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
|
||||
settings.position_independent_encoding = false;
|
||||
auto & elem = block.getByPosition(i);
|
||||
elem.column = elem.column->cloneEmpty();
|
||||
|
||||
ISerialization::DeserializeBinaryBulkStatePtr state;
|
||||
auto serialization = type->getDefaultSerialization();
|
||||
auto serialization = elem.type->getDefaultSerialization();
|
||||
|
||||
serialization->deserializeBinaryBulkStatePrefix(settings, state);
|
||||
serialization->deserializeBinaryBulkWithMultipleStreams(new_column, rows_to_read, settings, state, nullptr);
|
||||
|
||||
block.insert(ColumnWithTypeAndName(new_column, type, column.name));
|
||||
serialization->deserializeBinaryBulkWithMultipleStreams(elem.column, rows_to_read, settings, state, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
@ -272,6 +267,8 @@ MergeTreeIndexConditionSet::MergeTreeIndexConditionSet(
|
||||
|
||||
filter_actions_dag->removeUnusedActions();
|
||||
actions = std::make_shared<ExpressionActions>(filter_actions_dag);
|
||||
|
||||
actions_output_column_name = filter_actions_dag->getOutputs().at(0)->result_name;
|
||||
}
|
||||
|
||||
bool MergeTreeIndexConditionSet::alwaysUnknownOrTrue() const
|
||||
@ -284,42 +281,19 @@ bool MergeTreeIndexConditionSet::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx
|
||||
if (isUseless())
|
||||
return true;
|
||||
|
||||
auto granule = std::dynamic_pointer_cast<MergeTreeIndexGranuleSet>(idx_granule);
|
||||
if (!granule)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Set index condition got a granule with the wrong type");
|
||||
const MergeTreeIndexGranuleSet & granule = assert_cast<const MergeTreeIndexGranuleSet &>(*idx_granule);
|
||||
|
||||
if (isUseless() || granule->empty() || (max_rows != 0 && granule->size() > max_rows))
|
||||
size_t size = granule.size();
|
||||
if (size == 0 || (max_rows != 0 && size > max_rows))
|
||||
return true;
|
||||
|
||||
Block result = granule->block;
|
||||
Block result = granule.block;
|
||||
actions->execute(result);
|
||||
|
||||
const auto & filter_node_name = actions->getActionsDAG().getOutputs().at(0)->result_name;
|
||||
auto column = result.getByName(filter_node_name).column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality();
|
||||
const auto & column = result.getByName(actions_output_column_name).column;
|
||||
|
||||
if (column->onlyNull())
|
||||
return false;
|
||||
|
||||
const auto * col_uint8 = typeid_cast<const ColumnUInt8 *>(column.get());
|
||||
|
||||
const NullMap * null_map = nullptr;
|
||||
|
||||
if (const auto * col_nullable = checkAndGetColumn<ColumnNullable>(&*column))
|
||||
{
|
||||
col_uint8 = typeid_cast<const ColumnUInt8 *>(&col_nullable->getNestedColumn());
|
||||
null_map = &col_nullable->getNullMapData();
|
||||
}
|
||||
|
||||
if (!col_uint8)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"ColumnUInt8 expected as Set index condition result");
|
||||
|
||||
const auto & condition = col_uint8->getData();
|
||||
size_t column_size = column->size();
|
||||
|
||||
for (size_t i = 0; i < column_size; ++i)
|
||||
if ((!null_map || (*null_map)[i] == 0) && condition[i] & 1)
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
if (!column->isNullAt(i) && (column->get64(i) & 1))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
|
@ -34,7 +34,6 @@ struct MergeTreeIndexGranuleSet final : public IMergeTreeIndexGranule
|
||||
|
||||
const String index_name;
|
||||
const size_t max_rows;
|
||||
const Block index_sample_block;
|
||||
|
||||
Block block;
|
||||
};
|
||||
@ -127,6 +126,7 @@ private:
|
||||
|
||||
std::unordered_set<String> key_columns;
|
||||
ExpressionActionsPtr actions;
|
||||
String actions_output_column_name;
|
||||
};
|
||||
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/CopyableAtomic.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ZooKeeper/Types.h>
|
||||
#include <base/types.h>
|
||||
@ -9,7 +10,6 @@
|
||||
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
|
||||
#include <Disks/IDisk.h>
|
||||
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
|
||||
|
||||
@ -174,7 +174,7 @@ struct ReplicatedMergeTreeLogEntryData
|
||||
size_t quorum = 0;
|
||||
|
||||
/// Used only in tests for permanent fault injection for particular queue entry.
|
||||
bool fault_injected = false;
|
||||
CopyableAtomic<bool> fault_injected{false};
|
||||
|
||||
/// If this MUTATE_PART entry caused by alter(modify/drop) query.
|
||||
bool isAlterMutation() const
|
||||
|
@ -302,8 +302,8 @@ void registerStorageAzureBlob(StorageFactory & factory)
|
||||
auto settings = StorageAzureBlob::createSettings(args.getContext());
|
||||
|
||||
return std::make_shared<StorageAzureBlob>(
|
||||
std::move(configuration),
|
||||
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings),configuration.container),
|
||||
configuration,
|
||||
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
|
||||
args.getContext(),
|
||||
args.table_id,
|
||||
args.columns,
|
||||
@ -491,6 +491,12 @@ Poco::URI StorageAzureBlob::Configuration::getConnectionURL() const
|
||||
return Poco::URI(parsed_connection_string.BlobServiceUrl.GetAbsoluteUrl());
|
||||
}
|
||||
|
||||
std::string StorageAzureBlob::Configuration::getConnectionURLWithContainer() const
|
||||
{
|
||||
auto url = getConnectionURL();
|
||||
return fs::path(url.toString()) / container;
|
||||
}
|
||||
|
||||
bool StorageAzureBlob::Configuration::withGlobsIgnorePartitionWildcard() const
|
||||
{
|
||||
if (!withPartitionWildcard())
|
||||
|
@ -45,6 +45,8 @@ public:
|
||||
|
||||
Poco::URI getConnectionURL() const;
|
||||
|
||||
std::string getConnectionURLWithContainer() const;
|
||||
|
||||
std::string connection_url;
|
||||
bool is_connection_string;
|
||||
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include <IO/PeekableReadBuffer.h>
|
||||
#include <IO/AsynchronousReadBufferFromFile.h>
|
||||
#include <Disks/IO/IOUringReader.h>
|
||||
#include <Disks/IO/getIOUringReader.h>
|
||||
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Formats/ReadSchemaUtils.h>
|
||||
@ -282,10 +283,7 @@ std::unique_ptr<ReadBuffer> selectReadBuffer(
|
||||
else if (read_method == LocalFSReadMethod::io_uring && !use_table_fd)
|
||||
{
|
||||
#if USE_LIBURING
|
||||
auto & reader = context->getIOURingReader();
|
||||
if (!reader.isSupported())
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
|
||||
|
||||
auto & reader = getIOUringReaderOrThrow(context);
|
||||
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
||||
reader,
|
||||
Priority{},
|
||||
|
@ -193,6 +193,7 @@ StorageMaterializedView::StorageMaterializedView(
|
||||
|
||||
if (query.refresh_strategy)
|
||||
{
|
||||
fixed_uuid = false;
|
||||
refresher = RefreshTask::create(
|
||||
*this,
|
||||
getContext(),
|
||||
@ -687,10 +688,14 @@ void StorageMaterializedView::onActionLockRemove(StorageActionBlockType action_t
|
||||
refresher->start();
|
||||
}
|
||||
|
||||
DB::StorageID StorageMaterializedView::getTargetTableId() const
|
||||
StorageID StorageMaterializedView::getTargetTableId() const
|
||||
{
|
||||
std::lock_guard guard(target_table_id_mutex);
|
||||
return target_table_id;
|
||||
auto id = target_table_id;
|
||||
/// TODO: Avoid putting uuid into target_table_id in the first place, instead of clearing it here.
|
||||
if (!fixed_uuid)
|
||||
id.uuid = UUIDHelpers::Nil;
|
||||
return id;
|
||||
}
|
||||
|
||||
void StorageMaterializedView::setTargetTableId(DB::StorageID id)
|
||||
|
@ -110,6 +110,10 @@ private:
|
||||
|
||||
bool has_inner_table = false;
|
||||
|
||||
/// If false, inner table is replaced on each refresh. In that case, target_table_id doesn't
|
||||
/// have UUID, and we do inner table lookup by name instead.
|
||||
bool fixed_uuid = true;
|
||||
|
||||
friend class RefreshTask;
|
||||
|
||||
void checkStatementCanBeForwarded() const;
|
||||
|
@ -333,7 +333,7 @@ ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(Contex
|
||||
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
|
||||
auto settings = StorageAzureBlob::createSettings(context);
|
||||
|
||||
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings), configuration.container);
|
||||
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer());
|
||||
if (configuration.format == "auto")
|
||||
return StorageAzureBlob::getTableStructureAndFormatFromData(object_storage.get(), configuration, std::nullopt, context).first;
|
||||
return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context);
|
||||
@ -365,7 +365,7 @@ StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_funct
|
||||
|
||||
StoragePtr storage = std::make_shared<StorageAzureBlob>(
|
||||
configuration,
|
||||
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
|
||||
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
|
||||
context,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
columns,
|
||||
|
@ -39,7 +39,7 @@ StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
|
||||
/// On worker node this filename won't contains globs
|
||||
storage = std::make_shared<StorageAzureBlob>(
|
||||
configuration,
|
||||
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
|
||||
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
|
||||
context,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
columns,
|
||||
@ -54,7 +54,7 @@ StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
|
||||
storage = std::make_shared<StorageAzureBlobCluster>(
|
||||
cluster_name,
|
||||
configuration,
|
||||
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
|
||||
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
|
@ -91,6 +91,8 @@ apt-get install --yes --no-install-recommends azure-cli
|
||||
|
||||
# Increase the limit on number of virtual memory mappings to aviod 'Cannot mmap' error
|
||||
echo "vm.max_map_count = 2097152" > /etc/sysctl.d/01-increase-map-counts.conf
|
||||
# Workarond for sanitizers uncompatibility with some kernels, see https://github.com/google/sanitizers/issues/856
|
||||
echo "vm.mmap_rnd_bits=28" > /etc/sysctl.d/02-vm-mmap_rnd_bits.conf
|
||||
|
||||
systemctl restart docker
|
||||
|
||||
|
@ -0,0 +1 @@
|
||||
#!/usr/bin/env python3
|
215
tests/integration/test_azure_blob_storage_native_copy/test.py
Normal file
215
tests/integration/test_azure_blob_storage_native_copy/test.py
Normal file
@ -0,0 +1,215 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import gzip
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import io
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
|
||||
from azure.storage.blob import BlobServiceClient
|
||||
import helpers.client
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
|
||||
from helpers.network import PartitionManager
|
||||
from helpers.mock_servers import start_mock_servers
|
||||
from helpers.test_tools import exec_query_with_retry
|
||||
|
||||
|
||||
def generate_config(port):
|
||||
path = os.path.join(
|
||||
os.path.dirname(os.path.realpath(__file__)),
|
||||
"./_gen/storage_conf.xml",
|
||||
)
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
with open(path, "w") as f:
|
||||
TEMPLATE = """
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<disk_azure>
|
||||
<metadata_type>local</metadata_type>
|
||||
<type>object_storage</type>
|
||||
<object_storage_type>azure_blob_storage</object_storage_type>
|
||||
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
|
||||
<container_name>cont</container_name>
|
||||
<skip_access_check>false</skip_access_check>
|
||||
<account_name>devstoreaccount1</account_name>
|
||||
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
|
||||
<use_native_copy>true</use_native_copy>
|
||||
</disk_azure>
|
||||
<disk_azure_other_bucket>
|
||||
<metadata_type>local</metadata_type>
|
||||
<type>object_storage</type>
|
||||
<object_storage_type>azure_blob_storage</object_storage_type>
|
||||
<use_native_copy>true</use_native_copy>
|
||||
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
|
||||
<container_name>othercontainer</container_name>
|
||||
<skip_access_check>false</skip_access_check>
|
||||
<account_name>devstoreaccount1</account_name>
|
||||
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
|
||||
</disk_azure_other_bucket>
|
||||
<disk_azure_cache>
|
||||
<type>cache</type>
|
||||
<disk>disk_azure</disk>
|
||||
<path>/tmp/azure_cache/</path>
|
||||
<max_size>1000000000</max_size>
|
||||
<cache_on_write_operations>1</cache_on_write_operations>
|
||||
</disk_azure_cache>
|
||||
</disks>
|
||||
<policies>
|
||||
<policy_azure>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>disk_azure</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</policy_azure>
|
||||
<policy_azure_other_bucket>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>disk_azure_other_bucket</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</policy_azure_other_bucket>
|
||||
<policy_azure_cache>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>disk_azure_cache</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</policy_azure_cache>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
<backups>
|
||||
<allowed_disk>disk_azure</allowed_disk>
|
||||
<allowed_disk>disk_azure_cache</allowed_disk>
|
||||
<allowed_disk>disk_azure_other_bucket</allowed_disk>
|
||||
</backups>
|
||||
</clickhouse>
|
||||
"""
|
||||
f.write(TEMPLATE.format(port=port))
|
||||
return path
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
port = cluster.azurite_port
|
||||
path = generate_config(port)
|
||||
cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=[path],
|
||||
with_azurite=True,
|
||||
)
|
||||
cluster.add_instance(
|
||||
"node2",
|
||||
main_configs=[path],
|
||||
with_azurite=True,
|
||||
)
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def azure_query(
|
||||
node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None
|
||||
):
|
||||
for i in range(try_num):
|
||||
try:
|
||||
if expect_error:
|
||||
return node.query_and_get_error(query, settings=settings)
|
||||
else:
|
||||
return node.query(query, settings=settings)
|
||||
except Exception as ex:
|
||||
retriable_errors = [
|
||||
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
|
||||
"DB::Exception: Azure::Core::Http::TransportException: Connection closed before getting full response or response is less than expected",
|
||||
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
|
||||
"DB::Exception: Azure::Core::Http::TransportException: Error while polling for socket ready read",
|
||||
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
|
||||
"Azure::Core::Http::TransportException, e.what() = Connection closed before getting full response or response is less than expected",
|
||||
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
|
||||
"Azure::Core::Http::TransportException, e.what() = Error while polling for socket ready read",
|
||||
]
|
||||
retry = False
|
||||
for error in retriable_errors:
|
||||
if error in str(ex):
|
||||
retry = True
|
||||
print(f"Try num: {i}. Having retriable error: {ex}")
|
||||
time.sleep(i)
|
||||
break
|
||||
if not retry or i == try_num - 1:
|
||||
raise Exception(ex)
|
||||
if query_on_retry is not None:
|
||||
node.query(query_on_retry)
|
||||
continue
|
||||
|
||||
|
||||
def test_backup_restore_on_merge_tree_same_container(cluster):
|
||||
node1 = cluster.instances["node1"]
|
||||
azure_query(
|
||||
node1,
|
||||
f"CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_cache'",
|
||||
)
|
||||
azure_query(node1, f"INSERT INTO test_simple_merge_tree VALUES (1, 'a')")
|
||||
|
||||
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_backup')"
|
||||
print("BACKUP DEST", backup_destination)
|
||||
azure_query(
|
||||
node1,
|
||||
f"BACKUP TABLE test_simple_merge_tree TO {backup_destination}",
|
||||
)
|
||||
|
||||
assert node1.contains_in_log("using native copy")
|
||||
|
||||
azure_query(
|
||||
node1,
|
||||
f"RESTORE TABLE test_simple_merge_tree AS test_simple_merge_tree_restored FROM {backup_destination};",
|
||||
)
|
||||
assert (
|
||||
azure_query(node1, f"SELECT * from test_simple_merge_tree_restored") == "1\ta\n"
|
||||
)
|
||||
azure_query(node1, f"DROP TABLE test_simple_merge_tree")
|
||||
azure_query(node1, f"DROP TABLE test_simple_merge_tree_restored")
|
||||
|
||||
|
||||
def test_backup_restore_on_merge_tree_different_container(cluster):
|
||||
node2 = cluster.instances["node2"]
|
||||
azure_query(
|
||||
node2,
|
||||
f"CREATE TABLE test_simple_merge_tree_different_bucket(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_other_bucket'",
|
||||
)
|
||||
azure_query(
|
||||
node2, f"INSERT INTO test_simple_merge_tree_different_bucket VALUES (1, 'a')"
|
||||
)
|
||||
|
||||
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_different_bucket_backup_different_bucket')"
|
||||
print("BACKUP DEST", backup_destination)
|
||||
azure_query(
|
||||
node2,
|
||||
f"BACKUP TABLE test_simple_merge_tree_different_bucket TO {backup_destination}",
|
||||
)
|
||||
|
||||
assert not node2.contains_in_log("using native copy")
|
||||
|
||||
azure_query(
|
||||
node2,
|
||||
f"RESTORE TABLE test_simple_merge_tree_different_bucket AS test_simple_merge_tree_different_bucket_restored FROM {backup_destination};",
|
||||
)
|
||||
assert (
|
||||
azure_query(
|
||||
node2, f"SELECT * from test_simple_merge_tree_different_bucket_restored"
|
||||
)
|
||||
== "1\ta\n"
|
||||
)
|
||||
|
||||
assert not node2.contains_in_log("using native copy")
|
||||
|
||||
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket")
|
||||
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket_restored")
|
@ -39,10 +39,6 @@ def wait_for_clickhouse_stop(started_node):
|
||||
assert result == "OK", "ClickHouse process is still running"
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
helpers.cluster.is_arm(),
|
||||
reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855",
|
||||
)
|
||||
def test_pkill(started_node):
|
||||
if (
|
||||
started_node.is_built_with_thread_sanitizer()
|
||||
@ -63,10 +59,6 @@ def test_pkill(started_node):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
helpers.cluster.is_arm(),
|
||||
reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855",
|
||||
)
|
||||
def test_pkill_query_log(started_node):
|
||||
for signal in ["SEGV", "4"]:
|
||||
# force create query_log if it was not created
|
||||
|
@ -35,10 +35,6 @@ def started_node():
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
helpers.cluster.is_arm(),
|
||||
reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855",
|
||||
)
|
||||
def test_send_segfault(started_node):
|
||||
# NOTE: another option is to increase waiting time.
|
||||
if (
|
||||
|
14
tests/performance/set_index_analysis.xml
Normal file
14
tests/performance/set_index_analysis.xml
Normal file
@ -0,0 +1,14 @@
|
||||
<test>
|
||||
<create_query>
|
||||
CREATE TABLE test_set (k UInt32, x UInt32, INDEX idx (x) TYPE set(10) GRANULARITY 1) ENGINE = MergeTree ORDER BY k SETTINGS index_granularity = 111;
|
||||
</create_query>
|
||||
<fill_query>SYSTEM STOP MERGES</fill_query>
|
||||
<fill_query>INSERT INTO test_set SELECT number, number DIV 100 + rand() % 7 FROM numbers(3000000) SETTINGS max_insert_threads = 4;</fill_query>
|
||||
|
||||
<query>
|
||||
SELECT count() FROM test_set WHERE x = 1234 SETTINGS max_threads = 8;
|
||||
</query>
|
||||
|
||||
<drop_query>SYSTEM START MERGES</drop_query>
|
||||
<drop_query>DROP TABLE IF EXISTS test_set</drop_query>
|
||||
</test>
|
@ -18,5 +18,5 @@ CREATE table table_tar2star Engine S3(s3_conn, filename='03036_archive2.tar :: e
|
||||
SELECT id, data, _file, _path FROM table_tar2star ORDER BY (id, _file, _path);
|
||||
CREATE table table_tarstarglobs Engine S3(s3_conn, filename='03036_archive*.tar* :: example{2..3}.csv');
|
||||
SELECT id, data, _file, _path FROM table_tarstarglobs ORDER BY (id, _file, _path);
|
||||
CREATE table table_noexist Engine s3(s3_conn, filename='03036_archive2.zip :: nonexistent.csv'); -- { serverError INCORRECT_QUERY }
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_compressed_file_archive.zip :: example7.csv', format='CSV', structure='auto', compression_method='gz') ORDER BY (id, _file, _path)
|
||||
CREATE table table_noexist Engine s3(s3_conn, filename='03036_archive2.zip :: nonexistent.csv'); -- { serverError UNKNOWN_STORAGE }
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_compressed_file_archive.zip :: example7.csv', format='CSV', structure='auto', compression_method='gz') ORDER BY (id, _file, _path)
|
||||
|
@ -0,0 +1,43 @@
|
||||
┌─────────x─┐
|
||||
1. │ 123456789 │ -- 123.46 million
|
||||
└───────────┘
|
||||
┌─────────x─┐
|
||||
1. │ 123456789 │ -- 123.46 million
|
||||
└───────────┘
|
||||
┌─────────x─┐
|
||||
1. │ 123456789 │ -- 123.46 million
|
||||
└───────────┘
|
||||
┌─────────x─┐
|
||||
1. │ 123456789 │ -- 123.46 million
|
||||
└───────────┘
|
||||
┌─────────x─┐
|
||||
1. │ 123456789 │ -- 123.46 million
|
||||
└───────────┘
|
||||
Nullable(UInt64), Nullable(size = 10, UInt64(size = 10), UInt8(size = 10))
|
||||
┏━━━━━━━━━━━━┓
|
||||
┃ x ┃
|
||||
┡━━━━━━━━━━━━┩
|
||||
1. │ 1111111101 │ -- 1.11 billion
|
||||
└────────────┘
|
||||
┏━━━━━━━━━━━┓
|
||||
┃ x ┃
|
||||
┡━━━━━━━━━━━┩
|
||||
1. │ 123456789 │ -- 123.46 million
|
||||
└───────────┘
|
||||
x
|
||||
|
||||
1. ᴺᵁᴸᴸ
|
||||
UInt64, Sparse(size = 10, UInt64(size = 6), UInt64(size = 5))
|
||||
┏━━━━━━━━━━━━┓
|
||||
┃ x ┃
|
||||
┡━━━━━━━━━━━━┩
|
||||
1. │ 1111111101 │ -- 1.11 billion
|
||||
└────────────┘
|
||||
┏━━━┓
|
||||
┃ x ┃
|
||||
┡━━━┩
|
||||
1. │ 0 │
|
||||
└───┘
|
||||
x
|
||||
|
||||
1. 0
|
24
tests/queries/0_stateless/03156_nullable_number_tips.sql
Normal file
24
tests/queries/0_stateless/03156_nullable_number_tips.sql
Normal file
@ -0,0 +1,24 @@
|
||||
SELECT 123456789 AS x FORMAT PrettyCompact;
|
||||
SELECT toNullable(123456789) AS x FORMAT PrettyCompact;
|
||||
SELECT toLowCardinality(toNullable(123456789)) AS x FORMAT PrettyCompact;
|
||||
SELECT toNullable(toLowCardinality(123456789)) AS x FORMAT PrettyCompact;
|
||||
SELECT toLowCardinality(123456789) AS x FORMAT PrettyCompact;
|
||||
|
||||
CREATE TEMPORARY TABLE test (x Nullable(UInt64), PRIMARY KEY ()) ENGINE = MergeTree SETTINGS ratio_of_defaults_for_sparse_serialization = 0;
|
||||
INSERT INTO test SELECT number % 2 ? number * 123456789 : NULL FROM numbers(10);
|
||||
|
||||
SELECT DISTINCT dumpColumnStructure(*) FROM test;
|
||||
|
||||
SELECT * FROM test ORDER BY ALL DESC NULLS LAST LIMIT 1 FORMAT PRETTY;
|
||||
SELECT * FROM test ORDER BY ALL ASC NULLS LAST LIMIT 1 FORMAT PRETTY;
|
||||
SELECT * FROM test ORDER BY ALL ASC NULLS FIRST LIMIT 1 FORMAT PrettySpace;
|
||||
|
||||
DROP TEMPORARY TABLE test;
|
||||
CREATE TEMPORARY TABLE test (x UInt64, PRIMARY KEY ()) ENGINE = MergeTree SETTINGS ratio_of_defaults_for_sparse_serialization = 0;
|
||||
INSERT INTO test SELECT number % 2 ? number * 123456789 : NULL FROM numbers(10);
|
||||
|
||||
SELECT DISTINCT dumpColumnStructure(*) FROM test;
|
||||
|
||||
SELECT * FROM test ORDER BY ALL DESC NULLS LAST LIMIT 1 FORMAT PRETTY;
|
||||
SELECT * FROM test ORDER BY ALL ASC NULLS LAST LIMIT 1 FORMAT PRETTY;
|
||||
SELECT * FROM test ORDER BY ALL ASC NULLS FIRST LIMIT 1 FORMAT PrettySpace;
|
@ -7,6 +7,8 @@ export LC_ALL=C # The "total" should be printed without localization
|
||||
TU_EXCLUDES=(
|
||||
AggregateFunctionUniq
|
||||
Aggregator
|
||||
# FIXME: Exclude for now
|
||||
FunctionsConversion
|
||||
)
|
||||
|
||||
if find $1 -name '*.o' | xargs wc -c | grep --regexp='\.o$' | sort -rn | awk '{ if ($1 > 50000000) print }' \
|
||||
|
@ -1,4 +1,6 @@
|
||||
clickhouse_add_executable(clickhouse-zookeeper-cli
|
||||
zookeeper-cli.cpp
|
||||
${ClickHouse_SOURCE_DIR}/src/Client/LineReader.cpp)
|
||||
target_link_libraries(clickhouse-zookeeper-cli PRIVATE clickhouse_common_zookeeper_no_log)
|
||||
target_link_libraries(clickhouse-zookeeper-cli PRIVATE
|
||||
clickhouse_common_zookeeper_no_log
|
||||
dbms)
|
||||
|
@ -1,2 +1,6 @@
|
||||
clickhouse_add_executable (zookeeper-dump-tree main.cpp ${SRCS})
|
||||
target_link_libraries(zookeeper-dump-tree PRIVATE clickhouse_common_zookeeper_no_log clickhouse_common_io boost::program_options)
|
||||
target_link_libraries(zookeeper-dump-tree PRIVATE
|
||||
clickhouse_common_zookeeper_no_log
|
||||
clickhouse_common_io
|
||||
dbms
|
||||
boost::program_options)
|
||||
|
@ -1,2 +1,5 @@
|
||||
clickhouse_add_executable (zookeeper-remove-by-list main.cpp ${SRCS})
|
||||
target_link_libraries(zookeeper-remove-by-list PRIVATE clickhouse_common_zookeeper_no_log boost::program_options)
|
||||
target_link_libraries(zookeeper-remove-by-list PRIVATE
|
||||
clickhouse_common_zookeeper_no_log
|
||||
dbms
|
||||
boost::program_options)
|
||||
|
Loading…
Reference in New Issue
Block a user