mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Merge branch 'master' into libunwind-fix-crash
This commit is contained in:
commit
29f7a766a5
@ -7,7 +7,7 @@ endif()
|
|||||||
|
|
||||||
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libbcrypt")
|
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libbcrypt")
|
||||||
|
|
||||||
set(SRCS
|
set(SRCS
|
||||||
"${LIBRARY_DIR}/bcrypt.c"
|
"${LIBRARY_DIR}/bcrypt.c"
|
||||||
"${LIBRARY_DIR}/crypt_blowfish/crypt_blowfish.c"
|
"${LIBRARY_DIR}/crypt_blowfish/crypt_blowfish.c"
|
||||||
"${LIBRARY_DIR}/crypt_blowfish/crypt_gensalt.c"
|
"${LIBRARY_DIR}/crypt_blowfish/crypt_gensalt.c"
|
||||||
@ -16,4 +16,13 @@ set(SRCS
|
|||||||
|
|
||||||
add_library(_bcrypt ${SRCS})
|
add_library(_bcrypt ${SRCS})
|
||||||
target_include_directories(_bcrypt SYSTEM PUBLIC "${LIBRARY_DIR}")
|
target_include_directories(_bcrypt SYSTEM PUBLIC "${LIBRARY_DIR}")
|
||||||
|
# Avoid conflicts for crypt_r on FreeBSD [1]:
|
||||||
|
#
|
||||||
|
# - char *crypt_r(__const char *key, __const char *setting, void *data);
|
||||||
|
# - char *crypt_r(const char *, const char *, struct crypt_data *);
|
||||||
|
#
|
||||||
|
# [1]: https://github.com/freebsd/freebsd-src/commit/5f521d7ba72145092ea23ff6081d8791ad6c1f9d
|
||||||
|
#
|
||||||
|
# NOTE: ow-crypt.h is unsed only internally, so PRIVATE is enough
|
||||||
|
target_compile_definitions(_bcrypt PRIVATE -D__SKIP_GNU)
|
||||||
add_library(ch_contrib::bcrypt ALIAS _bcrypt)
|
add_library(ch_contrib::bcrypt ALIAS _bcrypt)
|
||||||
|
@ -160,10 +160,17 @@ function clone_submodules
|
|||||||
|
|
||||||
git submodule sync
|
git submodule sync
|
||||||
git submodule init
|
git submodule init
|
||||||
# --jobs does not work as fast as real parallel running
|
|
||||||
printf '%s\0' "${SUBMODULES_TO_UPDATE[@]}" | \
|
# Network is unreliable
|
||||||
xargs --max-procs=100 --null --no-run-if-empty --max-args=1 \
|
for _ in {1..10}
|
||||||
git submodule update --depth 1 --single-branch
|
do
|
||||||
|
# --jobs does not work as fast as real parallel running
|
||||||
|
printf '%s\0' "${SUBMODULES_TO_UPDATE[@]}" | \
|
||||||
|
xargs --max-procs=100 --null --no-run-if-empty --max-args=1 \
|
||||||
|
git submodule update --depth 1 --single-branch && break
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
git submodule foreach git reset --hard
|
git submodule foreach git reset --hard
|
||||||
git submodule foreach git checkout @ -f
|
git submodule foreach git checkout @ -f
|
||||||
git submodule foreach git clean -xfd
|
git submodule foreach git clean -xfd
|
||||||
|
@ -58,8 +58,14 @@ echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/sys
|
|||||||
# Install previous release packages
|
# Install previous release packages
|
||||||
install_packages previous_release_package_folder
|
install_packages previous_release_package_folder
|
||||||
|
|
||||||
# Save old settings from system table for settings changes check
|
# NOTE: we need to run clickhouse-local under script to get settings without any adjustments, like clickhouse-local does in case of stdout is not a tty
|
||||||
clickhouse-local -q "select * from system.settings format Native" > old_settings.native
|
function save_settings_clean()
|
||||||
|
{
|
||||||
|
local out=$1 && shift
|
||||||
|
script -q -c "clickhouse-local -q \"select * from system.settings into outfile '$out'\"" --log-out /dev/null
|
||||||
|
}
|
||||||
|
|
||||||
|
save_settings_clean 'old_settings.native'
|
||||||
|
|
||||||
# Initial run without S3 to create system.*_log on local file system to make it
|
# Initial run without S3 to create system.*_log on local file system to make it
|
||||||
# available for dump via clickhouse-local
|
# available for dump via clickhouse-local
|
||||||
@ -183,7 +189,7 @@ configure
|
|||||||
IS_SANITIZED=$(clickhouse-local --query "SELECT value LIKE '%-fsanitize=%' FROM system.build_options WHERE name = 'CXX_FLAGS'")
|
IS_SANITIZED=$(clickhouse-local --query "SELECT value LIKE '%-fsanitize=%' FROM system.build_options WHERE name = 'CXX_FLAGS'")
|
||||||
if [ "${IS_SANITIZED}" -eq "0" ]
|
if [ "${IS_SANITIZED}" -eq "0" ]
|
||||||
then
|
then
|
||||||
clickhouse-local -q "select * from system.settings format Native" > new_settings.native
|
save_settings_clean 'new_settings.native'
|
||||||
clickhouse-local -nmq "
|
clickhouse-local -nmq "
|
||||||
CREATE TABLE old_settings AS file('old_settings.native');
|
CREATE TABLE old_settings AS file('old_settings.native');
|
||||||
CREATE TABLE new_settings AS file('new_settings.native');
|
CREATE TABLE new_settings AS file('new_settings.native');
|
||||||
|
@ -108,7 +108,7 @@ Columns:
|
|||||||
- `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions combinators`, which were used during query execution.
|
- `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions combinators`, which were used during query execution.
|
||||||
- `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `database engines`, which were used during query execution.
|
- `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `database engines`, which were used during query execution.
|
||||||
- `used_data_type_families` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `data type families`, which were used during query execution.
|
- `used_data_type_families` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `data type families`, which were used during query execution.
|
||||||
- `used_dictionaries` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `dictionaries`, which were used during query execution.
|
- `used_dictionaries` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `dictionaries`, which were used during query execution. For dictionaries configured using an XML file this is the name of the dictionary, and for dictionaries created by an SQL statement, the canonical name is the fully qualified object name.
|
||||||
- `used_formats` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `formats`, which were used during query execution.
|
- `used_formats` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `formats`, which were used during query execution.
|
||||||
- `used_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `functions`, which were used during query execution.
|
- `used_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `functions`, which were used during query execution.
|
||||||
- `used_storages` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `storages`, which were used during query execution.
|
- `used_storages` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `storages`, which were used during query execution.
|
||||||
|
@ -148,6 +148,7 @@ if (BUILD_STANDALONE_KEEPER)
|
|||||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/createReadBufferFromFileBase.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/createReadBufferFromFileBase.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/IOUringReader.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/IOUringReader.cpp
|
||||||
|
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/getIOUringReader.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/WriteBufferFromTemporaryFile.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/WriteBufferFromTemporaryFile.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/WriteBufferWithFinalizeCallback.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/WriteBufferWithFinalizeCallback.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/AsynchronousBoundedReadBuffer.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/AsynchronousBoundedReadBuffer.cpp
|
||||||
|
@ -39,6 +39,7 @@ public:
|
|||||||
std::optional<UUID> backup_uuid;
|
std::optional<UUID> backup_uuid;
|
||||||
bool deduplicate_files = true;
|
bool deduplicate_files = true;
|
||||||
bool allow_s3_native_copy = true;
|
bool allow_s3_native_copy = true;
|
||||||
|
bool allow_azure_native_copy = true;
|
||||||
bool use_same_s3_credentials_for_base_backup = false;
|
bool use_same_s3_credentials_for_base_backup = false;
|
||||||
bool azure_attempt_to_create_container = true;
|
bool azure_attempt_to_create_container = true;
|
||||||
ReadSettings read_settings;
|
ReadSettings read_settings;
|
||||||
|
@ -31,22 +31,28 @@ namespace ErrorCodes
|
|||||||
|
|
||||||
BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
|
BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
|
||||||
StorageAzureBlob::Configuration configuration_,
|
StorageAzureBlob::Configuration configuration_,
|
||||||
|
bool allow_azure_native_copy,
|
||||||
const ReadSettings & read_settings_,
|
const ReadSettings & read_settings_,
|
||||||
const WriteSettings & write_settings_,
|
const WriteSettings & write_settings_,
|
||||||
const ContextPtr & context_)
|
const ContextPtr & context_)
|
||||||
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderAzureBlobStorage"))
|
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderAzureBlobStorage"))
|
||||||
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
|
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.getConnectionURLWithContainer(), false, false}
|
||||||
, configuration(configuration_)
|
, configuration(configuration_)
|
||||||
{
|
{
|
||||||
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
|
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
|
||||||
client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true});
|
client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true});
|
||||||
|
|
||||||
object_storage = std::make_unique<AzureObjectStorage>("BackupReaderAzureBlobStorage",
|
object_storage = std::make_unique<AzureObjectStorage>(
|
||||||
std::move(client_ptr),
|
"BackupReaderAzureBlobStorage",
|
||||||
StorageAzureBlob::createSettings(context_),
|
std::move(client_ptr),
|
||||||
configuration_.container);
|
StorageAzureBlob::createSettings(context_),
|
||||||
|
configuration.container,
|
||||||
|
configuration.getConnectionURLWithContainer());
|
||||||
|
|
||||||
client = object_storage->getAzureBlobStorageClient();
|
client = object_storage->getAzureBlobStorageClient();
|
||||||
settings = object_storage->getSettings();
|
auto settings_copy = *object_storage->getSettings();
|
||||||
|
settings_copy.use_native_copy = allow_azure_native_copy;
|
||||||
|
settings = std::make_unique<const AzureObjectStorageSettings>(settings_copy);
|
||||||
}
|
}
|
||||||
|
|
||||||
BackupReaderAzureBlobStorage::~BackupReaderAzureBlobStorage() = default;
|
BackupReaderAzureBlobStorage::~BackupReaderAzureBlobStorage() = default;
|
||||||
@ -76,9 +82,9 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
|
|||||||
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode)
|
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode)
|
||||||
{
|
{
|
||||||
auto destination_data_source_description = destination_disk->getDataSourceDescription();
|
auto destination_data_source_description = destination_disk->getDataSourceDescription();
|
||||||
if ((destination_data_source_description.type == DataSourceType::ObjectStorage)
|
LOG_TRACE(log, "Source description {}, desctionation description {}", data_source_description.description, destination_data_source_description.description);
|
||||||
&& (destination_data_source_description.object_storage_type == ObjectStorageType::Azure)
|
if (destination_data_source_description.sameKind(data_source_description)
|
||||||
&& (destination_data_source_description.is_encrypted == encrypted_in_backup))
|
&& destination_data_source_description.is_encrypted == encrypted_in_backup)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Copying {} from AzureBlobStorage to disk {}", path_in_backup, destination_disk->getName());
|
LOG_TRACE(log, "Copying {} from AzureBlobStorage to disk {}", path_in_backup, destination_disk->getName());
|
||||||
auto write_blob_function = [&](const Strings & blob_path, WriteMode mode, const std::optional<ObjectAttributes> &) -> size_t
|
auto write_blob_function = [&](const Strings & blob_path, WriteMode mode, const std::optional<ObjectAttributes> &) -> size_t
|
||||||
@ -116,12 +122,13 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
|
|||||||
|
|
||||||
BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
|
BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
|
||||||
StorageAzureBlob::Configuration configuration_,
|
StorageAzureBlob::Configuration configuration_,
|
||||||
|
bool allow_azure_native_copy,
|
||||||
const ReadSettings & read_settings_,
|
const ReadSettings & read_settings_,
|
||||||
const WriteSettings & write_settings_,
|
const WriteSettings & write_settings_,
|
||||||
const ContextPtr & context_,
|
const ContextPtr & context_,
|
||||||
bool attempt_to_create_container)
|
bool attempt_to_create_container)
|
||||||
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterAzureBlobStorage"))
|
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterAzureBlobStorage"))
|
||||||
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
|
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.getConnectionURLWithContainer(), false, false}
|
||||||
, configuration(configuration_)
|
, configuration(configuration_)
|
||||||
{
|
{
|
||||||
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false, attempt_to_create_container);
|
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false, attempt_to_create_container);
|
||||||
@ -130,9 +137,12 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
|
|||||||
object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
|
object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
|
||||||
std::move(client_ptr),
|
std::move(client_ptr),
|
||||||
StorageAzureBlob::createSettings(context_),
|
StorageAzureBlob::createSettings(context_),
|
||||||
configuration_.container);
|
configuration_.container,
|
||||||
|
configuration.getConnectionURLWithContainer());
|
||||||
client = object_storage->getAzureBlobStorageClient();
|
client = object_storage->getAzureBlobStorageClient();
|
||||||
settings = object_storage->getSettings();
|
auto settings_copy = *object_storage->getSettings();
|
||||||
|
settings_copy.use_native_copy = allow_azure_native_copy;
|
||||||
|
settings = std::make_unique<const AzureObjectStorageSettings>(settings_copy);
|
||||||
}
|
}
|
||||||
|
|
||||||
void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
|
void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
|
||||||
@ -140,7 +150,9 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backu
|
|||||||
{
|
{
|
||||||
/// Use the native copy as a more optimal way to copy a file from AzureBlobStorage to AzureBlobStorage if it's possible.
|
/// Use the native copy as a more optimal way to copy a file from AzureBlobStorage to AzureBlobStorage if it's possible.
|
||||||
auto source_data_source_description = src_disk->getDataSourceDescription();
|
auto source_data_source_description = src_disk->getDataSourceDescription();
|
||||||
if (source_data_source_description.sameKind(data_source_description) && (source_data_source_description.is_encrypted == copy_encrypted))
|
LOG_TRACE(log, "Source description {}, desctionation description {}", source_data_source_description.description, data_source_description.description);
|
||||||
|
if (source_data_source_description.sameKind(data_source_description)
|
||||||
|
&& source_data_source_description.is_encrypted == copy_encrypted)
|
||||||
{
|
{
|
||||||
/// getBlobPath() can return more than 3 elements if the file is stored as multiple objects in AzureBlobStorage container.
|
/// getBlobPath() can return more than 3 elements if the file is stored as multiple objects in AzureBlobStorage container.
|
||||||
/// In this case we can't use the native copy.
|
/// In this case we can't use the native copy.
|
||||||
|
@ -16,7 +16,12 @@ namespace DB
|
|||||||
class BackupReaderAzureBlobStorage : public BackupReaderDefault
|
class BackupReaderAzureBlobStorage : public BackupReaderDefault
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
BackupReaderAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
|
BackupReaderAzureBlobStorage(
|
||||||
|
StorageAzureBlob::Configuration configuration_,
|
||||||
|
bool allow_azure_native_copy,
|
||||||
|
const ReadSettings & read_settings_,
|
||||||
|
const WriteSettings & write_settings_,
|
||||||
|
const ContextPtr & context_);
|
||||||
~BackupReaderAzureBlobStorage() override;
|
~BackupReaderAzureBlobStorage() override;
|
||||||
|
|
||||||
bool fileExists(const String & file_name) override;
|
bool fileExists(const String & file_name) override;
|
||||||
@ -37,7 +42,13 @@ private:
|
|||||||
class BackupWriterAzureBlobStorage : public BackupWriterDefault
|
class BackupWriterAzureBlobStorage : public BackupWriterDefault
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
BackupWriterAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_, bool attempt_to_create_container);
|
BackupWriterAzureBlobStorage(
|
||||||
|
StorageAzureBlob::Configuration configuration_,
|
||||||
|
bool allow_azure_native_copy,
|
||||||
|
const ReadSettings & read_settings_,
|
||||||
|
const WriteSettings & write_settings_,
|
||||||
|
const ContextPtr & context_,
|
||||||
|
bool attempt_to_create_container);
|
||||||
~BackupWriterAzureBlobStorage() override;
|
~BackupWriterAzureBlobStorage() override;
|
||||||
|
|
||||||
bool fileExists(const String & file_name) override;
|
bool fileExists(const String & file_name) override;
|
||||||
|
@ -27,6 +27,7 @@ namespace ErrorCodes
|
|||||||
M(Bool, decrypt_files_from_encrypted_disks) \
|
M(Bool, decrypt_files_from_encrypted_disks) \
|
||||||
M(Bool, deduplicate_files) \
|
M(Bool, deduplicate_files) \
|
||||||
M(Bool, allow_s3_native_copy) \
|
M(Bool, allow_s3_native_copy) \
|
||||||
|
M(Bool, allow_azure_native_copy) \
|
||||||
M(Bool, use_same_s3_credentials_for_base_backup) \
|
M(Bool, use_same_s3_credentials_for_base_backup) \
|
||||||
M(Bool, azure_attempt_to_create_container) \
|
M(Bool, azure_attempt_to_create_container) \
|
||||||
M(Bool, read_from_filesystem_cache) \
|
M(Bool, read_from_filesystem_cache) \
|
||||||
|
@ -44,6 +44,9 @@ struct BackupSettings
|
|||||||
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
|
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
|
||||||
bool allow_s3_native_copy = true;
|
bool allow_s3_native_copy = true;
|
||||||
|
|
||||||
|
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
|
||||||
|
bool allow_azure_native_copy = true;
|
||||||
|
|
||||||
/// Whether base backup to S3 should inherit credentials from the BACKUP query.
|
/// Whether base backup to S3 should inherit credentials from the BACKUP query.
|
||||||
bool use_same_s3_credentials_for_base_backup = false;
|
bool use_same_s3_credentials_for_base_backup = false;
|
||||||
|
|
||||||
|
@ -598,6 +598,7 @@ void BackupsWorker::doBackup(
|
|||||||
backup_create_params.backup_uuid = backup_settings.backup_uuid;
|
backup_create_params.backup_uuid = backup_settings.backup_uuid;
|
||||||
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
|
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
|
||||||
backup_create_params.allow_s3_native_copy = backup_settings.allow_s3_native_copy;
|
backup_create_params.allow_s3_native_copy = backup_settings.allow_s3_native_copy;
|
||||||
|
backup_create_params.allow_azure_native_copy = backup_settings.allow_azure_native_copy;
|
||||||
backup_create_params.use_same_s3_credentials_for_base_backup = backup_settings.use_same_s3_credentials_for_base_backup;
|
backup_create_params.use_same_s3_credentials_for_base_backup = backup_settings.use_same_s3_credentials_for_base_backup;
|
||||||
backup_create_params.azure_attempt_to_create_container = backup_settings.azure_attempt_to_create_container;
|
backup_create_params.azure_attempt_to_create_container = backup_settings.azure_attempt_to_create_container;
|
||||||
backup_create_params.read_settings = getReadSettingsForBackup(context, backup_settings);
|
backup_create_params.read_settings = getReadSettingsForBackup(context, backup_settings);
|
||||||
|
@ -135,10 +135,12 @@ void registerBackupEngineAzureBlobStorage(BackupFactory & factory)
|
|||||||
|
|
||||||
if (params.open_mode == IBackup::OpenMode::READ)
|
if (params.open_mode == IBackup::OpenMode::READ)
|
||||||
{
|
{
|
||||||
auto reader = std::make_shared<BackupReaderAzureBlobStorage>(configuration,
|
auto reader = std::make_shared<BackupReaderAzureBlobStorage>(
|
||||||
params.read_settings,
|
configuration,
|
||||||
params.write_settings,
|
params.allow_azure_native_copy,
|
||||||
params.context);
|
params.read_settings,
|
||||||
|
params.write_settings,
|
||||||
|
params.context);
|
||||||
|
|
||||||
return std::make_unique<BackupImpl>(
|
return std::make_unique<BackupImpl>(
|
||||||
params.backup_info,
|
params.backup_info,
|
||||||
@ -150,11 +152,13 @@ void registerBackupEngineAzureBlobStorage(BackupFactory & factory)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto writer = std::make_shared<BackupWriterAzureBlobStorage>(configuration,
|
auto writer = std::make_shared<BackupWriterAzureBlobStorage>(
|
||||||
params.read_settings,
|
configuration,
|
||||||
params.write_settings,
|
params.allow_azure_native_copy,
|
||||||
params.context,
|
params.read_settings,
|
||||||
params.azure_attempt_to_create_container);
|
params.write_settings,
|
||||||
|
params.context,
|
||||||
|
params.azure_attempt_to_create_container);
|
||||||
|
|
||||||
return std::make_unique<BackupImpl>(
|
return std::make_unique<BackupImpl>(
|
||||||
params.backup_info,
|
params.backup_info,
|
||||||
|
@ -710,8 +710,8 @@ void ClientBase::adjustSettings()
|
|||||||
settings.input_format_values_allow_data_after_semicolon.changed = false;
|
settings.input_format_values_allow_data_after_semicolon.changed = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Do not limit pretty format output in case of --pager specified.
|
/// Do not limit pretty format output in case of --pager specified or in case of stdout is not a tty.
|
||||||
if (!pager.empty())
|
if (!pager.empty() || !stdout_is_a_tty)
|
||||||
{
|
{
|
||||||
if (!global_context->getSettingsRef().output_format_pretty_max_rows.changed)
|
if (!global_context->getSettingsRef().output_format_pretty_max_rows.changed)
|
||||||
{
|
{
|
||||||
|
@ -22,6 +22,7 @@ namespace ErrorCodes
|
|||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
extern const int ILLEGAL_COLUMN;
|
extern const int ILLEGAL_COLUMN;
|
||||||
extern const int NOT_IMPLEMENTED;
|
extern const int NOT_IMPLEMENTED;
|
||||||
|
extern const int BAD_ARGUMENTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -116,6 +117,38 @@ void ColumnNullable::get(size_t n, Field & res) const
|
|||||||
getNestedColumn().get(n, res);
|
getNestedColumn().get(n, res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Float64 ColumnNullable::getFloat64(size_t n) const
|
||||||
|
{
|
||||||
|
if (isNullAt(n))
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of {} at {} is NULL while calling method getFloat64", getName(), n);
|
||||||
|
else
|
||||||
|
return getNestedColumn().getFloat64(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
Float32 ColumnNullable::getFloat32(size_t n) const
|
||||||
|
{
|
||||||
|
if (isNullAt(n))
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of {} at {} is NULL while calling method getFloat32", getName(), n);
|
||||||
|
else
|
||||||
|
return getNestedColumn().getFloat32(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
UInt64 ColumnNullable::getUInt(size_t n) const
|
||||||
|
{
|
||||||
|
if (isNullAt(n))
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of {} at {} is NULL while calling method getUInt", getName(), n);
|
||||||
|
else
|
||||||
|
return getNestedColumn().getUInt(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
Int64 ColumnNullable::getInt(size_t n) const
|
||||||
|
{
|
||||||
|
if (isNullAt(n))
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of {} at {} is NULL while calling method getInt", getName(), n);
|
||||||
|
else
|
||||||
|
return getNestedColumn().getInt(n);
|
||||||
|
}
|
||||||
|
|
||||||
void ColumnNullable::insertData(const char * pos, size_t length)
|
void ColumnNullable::insertData(const char * pos, size_t length)
|
||||||
{
|
{
|
||||||
if (pos == nullptr)
|
if (pos == nullptr)
|
||||||
|
@ -57,6 +57,10 @@ public:
|
|||||||
void get(size_t n, Field & res) const override;
|
void get(size_t n, Field & res) const override;
|
||||||
bool getBool(size_t n) const override { return isNullAt(n) ? false : nested_column->getBool(n); }
|
bool getBool(size_t n) const override { return isNullAt(n) ? false : nested_column->getBool(n); }
|
||||||
UInt64 get64(size_t n) const override { return nested_column->get64(n); }
|
UInt64 get64(size_t n) const override { return nested_column->get64(n); }
|
||||||
|
Float64 getFloat64(size_t n) const override;
|
||||||
|
Float32 getFloat32(size_t n) const override;
|
||||||
|
UInt64 getUInt(size_t n) const override;
|
||||||
|
Int64 getInt(size_t n) const override;
|
||||||
bool isDefaultAt(size_t n) const override { return isNullAt(n); }
|
bool isDefaultAt(size_t n) const override { return isNullAt(n); }
|
||||||
StringRef getDataAt(size_t) const override;
|
StringRef getDataAt(size_t) const override;
|
||||||
/// Will insert null value if pos=nullptr
|
/// Will insert null value if pos=nullptr
|
||||||
|
39
src/Common/CopyableAtomic.h
Normal file
39
src/Common/CopyableAtomic.h
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
struct CopyableAtomic
|
||||||
|
{
|
||||||
|
CopyableAtomic(const CopyableAtomic & other)
|
||||||
|
: value(other.value.load())
|
||||||
|
{}
|
||||||
|
|
||||||
|
explicit CopyableAtomic(T && value_)
|
||||||
|
: value(std::forward<T>(value_))
|
||||||
|
{}
|
||||||
|
|
||||||
|
CopyableAtomic & operator=(const CopyableAtomic & other)
|
||||||
|
{
|
||||||
|
value = other.value.load();
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
CopyableAtomic & operator=(bool value_)
|
||||||
|
{
|
||||||
|
value = value_;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
explicit operator T() const { return value; }
|
||||||
|
|
||||||
|
const T & getValue() const { return value; }
|
||||||
|
|
||||||
|
std::atomic<T> value;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
@ -288,8 +288,10 @@
|
|||||||
M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \
|
M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \
|
||||||
\
|
\
|
||||||
M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \
|
M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \
|
||||||
M(AddressesBanned, "Total count of addresses which are banned as faulty for creation connections with connection pools") \
|
M(AddressesBanned, "Total count of addresses which are banned as faulty for creation connections with connection pools") \
|
||||||
|
\
|
||||||
|
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
|
||||||
|
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
|
||||||
|
|
||||||
#ifdef APPLY_FOR_EXTERNAL_METRICS
|
#ifdef APPLY_FOR_EXTERNAL_METRICS
|
||||||
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)
|
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)
|
||||||
|
@ -498,8 +498,10 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
|
|||||||
|
|
||||||
template class ThreadPoolImpl<std::thread>;
|
template class ThreadPoolImpl<std::thread>;
|
||||||
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>;
|
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>;
|
||||||
|
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, false>>;
|
||||||
template class ThreadFromGlobalPoolImpl<true, true>;
|
template class ThreadFromGlobalPoolImpl<true, true>;
|
||||||
template class ThreadFromGlobalPoolImpl<true, false>;
|
template class ThreadFromGlobalPoolImpl<true, false>;
|
||||||
|
template class ThreadFromGlobalPoolImpl<false, false>;
|
||||||
|
|
||||||
std::unique_ptr<GlobalThreadPool> GlobalThreadPool::the_instance;
|
std::unique_ptr<GlobalThreadPool> GlobalThreadPool::the_instance;
|
||||||
|
|
||||||
|
@ -242,6 +242,11 @@ public:
|
|||||||
if (unlikely(global_profiler_real_time_period != 0 || global_profiler_cpu_time_period != 0))
|
if (unlikely(global_profiler_real_time_period != 0 || global_profiler_cpu_time_period != 0))
|
||||||
thread_status.initGlobalProfiler(global_profiler_real_time_period, global_profiler_cpu_time_period);
|
thread_status.initGlobalProfiler(global_profiler_real_time_period, global_profiler_cpu_time_period);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
UNUSED(global_profiler_real_time_period);
|
||||||
|
UNUSED(global_profiler_cpu_time_period);
|
||||||
|
}
|
||||||
|
|
||||||
std::apply(function, arguments);
|
std::apply(function, arguments);
|
||||||
},
|
},
|
||||||
|
@ -23,12 +23,8 @@ thread_local ThreadStatus constinit * current_thread = nullptr;
|
|||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
#if defined(__aarch64__)
|
|
||||||
/// For aarch64 16K is not enough (likely due to tons of registers)
|
/// For aarch64 16K is not enough (likely due to tons of registers)
|
||||||
static constexpr size_t UNWIND_MINSIGSTKSZ = 32 << 10;
|
constexpr size_t UNWIND_MINSIGSTKSZ = 32 << 10;
|
||||||
#else
|
|
||||||
static constexpr size_t UNWIND_MINSIGSTKSZ = 16 << 10;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/// Alternative stack for signal handling.
|
/// Alternative stack for signal handling.
|
||||||
///
|
///
|
||||||
@ -131,26 +127,6 @@ ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadStatus::initGlobalProfiler([[maybe_unused]] UInt64 global_profiler_real_time_period, [[maybe_unused]] UInt64 global_profiler_cpu_time_period)
|
|
||||||
{
|
|
||||||
#if !defined(SANITIZER) && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD) && !defined(__APPLE__)
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (global_profiler_real_time_period > 0)
|
|
||||||
query_profiler_real = std::make_unique<QueryProfilerReal>(thread_id,
|
|
||||||
/* period= */ static_cast<UInt32>(global_profiler_real_time_period));
|
|
||||||
|
|
||||||
if (global_profiler_cpu_time_period > 0)
|
|
||||||
query_profiler_cpu = std::make_unique<QueryProfilerCPU>(thread_id,
|
|
||||||
/* period= */ static_cast<UInt32>(global_profiler_cpu_time_period));
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
tryLogCurrentException("ThreadStatus", "Cannot initialize GlobalProfiler");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
ThreadGroupPtr ThreadStatus::getThreadGroup() const
|
ThreadGroupPtr ThreadStatus::getThreadGroup() const
|
||||||
{
|
{
|
||||||
chassert(current_thread == this);
|
chassert(current_thread == this);
|
||||||
|
@ -1,8 +1,16 @@
|
|||||||
clickhouse_add_executable(zkutil_test_commands zkutil_test_commands.cpp)
|
clickhouse_add_executable(zkutil_test_commands zkutil_test_commands.cpp)
|
||||||
target_link_libraries(zkutil_test_commands PRIVATE clickhouse_common_zookeeper_no_log)
|
target_link_libraries(zkutil_test_commands PRIVATE
|
||||||
|
clickhouse_common_zookeeper_no_log
|
||||||
|
dbms)
|
||||||
|
|
||||||
clickhouse_add_executable(zkutil_test_commands_new_lib zkutil_test_commands_new_lib.cpp)
|
clickhouse_add_executable(zkutil_test_commands_new_lib zkutil_test_commands_new_lib.cpp)
|
||||||
target_link_libraries(zkutil_test_commands_new_lib PRIVATE clickhouse_common_zookeeper_no_log clickhouse_compression string_utils)
|
target_link_libraries(zkutil_test_commands_new_lib PRIVATE
|
||||||
|
clickhouse_common_zookeeper_no_log
|
||||||
|
clickhouse_compression
|
||||||
|
string_utils
|
||||||
|
dbms)
|
||||||
|
|
||||||
clickhouse_add_executable(zkutil_test_async zkutil_test_async.cpp)
|
clickhouse_add_executable(zkutil_test_async zkutil_test_async.cpp)
|
||||||
target_link_libraries(zkutil_test_async PRIVATE clickhouse_common_zookeeper_no_log)
|
target_link_libraries(zkutil_test_async PRIVATE
|
||||||
|
clickhouse_common_zookeeper_no_log
|
||||||
|
dbms)
|
||||||
|
@ -20,6 +20,9 @@
|
|||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
|
|
||||||
|
|
||||||
|
using ThreadFromGlobalPoolSimple = ThreadFromGlobalPoolImpl</* propagate_opentelemetry_context= */ false, /* global_trace_collector_allowed= */ false>;
|
||||||
|
using SimpleThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolSimple>;
|
||||||
|
|
||||||
using Key = UInt64;
|
using Key = UInt64;
|
||||||
using Value = UInt64;
|
using Value = UInt64;
|
||||||
|
|
||||||
@ -255,7 +258,7 @@ int main(int argc, char ** argv)
|
|||||||
|
|
||||||
std::cerr << std::fixed << std::setprecision(2);
|
std::cerr << std::fixed << std::setprecision(2);
|
||||||
|
|
||||||
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, num_threads);
|
SimpleThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, num_threads);
|
||||||
|
|
||||||
Source data(n);
|
Source data(n);
|
||||||
|
|
||||||
|
@ -20,6 +20,9 @@
|
|||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
|
|
||||||
|
|
||||||
|
using ThreadFromGlobalPoolSimple = ThreadFromGlobalPoolImpl</* propagate_opentelemetry_context= */ false, /* global_trace_collector_allowed= */ false>;
|
||||||
|
using SimpleThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolSimple>;
|
||||||
|
|
||||||
using Key = UInt64;
|
using Key = UInt64;
|
||||||
using Value = UInt64;
|
using Value = UInt64;
|
||||||
using Source = std::vector<Key>;
|
using Source = std::vector<Key>;
|
||||||
@ -38,7 +41,7 @@ struct AggregateIndependent
|
|||||||
template <typename Creator, typename Updater>
|
template <typename Creator, typename Updater>
|
||||||
static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results,
|
static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results,
|
||||||
Creator && creator, Updater && updater,
|
Creator && creator, Updater && updater,
|
||||||
ThreadPool & pool)
|
SimpleThreadPool & pool)
|
||||||
{
|
{
|
||||||
results.reserve(num_threads);
|
results.reserve(num_threads);
|
||||||
for (size_t i = 0; i < num_threads; ++i)
|
for (size_t i = 0; i < num_threads; ++i)
|
||||||
@ -76,7 +79,7 @@ struct AggregateIndependentWithSequentialKeysOptimization
|
|||||||
template <typename Creator, typename Updater>
|
template <typename Creator, typename Updater>
|
||||||
static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results,
|
static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results,
|
||||||
Creator && creator, Updater && updater,
|
Creator && creator, Updater && updater,
|
||||||
ThreadPool & pool)
|
SimpleThreadPool & pool)
|
||||||
{
|
{
|
||||||
results.reserve(num_threads);
|
results.reserve(num_threads);
|
||||||
for (size_t i = 0; i < num_threads; ++i)
|
for (size_t i = 0; i < num_threads; ++i)
|
||||||
@ -124,7 +127,7 @@ struct MergeSequential
|
|||||||
template <typename Merger>
|
template <typename Merger>
|
||||||
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
|
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
|
||||||
Merger && merger,
|
Merger && merger,
|
||||||
ThreadPool &)
|
SimpleThreadPool &)
|
||||||
{
|
{
|
||||||
for (size_t i = 1; i < num_maps; ++i)
|
for (size_t i = 1; i < num_maps; ++i)
|
||||||
{
|
{
|
||||||
@ -144,7 +147,7 @@ struct MergeSequentialTransposed /// In practice not better than usual.
|
|||||||
template <typename Merger>
|
template <typename Merger>
|
||||||
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
|
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
|
||||||
Merger && merger,
|
Merger && merger,
|
||||||
ThreadPool &)
|
SimpleThreadPool &)
|
||||||
{
|
{
|
||||||
std::vector<typename Map::iterator> iterators(num_maps);
|
std::vector<typename Map::iterator> iterators(num_maps);
|
||||||
for (size_t i = 1; i < num_maps; ++i)
|
for (size_t i = 1; i < num_maps; ++i)
|
||||||
@ -177,7 +180,7 @@ struct MergeParallelForTwoLevelTable
|
|||||||
template <typename Merger>
|
template <typename Merger>
|
||||||
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
|
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
|
||||||
Merger && merger,
|
Merger && merger,
|
||||||
ThreadPool & pool)
|
SimpleThreadPool & pool)
|
||||||
{
|
{
|
||||||
for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket)
|
for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket)
|
||||||
pool.scheduleOrThrowOnError([&, bucket, num_maps]
|
pool.scheduleOrThrowOnError([&, bucket, num_maps]
|
||||||
@ -202,7 +205,7 @@ struct Work
|
|||||||
template <typename Creator, typename Updater, typename Merger>
|
template <typename Creator, typename Updater, typename Merger>
|
||||||
static void NO_INLINE execute(const Source & data, size_t num_threads,
|
static void NO_INLINE execute(const Source & data, size_t num_threads,
|
||||||
Creator && creator, Updater && updater, Merger && merger,
|
Creator && creator, Updater && updater, Merger && merger,
|
||||||
ThreadPool & pool)
|
SimpleThreadPool & pool)
|
||||||
{
|
{
|
||||||
std::vector<std::unique_ptr<Map>> intermediate_results;
|
std::vector<std::unique_ptr<Map>> intermediate_results;
|
||||||
|
|
||||||
@ -282,7 +285,7 @@ int main(int argc, char ** argv)
|
|||||||
|
|
||||||
std::cerr << std::fixed << std::setprecision(2);
|
std::cerr << std::fixed << std::setprecision(2);
|
||||||
|
|
||||||
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, num_threads);
|
SimpleThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, num_threads);
|
||||||
|
|
||||||
Source data(n);
|
Source data(n);
|
||||||
|
|
||||||
|
@ -14,6 +14,8 @@ int value = 0;
|
|||||||
static void f() { ++value; }
|
static void f() { ++value; }
|
||||||
static void * g(void *) { f(); return {}; }
|
static void * g(void *) { f(); return {}; }
|
||||||
|
|
||||||
|
using ThreadFromGlobalPoolSimple = ThreadFromGlobalPoolImpl</* propagate_opentelemetry_context= */ false, /* global_trace_collector_allowed= */ false>;
|
||||||
|
using SimpleThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolSimple>;
|
||||||
|
|
||||||
namespace CurrentMetrics
|
namespace CurrentMetrics
|
||||||
{
|
{
|
||||||
@ -72,7 +74,7 @@ int main(int argc, char ** argv)
|
|||||||
|
|
||||||
test(n, "Create and destroy ThreadPool each iteration", []
|
test(n, "Create and destroy ThreadPool each iteration", []
|
||||||
{
|
{
|
||||||
ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1);
|
SimpleThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1);
|
||||||
tp.scheduleOrThrowOnError(f);
|
tp.scheduleOrThrowOnError(f);
|
||||||
tp.wait();
|
tp.wait();
|
||||||
});
|
});
|
||||||
@ -93,7 +95,7 @@ int main(int argc, char ** argv)
|
|||||||
});
|
});
|
||||||
|
|
||||||
{
|
{
|
||||||
ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1);
|
SimpleThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1);
|
||||||
|
|
||||||
test(n, "Schedule job for Threadpool each iteration", [&tp]
|
test(n, "Schedule job for Threadpool each iteration", [&tp]
|
||||||
{
|
{
|
||||||
@ -103,7 +105,7 @@ int main(int argc, char ** argv)
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 128);
|
SimpleThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 128);
|
||||||
|
|
||||||
test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp]
|
test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp]
|
||||||
{
|
{
|
||||||
|
@ -3,8 +3,8 @@
|
|||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
#include <Common/RWLock.h>
|
#include <Common/RWLock.h>
|
||||||
#include <Common/Stopwatch.h>
|
#include <Common/Stopwatch.h>
|
||||||
|
#include <Core/Types.h>
|
||||||
#include <base/types.h>
|
#include <base/types.h>
|
||||||
#include <Common/ThreadPool.h>
|
|
||||||
#include <base/phdr_cache.h>
|
#include <base/phdr_cache.h>
|
||||||
#include <random>
|
#include <random>
|
||||||
#include <pcg_random.hpp>
|
#include <pcg_random.hpp>
|
||||||
@ -541,7 +541,7 @@ TEST(Common, RWLockWriteLockTimeoutDuringWriteWithWaitingRead)
|
|||||||
events.add(wc ? "Locked wb" : "Failed to lock wb");
|
events.add(wc ? "Locked wb" : "Failed to lock wb");
|
||||||
EXPECT_EQ(wc, nullptr);
|
EXPECT_EQ(wc, nullptr);
|
||||||
});
|
});
|
||||||
|
|
||||||
std::thread rc_thread([&] ()
|
std::thread rc_thread([&] ()
|
||||||
{
|
{
|
||||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
|
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <Common/ThreadPool.h>
|
#include <Common/ThreadPool.h>
|
||||||
#include <Common/callOnce.h>
|
#include <Common/callOnce.h>
|
||||||
#include <Disks/IO/IOUringReader.h>
|
#include <Disks/IO/IOUringReader.h>
|
||||||
|
#include <Disks/IO/getIOUringReader.h>
|
||||||
|
|
||||||
#include <Core/ServerSettings.h>
|
#include <Core/ServerSettings.h>
|
||||||
|
|
||||||
@ -303,10 +304,10 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
|
|||||||
}
|
}
|
||||||
|
|
||||||
#if USE_LIBURING
|
#if USE_LIBURING
|
||||||
IOUringReader & Context::getIOURingReader() const
|
IOUringReader & Context::getIOUringReader() const
|
||||||
{
|
{
|
||||||
callOnce(shared->io_uring_reader_initialized, [&] {
|
callOnce(shared->io_uring_reader_initialized, [&] {
|
||||||
shared->io_uring_reader = std::make_unique<IOUringReader>(512);
|
shared->io_uring_reader = createIOUringReader();
|
||||||
});
|
});
|
||||||
|
|
||||||
return *shared->io_uring_reader;
|
return *shared->io_uring_reader;
|
||||||
@ -457,4 +458,9 @@ const ServerSettings & Context::getServerSettings() const
|
|||||||
return shared->server_settings;
|
return shared->server_settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Context::hasTraceCollector() const
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -137,7 +137,7 @@ public:
|
|||||||
|
|
||||||
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
|
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
|
||||||
#if USE_LIBURING
|
#if USE_LIBURING
|
||||||
IOUringReader & getIOURingReader() const;
|
IOUringReader & getIOUringReader() const;
|
||||||
#endif
|
#endif
|
||||||
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
|
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
|
||||||
ThreadPool & getThreadPoolWriter() const;
|
ThreadPool & getThreadPoolWriter() const;
|
||||||
@ -163,6 +163,8 @@ public:
|
|||||||
zkutil::ZooKeeperPtr getZooKeeper() const;
|
zkutil::ZooKeeperPtr getZooKeeper() const;
|
||||||
|
|
||||||
const ServerSettings & getServerSettings() const;
|
const ServerSettings & getServerSettings() const;
|
||||||
|
|
||||||
|
bool hasTraceCollector() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
#include <Common/CurrentThread.h>
|
#include <Common/CurrentThread.h>
|
||||||
|
#include <Common/ThreadStatus.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -11,4 +12,8 @@ void CurrentThread::attachToGroup(const ThreadGroupPtr &)
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ThreadStatus::initGlobalProfiler(UInt64 /*global_profiler_real_time_period*/, UInt64 /*global_profiler_cpu_time_period*/)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,8 @@
|
|||||||
#include <Common/escapeForFileName.h>
|
#include <Common/escapeForFileName.h>
|
||||||
#include <Common/filesystemHelpers.h>
|
#include <Common/filesystemHelpers.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
|
#include <Common/setThreadName.h>
|
||||||
|
|
||||||
|
|
||||||
namespace fs = std::filesystem;
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
@ -665,6 +667,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
|
|||||||
pool.scheduleOrThrowOnError(
|
pool.scheduleOrThrowOnError(
|
||||||
[batch, &process_metadata_file, &process_tmp_drop_metadata_file]() mutable
|
[batch, &process_metadata_file, &process_tmp_drop_metadata_file]() mutable
|
||||||
{
|
{
|
||||||
|
setThreadName("DatabaseOnDisk");
|
||||||
for (const auto & file : batch)
|
for (const auto & file : batch)
|
||||||
if (file.second)
|
if (file.second)
|
||||||
process_metadata_file(file.first);
|
process_metadata_file(file.first);
|
||||||
|
@ -944,6 +944,13 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
|||||||
query_context->setSetting("allow_hyperscan", 1);
|
query_context->setSetting("allow_hyperscan", 1);
|
||||||
query_context->setSetting("allow_simdjson", 1);
|
query_context->setSetting("allow_simdjson", 1);
|
||||||
query_context->setSetting("allow_deprecated_syntax_for_merge_tree", 1);
|
query_context->setSetting("allow_deprecated_syntax_for_merge_tree", 1);
|
||||||
|
query_context->setSetting("allow_suspicious_primary_key", 1);
|
||||||
|
query_context->setSetting("allow_suspicious_ttl_expressions", 1);
|
||||||
|
query_context->setSetting("allow_suspicious_variant_types", 1);
|
||||||
|
query_context->setSetting("enable_deflate_qpl_codec", 1);
|
||||||
|
query_context->setSetting("enable_zstd_qat_codec", 1);
|
||||||
|
query_context->setSetting("allow_create_index_without_type", 1);
|
||||||
|
query_context->setSetting("allow_experimental_s3queue", 1);
|
||||||
|
|
||||||
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(current_zookeeper, zookeeper_path, false, "");
|
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(current_zookeeper, zookeeper_path, false, "");
|
||||||
query_context->initZooKeeperMetadataTransaction(txn);
|
query_context->initZooKeeperMetadataTransaction(txn);
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
#include <Common/ThreadPool.h>
|
|
||||||
#include <Databases/MySQL/MySQLBinlog.h>
|
#include <Databases/MySQL/MySQLBinlog.h>
|
||||||
#include <Databases/MySQL/MySQLBinlogEventsDispatcher.h>
|
#include <Databases/MySQL/MySQLBinlogEventsDispatcher.h>
|
||||||
#include <Databases/MySQL/MySQLBinlogClient.h>
|
#include <Databases/MySQL/MySQLBinlogClient.h>
|
||||||
|
@ -69,6 +69,15 @@ public:
|
|||||||
return dictionary_id.getNameForLogs();
|
return dictionary_id.getNameForLogs();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns fully qualified unquoted dictionary name
|
||||||
|
std::string getQualifiedName() const
|
||||||
|
{
|
||||||
|
std::lock_guard lock{mutex};
|
||||||
|
if (dictionary_id.database_name.empty())
|
||||||
|
return dictionary_id.table_name;
|
||||||
|
return dictionary_id.database_name + "." + dictionary_id.table_name;
|
||||||
|
}
|
||||||
|
|
||||||
StorageID getDictionaryID() const
|
StorageID getDictionaryID() const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{mutex};
|
std::lock_guard lock{mutex};
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
#include "IOUringReader.h"
|
#include "IOUringReader.h"
|
||||||
#include <memory>
|
|
||||||
|
|
||||||
#if USE_LIBURING
|
#if USE_LIBURING
|
||||||
|
|
||||||
@ -13,6 +12,7 @@
|
|||||||
#include <Common/ThreadPool.h>
|
#include <Common/ThreadPool.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <future>
|
#include <future>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
{
|
{
|
||||||
|
@ -4,9 +4,9 @@
|
|||||||
#include <IO/MMapReadBufferFromFileWithCache.h>
|
#include <IO/MMapReadBufferFromFileWithCache.h>
|
||||||
#include <IO/AsynchronousReadBufferFromFile.h>
|
#include <IO/AsynchronousReadBufferFromFile.h>
|
||||||
#include <Disks/IO/IOUringReader.h>
|
#include <Disks/IO/IOUringReader.h>
|
||||||
|
#include <Disks/IO/getIOUringReader.h>
|
||||||
#include <Disks/IO/ThreadPoolReader.h>
|
#include <Disks/IO/ThreadPoolReader.h>
|
||||||
#include <Disks/IO/getThreadPoolReader.h>
|
#include <Disks/IO/getThreadPoolReader.h>
|
||||||
#include <IO/SynchronousReader.h>
|
|
||||||
#include <IO/AsynchronousReader.h>
|
#include <IO/AsynchronousReader.h>
|
||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
@ -100,14 +100,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
|||||||
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
|
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
|
||||||
{
|
{
|
||||||
#if USE_LIBURING
|
#if USE_LIBURING
|
||||||
auto global_context = Context::getGlobalContextInstance();
|
auto & reader = getIOUringReaderOrThrow();
|
||||||
if (!global_context)
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot obtain io_uring reader (global context not initialized)");
|
|
||||||
|
|
||||||
auto & reader = global_context->getIOURingReader();
|
|
||||||
if (!reader.isSupported())
|
|
||||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
|
|
||||||
|
|
||||||
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
||||||
reader,
|
reader,
|
||||||
settings.priority,
|
settings.priority,
|
||||||
|
41
src/Disks/IO/getIOUringReader.cpp
Normal file
41
src/Disks/IO/getIOUringReader.cpp
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
#include <Disks/IO/getIOUringReader.h>
|
||||||
|
|
||||||
|
#if USE_LIBURING
|
||||||
|
|
||||||
|
#include <Interpreters/Context.h>
|
||||||
|
#include <Common/ErrorCodes.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
|
extern const int UNSUPPORTED_METHOD;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<IOUringReader> createIOUringReader()
|
||||||
|
{
|
||||||
|
return std::make_unique<IOUringReader>(512);
|
||||||
|
}
|
||||||
|
|
||||||
|
IOUringReader & getIOUringReaderOrThrow(ContextPtr context)
|
||||||
|
{
|
||||||
|
auto & reader = context->getIOUringReader();
|
||||||
|
if (!reader.isSupported())
|
||||||
|
{
|
||||||
|
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
|
||||||
|
}
|
||||||
|
return reader;
|
||||||
|
}
|
||||||
|
|
||||||
|
IOUringReader & getIOUringReaderOrThrow()
|
||||||
|
{
|
||||||
|
auto context = Context::getGlobalContextInstance();
|
||||||
|
if (!context)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
|
||||||
|
return getIOUringReaderOrThrow(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
#endif
|
21
src/Disks/IO/getIOUringReader.h
Normal file
21
src/Disks/IO/getIOUringReader.h
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "config.h"
|
||||||
|
|
||||||
|
#if USE_LIBURING
|
||||||
|
|
||||||
|
#include <Interpreters/Context_fwd.h>
|
||||||
|
#include <Disks/IO/IOUringReader.h>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
std::unique_ptr<IOUringReader> createIOUringReader();
|
||||||
|
|
||||||
|
IOUringReader & getIOUringReaderOrThrow(ContextPtr);
|
||||||
|
|
||||||
|
IOUringReader & getIOUringReaderOrThrow();
|
||||||
|
|
||||||
|
}
|
||||||
|
#endif
|
@ -107,11 +107,13 @@ AzureObjectStorage::AzureObjectStorage(
|
|||||||
const String & name_,
|
const String & name_,
|
||||||
AzureClientPtr && client_,
|
AzureClientPtr && client_,
|
||||||
SettingsPtr && settings_,
|
SettingsPtr && settings_,
|
||||||
const String & object_namespace_)
|
const String & object_namespace_,
|
||||||
|
const String & description_)
|
||||||
: name(name_)
|
: name(name_)
|
||||||
, client(std::move(client_))
|
, client(std::move(client_))
|
||||||
, settings(std::move(settings_))
|
, settings(std::move(settings_))
|
||||||
, object_namespace(object_namespace_)
|
, object_namespace(object_namespace_)
|
||||||
|
, description(description_)
|
||||||
, log(getLogger("AzureObjectStorage"))
|
, log(getLogger("AzureObjectStorage"))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -409,7 +411,8 @@ std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(const std
|
|||||||
name,
|
name,
|
||||||
getAzureBlobContainerClient(config, config_prefix),
|
getAzureBlobContainerClient(config, config_prefix),
|
||||||
getAzureBlobStorageSettings(config, config_prefix, context),
|
getAzureBlobStorageSettings(config, config_prefix, context),
|
||||||
object_namespace
|
object_namespace,
|
||||||
|
description
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,7 +81,8 @@ public:
|
|||||||
const String & name_,
|
const String & name_,
|
||||||
AzureClientPtr && client_,
|
AzureClientPtr && client_,
|
||||||
SettingsPtr && settings_,
|
SettingsPtr && settings_,
|
||||||
const String & object_namespace_);
|
const String & object_namespace_,
|
||||||
|
const String & description_);
|
||||||
|
|
||||||
void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const override;
|
void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const override;
|
||||||
|
|
||||||
@ -93,7 +94,7 @@ public:
|
|||||||
|
|
||||||
std::string getCommonKeyPrefix() const override { return ""; }
|
std::string getCommonKeyPrefix() const override { return ""; }
|
||||||
|
|
||||||
std::string getDescription() const override { return client.get()->GetUrl(); }
|
std::string getDescription() const override { return description; }
|
||||||
|
|
||||||
bool exists(const StoredObject & object) const override;
|
bool exists(const StoredObject & object) const override;
|
||||||
|
|
||||||
@ -172,6 +173,7 @@ private:
|
|||||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
|
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
|
||||||
MultiVersion<AzureObjectStorageSettings> settings;
|
MultiVersion<AzureObjectStorageSettings> settings;
|
||||||
const String object_namespace; /// container + prefix
|
const String object_namespace; /// container + prefix
|
||||||
|
const String description; /// url + container
|
||||||
|
|
||||||
LoggerPtr log;
|
LoggerPtr log;
|
||||||
};
|
};
|
||||||
|
@ -129,6 +129,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecu
|
|||||||
{
|
{
|
||||||
pool.scheduleOrThrowOnError([this, path]
|
pool.scheduleOrThrowOnError([this, path]
|
||||||
{
|
{
|
||||||
|
setThreadName("BackupWorker");
|
||||||
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
|
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
|
||||||
migrateFileToRestorableSchema(it->path());
|
migrateFileToRestorableSchema(it->path());
|
||||||
});
|
});
|
||||||
|
@ -306,11 +306,14 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
|
|||||||
bool /* skip_access_check */) -> ObjectStoragePtr
|
bool /* skip_access_check */) -> ObjectStoragePtr
|
||||||
{
|
{
|
||||||
AzureBlobStorageEndpoint endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
|
AzureBlobStorageEndpoint endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
|
||||||
|
std::string endpoint_string = endpoint.getEndpoint();
|
||||||
|
|
||||||
return createObjectStorage<AzureObjectStorage>(
|
return createObjectStorage<AzureObjectStorage>(
|
||||||
ObjectStorageType::Azure, config, config_prefix, name,
|
ObjectStorageType::Azure, config, config_prefix, name,
|
||||||
getAzureBlobContainerClient(config, config_prefix),
|
getAzureBlobContainerClient(config, config_prefix),
|
||||||
getAzureBlobStorageSettings(config, config_prefix, context),
|
getAzureBlobStorageSettings(config, config_prefix, context),
|
||||||
endpoint.prefix.empty() ? endpoint.container_name : endpoint.container_name + "/" + endpoint.prefix);
|
endpoint.prefix.empty() ? endpoint.container_name : endpoint.container_name + "/" + endpoint.prefix,
|
||||||
|
endpoint.prefix.empty() ? endpoint_string : endpoint_string.substr(0, endpoint_string.length() - endpoint.prefix.length()));
|
||||||
};
|
};
|
||||||
factory.registerObjectStorageType("azure_blob_storage", creator);
|
factory.registerObjectStorageType("azure_blob_storage", creator);
|
||||||
factory.registerObjectStorageType("azure", creator);
|
factory.registerObjectStorageType("azure", creator);
|
||||||
|
@ -3,7 +3,7 @@ add_subdirectory(divide)
|
|||||||
include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
|
include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
|
||||||
add_headers_and_sources(clickhouse_functions .)
|
add_headers_and_sources(clickhouse_functions .)
|
||||||
|
|
||||||
extract_into_parent_list(clickhouse_functions_sources dbms_sources
|
set(DBMS_FUNCTIONS
|
||||||
IFunction.cpp
|
IFunction.cpp
|
||||||
FunctionFactory.cpp
|
FunctionFactory.cpp
|
||||||
FunctionHelpers.cpp
|
FunctionHelpers.cpp
|
||||||
@ -15,7 +15,10 @@ extract_into_parent_list(clickhouse_functions_sources dbms_sources
|
|||||||
checkHyperscanRegexp.cpp
|
checkHyperscanRegexp.cpp
|
||||||
array/has.cpp
|
array/has.cpp
|
||||||
CastOverloadResolver.cpp
|
CastOverloadResolver.cpp
|
||||||
|
# Provides dependency for cast - createFunctionBaseCast()
|
||||||
|
FunctionsConversion.cpp
|
||||||
)
|
)
|
||||||
|
extract_into_parent_list(clickhouse_functions_sources dbms_sources ${DBMS_FUNCTIONS})
|
||||||
extract_into_parent_list(clickhouse_functions_headers dbms_headers
|
extract_into_parent_list(clickhouse_functions_headers dbms_headers
|
||||||
IFunction.h
|
IFunction.h
|
||||||
FunctionFactory.h
|
FunctionFactory.h
|
||||||
@ -26,6 +29,10 @@ extract_into_parent_list(clickhouse_functions_headers dbms_headers
|
|||||||
)
|
)
|
||||||
|
|
||||||
add_library(clickhouse_functions_obj OBJECT ${clickhouse_functions_headers} ${clickhouse_functions_sources})
|
add_library(clickhouse_functions_obj OBJECT ${clickhouse_functions_headers} ${clickhouse_functions_sources})
|
||||||
|
if (OMIT_HEAVY_DEBUG_SYMBOLS)
|
||||||
|
target_compile_options(clickhouse_functions_obj PRIVATE "-g0")
|
||||||
|
set_source_files_properties(${DBMS_FUNCTIONS} PROPERTIES COMPILE_FLAGS "-g0")
|
||||||
|
endif()
|
||||||
|
|
||||||
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_obj>)
|
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_obj>)
|
||||||
|
|
||||||
@ -62,10 +69,6 @@ if (TARGET OpenSSL::Crypto)
|
|||||||
list (APPEND PUBLIC_LIBS OpenSSL::Crypto)
|
list (APPEND PUBLIC_LIBS OpenSSL::Crypto)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
if (OMIT_HEAVY_DEBUG_SYMBOLS)
|
|
||||||
target_compile_options(clickhouse_functions_obj PRIVATE "-g0")
|
|
||||||
endif()
|
|
||||||
|
|
||||||
if (TARGET ch_contrib::icu)
|
if (TARGET ch_contrib::icu)
|
||||||
list (APPEND PRIVATE_LIBS ch_contrib::icu)
|
list (APPEND PRIVATE_LIBS ch_contrib::icu)
|
||||||
endif ()
|
endif ()
|
||||||
|
@ -289,6 +289,7 @@ void copyAzureBlobStorageFile(
|
|||||||
|
|
||||||
if (settings->use_native_copy)
|
if (settings->use_native_copy)
|
||||||
{
|
{
|
||||||
|
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);
|
||||||
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
|
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
|
||||||
if (dest_client->GetClickhouseOptions().IsClientForDisk)
|
if (dest_client->GetClickhouseOptions().IsClientForDisk)
|
||||||
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
|
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
|
||||||
|
@ -695,7 +695,6 @@ String serializeQuery(const IAST & query, size_t max_length)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// static
|
|
||||||
void AsynchronousInsertQueue::processData(
|
void AsynchronousInsertQueue::processData(
|
||||||
InsertQuery key, InsertDataPtr data, ContextPtr global_context, QueueShardFlushTimeHistory & queue_shard_flush_time_history)
|
InsertQuery key, InsertDataPtr data, ContextPtr global_context, QueueShardFlushTimeHistory & queue_shard_flush_time_history)
|
||||||
try
|
try
|
||||||
@ -705,6 +704,8 @@ try
|
|||||||
|
|
||||||
SCOPE_EXIT(CurrentMetrics::sub(CurrentMetrics::PendingAsyncInsert, data->entries.size()));
|
SCOPE_EXIT(CurrentMetrics::sub(CurrentMetrics::PendingAsyncInsert, data->entries.size()));
|
||||||
|
|
||||||
|
setThreadName("AsyncInsertQ");
|
||||||
|
|
||||||
const auto log = getLogger("AsynchronousInsertQueue");
|
const auto log = getLogger("AsynchronousInsertQueue");
|
||||||
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*key.query);
|
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*key.query);
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||||
#include <Disks/StoragePolicy.h>
|
#include <Disks/StoragePolicy.h>
|
||||||
#include <Disks/IO/IOUringReader.h>
|
#include <Disks/IO/IOUringReader.h>
|
||||||
|
#include <Disks/IO/getIOUringReader.h>
|
||||||
#include <IO/SynchronousReader.h>
|
#include <IO/SynchronousReader.h>
|
||||||
#include <TableFunctions/TableFunctionFactory.h>
|
#include <TableFunctions/TableFunctionFactory.h>
|
||||||
#include <Interpreters/ActionLocksManager.h>
|
#include <Interpreters/ActionLocksManager.h>
|
||||||
@ -5188,10 +5189,10 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
|
|||||||
}
|
}
|
||||||
|
|
||||||
#if USE_LIBURING
|
#if USE_LIBURING
|
||||||
IOUringReader & Context::getIOURingReader() const
|
IOUringReader & Context::getIOUringReader() const
|
||||||
{
|
{
|
||||||
callOnce(shared->io_uring_reader_initialized, [&] {
|
callOnce(shared->io_uring_reader_initialized, [&] {
|
||||||
shared->io_uring_reader = std::make_unique<IOUringReader>(512);
|
shared->io_uring_reader = createIOUringReader();
|
||||||
});
|
});
|
||||||
|
|
||||||
return *shared->io_uring_reader;
|
return *shared->io_uring_reader;
|
||||||
|
@ -1246,7 +1246,7 @@ public:
|
|||||||
|
|
||||||
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
|
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
|
||||||
#if USE_LIBURING
|
#if USE_LIBURING
|
||||||
IOUringReader & getIOURingReader() const;
|
IOUringReader & getIOUringReader() const;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
|
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
|
||||||
|
@ -77,21 +77,23 @@ void ExternalDictionariesLoader::updateObjectFromConfigWithoutReloading(IExterna
|
|||||||
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::getDictionary(const std::string & dictionary_name, ContextPtr local_context) const
|
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::getDictionary(const std::string & dictionary_name, ContextPtr local_context) const
|
||||||
{
|
{
|
||||||
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, local_context->getCurrentDatabase());
|
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, local_context->getCurrentDatabase());
|
||||||
|
auto dictionary = std::static_pointer_cast<const IDictionary>(load(resolved_dictionary_name));
|
||||||
|
|
||||||
if (local_context->hasQueryContext() && local_context->getSettingsRef().log_queries)
|
if (local_context->hasQueryContext() && local_context->getSettingsRef().log_queries)
|
||||||
local_context->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, resolved_dictionary_name);
|
local_context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, dictionary->getQualifiedName());
|
||||||
|
|
||||||
return std::static_pointer_cast<const IDictionary>(load(resolved_dictionary_name));
|
return dictionary;
|
||||||
}
|
}
|
||||||
|
|
||||||
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::tryGetDictionary(const std::string & dictionary_name, ContextPtr local_context) const
|
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::tryGetDictionary(const std::string & dictionary_name, ContextPtr local_context) const
|
||||||
{
|
{
|
||||||
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, local_context->getCurrentDatabase());
|
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, local_context->getCurrentDatabase());
|
||||||
|
auto dictionary = std::static_pointer_cast<const IDictionary>(tryLoad(resolved_dictionary_name));
|
||||||
|
|
||||||
if (local_context->hasQueryContext() && local_context->getSettingsRef().log_queries)
|
if (local_context->hasQueryContext() && local_context->getSettingsRef().log_queries && dictionary)
|
||||||
local_context->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, resolved_dictionary_name);
|
local_context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Dictionary, dictionary->getQualifiedName());
|
||||||
|
|
||||||
return std::static_pointer_cast<const IDictionary>(tryLoad(resolved_dictionary_name));
|
return dictionary;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -458,6 +458,31 @@ void ThreadStatus::resetPerformanceCountersLastUsage()
|
|||||||
taskstats->reset();
|
taskstats->reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ThreadStatus::initGlobalProfiler([[maybe_unused]] UInt64 global_profiler_real_time_period, [[maybe_unused]] UInt64 global_profiler_cpu_time_period)
|
||||||
|
{
|
||||||
|
#if !defined(SANITIZER) && !defined(__APPLE__)
|
||||||
|
/// profilers are useless without trace collector
|
||||||
|
auto global_context_ptr = global_context.lock();
|
||||||
|
if (!global_context_ptr || !global_context_ptr->hasTraceCollector())
|
||||||
|
return;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (global_profiler_real_time_period > 0)
|
||||||
|
query_profiler_real = std::make_unique<QueryProfilerReal>(thread_id,
|
||||||
|
/* period= */ static_cast<UInt32>(global_profiler_real_time_period));
|
||||||
|
|
||||||
|
if (global_profiler_cpu_time_period > 0)
|
||||||
|
query_profiler_cpu = std::make_unique<QueryProfilerCPU>(thread_id,
|
||||||
|
/* period= */ static_cast<UInt32>(global_profiler_cpu_time_period));
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException("ThreadStatus", "Cannot initialize GlobalProfiler");
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
void ThreadStatus::initQueryProfiler()
|
void ThreadStatus::initQueryProfiler()
|
||||||
{
|
{
|
||||||
if (internal_thread)
|
if (internal_thread)
|
||||||
|
@ -44,7 +44,7 @@ bool replaceForPositionalArguments(ASTPtr & argument, const ASTSelectQuery * sel
|
|||||||
pos = value;
|
pos = value;
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (static_cast<size_t>(std::abs(value)) > columns.size())
|
if (value < -static_cast<Int64>(columns.size()))
|
||||||
throw Exception(
|
throw Exception(
|
||||||
ErrorCodes::BAD_ARGUMENTS,
|
ErrorCodes::BAD_ARGUMENTS,
|
||||||
"Negative positional argument number {} is out of bounds. Expected in range [-{}, -1]",
|
"Negative positional argument number {} is out of bounds. Expected in range [-{}, -1]",
|
||||||
|
@ -7,6 +7,8 @@
|
|||||||
#include <Common/UTF8Helpers.h>
|
#include <Common/UTF8Helpers.h>
|
||||||
#include <Common/PODArray.h>
|
#include <Common/PODArray.h>
|
||||||
#include <Common/formatReadable.h>
|
#include <Common/formatReadable.h>
|
||||||
|
#include <DataTypes/DataTypeLowCardinality.h>
|
||||||
|
#include <DataTypes/DataTypeNullable.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -16,7 +18,14 @@ PrettyBlockOutputFormat::PrettyBlockOutputFormat(
|
|||||||
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool mono_block_, bool color_)
|
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool mono_block_, bool color_)
|
||||||
: IOutputFormat(header_, out_), format_settings(format_settings_), serializations(header_.getSerializations()), color(color_), mono_block(mono_block_)
|
: IOutputFormat(header_, out_), format_settings(format_settings_), serializations(header_.getSerializations()), color(color_), mono_block(mono_block_)
|
||||||
{
|
{
|
||||||
readable_number_tip = header_.getColumns().size() == 1 && WhichDataType(header_.getDataTypes()[0]->getTypeId()).isNumber();
|
/// Decide whether we should print a tip near the single number value in the result.
|
||||||
|
if (header_.getColumns().size() == 1)
|
||||||
|
{
|
||||||
|
/// Check if it is a numeric type, possible wrapped by Nullable or LowCardinality.
|
||||||
|
DataTypePtr type = removeNullable(recursiveRemoveLowCardinality(header_.getDataTypes().at(0)));
|
||||||
|
if (isNumber(type))
|
||||||
|
readable_number_tip = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -497,6 +506,9 @@ void PrettyBlockOutputFormat::writeReadableNumberTip(const Chunk & chunk)
|
|||||||
if (!is_single_number)
|
if (!is_single_number)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
if (columns[0]->isNullAt(0))
|
||||||
|
return;
|
||||||
|
|
||||||
auto value = columns[0]->getFloat64(0);
|
auto value = columns[0]->getFloat64(0);
|
||||||
auto threshold = format_settings.pretty.output_format_pretty_single_large_number_tip_threshold;
|
auto threshold = format_settings.pretty.output_format_pretty_single_large_number_tip_threshold;
|
||||||
|
|
||||||
|
@ -53,6 +53,8 @@ namespace CurrentMetrics
|
|||||||
extern const Metric MergeTreeDataSelectExecutorThreads;
|
extern const Metric MergeTreeDataSelectExecutorThreads;
|
||||||
extern const Metric MergeTreeDataSelectExecutorThreadsActive;
|
extern const Metric MergeTreeDataSelectExecutorThreadsActive;
|
||||||
extern const Metric MergeTreeDataSelectExecutorThreadsScheduled;
|
extern const Metric MergeTreeDataSelectExecutorThreadsScheduled;
|
||||||
|
extern const Metric FilteringMarksWithPrimaryKey;
|
||||||
|
extern const Metric FilteringMarksWithSecondaryKeys;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -664,15 +666,22 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
|
|||||||
size_t total_marks_count = part->index_granularity.getMarksCountWithoutFinal();
|
size_t total_marks_count = part->index_granularity.getMarksCountWithoutFinal();
|
||||||
|
|
||||||
if (metadata_snapshot->hasPrimaryKey() || part_offset_condition)
|
if (metadata_snapshot->hasPrimaryKey() || part_offset_condition)
|
||||||
|
{
|
||||||
|
CurrentMetrics::Increment metric(CurrentMetrics::FilteringMarksWithPrimaryKey);
|
||||||
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, part_offset_condition, settings, log);
|
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, part_offset_condition, settings, log);
|
||||||
|
}
|
||||||
else if (total_marks_count)
|
else if (total_marks_count)
|
||||||
|
{
|
||||||
ranges.ranges = MarkRanges{{MarkRange{0, total_marks_count}}};
|
ranges.ranges = MarkRanges{{MarkRange{0, total_marks_count}}};
|
||||||
|
}
|
||||||
|
|
||||||
sum_marks_pk.fetch_add(ranges.getMarksCount(), std::memory_order_relaxed);
|
sum_marks_pk.fetch_add(ranges.getMarksCount(), std::memory_order_relaxed);
|
||||||
|
|
||||||
if (!ranges.ranges.empty())
|
if (!ranges.ranges.empty())
|
||||||
sum_parts_pk.fetch_add(1, std::memory_order_relaxed);
|
sum_parts_pk.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
|
||||||
|
CurrentMetrics::Increment metric(CurrentMetrics::FilteringMarksWithSecondaryKeys);
|
||||||
|
|
||||||
for (size_t idx = 0; idx < skip_indexes.useful_indices.size(); ++idx)
|
for (size_t idx = 0; idx < skip_indexes.useful_indices.size(); ++idx)
|
||||||
{
|
{
|
||||||
if (ranges.ranges.empty())
|
if (ranges.ranges.empty())
|
||||||
@ -733,6 +742,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
|
|||||||
num_threads = std::min<size_t>(num_streams, settings.max_threads_for_indexes);
|
num_threads = std::min<size_t>(num_streams, settings.max_threads_for_indexes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG_TRACE(log, "Filtering marks by primary and secondary keys");
|
||||||
|
|
||||||
if (num_threads <= 1)
|
if (num_threads <= 1)
|
||||||
{
|
{
|
||||||
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
|
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
|
||||||
@ -740,7 +751,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/// Parallel loading of data parts.
|
/// Parallel loading and filtering of data parts.
|
||||||
ThreadPool pool(
|
ThreadPool pool(
|
||||||
CurrentMetrics::MergeTreeDataSelectExecutorThreads,
|
CurrentMetrics::MergeTreeDataSelectExecutorThreads,
|
||||||
CurrentMetrics::MergeTreeDataSelectExecutorThreadsActive,
|
CurrentMetrics::MergeTreeDataSelectExecutorThreadsActive,
|
||||||
@ -748,8 +759,11 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
|
|||||||
num_threads);
|
num_threads);
|
||||||
|
|
||||||
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
|
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
|
||||||
|
{
|
||||||
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()]
|
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()]
|
||||||
{
|
{
|
||||||
|
setThreadName("MergeTreeIndex");
|
||||||
|
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachFromGroupIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
@ -759,6 +773,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
|
|||||||
|
|
||||||
process_part(part_index);
|
process_part(part_index);
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
pool.wait();
|
pool.wait();
|
||||||
}
|
}
|
||||||
@ -1296,8 +1311,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
|
|||||||
size_t last_index_mark = 0;
|
size_t last_index_mark = 0;
|
||||||
|
|
||||||
PostingsCacheForStore cache_in_store;
|
PostingsCacheForStore cache_in_store;
|
||||||
|
if (dynamic_cast<const MergeTreeIndexFullText *>(index_helper.get()))
|
||||||
if (dynamic_cast<const MergeTreeIndexFullText *>(&*index_helper) != nullptr)
|
|
||||||
cache_in_store.store = GinIndexStoreFactory::instance().get(index_helper->getFileName(), part->getDataPartStoragePtr());
|
cache_in_store.store = GinIndexStoreFactory::instance().get(index_helper->getFileName(), part->getDataPartStoragePtr());
|
||||||
|
|
||||||
for (size_t i = 0; i < ranges.size(); ++i)
|
for (size_t i = 0; i < ranges.size(); ++i)
|
||||||
@ -1315,12 +1329,12 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
|
|||||||
auto ann_condition = std::dynamic_pointer_cast<IMergeTreeIndexConditionApproximateNearestNeighbor>(condition);
|
auto ann_condition = std::dynamic_pointer_cast<IMergeTreeIndexConditionApproximateNearestNeighbor>(condition);
|
||||||
if (ann_condition != nullptr)
|
if (ann_condition != nullptr)
|
||||||
{
|
{
|
||||||
// vector of indexes of useful ranges
|
/// An array of indices of useful ranges.
|
||||||
auto result = ann_condition->getUsefulRanges(granule);
|
auto result = ann_condition->getUsefulRanges(granule);
|
||||||
|
|
||||||
for (auto range : result)
|
for (auto range : result)
|
||||||
{
|
{
|
||||||
// range for corresponding index
|
/// The range for the corresponding index.
|
||||||
MarkRange data_range(
|
MarkRange data_range(
|
||||||
std::max(ranges[i].begin, index_mark * index_granularity + range),
|
std::max(ranges[i].begin, index_mark * index_granularity + range),
|
||||||
std::min(ranges[i].end, index_mark * index_granularity + range + 1));
|
std::min(ranges[i].end, index_mark * index_granularity + range + 1));
|
||||||
@ -1344,8 +1358,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
|
|||||||
continue;
|
continue;
|
||||||
|
|
||||||
MarkRange data_range(
|
MarkRange data_range(
|
||||||
std::max(ranges[i].begin, index_mark * index_granularity),
|
std::max(ranges[i].begin, index_mark * index_granularity),
|
||||||
std::min(ranges[i].end, (index_mark + 1) * index_granularity));
|
std::min(ranges[i].end, (index_mark + 1) * index_granularity));
|
||||||
|
|
||||||
if (res.empty() || data_range.begin - res.back().end > min_marks_for_seek)
|
if (res.empty() || data_range.begin - res.back().end > min_marks_for_seek)
|
||||||
res.push_back(data_range);
|
res.push_back(data_range);
|
||||||
|
@ -35,8 +35,7 @@ MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
|
|||||||
size_t max_rows_)
|
size_t max_rows_)
|
||||||
: index_name(index_name_)
|
: index_name(index_name_)
|
||||||
, max_rows(max_rows_)
|
, max_rows(max_rows_)
|
||||||
, index_sample_block(index_sample_block_)
|
, block(index_sample_block_.cloneEmpty())
|
||||||
, block(index_sample_block)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -47,8 +46,7 @@ MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
|
|||||||
MutableColumns && mutable_columns_)
|
MutableColumns && mutable_columns_)
|
||||||
: index_name(index_name_)
|
: index_name(index_name_)
|
||||||
, max_rows(max_rows_)
|
, max_rows(max_rows_)
|
||||||
, index_sample_block(index_sample_block_)
|
, block(index_sample_block_.cloneWithColumns(std::move(mutable_columns_)))
|
||||||
, block(index_sample_block.cloneWithColumns(std::move(mutable_columns_)))
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,10 +65,11 @@ void MergeTreeIndexGranuleSet::serializeBinary(WriteBuffer & ostr) const
|
|||||||
}
|
}
|
||||||
|
|
||||||
size_serialization->serializeBinary(size(), ostr, {});
|
size_serialization->serializeBinary(size(), ostr, {});
|
||||||
|
size_t num_columns = block.columns();
|
||||||
|
|
||||||
for (size_t i = 0; i < index_sample_block.columns(); ++i)
|
for (size_t i = 0; i < num_columns; ++i)
|
||||||
{
|
{
|
||||||
const auto & type = index_sample_block.getByPosition(i).type;
|
const auto & type = block.getByPosition(i).type;
|
||||||
|
|
||||||
ISerialization::SerializeBinaryBulkSettings settings;
|
ISerialization::SerializeBinaryBulkSettings settings;
|
||||||
settings.getter = [&ostr](ISerialization::SubstreamPath) -> WriteBuffer * { return &ostr; };
|
settings.getter = [&ostr](ISerialization::SubstreamPath) -> WriteBuffer * { return &ostr; };
|
||||||
@ -92,8 +91,6 @@ void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr, MergeTreeInd
|
|||||||
if (version != 1)
|
if (version != 1)
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown index version {}.", version);
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown index version {}.", version);
|
||||||
|
|
||||||
block.clear();
|
|
||||||
|
|
||||||
Field field_rows;
|
Field field_rows;
|
||||||
const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>());
|
const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>());
|
||||||
size_type->getDefaultSerialization()->deserializeBinary(field_rows, istr, {});
|
size_type->getDefaultSerialization()->deserializeBinary(field_rows, istr, {});
|
||||||
@ -102,24 +99,22 @@ void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr, MergeTreeInd
|
|||||||
if (rows_to_read == 0)
|
if (rows_to_read == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
for (size_t i = 0; i < index_sample_block.columns(); ++i)
|
size_t num_columns = block.columns();
|
||||||
|
|
||||||
|
ISerialization::DeserializeBinaryBulkSettings settings;
|
||||||
|
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
|
||||||
|
settings.position_independent_encoding = false;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < num_columns; ++i)
|
||||||
{
|
{
|
||||||
const auto & column = index_sample_block.getByPosition(i);
|
auto & elem = block.getByPosition(i);
|
||||||
const auto & type = column.type;
|
elem.column = elem.column->cloneEmpty();
|
||||||
ColumnPtr new_column = type->createColumn();
|
|
||||||
|
|
||||||
|
|
||||||
ISerialization::DeserializeBinaryBulkSettings settings;
|
|
||||||
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
|
|
||||||
settings.position_independent_encoding = false;
|
|
||||||
|
|
||||||
ISerialization::DeserializeBinaryBulkStatePtr state;
|
ISerialization::DeserializeBinaryBulkStatePtr state;
|
||||||
auto serialization = type->getDefaultSerialization();
|
auto serialization = elem.type->getDefaultSerialization();
|
||||||
|
|
||||||
serialization->deserializeBinaryBulkStatePrefix(settings, state);
|
serialization->deserializeBinaryBulkStatePrefix(settings, state);
|
||||||
serialization->deserializeBinaryBulkWithMultipleStreams(new_column, rows_to_read, settings, state, nullptr);
|
serialization->deserializeBinaryBulkWithMultipleStreams(elem.column, rows_to_read, settings, state, nullptr);
|
||||||
|
|
||||||
block.insert(ColumnWithTypeAndName(new_column, type, column.name));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,6 +267,8 @@ MergeTreeIndexConditionSet::MergeTreeIndexConditionSet(
|
|||||||
|
|
||||||
filter_actions_dag->removeUnusedActions();
|
filter_actions_dag->removeUnusedActions();
|
||||||
actions = std::make_shared<ExpressionActions>(filter_actions_dag);
|
actions = std::make_shared<ExpressionActions>(filter_actions_dag);
|
||||||
|
|
||||||
|
actions_output_column_name = filter_actions_dag->getOutputs().at(0)->result_name;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MergeTreeIndexConditionSet::alwaysUnknownOrTrue() const
|
bool MergeTreeIndexConditionSet::alwaysUnknownOrTrue() const
|
||||||
@ -284,42 +281,19 @@ bool MergeTreeIndexConditionSet::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx
|
|||||||
if (isUseless())
|
if (isUseless())
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
auto granule = std::dynamic_pointer_cast<MergeTreeIndexGranuleSet>(idx_granule);
|
const MergeTreeIndexGranuleSet & granule = assert_cast<const MergeTreeIndexGranuleSet &>(*idx_granule);
|
||||||
if (!granule)
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
|
||||||
"Set index condition got a granule with the wrong type");
|
|
||||||
|
|
||||||
if (isUseless() || granule->empty() || (max_rows != 0 && granule->size() > max_rows))
|
size_t size = granule.size();
|
||||||
|
if (size == 0 || (max_rows != 0 && size > max_rows))
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
Block result = granule->block;
|
Block result = granule.block;
|
||||||
actions->execute(result);
|
actions->execute(result);
|
||||||
|
|
||||||
const auto & filter_node_name = actions->getActionsDAG().getOutputs().at(0)->result_name;
|
const auto & column = result.getByName(actions_output_column_name).column;
|
||||||
auto column = result.getByName(filter_node_name).column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality();
|
|
||||||
|
|
||||||
if (column->onlyNull())
|
for (size_t i = 0; i < size; ++i)
|
||||||
return false;
|
if (!column->isNullAt(i) && (column->get64(i) & 1))
|
||||||
|
|
||||||
const auto * col_uint8 = typeid_cast<const ColumnUInt8 *>(column.get());
|
|
||||||
|
|
||||||
const NullMap * null_map = nullptr;
|
|
||||||
|
|
||||||
if (const auto * col_nullable = checkAndGetColumn<ColumnNullable>(&*column))
|
|
||||||
{
|
|
||||||
col_uint8 = typeid_cast<const ColumnUInt8 *>(&col_nullable->getNestedColumn());
|
|
||||||
null_map = &col_nullable->getNullMapData();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!col_uint8)
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
|
||||||
"ColumnUInt8 expected as Set index condition result");
|
|
||||||
|
|
||||||
const auto & condition = col_uint8->getData();
|
|
||||||
size_t column_size = column->size();
|
|
||||||
|
|
||||||
for (size_t i = 0; i < column_size; ++i)
|
|
||||||
if ((!null_map || (*null_map)[i] == 0) && condition[i] & 1)
|
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
@ -34,7 +34,6 @@ struct MergeTreeIndexGranuleSet final : public IMergeTreeIndexGranule
|
|||||||
|
|
||||||
const String index_name;
|
const String index_name;
|
||||||
const size_t max_rows;
|
const size_t max_rows;
|
||||||
const Block index_sample_block;
|
|
||||||
|
|
||||||
Block block;
|
Block block;
|
||||||
};
|
};
|
||||||
@ -127,6 +126,7 @@ private:
|
|||||||
|
|
||||||
std::unordered_set<String> key_columns;
|
std::unordered_set<String> key_columns;
|
||||||
ExpressionActionsPtr actions;
|
ExpressionActionsPtr actions;
|
||||||
|
String actions_output_column_name;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <Common/CopyableAtomic.h>
|
||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
#include <Common/ZooKeeper/Types.h>
|
#include <Common/ZooKeeper/Types.h>
|
||||||
#include <base/types.h>
|
#include <base/types.h>
|
||||||
@ -9,7 +10,6 @@
|
|||||||
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
|
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
|
||||||
#include <Disks/IDisk.h>
|
#include <Disks/IDisk.h>
|
||||||
|
|
||||||
#include <mutex>
|
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
|
|
||||||
|
|
||||||
@ -174,7 +174,7 @@ struct ReplicatedMergeTreeLogEntryData
|
|||||||
size_t quorum = 0;
|
size_t quorum = 0;
|
||||||
|
|
||||||
/// Used only in tests for permanent fault injection for particular queue entry.
|
/// Used only in tests for permanent fault injection for particular queue entry.
|
||||||
bool fault_injected = false;
|
CopyableAtomic<bool> fault_injected{false};
|
||||||
|
|
||||||
/// If this MUTATE_PART entry caused by alter(modify/drop) query.
|
/// If this MUTATE_PART entry caused by alter(modify/drop) query.
|
||||||
bool isAlterMutation() const
|
bool isAlterMutation() const
|
||||||
|
@ -302,8 +302,8 @@ void registerStorageAzureBlob(StorageFactory & factory)
|
|||||||
auto settings = StorageAzureBlob::createSettings(args.getContext());
|
auto settings = StorageAzureBlob::createSettings(args.getContext());
|
||||||
|
|
||||||
return std::make_shared<StorageAzureBlob>(
|
return std::make_shared<StorageAzureBlob>(
|
||||||
std::move(configuration),
|
configuration,
|
||||||
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings),configuration.container),
|
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
|
||||||
args.getContext(),
|
args.getContext(),
|
||||||
args.table_id,
|
args.table_id,
|
||||||
args.columns,
|
args.columns,
|
||||||
@ -491,6 +491,12 @@ Poco::URI StorageAzureBlob::Configuration::getConnectionURL() const
|
|||||||
return Poco::URI(parsed_connection_string.BlobServiceUrl.GetAbsoluteUrl());
|
return Poco::URI(parsed_connection_string.BlobServiceUrl.GetAbsoluteUrl());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string StorageAzureBlob::Configuration::getConnectionURLWithContainer() const
|
||||||
|
{
|
||||||
|
auto url = getConnectionURL();
|
||||||
|
return fs::path(url.toString()) / container;
|
||||||
|
}
|
||||||
|
|
||||||
bool StorageAzureBlob::Configuration::withGlobsIgnorePartitionWildcard() const
|
bool StorageAzureBlob::Configuration::withGlobsIgnorePartitionWildcard() const
|
||||||
{
|
{
|
||||||
if (!withPartitionWildcard())
|
if (!withPartitionWildcard())
|
||||||
|
@ -45,6 +45,8 @@ public:
|
|||||||
|
|
||||||
Poco::URI getConnectionURL() const;
|
Poco::URI getConnectionURL() const;
|
||||||
|
|
||||||
|
std::string getConnectionURLWithContainer() const;
|
||||||
|
|
||||||
std::string connection_url;
|
std::string connection_url;
|
||||||
bool is_connection_string;
|
bool is_connection_string;
|
||||||
|
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
#include <IO/PeekableReadBuffer.h>
|
#include <IO/PeekableReadBuffer.h>
|
||||||
#include <IO/AsynchronousReadBufferFromFile.h>
|
#include <IO/AsynchronousReadBufferFromFile.h>
|
||||||
#include <Disks/IO/IOUringReader.h>
|
#include <Disks/IO/IOUringReader.h>
|
||||||
|
#include <Disks/IO/getIOUringReader.h>
|
||||||
|
|
||||||
#include <Formats/FormatFactory.h>
|
#include <Formats/FormatFactory.h>
|
||||||
#include <Formats/ReadSchemaUtils.h>
|
#include <Formats/ReadSchemaUtils.h>
|
||||||
@ -282,10 +283,7 @@ std::unique_ptr<ReadBuffer> selectReadBuffer(
|
|||||||
else if (read_method == LocalFSReadMethod::io_uring && !use_table_fd)
|
else if (read_method == LocalFSReadMethod::io_uring && !use_table_fd)
|
||||||
{
|
{
|
||||||
#if USE_LIBURING
|
#if USE_LIBURING
|
||||||
auto & reader = context->getIOURingReader();
|
auto & reader = getIOUringReaderOrThrow(context);
|
||||||
if (!reader.isSupported())
|
|
||||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
|
|
||||||
|
|
||||||
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
||||||
reader,
|
reader,
|
||||||
Priority{},
|
Priority{},
|
||||||
|
@ -193,6 +193,7 @@ StorageMaterializedView::StorageMaterializedView(
|
|||||||
|
|
||||||
if (query.refresh_strategy)
|
if (query.refresh_strategy)
|
||||||
{
|
{
|
||||||
|
fixed_uuid = false;
|
||||||
refresher = RefreshTask::create(
|
refresher = RefreshTask::create(
|
||||||
*this,
|
*this,
|
||||||
getContext(),
|
getContext(),
|
||||||
@ -687,10 +688,14 @@ void StorageMaterializedView::onActionLockRemove(StorageActionBlockType action_t
|
|||||||
refresher->start();
|
refresher->start();
|
||||||
}
|
}
|
||||||
|
|
||||||
DB::StorageID StorageMaterializedView::getTargetTableId() const
|
StorageID StorageMaterializedView::getTargetTableId() const
|
||||||
{
|
{
|
||||||
std::lock_guard guard(target_table_id_mutex);
|
std::lock_guard guard(target_table_id_mutex);
|
||||||
return target_table_id;
|
auto id = target_table_id;
|
||||||
|
/// TODO: Avoid putting uuid into target_table_id in the first place, instead of clearing it here.
|
||||||
|
if (!fixed_uuid)
|
||||||
|
id.uuid = UUIDHelpers::Nil;
|
||||||
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageMaterializedView::setTargetTableId(DB::StorageID id)
|
void StorageMaterializedView::setTargetTableId(DB::StorageID id)
|
||||||
|
@ -110,6 +110,10 @@ private:
|
|||||||
|
|
||||||
bool has_inner_table = false;
|
bool has_inner_table = false;
|
||||||
|
|
||||||
|
/// If false, inner table is replaced on each refresh. In that case, target_table_id doesn't
|
||||||
|
/// have UUID, and we do inner table lookup by name instead.
|
||||||
|
bool fixed_uuid = true;
|
||||||
|
|
||||||
friend class RefreshTask;
|
friend class RefreshTask;
|
||||||
|
|
||||||
void checkStatementCanBeForwarded() const;
|
void checkStatementCanBeForwarded() const;
|
||||||
|
@ -141,6 +141,8 @@ public:
|
|||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToGroupIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
|
setThreadName("SystemReplicas");
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
ReplicatedTableStatus status;
|
ReplicatedTableStatus status;
|
||||||
|
@ -333,7 +333,7 @@ ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(Contex
|
|||||||
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
|
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
|
||||||
auto settings = StorageAzureBlob::createSettings(context);
|
auto settings = StorageAzureBlob::createSettings(context);
|
||||||
|
|
||||||
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings), configuration.container);
|
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer());
|
||||||
if (configuration.format == "auto")
|
if (configuration.format == "auto")
|
||||||
return StorageAzureBlob::getTableStructureAndFormatFromData(object_storage.get(), configuration, std::nullopt, context).first;
|
return StorageAzureBlob::getTableStructureAndFormatFromData(object_storage.get(), configuration, std::nullopt, context).first;
|
||||||
return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context);
|
return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context);
|
||||||
@ -365,7 +365,7 @@ StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_funct
|
|||||||
|
|
||||||
StoragePtr storage = std::make_shared<StorageAzureBlob>(
|
StoragePtr storage = std::make_shared<StorageAzureBlob>(
|
||||||
configuration,
|
configuration,
|
||||||
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
|
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
|
||||||
context,
|
context,
|
||||||
StorageID(getDatabaseName(), table_name),
|
StorageID(getDatabaseName(), table_name),
|
||||||
columns,
|
columns,
|
||||||
|
@ -39,7 +39,7 @@ StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
|
|||||||
/// On worker node this filename won't contains globs
|
/// On worker node this filename won't contains globs
|
||||||
storage = std::make_shared<StorageAzureBlob>(
|
storage = std::make_shared<StorageAzureBlob>(
|
||||||
configuration,
|
configuration,
|
||||||
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
|
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
|
||||||
context,
|
context,
|
||||||
StorageID(getDatabaseName(), table_name),
|
StorageID(getDatabaseName(), table_name),
|
||||||
columns,
|
columns,
|
||||||
@ -54,7 +54,7 @@ StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
|
|||||||
storage = std::make_shared<StorageAzureBlobCluster>(
|
storage = std::make_shared<StorageAzureBlobCluster>(
|
||||||
cluster_name,
|
cluster_name,
|
||||||
configuration,
|
configuration,
|
||||||
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
|
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
|
||||||
StorageID(getDatabaseName(), table_name),
|
StorageID(getDatabaseName(), table_name),
|
||||||
columns,
|
columns,
|
||||||
ConstraintsDescription{},
|
ConstraintsDescription{},
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
#!/usr/bin/env python3
|
215
tests/integration/test_azure_blob_storage_native_copy/test.py
Normal file
215
tests/integration/test_azure_blob_storage_native_copy/test.py
Normal file
@ -0,0 +1,215 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import gzip
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import io
|
||||||
|
import random
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
from azure.storage.blob import BlobServiceClient
|
||||||
|
import helpers.client
|
||||||
|
import pytest
|
||||||
|
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
|
||||||
|
from helpers.network import PartitionManager
|
||||||
|
from helpers.mock_servers import start_mock_servers
|
||||||
|
from helpers.test_tools import exec_query_with_retry
|
||||||
|
|
||||||
|
|
||||||
|
def generate_config(port):
|
||||||
|
path = os.path.join(
|
||||||
|
os.path.dirname(os.path.realpath(__file__)),
|
||||||
|
"./_gen/storage_conf.xml",
|
||||||
|
)
|
||||||
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||||
|
with open(path, "w") as f:
|
||||||
|
TEMPLATE = """
|
||||||
|
<clickhouse>
|
||||||
|
<storage_configuration>
|
||||||
|
<disks>
|
||||||
|
<disk_azure>
|
||||||
|
<metadata_type>local</metadata_type>
|
||||||
|
<type>object_storage</type>
|
||||||
|
<object_storage_type>azure_blob_storage</object_storage_type>
|
||||||
|
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
|
||||||
|
<container_name>cont</container_name>
|
||||||
|
<skip_access_check>false</skip_access_check>
|
||||||
|
<account_name>devstoreaccount1</account_name>
|
||||||
|
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
|
||||||
|
<use_native_copy>true</use_native_copy>
|
||||||
|
</disk_azure>
|
||||||
|
<disk_azure_other_bucket>
|
||||||
|
<metadata_type>local</metadata_type>
|
||||||
|
<type>object_storage</type>
|
||||||
|
<object_storage_type>azure_blob_storage</object_storage_type>
|
||||||
|
<use_native_copy>true</use_native_copy>
|
||||||
|
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
|
||||||
|
<container_name>othercontainer</container_name>
|
||||||
|
<skip_access_check>false</skip_access_check>
|
||||||
|
<account_name>devstoreaccount1</account_name>
|
||||||
|
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
|
||||||
|
</disk_azure_other_bucket>
|
||||||
|
<disk_azure_cache>
|
||||||
|
<type>cache</type>
|
||||||
|
<disk>disk_azure</disk>
|
||||||
|
<path>/tmp/azure_cache/</path>
|
||||||
|
<max_size>1000000000</max_size>
|
||||||
|
<cache_on_write_operations>1</cache_on_write_operations>
|
||||||
|
</disk_azure_cache>
|
||||||
|
</disks>
|
||||||
|
<policies>
|
||||||
|
<policy_azure>
|
||||||
|
<volumes>
|
||||||
|
<main>
|
||||||
|
<disk>disk_azure</disk>
|
||||||
|
</main>
|
||||||
|
</volumes>
|
||||||
|
</policy_azure>
|
||||||
|
<policy_azure_other_bucket>
|
||||||
|
<volumes>
|
||||||
|
<main>
|
||||||
|
<disk>disk_azure_other_bucket</disk>
|
||||||
|
</main>
|
||||||
|
</volumes>
|
||||||
|
</policy_azure_other_bucket>
|
||||||
|
<policy_azure_cache>
|
||||||
|
<volumes>
|
||||||
|
<main>
|
||||||
|
<disk>disk_azure_cache</disk>
|
||||||
|
</main>
|
||||||
|
</volumes>
|
||||||
|
</policy_azure_cache>
|
||||||
|
</policies>
|
||||||
|
</storage_configuration>
|
||||||
|
<backups>
|
||||||
|
<allowed_disk>disk_azure</allowed_disk>
|
||||||
|
<allowed_disk>disk_azure_cache</allowed_disk>
|
||||||
|
<allowed_disk>disk_azure_other_bucket</allowed_disk>
|
||||||
|
</backups>
|
||||||
|
</clickhouse>
|
||||||
|
"""
|
||||||
|
f.write(TEMPLATE.format(port=port))
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def cluster():
|
||||||
|
try:
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
port = cluster.azurite_port
|
||||||
|
path = generate_config(port)
|
||||||
|
cluster.add_instance(
|
||||||
|
"node1",
|
||||||
|
main_configs=[path],
|
||||||
|
with_azurite=True,
|
||||||
|
)
|
||||||
|
cluster.add_instance(
|
||||||
|
"node2",
|
||||||
|
main_configs=[path],
|
||||||
|
with_azurite=True,
|
||||||
|
)
|
||||||
|
cluster.start()
|
||||||
|
|
||||||
|
yield cluster
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def azure_query(
|
||||||
|
node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None
|
||||||
|
):
|
||||||
|
for i in range(try_num):
|
||||||
|
try:
|
||||||
|
if expect_error:
|
||||||
|
return node.query_and_get_error(query, settings=settings)
|
||||||
|
else:
|
||||||
|
return node.query(query, settings=settings)
|
||||||
|
except Exception as ex:
|
||||||
|
retriable_errors = [
|
||||||
|
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
|
||||||
|
"DB::Exception: Azure::Core::Http::TransportException: Connection closed before getting full response or response is less than expected",
|
||||||
|
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
|
||||||
|
"DB::Exception: Azure::Core::Http::TransportException: Error while polling for socket ready read",
|
||||||
|
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
|
||||||
|
"Azure::Core::Http::TransportException, e.what() = Connection closed before getting full response or response is less than expected",
|
||||||
|
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
|
||||||
|
"Azure::Core::Http::TransportException, e.what() = Error while polling for socket ready read",
|
||||||
|
]
|
||||||
|
retry = False
|
||||||
|
for error in retriable_errors:
|
||||||
|
if error in str(ex):
|
||||||
|
retry = True
|
||||||
|
print(f"Try num: {i}. Having retriable error: {ex}")
|
||||||
|
time.sleep(i)
|
||||||
|
break
|
||||||
|
if not retry or i == try_num - 1:
|
||||||
|
raise Exception(ex)
|
||||||
|
if query_on_retry is not None:
|
||||||
|
node.query(query_on_retry)
|
||||||
|
continue
|
||||||
|
|
||||||
|
|
||||||
|
def test_backup_restore_on_merge_tree_same_container(cluster):
|
||||||
|
node1 = cluster.instances["node1"]
|
||||||
|
azure_query(
|
||||||
|
node1,
|
||||||
|
f"CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_cache'",
|
||||||
|
)
|
||||||
|
azure_query(node1, f"INSERT INTO test_simple_merge_tree VALUES (1, 'a')")
|
||||||
|
|
||||||
|
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_backup')"
|
||||||
|
print("BACKUP DEST", backup_destination)
|
||||||
|
azure_query(
|
||||||
|
node1,
|
||||||
|
f"BACKUP TABLE test_simple_merge_tree TO {backup_destination}",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert node1.contains_in_log("using native copy")
|
||||||
|
|
||||||
|
azure_query(
|
||||||
|
node1,
|
||||||
|
f"RESTORE TABLE test_simple_merge_tree AS test_simple_merge_tree_restored FROM {backup_destination};",
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
azure_query(node1, f"SELECT * from test_simple_merge_tree_restored") == "1\ta\n"
|
||||||
|
)
|
||||||
|
azure_query(node1, f"DROP TABLE test_simple_merge_tree")
|
||||||
|
azure_query(node1, f"DROP TABLE test_simple_merge_tree_restored")
|
||||||
|
|
||||||
|
|
||||||
|
def test_backup_restore_on_merge_tree_different_container(cluster):
|
||||||
|
node2 = cluster.instances["node2"]
|
||||||
|
azure_query(
|
||||||
|
node2,
|
||||||
|
f"CREATE TABLE test_simple_merge_tree_different_bucket(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_other_bucket'",
|
||||||
|
)
|
||||||
|
azure_query(
|
||||||
|
node2, f"INSERT INTO test_simple_merge_tree_different_bucket VALUES (1, 'a')"
|
||||||
|
)
|
||||||
|
|
||||||
|
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_different_bucket_backup_different_bucket')"
|
||||||
|
print("BACKUP DEST", backup_destination)
|
||||||
|
azure_query(
|
||||||
|
node2,
|
||||||
|
f"BACKUP TABLE test_simple_merge_tree_different_bucket TO {backup_destination}",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert not node2.contains_in_log("using native copy")
|
||||||
|
|
||||||
|
azure_query(
|
||||||
|
node2,
|
||||||
|
f"RESTORE TABLE test_simple_merge_tree_different_bucket AS test_simple_merge_tree_different_bucket_restored FROM {backup_destination};",
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
azure_query(
|
||||||
|
node2, f"SELECT * from test_simple_merge_tree_different_bucket_restored"
|
||||||
|
)
|
||||||
|
== "1\ta\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert not node2.contains_in_log("using native copy")
|
||||||
|
|
||||||
|
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket")
|
||||||
|
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket_restored")
|
14
tests/performance/set_index_analysis.xml
Normal file
14
tests/performance/set_index_analysis.xml
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
<test>
|
||||||
|
<create_query>
|
||||||
|
CREATE TABLE test_set (k UInt32, x UInt32, INDEX idx (x) TYPE set(10) GRANULARITY 1) ENGINE = MergeTree ORDER BY k SETTINGS index_granularity = 111;
|
||||||
|
</create_query>
|
||||||
|
<fill_query>SYSTEM STOP MERGES</fill_query>
|
||||||
|
<fill_query>INSERT INTO test_set SELECT number, number DIV 100 + rand() % 7 FROM numbers(3000000) SETTINGS max_insert_threads = 4;</fill_query>
|
||||||
|
|
||||||
|
<query>
|
||||||
|
SELECT count() FROM test_set WHERE x = 1234 SETTINGS max_threads = 8;
|
||||||
|
</query>
|
||||||
|
|
||||||
|
<drop_query>SYSTEM START MERGES</drop_query>
|
||||||
|
<drop_query>DROP TABLE IF EXISTS test_set</drop_query>
|
||||||
|
</test>
|
@ -0,0 +1,4 @@
|
|||||||
|
simple_with_analyzer ['default.03148_dictionary']
|
||||||
|
nested_with_analyzer ['default.03148_dictionary']
|
||||||
|
simple_without_analyzer ['default.03148_dictionary']
|
||||||
|
nested_without_analyzer ['default.03148_dictionary']
|
@ -0,0 +1,84 @@
|
|||||||
|
DROP DICTIONARY IF EXISTS 03148_dictionary;
|
||||||
|
|
||||||
|
CREATE DICTIONARY 03148_dictionary (
|
||||||
|
id UInt64,
|
||||||
|
name String
|
||||||
|
)
|
||||||
|
PRIMARY KEY id
|
||||||
|
SOURCE(CLICKHOUSE(
|
||||||
|
QUERY 'select 0 as id, ''name0'' as name'
|
||||||
|
))
|
||||||
|
LIFETIME(MIN 1 MAX 10)
|
||||||
|
LAYOUT(HASHED);
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
dictGet('03148_dictionary', 'name', number) as dict_value
|
||||||
|
FROM numbers(1)
|
||||||
|
SETTINGS
|
||||||
|
allow_experimental_analyzer = 1,
|
||||||
|
log_comment = 'simple_with_analyzer'
|
||||||
|
FORMAT Null;
|
||||||
|
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
|
SELECT log_comment, used_dictionaries
|
||||||
|
FROM system.query_log
|
||||||
|
WHERE current_database = currentDatabase()
|
||||||
|
AND type = 'QueryFinish'
|
||||||
|
AND log_comment = 'simple_with_analyzer';
|
||||||
|
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
dictGet('03148_dictionary', 'name', number) as dict_value
|
||||||
|
FROM numbers(1)
|
||||||
|
) t
|
||||||
|
SETTINGS
|
||||||
|
allow_experimental_analyzer = 1,
|
||||||
|
log_comment = 'nested_with_analyzer'
|
||||||
|
FORMAT Null;
|
||||||
|
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
|
SELECT log_comment, used_dictionaries
|
||||||
|
FROM system.query_log
|
||||||
|
WHERE current_database = currentDatabase()
|
||||||
|
AND type = 'QueryFinish'
|
||||||
|
AND log_comment = 'nested_with_analyzer';
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
dictGet('03148_dictionary', 'name', number) as dict_value
|
||||||
|
FROM numbers(1)
|
||||||
|
SETTINGS
|
||||||
|
allow_experimental_analyzer = 0,
|
||||||
|
log_comment = 'simple_without_analyzer'
|
||||||
|
FORMAT Null;
|
||||||
|
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
|
SELECT log_comment, used_dictionaries
|
||||||
|
FROM system.query_log
|
||||||
|
WHERE current_database = currentDatabase()
|
||||||
|
AND type = 'QueryFinish'
|
||||||
|
AND log_comment = 'simple_without_analyzer';
|
||||||
|
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
dictGet('03148_dictionary', 'name', number) as dict_value
|
||||||
|
FROM numbers(1)
|
||||||
|
) t
|
||||||
|
SETTINGS
|
||||||
|
allow_experimental_analyzer = 0,
|
||||||
|
log_comment = 'nested_without_analyzer'
|
||||||
|
FORMAT Null;
|
||||||
|
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
|
SELECT log_comment, used_dictionaries
|
||||||
|
FROM system.query_log
|
||||||
|
WHERE current_database = currentDatabase()
|
||||||
|
AND type = 'QueryFinish'
|
||||||
|
AND log_comment = 'nested_without_analyzer';
|
||||||
|
|
||||||
|
DROP DICTIONARY IF EXISTS 03148_dictionary;
|
@ -0,0 +1,43 @@
|
|||||||
|
┌─────────x─┐
|
||||||
|
1. │ 123456789 │ -- 123.46 million
|
||||||
|
└───────────┘
|
||||||
|
┌─────────x─┐
|
||||||
|
1. │ 123456789 │ -- 123.46 million
|
||||||
|
└───────────┘
|
||||||
|
┌─────────x─┐
|
||||||
|
1. │ 123456789 │ -- 123.46 million
|
||||||
|
└───────────┘
|
||||||
|
┌─────────x─┐
|
||||||
|
1. │ 123456789 │ -- 123.46 million
|
||||||
|
└───────────┘
|
||||||
|
┌─────────x─┐
|
||||||
|
1. │ 123456789 │ -- 123.46 million
|
||||||
|
└───────────┘
|
||||||
|
Nullable(UInt64), Nullable(size = 10, UInt64(size = 10), UInt8(size = 10))
|
||||||
|
┏━━━━━━━━━━━━┓
|
||||||
|
┃ x ┃
|
||||||
|
┡━━━━━━━━━━━━┩
|
||||||
|
1. │ 1111111101 │ -- 1.11 billion
|
||||||
|
└────────────┘
|
||||||
|
┏━━━━━━━━━━━┓
|
||||||
|
┃ x ┃
|
||||||
|
┡━━━━━━━━━━━┩
|
||||||
|
1. │ 123456789 │ -- 123.46 million
|
||||||
|
└───────────┘
|
||||||
|
x
|
||||||
|
|
||||||
|
1. ᴺᵁᴸᴸ
|
||||||
|
UInt64, Sparse(size = 10, UInt64(size = 6), UInt64(size = 5))
|
||||||
|
┏━━━━━━━━━━━━┓
|
||||||
|
┃ x ┃
|
||||||
|
┡━━━━━━━━━━━━┩
|
||||||
|
1. │ 1111111101 │ -- 1.11 billion
|
||||||
|
└────────────┘
|
||||||
|
┏━━━┓
|
||||||
|
┃ x ┃
|
||||||
|
┡━━━┩
|
||||||
|
1. │ 0 │
|
||||||
|
└───┘
|
||||||
|
x
|
||||||
|
|
||||||
|
1. 0
|
24
tests/queries/0_stateless/03156_nullable_number_tips.sql
Normal file
24
tests/queries/0_stateless/03156_nullable_number_tips.sql
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
SELECT 123456789 AS x FORMAT PrettyCompact;
|
||||||
|
SELECT toNullable(123456789) AS x FORMAT PrettyCompact;
|
||||||
|
SELECT toLowCardinality(toNullable(123456789)) AS x FORMAT PrettyCompact;
|
||||||
|
SELECT toNullable(toLowCardinality(123456789)) AS x FORMAT PrettyCompact;
|
||||||
|
SELECT toLowCardinality(123456789) AS x FORMAT PrettyCompact;
|
||||||
|
|
||||||
|
CREATE TEMPORARY TABLE test (x Nullable(UInt64), PRIMARY KEY ()) ENGINE = MergeTree SETTINGS ratio_of_defaults_for_sparse_serialization = 0;
|
||||||
|
INSERT INTO test SELECT number % 2 ? number * 123456789 : NULL FROM numbers(10);
|
||||||
|
|
||||||
|
SELECT DISTINCT dumpColumnStructure(*) FROM test;
|
||||||
|
|
||||||
|
SELECT * FROM test ORDER BY ALL DESC NULLS LAST LIMIT 1 FORMAT PRETTY;
|
||||||
|
SELECT * FROM test ORDER BY ALL ASC NULLS LAST LIMIT 1 FORMAT PRETTY;
|
||||||
|
SELECT * FROM test ORDER BY ALL ASC NULLS FIRST LIMIT 1 FORMAT PrettySpace;
|
||||||
|
|
||||||
|
DROP TEMPORARY TABLE test;
|
||||||
|
CREATE TEMPORARY TABLE test (x UInt64, PRIMARY KEY ()) ENGINE = MergeTree SETTINGS ratio_of_defaults_for_sparse_serialization = 0;
|
||||||
|
INSERT INTO test SELECT number % 2 ? number * 123456789 : NULL FROM numbers(10);
|
||||||
|
|
||||||
|
SELECT DISTINCT dumpColumnStructure(*) FROM test;
|
||||||
|
|
||||||
|
SELECT * FROM test ORDER BY ALL DESC NULLS LAST LIMIT 1 FORMAT PRETTY;
|
||||||
|
SELECT * FROM test ORDER BY ALL ASC NULLS LAST LIMIT 1 FORMAT PRETTY;
|
||||||
|
SELECT * FROM test ORDER BY ALL ASC NULLS FIRST LIMIT 1 FORMAT PrettySpace;
|
@ -0,0 +1 @@
|
|||||||
|
SELECT 1 GROUP BY -9223372036854775808; -- { serverError BAD_ARGUMENTS }
|
@ -0,0 +1 @@
|
|||||||
|
100004
|
8
tests/queries/0_stateless/03160_pretty_format_tty.sh
Executable file
8
tests/queries/0_stateless/03160_pretty_format_tty.sh
Executable file
@ -0,0 +1,8 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CUR_DIR"/../shell_config.sh
|
||||||
|
|
||||||
|
# default output_format_pretty_max_rows is 10K
|
||||||
|
$CLICKHOUSE_LOCAL -q "select * from numbers(100e3) format PrettySpace settings max_threads=1" | wc -l
|
@ -7,6 +7,8 @@ export LC_ALL=C # The "total" should be printed without localization
|
|||||||
TU_EXCLUDES=(
|
TU_EXCLUDES=(
|
||||||
AggregateFunctionUniq
|
AggregateFunctionUniq
|
||||||
Aggregator
|
Aggregator
|
||||||
|
# FIXME: Exclude for now
|
||||||
|
FunctionsConversion
|
||||||
)
|
)
|
||||||
|
|
||||||
if find $1 -name '*.o' | xargs wc -c | grep --regexp='\.o$' | sort -rn | awk '{ if ($1 > 50000000) print }' \
|
if find $1 -name '*.o' | xargs wc -c | grep --regexp='\.o$' | sort -rn | awk '{ if ($1 > 50000000) print }' \
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
clickhouse_add_executable(clickhouse-zookeeper-cli
|
clickhouse_add_executable(clickhouse-zookeeper-cli
|
||||||
zookeeper-cli.cpp
|
zookeeper-cli.cpp
|
||||||
${ClickHouse_SOURCE_DIR}/src/Client/LineReader.cpp)
|
${ClickHouse_SOURCE_DIR}/src/Client/LineReader.cpp)
|
||||||
target_link_libraries(clickhouse-zookeeper-cli PRIVATE clickhouse_common_zookeeper_no_log)
|
target_link_libraries(clickhouse-zookeeper-cli PRIVATE
|
||||||
|
clickhouse_common_zookeeper_no_log
|
||||||
|
dbms)
|
||||||
|
@ -1,2 +1,6 @@
|
|||||||
clickhouse_add_executable (zookeeper-dump-tree main.cpp ${SRCS})
|
clickhouse_add_executable (zookeeper-dump-tree main.cpp ${SRCS})
|
||||||
target_link_libraries(zookeeper-dump-tree PRIVATE clickhouse_common_zookeeper_no_log clickhouse_common_io boost::program_options)
|
target_link_libraries(zookeeper-dump-tree PRIVATE
|
||||||
|
clickhouse_common_zookeeper_no_log
|
||||||
|
clickhouse_common_io
|
||||||
|
dbms
|
||||||
|
boost::program_options)
|
||||||
|
@ -1,2 +1,5 @@
|
|||||||
clickhouse_add_executable (zookeeper-remove-by-list main.cpp ${SRCS})
|
clickhouse_add_executable (zookeeper-remove-by-list main.cpp ${SRCS})
|
||||||
target_link_libraries(zookeeper-remove-by-list PRIVATE clickhouse_common_zookeeper_no_log boost::program_options)
|
target_link_libraries(zookeeper-remove-by-list PRIVATE
|
||||||
|
clickhouse_common_zookeeper_no_log
|
||||||
|
dbms
|
||||||
|
boost::program_options)
|
||||||
|
Loading…
Reference in New Issue
Block a user