Merge branch 'master' into fix-preadv2-with-nowait

This commit is contained in:
Antonio Andelic 2024-10-04 11:06:38 +02:00
commit 3f2fd5f374
48 changed files with 523 additions and 516 deletions

View File

@ -1 +1,4 @@
# See contrib/usearch-cmake/CMakeLists.txt
set (FP16_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/FP16/")
add_library(_fp16 INTERFACE)
target_include_directories(_fp16 SYSTEM INTERFACE ${FP16_PROJECT_DIR}/include)

2
contrib/SimSIMD vendored

@ -1 +1 @@
Subproject commit 91a76d1ac519b3b9dc8957734a3dabd985f00c26
Subproject commit ff51434d90c66f916e94ff05b24530b127aa4cff

View File

@ -1 +1,4 @@
# See contrib/usearch-cmake/CMakeLists.txt
set(SIMSIMD_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/SimSIMD")
add_library(_simsimd INTERFACE)
target_include_directories(_simsimd SYSTEM INTERFACE "${SIMSIMD_PROJECT_DIR}/include")

2
contrib/usearch vendored

@ -1 +1 @@
Subproject commit 7a8967cb442b08ca20c3dd781414378e65957d37
Subproject commit d1d33eac94acd3b628e0b446c927ec3295ef63c7

View File

@ -1,14 +1,9 @@
set(FP16_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/FP16")
set(SIMSIMD_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/SimSIMD")
set(USEARCH_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/usearch")
add_library(_usearch INTERFACE)
target_include_directories(_usearch SYSTEM INTERFACE ${USEARCH_PROJECT_DIR}/include)
target_include_directories(_usearch SYSTEM INTERFACE
${FP16_PROJECT_DIR}/include
${SIMSIMD_PROJECT_DIR}/include
${USEARCH_PROJECT_DIR}/include)
target_link_libraries(_usearch INTERFACE _fp16)
target_compile_definitions(_usearch INTERFACE USEARCH_USE_FP16LIB)
# target_compile_definitions(_usearch INTERFACE USEARCH_USE_SIMSIMD)

View File

@ -34,7 +34,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.9.1.3278"
ARG VERSION="24.9.2.42"
ARG PACKAGES="clickhouse-keeper"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -35,7 +35,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.9.1.3278"
ARG VERSION="24.9.2.42"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -28,7 +28,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="24.9.1.3278"
ARG VERSION="24.9.2.42"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
#docker-official-library:off

View File

@ -0,0 +1,33 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.9.2.42-stable (de7c791a2ea) FIXME as compared to v24.9.1.3278-stable (6d058d82a8e)
#### Improvement
* Backported in [#70091](https://github.com/ClickHouse/ClickHouse/issues/70091): Add `show_create_query_identifier_quoting_rule` to define identifier quoting behavior of the show create query result. Possible values: - `user_display`: When the identifiers is a keyword. - `when_necessary`: When the identifiers is one of `{"distinct", "all", "table"}`, or it can cause ambiguity: column names, dictionary attribute names. - `always`: Always quote identifiers. [#69448](https://github.com/ClickHouse/ClickHouse/pull/69448) ([tuanpach](https://github.com/tuanpach)).
* Backported in [#70100](https://github.com/ClickHouse/ClickHouse/issues/70100): Follow-up to https://github.com/ClickHouse/ClickHouse/pull/69346 Point 4 described there will work now as well:. [#69563](https://github.com/ClickHouse/ClickHouse/pull/69563) ([Vitaly Baranov](https://github.com/vitlibar)).
* Backported in [#70048](https://github.com/ClickHouse/ClickHouse/issues/70048): Add new column readonly_duration to the system.replicas table. Needed to be able to distinguish actual readonly replicas from sentinel ones in alerts. [#69871](https://github.com/ClickHouse/ClickHouse/pull/69871) ([Miсhael Stetsyuk](https://github.com/mstetsyuk)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Backported in [#70193](https://github.com/ClickHouse/ClickHouse/issues/70193): Fix crash when executing `create view t as (with recursive 42 as ttt select ttt);`. [#69676](https://github.com/ClickHouse/ClickHouse/pull/69676) ([Han Fei](https://github.com/hanfei1991)).
* Backported in [#70083](https://github.com/ClickHouse/ClickHouse/issues/70083): Closes [#69752](https://github.com/ClickHouse/ClickHouse/issues/69752). [#69985](https://github.com/ClickHouse/ClickHouse/pull/69985) ([pufit](https://github.com/pufit)).
* Backported in [#70070](https://github.com/ClickHouse/ClickHouse/issues/70070): Fixes `Block structure mismatch` for queries with nested views and `WHERE` condition. Fixes [#66209](https://github.com/ClickHouse/ClickHouse/issues/66209). [#70054](https://github.com/ClickHouse/ClickHouse/pull/70054) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#70168](https://github.com/ClickHouse/ClickHouse/issues/70168): Fix wrong LOGICAL_ERROR when replacing literals in ranges. [#70122](https://github.com/ClickHouse/ClickHouse/pull/70122) ([Pablo Marcos](https://github.com/pamarcos)).
* Backported in [#70238](https://github.com/ClickHouse/ClickHouse/issues/70238): Check for Nullable(Nothing) type during ALTER TABLE MODIFY COLUMN/QUERY to prevent tables with such data type. [#70123](https://github.com/ClickHouse/ClickHouse/pull/70123) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70205](https://github.com/ClickHouse/ClickHouse/issues/70205): Fix wrong result with skipping index. [#70127](https://github.com/ClickHouse/ClickHouse/pull/70127) ([Raúl Marín](https://github.com/Algunenano)).
* Backported in [#70185](https://github.com/ClickHouse/ClickHouse/issues/70185): Fix data race in ColumnObject/ColumnTuple decompress method that could lead to heap use after free. [#70137](https://github.com/ClickHouse/ClickHouse/pull/70137) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70253](https://github.com/ClickHouse/ClickHouse/issues/70253): Fix possible hung in ALTER COLUMN with Dynamic type. [#70144](https://github.com/ClickHouse/ClickHouse/pull/70144) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70230](https://github.com/ClickHouse/ClickHouse/issues/70230): Use correct `max_types` parameter during Dynamic type creation for JSON subcolumn. [#70147](https://github.com/ClickHouse/ClickHouse/pull/70147) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70217](https://github.com/ClickHouse/ClickHouse/issues/70217): Fix the password being displayed in `system.query_log` for users with bcrypt password authentication method. [#70148](https://github.com/ClickHouse/ClickHouse/pull/70148) ([Nikolay Degterinsky](https://github.com/evillique)).
* Backported in [#70267](https://github.com/ClickHouse/ClickHouse/issues/70267): Respect setting allow_simdjson in JSON type parser. [#70218](https://github.com/ClickHouse/ClickHouse/pull/70218) ([Pavel Kruglov](https://github.com/Avogar)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Backported in [#70052](https://github.com/ClickHouse/ClickHouse/issues/70052): Improve stateless test runner. [#69864](https://github.com/ClickHouse/ClickHouse/pull/69864) ([Alexey Katsman](https://github.com/alexkats)).
* Backported in [#70284](https://github.com/ClickHouse/ClickHouse/issues/70284): Improve pipdeptree generator for docker images. - Update requirements.txt for the integration tests runner container - Remove some small dependencies, improve `helpers/retry_decorator.py` - Upgrade docker-compose from EOL version 1 to version 2. [#70146](https://github.com/ClickHouse/ClickHouse/pull/70146) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Backported in [#70261](https://github.com/ClickHouse/ClickHouse/issues/70261): Update test_storage_s3_queue/test.py. [#70159](https://github.com/ClickHouse/ClickHouse/pull/70159) ([Kseniia Sumarokova](https://github.com/kssenii)).

View File

@ -316,6 +316,38 @@ Result:
Same as `toIPv4`, but if the IPv4 address has an invalid format, it returns null.
**Syntax**
```sql
toIPv4OrNull(value)
```
**Arguments**
- `value` — The value with IPv4 address.
**Returned value**
- `value` converted to the current IPv4 address. [String](../data-types/string.md).
**Example**
Query:
```sql
SELECT
toIPv4OrNull('192.168.0.1') AS s1,
toIPv4OrNull('192.168.0') AS s2
```
Result:
```response
┌─s1──────────┬─s2───┐
│ 192.168.0.1 │ ᴺᵁᴸᴸ │
└─────────────┴──────┘
```
## toIPv6OrDefault(string)
Same as `toIPv6`, but if the IPv6 address has an invalid format, it returns `::` (0 IPv6).

View File

@ -135,15 +135,15 @@ To change SQL security for an existing view, use
ALTER TABLE MODIFY SQL SECURITY { DEFINER | INVOKER | NONE } [DEFINER = { user | CURRENT_USER }]
```
### Examples sql security
### Examples
```sql
CREATE test_view
CREATE VIEW test_view
DEFINER = alice SQL SECURITY DEFINER
AS SELECT ...
```
```sql
CREATE test_view
CREATE VIEW test_view
SQL SECURITY INVOKER
AS SELECT ...
```

View File

@ -628,7 +628,9 @@ void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, Contex
auto condition_write_buffer = WriteBufferFromOwnString();
LOG_DEBUG(log, "Checking startup query condition `{}`", condition);
executeQuery(condition_read_buffer, condition_write_buffer, true, context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
auto startup_context = Context::createCopy(context);
startup_context->makeQueryContext();
executeQuery(condition_read_buffer, condition_write_buffer, true, startup_context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
auto result = condition_write_buffer.str();
@ -648,7 +650,9 @@ void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, Contex
auto write_buffer = WriteBufferFromOwnString();
LOG_DEBUG(log, "Executing query `{}`", query);
executeQuery(read_buffer, write_buffer, true, context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
auto startup_context = Context::createCopy(context);
startup_context->makeQueryContext();
executeQuery(read_buffer, write_buffer, true, startup_context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
}
}
catch (...)

View File

@ -11,20 +11,6 @@
using namespace DB;
namespace
{
bool withFileCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache;
}
bool withPageCache(const ReadSettings & settings, bool with_file_cache)
{
return settings.page_cache && !with_file_cache && settings.use_page_cache_for_disks_without_file_cache;
}
}
namespace DB
{
namespace ErrorCodes
@ -35,7 +21,7 @@ namespace ErrorCodes
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size)
{
/// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task.
if (!withFileCache(settings))
if (!settings.enable_filesystem_cache)
return settings.remote_fs_buffer_size;
/// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file.
@ -45,7 +31,6 @@ size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const std::string & cache_path_prefix_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_)
@ -54,12 +39,10 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))
, cache_path_prefix(cache_path_prefix_)
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
, query_id(CurrentThread::getQueryId())
, use_external_buffer(use_external_buffer_)
, with_file_cache(withFileCache(settings))
, with_page_cache(withPageCache(settings, with_file_cache))
, with_file_cache(settings.enable_filesystem_cache)
, log(getLogger("ReadBufferFromRemoteFSGather"))
{
if (!blobs_to_read.empty())
@ -74,47 +57,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
}
current_object = object;
const auto & object_path = object.remote_path;
std::unique_ptr<ReadBufferFromFileBase> buf;
if (with_file_cache)
{
if (settings.remote_fs_cache->isInitialized())
{
auto cache_key = settings.remote_fs_cache->createKeyForPath(object_path);
buf = std::make_unique<CachedOnDiskReadBufferFromFile>(
object_path,
cache_key,
settings.remote_fs_cache,
FileCache::getCommonUser(),
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object); },
settings,
query_id,
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
/* read_until_position */std::nullopt,
cache_log);
}
else
{
settings.remote_fs_cache->throwInitExceptionIfNeeded();
}
}
/// Can't wrap CachedOnDiskReadBufferFromFile in CachedInMemoryReadBufferFromFile because the
/// former doesn't support seeks.
if (with_page_cache && !buf)
{
auto inner = read_buffer_creator(/* restricted_seek */false, object);
auto cache_key = FileChunkAddress { .path = cache_path_prefix + object_path };
buf = std::make_unique<CachedInMemoryReadBufferFromFile>(
cache_key, settings.page_cache, std::move(inner), settings);
}
if (!buf)
buf = read_buffer_creator(/* restricted_seek */true, object);
auto buf = read_buffer_creator(/* restricted_seek */true, object);
if (read_until_position > start_offset && read_until_position < start_offset + object.bytes_size)
buf->setReadUntilPosition(read_until_position - start_offset);

View File

@ -26,7 +26,6 @@ public:
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const std::string & cache_path_prefix_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_);
@ -71,12 +70,10 @@ private:
const ReadSettings settings;
const StoredObjects blobs_to_read;
const ReadBufferCreator read_buffer_creator;
const std::string cache_path_prefix;
const std::shared_ptr<FilesystemCacheLog> cache_log;
const String query_id;
const bool use_external_buffer;
const bool with_file_cache;
const bool with_page_cache;
size_t read_until_position = 0;
size_t file_offset_of_buffer_end = 0;

View File

@ -210,63 +210,14 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObject( /// NOLI
auto settings_ptr = settings.get();
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(), object.remote_path, patchSettings(read_settings), settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries);
}
std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto settings_ptr = settings.get();
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(),
object_.remote_path,
disk_read_settings,
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
/* use_external_buffer */true,
restricted_seek);
};
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"azure:",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"azure:",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
client.get(),
object.remote_path,
patchSettings(read_settings),
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
read_settings.remote_read_buffer_use_external_buffer,
read_settings.remote_read_buffer_restrict_seek,
/* read_until_position */0);
}
/// Open the file for write and return WriteBufferFromFileBase object.

View File

@ -51,12 +51,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -48,9 +48,7 @@ CachedObjectStorage::generateObjectKeyPrefixForDirectoryPath(const std::string &
ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settings) const
{
ReadSettings modified_settings{read_settings};
modified_settings.remote_fs_cache = cache;
return object_storage->patchSettings(modified_settings);
return object_storage->patchSettings(read_settings);
}
void CachedObjectStorage::startup()
@ -63,21 +61,45 @@ bool CachedObjectStorage::exists(const StoredObject & object) const
return object_storage->exists(object);
}
std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
return object_storage->readObjects(objects, patchSettings(read_settings), read_hint, file_size);
}
std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObject( /// NOLINT
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
if (read_settings.enable_filesystem_cache)
{
if (cache->isInitialized())
{
auto cache_key = cache->createKeyForPath(object.remote_path);
auto global_context = Context::getGlobalContextInstance();
auto modified_read_settings = read_settings.withNestedBuffer();
auto read_buffer_creator = [=, this]()
{
return object_storage->readObject(object, patchSettings(read_settings), read_hint, file_size);
};
return std::make_unique<CachedOnDiskReadBufferFromFile>(
object.remote_path,
cache_key,
cache,
FileCache::getCommonUser(),
read_buffer_creator,
modified_read_settings,
std::string(CurrentThread::getQueryId()),
object.bytes_size,
/* allow_seeks */!read_settings.remote_read_buffer_restrict_seek,
/* use_external_buffer */read_settings.remote_read_buffer_use_external_buffer,
/* read_until_position */std::nullopt,
global_context->getFilesystemCacheLog());
}
else
{
cache->throwInitExceptionIfNeeded();
}
}
return object_storage->readObject(object, patchSettings(read_settings), read_hint, file_size);
}

View File

@ -37,12 +37,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -11,6 +11,9 @@
#include <Common/CurrentMetrics.h>
#include <Common/Scheduler/IResourceManager.h>
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
#include <IO/CachedInMemoryReadBufferFromFile.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
#include <Disks/FakeDiskTransaction.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -496,16 +499,60 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
std::optional<size_t> file_size) const
{
const auto storage_objects = metadata_storage->getStorageObjects(path);
auto global_context = Context::getGlobalContextInstance();
const bool file_can_be_empty = !file_size.has_value() || *file_size == 0;
if (storage_objects.empty() && file_can_be_empty)
return std::make_unique<ReadBufferFromEmptyFile>();
return object_storage->readObjects(
auto read_settings = updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName());
/// We wrap read buffer from object storage (read_buf = object_storage->readObject())
/// inside ReadBufferFromRemoteFSGather, so add nested buffer setting.
read_settings = read_settings.withNestedBuffer();
auto read_buffer_creator =
[this, read_settings, read_hint, file_size]
(bool restricted_seek, const StoredObject & object_) mutable -> std::unique_ptr<ReadBufferFromFileBase>
{
read_settings.remote_read_buffer_restrict_seek = restricted_seek;
auto impl = object_storage->readObject(object_, read_settings, read_hint, file_size);
if ((!object_storage->supportsCache() || !read_settings.enable_filesystem_cache)
&& read_settings.page_cache && read_settings.use_page_cache_for_disks_without_file_cache)
{
/// Can't wrap CachedOnDiskReadBufferFromFile in CachedInMemoryReadBufferFromFile because the
/// former doesn't support seeks.
auto cache_path_prefix = fmt::format("{}:", magic_enum::enum_name(object_storage->getType()));
const auto object_namespace = object_storage->getObjectsNamespace();
if (!object_namespace.empty())
cache_path_prefix += object_namespace + "/";
const auto cache_key = FileChunkAddress { .path = cache_path_prefix + object_.remote_path };
impl = std::make_unique<CachedInMemoryReadBufferFromFile>(
cache_key, read_settings.page_cache, std::move(impl), read_settings);
}
return impl;
};
const bool use_async_buffer = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
storage_objects,
updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName()),
read_hint,
file_size);
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */use_async_buffer);
if (use_async_buffer)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
return impl;
}
std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(

View File

@ -82,28 +82,12 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObject( /// NOLIN
initializeHDFSFS();
auto path = extractObjectKeyFromURL(object);
return std::make_unique<ReadBufferFromHDFS>(
fs::path(url_without_path) / "", fs::path(data_directory) / path, config, patchSettings(read_settings));
}
std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
initializeHDFSFS();
auto disk_read_settings = patchSettings(read_settings);
auto read_buffer_creator =
[this, disk_read_settings]
(bool /* restricted_seek */, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
auto path = extractObjectKeyFromURL(object_);
return std::make_unique<ReadBufferFromHDFS>(
fs::path(url_without_path) / "", fs::path(data_directory) / path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
};
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, "hdfs:", disk_read_settings, nullptr, /* use_external_buffer */false);
fs::path(url_without_path) / "",
fs::path(data_directory) / path,
config,
patchSettings(read_settings),
/* read_until_position */0,
read_settings.remote_read_buffer_use_external_buffer);
}
std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOLINT

View File

@ -69,12 +69,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -150,13 +150,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const = 0;
/// Read multiple objects with common prefix
virtual std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const = 0;
/// Open the file for write and return WriteBufferFromFileBase object.
virtual std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -40,47 +40,12 @@ bool LocalObjectStorage::exists(const StoredObject & object) const
return fs::exists(object.remote_path);
}
std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator = [=](bool /* restricted_seek */, const StoredObject & object) -> std::unique_ptr<ReadBufferFromFileBase>
{ return std::make_unique<ReadBufferFromFile>(object.remote_path); };
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"file:",
modified_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */ false);
}
ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const
{
if (!read_settings.enable_filesystem_cache)
return IObjectStorage::patchSettings(read_settings);
auto modified_settings{read_settings};
/// For now we cannot allow asynchronous reader from local filesystem when CachedObjectStorage is used.
switch (modified_settings.local_fs_method)
{
case LocalFSReadMethod::pread_threadpool:
case LocalFSReadMethod::pread_fake_async:
{
modified_settings.local_fs_method = LocalFSReadMethod::pread;
LOG_INFO(log, "Changing local filesystem read method to `pread`");
break;
}
default:
{
break;
}
}
/// Other options might break assertions in AsynchronousBoundedReadBuffer.
modified_settings.local_fs_method = LocalFSReadMethod::pread;
modified_settings.direct_io_threshold = 0; /// Disable.
return IObjectStorage::patchSettings(modified_settings);
}

View File

@ -34,12 +34,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -176,65 +176,6 @@ bool S3ObjectStorage::exists(const StoredObject & object) const
return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {});
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto settings_ptr = s3_settings.get();
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client.get(),
uri.bucket,
object_.remote_path,
uri.version_id,
settings_ptr->request_settings,
disk_read_settings,
/* use_external_buffer */true,
/* offset */0,
/* read_until_position */0,
restricted_seek);
};
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"s3:" + uri.bucket + "/",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"s3:" + uri.bucket + "/",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
const StoredObject & object,
const ReadSettings & read_settings,
@ -248,7 +189,12 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
object.remote_path,
uri.version_id,
settings_ptr->request_settings,
patchSettings(read_settings));
patchSettings(read_settings),
read_settings.remote_read_buffer_use_external_buffer,
/* offset */0,
/* read_until_position */0,
read_settings.remote_read_buffer_restrict_seek,
object.bytes_size ? std::optional<size_t>(object.bytes_size) : std::nullopt);
}
std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLINT

View File

@ -89,12 +89,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -233,69 +233,18 @@ WebObjectStorage::FileDataPtr WebObjectStorage::tryGetFileInfo(const String & pa
}
}
std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
if (objects.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "WebObjectStorage support read only from single object");
return readObject(objects[0], read_settings, read_hint, file_size);
}
std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
size_t object_size = object.bytes_size;
auto read_buffer_creator =
[this, read_settings, object_size]
(bool /* restricted_seek */, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / object_.remote_path,
getContext(),
object_size,
read_settings,
/* use_external_buffer */true);
};
auto global_context = Context::getGlobalContextInstance();
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
"url:" + url + "/",
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
"url:" + url + "/",
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / object.remote_path,
getContext(),
object.bytes_size,
read_settings,
read_settings.remote_read_buffer_use_external_buffer);
}
void WebObjectStorage::throwNotAllowed()

View File

@ -39,12 +39,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -104,7 +104,7 @@ struct ArrayAggregateImpl
static DataTypePtr getReturnType(const DataTypePtr & expression_return, const DataTypePtr & /*array_element*/)
{
if (aggregate_operation == AggregateOperation::max || aggregate_operation == AggregateOperation::min)
if constexpr (aggregate_operation == AggregateOperation::max || aggregate_operation == AggregateOperation::min)
{
return expression_return;
}
@ -152,9 +152,62 @@ struct ArrayAggregateImpl
return result;
}
template <AggregateOperation op = aggregate_operation>
requires(op == AggregateOperation::min || op == AggregateOperation::max)
static void executeMinOrMax(const ColumnPtr & mapped, const ColumnArray::Offsets & offsets, ColumnPtr & res_ptr)
{
const ColumnConst * const_column = checkAndGetColumn<ColumnConst>(&*mapped);
if (const_column)
{
MutableColumnPtr res_column = const_column->getDataColumn().cloneEmpty();
res_column->insertMany(const_column->getField(), offsets.size());
res_ptr = std::move(res_column);
return;
}
MutableColumnPtr res_column = mapped->cloneEmpty();
static constexpr int nan_null_direction_hint = aggregate_operation == AggregateOperation::min ? 1 : -1;
/// TODO: Introduce row_begin and row_end to getPermutation or an equivalent function to use that instead
/// (same use case as SingleValueDataBase::getSmallestIndex)
UInt64 start_of_array = 0;
for (auto end_of_array : offsets)
{
/// Array is empty
if (start_of_array == end_of_array)
{
res_column->insertDefault();
continue;
}
UInt64 index = start_of_array;
for (UInt64 i = index + 1; i < end_of_array; i++)
{
if constexpr (aggregate_operation == AggregateOperation::min)
{
if ((mapped->compareAt(i, index, *mapped, nan_null_direction_hint) < 0))
index = i;
}
else
{
if ((mapped->compareAt(i, index, *mapped, nan_null_direction_hint) > 0))
index = i;
}
}
res_column->insertFrom(*mapped, index);
start_of_array = end_of_array;
}
chassert(res_column->size() == offsets.size());
res_ptr = std::move(res_column);
}
template <typename Element>
static NO_SANITIZE_UNDEFINED bool executeType(const ColumnPtr & mapped, const ColumnArray::Offsets & offsets, ColumnPtr & res_ptr)
{
/// Min and Max are implemented in a different function
static_assert(aggregate_operation != AggregateOperation::min && aggregate_operation != AggregateOperation::max);
using ResultType = ArrayAggregateResult<Element, aggregate_operation>;
using ColVecType = ColumnVectorOrDecimal<Element>;
using ColVecResultType = ColumnVectorOrDecimal<ResultType>;
@ -197,11 +250,6 @@ struct ArrayAggregateImpl
/// Just multiply the value by array size.
res[i] = x * static_cast<ResultType>(array_size);
}
else if constexpr (aggregate_operation == AggregateOperation::min ||
aggregate_operation == AggregateOperation::max)
{
res[i] = x;
}
else if constexpr (aggregate_operation == AggregateOperation::average)
{
if constexpr (is_decimal<Element>)
@ -292,20 +340,6 @@ struct ArrayAggregateImpl
{
aggregate_value += element;
}
else if constexpr (aggregate_operation == AggregateOperation::min)
{
if (element < aggregate_value)
{
aggregate_value = element;
}
}
else if constexpr (aggregate_operation == AggregateOperation::max)
{
if (element > aggregate_value)
{
aggregate_value = element;
}
}
else if constexpr (aggregate_operation == AggregateOperation::product)
{
if constexpr (is_decimal<Element>)
@ -360,74 +394,41 @@ struct ArrayAggregateImpl
static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped)
{
if constexpr (aggregate_operation == AggregateOperation::max || aggregate_operation == AggregateOperation::min)
{
MutableColumnPtr res;
const auto & column = array.getDataPtr();
const ColumnConst * const_column = checkAndGetColumn<ColumnConst>(&*column);
if (const_column)
{
res = const_column->getDataColumn().cloneEmpty();
}
else
{
res = column->cloneEmpty();
}
const IColumn::Offsets & offsets = array.getOffsets();
size_t pos = 0;
for (const auto & offset : offsets)
{
if (offset == pos)
{
res->insertDefault();
continue;
}
size_t current_max_or_min_index = pos;
++pos;
for (; pos < offset; ++pos)
{
int compare_result = column->compareAt(pos, current_max_or_min_index, *column, 1);
if (aggregate_operation == AggregateOperation::max && compare_result > 0)
{
current_max_or_min_index = pos;
}
else if (aggregate_operation == AggregateOperation::min && compare_result < 0)
{
current_max_or_min_index = pos;
}
}
res->insert((*column)[current_max_or_min_index]);
}
return res;
}
const IColumn::Offsets & offsets = array.getOffsets();
ColumnPtr res;
if (executeType<UInt8>(mapped, offsets, res) ||
executeType<UInt16>(mapped, offsets, res) ||
executeType<UInt32>(mapped, offsets, res) ||
executeType<UInt64>(mapped, offsets, res) ||
executeType<UInt128>(mapped, offsets, res) ||
executeType<UInt256>(mapped, offsets, res) ||
executeType<Int8>(mapped, offsets, res) ||
executeType<Int16>(mapped, offsets, res) ||
executeType<Int32>(mapped, offsets, res) ||
executeType<Int64>(mapped, offsets, res) ||
executeType<Int128>(mapped, offsets, res) ||
executeType<Int256>(mapped, offsets, res) ||
executeType<Float32>(mapped, offsets, res) ||
executeType<Float64>(mapped, offsets, res) ||
executeType<Decimal32>(mapped, offsets, res) ||
executeType<Decimal64>(mapped, offsets, res) ||
executeType<Decimal128>(mapped, offsets, res) ||
executeType<Decimal256>(mapped, offsets, res) ||
executeType<DateTime64>(mapped, offsets, res))
if constexpr (aggregate_operation == AggregateOperation::min || aggregate_operation == AggregateOperation::max)
{
executeMinOrMax(mapped, offsets, res);
return res;
}
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Unexpected column for arraySum: {}", mapped->getName());
{
if (executeType<UInt8>(mapped, offsets, res) ||
executeType<UInt16>(mapped, offsets, res) ||
executeType<UInt32>(mapped, offsets, res) ||
executeType<UInt64>(mapped, offsets, res) ||
executeType<UInt128>(mapped, offsets, res) ||
executeType<UInt256>(mapped, offsets, res) ||
executeType<Int8>(mapped, offsets, res) ||
executeType<Int16>(mapped, offsets, res) ||
executeType<Int32>(mapped, offsets, res) ||
executeType<Int64>(mapped, offsets, res) ||
executeType<Int128>(mapped, offsets, res) ||
executeType<Int256>(mapped, offsets, res) ||
executeType<Float32>(mapped, offsets, res) ||
executeType<Float64>(mapped, offsets, res) ||
executeType<Decimal32>(mapped, offsets, res) ||
executeType<Decimal64>(mapped, offsets, res) ||
executeType<Decimal128>(mapped, offsets, res) ||
executeType<Decimal256>(mapped, offsets, res) ||
executeType<DateTime64>(mapped, offsets, res))
{
return res;
}
}
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Unexpected column for arraySum: {}", mapped->getName());
}
};

View File

@ -116,7 +116,8 @@ struct ReadSettings
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
FileCachePtr remote_fs_cache;
bool remote_read_buffer_restrict_seek = false;
bool remote_read_buffer_use_external_buffer = false;
/// Bandwidth throttler to use during reading
ThrottlerPtr remote_throttler;
@ -138,6 +139,14 @@ struct ReadSettings
res.prefetch_buffer_size = std::min(std::max(1ul, file_size), prefetch_buffer_size);
return res;
}
ReadSettings withNestedBuffer() const
{
ReadSettings res = *this;
res.remote_read_buffer_restrict_seek = true;
res.remote_read_buffer_use_external_buffer = true;
return res;
}
};
ReadSettings getReadSettings();

View File

@ -131,7 +131,12 @@ bool KeyMetadata::createBaseDirectory(bool throw_if_failed)
{
created_base_directory = false;
if (!throw_if_failed && e.code() == std::errc::no_space_on_device)
if (!throw_if_failed &&
(e.code() == std::errc::no_space_on_device
|| e.code() == std::errc::read_only_file_system
|| e.code() == std::errc::permission_denied
|| e.code() == std::errc::too_many_files_open
|| e.code() == std::errc::operation_not_permitted))
{
LOG_TRACE(cache_metadata->log, "Failed to create base directory for key {}, "
"because no space left on device", key);

View File

@ -24,6 +24,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_FORMAT_VERSION;
extern const int NOT_IMPLEMENTED;
};
GinIndexPostingsBuilder::GinIndexPostingsBuilder(UInt64 limit)
@ -153,13 +154,18 @@ GinIndexStore::GinIndexStore(const String & name_, DataPartStoragePtr storage_)
: name(name_)
, storage(storage_)
{
if (storage->getType() != MergeTreeDataPartStorageType::Full)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "INDEX {} with 'full_text' type supports only full storage", name);
}
GinIndexStore::GinIndexStore(const String & name_, DataPartStoragePtr storage_, MutableDataPartStoragePtr data_part_storage_builder_, UInt64 max_digestion_size_)
: name(name_)
, storage(storage_)
, data_part_storage_builder(data_part_storage_builder_)
, max_digestion_size(max_digestion_size_)
{
if (storage->getType() != MergeTreeDataPartStorageType::Full)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "INDEX {} with 'full_text' type supports only full storage", name);
}
bool GinIndexStore::exists() const

View File

@ -2129,6 +2129,8 @@ try
runner([&, my_part = part]()
{
auto blocker_for_runner_thread = CannotAllocateThreadFaultInjector::blockFaultInjections();
auto res = loadDataPartWithRetries(
my_part->info, my_part->name, my_part->disk,
DataPartState::Outdated, data_parts_mutex, loading_parts_initial_backoff_ms,

View File

@ -11,7 +11,11 @@ namespace DB
std::unique_ptr<ReadBuffer> PartMetadataManagerOrdinary::read(const String & file_name) const
{
size_t file_size = part->getDataPartStorage().getFileSize(file_name);
auto res = part->getDataPartStorage().readFile(file_name, getReadSettings().adjustBufferSize(file_size), file_size, std::nullopt);
auto read_settings = getReadSettings().adjustBufferSize(file_size);
/// Default read method is pread_threadpool, but there is not much point in it here.
read_settings.local_fs_method = LocalFSReadMethod::pread;
auto res = part->getDataPartStorage().readFile(file_name, read_settings, file_size, std::nullopt);
if (isCompressedFromFileName(file_name))
return std::make_unique<CompressedReadBufferFromFile>(std::move(res));

View File

@ -77,7 +77,14 @@ bool isRetryableException(std::exception_ptr exception_ptr)
#endif
catch (const ErrnoException & e)
{
return e.getErrno() == EMFILE;
return e.getErrno() == EMFILE
|| e.getErrno() == ENOMEM
|| isNotEnoughMemoryErrorCode(e.code())
|| e.code() == ErrorCodes::NETWORK_ERROR
|| e.code() == ErrorCodes::SOCKET_TIMEOUT
|| e.code() == ErrorCodes::CANNOT_SCHEDULE_TASK
|| e.code() == ErrorCodes::ABORTED;
}
catch (const Coordination::Exception & e)
{
@ -91,6 +98,22 @@ bool isRetryableException(std::exception_ptr exception_ptr)
|| e.code() == ErrorCodes::CANNOT_SCHEDULE_TASK
|| e.code() == ErrorCodes::ABORTED;
}
catch (const std::filesystem::filesystem_error & e)
{
return e.code() == std::errc::no_space_on_device ||
e.code() == std::errc::read_only_file_system ||
e.code() == std::errc::too_many_files_open_in_system ||
e.code() == std::errc::operation_not_permitted ||
e.code() == std::errc::device_or_resource_busy ||
e.code() == std::errc::permission_denied ||
e.code() == std::errc::too_many_files_open ||
e.code() == std::errc::text_file_busy ||
e.code() == std::errc::timed_out ||
e.code() == std::errc::not_enough_memory ||
e.code() == std::errc::not_supported ||
e.code() == std::errc::too_many_links ||
e.code() == std::errc::too_many_symbolic_link_levels;
}
catch (const Poco::Net::NetException &)
{
return true;
@ -171,13 +194,9 @@ static IMergeTreeDataPart::Checksums checkDataPart(
SerializationInfo::Settings settings{ratio_of_defaults, false};
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
}
catch (const Poco::Exception & ex)
{
throw Exception(ErrorCodes::CORRUPTED_DATA, "Failed to load {}, with error {}", IMergeTreeDataPart::SERIALIZATION_FILE_NAME, ex.message());
}
catch (...)
{
throw;
throw Exception(ErrorCodes::CORRUPTED_DATA, "Failed to load file {} of data part {}, with error {}", IMergeTreeDataPart::SERIALIZATION_FILE_NAME, data_part->name, getCurrentExceptionMessage(true));
}
}
@ -399,18 +418,45 @@ IMergeTreeDataPart::Checksums checkDataPart(
ReadSettings read_settings;
read_settings.enable_filesystem_cache = false;
read_settings.enable_filesystem_cache_log = false;
read_settings.enable_filesystem_read_prefetches_log = false;
read_settings.page_cache = nullptr;
read_settings.load_marks_asynchronously = false;
read_settings.remote_fs_prefetch = false;
read_settings.page_cache_inject_eviction = false;
read_settings.use_page_cache_for_disks_without_file_cache = false;
read_settings.local_fs_method = LocalFSReadMethod::pread;
try
{
return checkDataPart(
data_part,
data_part_storage,
data_part->getColumns(),
data_part->getType(),
data_part->getFileNamesWithoutChecksums(),
read_settings,
require_checksums,
is_cancelled,
is_broken_projection,
throw_on_broken_projection);
}
catch (...)
{
if (isRetryableException(std::current_exception()))
{
LOG_DEBUG(
getLogger("checkDataPart"),
"Got retriable error {} checking data part {}, will return empty", data_part->name, getCurrentExceptionMessage(false));
/// We were unable to check data part because of some temporary exception
/// like Memory limit exceeded. If part is actually broken we will retry check
/// with the next read attempt of this data part.
return IMergeTreeDataPart::Checksums{};
}
throw;
}
return checkDataPart(
data_part,
data_part_storage,
data_part->getColumns(),
data_part->getType(),
data_part->getFileNamesWithoutChecksums(),
read_settings,
require_checksums,
is_cancelled,
is_broken_projection,
throw_on_broken_projection);
};
try
@ -431,7 +477,16 @@ IMergeTreeDataPart::Checksums checkDataPart(
catch (...)
{
if (isRetryableException(std::current_exception()))
throw;
{
LOG_DEBUG(
getLogger("checkDataPart"),
"Got retriable error {} checking data part {}, will return empty", data_part->name, getCurrentExceptionMessage(false));
/// We were unable to check data part because of some temporary exception
/// like Memory limit exceeded. If part is actually broken we will retry check
/// with the next read attempt of this data part.
return {};
}
return drop_cache_and_check();
}
}

View File

@ -9,6 +9,7 @@
#include <IO/ReadBufferFromFileBase.h>
#include <IO/Archives/createArchiveReader.h>
#include <Formats/FormatFactory.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Formats/ReadSchemaUtils.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/Cache/SchemaCache.h>
@ -426,37 +427,39 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
const auto & object_size = object_info.metadata->size_bytes;
auto read_settings = context_->getReadSettings().adjustBufferSize(object_size);
read_settings.enable_filesystem_cache = false;
/// FIXME: Changing this setting to default value breaks something around parquet reading
read_settings.remote_read_min_bytes_for_seek = read_settings.remote_fs_buffer_size;
/// User's object may change, don't cache it.
read_settings.enable_filesystem_cache = false;
read_settings.use_page_cache_for_disks_without_file_cache = false;
const bool object_too_small = object_size <= 2 * context_->getSettingsRef()[Setting::max_download_buffer_size];
const bool use_prefetch = object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
read_settings.remote_fs_method = use_prefetch ? RemoteFSReadMethod::threadpool : RemoteFSReadMethod::read;
/// User's object may change, don't cache it.
read_settings.use_page_cache_for_disks_without_file_cache = false;
const bool use_prefetch = object_too_small
&& read_settings.remote_fs_method == RemoteFSReadMethod::threadpool
&& read_settings.remote_fs_prefetch;
if (use_prefetch)
read_settings.remote_read_buffer_use_external_buffer = true;
auto impl = object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings);
// Create a read buffer that will prefetch the first ~1 MB of the file.
// When reading lots of tiny files, this prefetching almost doubles the throughput.
// For bigger files, parallel reading is more useful.
if (use_prefetch)
{
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
if (!use_prefetch)
return impl;
auto async_reader = object_storage->readObjects(
StoredObjects{StoredObject{object_info.getPath(), /* local_path */ "", object_size}}, read_settings);
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)
async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY);
auto & reader = context_->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
impl = std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
context_->getAsyncReadCounters(),
context_->getFilesystemReadPrefetchesLog());
return async_reader;
}
else
{
/// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting.
return object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings);
}
impl->setReadUntilEnd();
impl->prefetch(DEFAULT_PREFETCH_PRIORITY);
return impl;
}
StorageObjectStorageSource::IIterator::IIterator(const std::string & logger_name_)

View File

@ -100,17 +100,20 @@ void checkAllowedQueries(const ASTSelectQuery & query)
/// check if only one single select query in SelectWithUnionQuery
static bool isSingleSelect(const ASTPtr & select, ASTPtr & res)
{
auto new_select = select->as<ASTSelectWithUnionQuery &>();
if (new_select.list_of_selects->children.size() != 1)
auto * new_select = select->as<ASTSelectWithUnionQuery>();
if (new_select == nullptr)
return false;
auto & new_inner_query = new_select.list_of_selects->children.at(0);
if (new_select->list_of_selects->children.size() != 1)
return false;
auto & new_inner_query = new_select->list_of_selects->children.at(0);
if (new_inner_query->as<ASTSelectQuery>())
{
res = new_inner_query;
return true;
}
else
return isSingleSelect(new_inner_query, res);
return isSingleSelect(new_inner_query, res);
}
SelectQueryDescription SelectQueryDescription::getSelectQueryFromASTForMatView(const ASTPtr & select, bool refreshable, ContextPtr context)

View File

@ -1133,12 +1133,15 @@ def main() -> int:
if IS_CI and not pr_info.is_merge_queue:
if pr_info.is_release and pr_info.is_push_event:
if pr_info.is_master and pr_info.is_push_event:
print("Release/master: CI Cache add pending records for all todo jobs")
ci_cache.push_pending_all(pr_info.is_release)
# wait for pending jobs to be finished, await_jobs is a long blocking call
ci_cache.await_pending_jobs(pr_info.is_release)
if pr_info.is_master or pr_info.is_pr:
# - wait for pending jobs to be finished, await_jobs is a long blocking call
# - don't wait for release CI because some jobs may not be present there
# and we may wait until timeout in vain
ci_cache.await_pending_jobs(pr_info.is_release)
# conclude results
result["git_ref"] = git_ref

View File

@ -344,7 +344,7 @@ def test_cmd_srvr(started_cluster):
assert result["Received"] == "10"
assert result["Sent"] == "10"
assert int(result["Connections"]) == 1
assert int(result["Zxid"], 16) > 10
assert int(result["Zxid"], 16) >= 10
assert result["Mode"] == "leader"
assert result["Node count"] == "14"

View File

@ -13,5 +13,13 @@
<scripts>
<query>SELECT * FROM system.query_log LIMIT 1</query>
</scripts>
<scripts>
<query>SELECT 1 SETTINGS skip_unavailable_shards = 1</query>
<condition>SELECT 1;</condition>
</scripts>
<scripts>
<query>SELECT 1 SETTINGS skip_unavailable_shards = 1</query>
<condition>SELECT 1;</condition>
</scripts>
</startup_scripts>
</clickhouse>

View File

@ -16,6 +16,12 @@ def test_startup_scripts():
try:
cluster.start()
assert node.query("SHOW TABLES") == "TestTable\n"
assert (
node.query(
"SELECT value, changed FROM system.settings WHERE name = 'skip_unavailable_shards'"
)
== "0\t0\n"
)
finally:
cluster.shutdown()

View File

@ -1087,7 +1087,7 @@ def test_drop_table(started_cluster):
started_cluster, files_path, files_to_generate, start_ind=0, row_num=100000
)
create_mv(node, table_name, dst_table_name)
node.wait_for_log_line(f"Reading from file: test_drop_data")
node.wait_for_log_line(f"rows from file: test_drop_data")
node.query(f"DROP TABLE {table_name} SYNC")
assert node.contains_in_log(
f"StorageS3Queue (default.{table_name}): Table is being dropped"

View File

@ -11,7 +11,9 @@ for i in {1..250}; do
table_structure+=", c$i String"
done
$CLICKHOUSE_CLIENT --query "
MY_CLICKHOUSE_CLIENT="$CLICKHOUSE_CLIENT --enable_parsing_to_custom_serialization 1"
$MY_CLICKHOUSE_CLIENT --query "
DROP TABLE IF EXISTS t_insert_mem;
DROP TABLE IF EXISTS t_reference;
@ -23,7 +25,7 @@ $CLICKHOUSE_CLIENT --query "
filename="test_data_sparse_$CLICKHOUSE_DATABASE.json"
$CLICKHOUSE_CLIENT --query "
$MY_CLICKHOUSE_CLIENT --query "
INSERT INTO FUNCTION file('$filename', LineAsString)
SELECT format('{{ \"id\": {}, \"c{}\": \"{}\" }}', number, number % 250, hex(number * 1000000)) FROM numbers(30000)
SETTINGS engine_file_truncate_on_insert = 1;
@ -34,15 +36,19 @@ $CLICKHOUSE_CLIENT --query "
"
for _ in {1..4}; do
$CLICKHOUSE_CLIENT --query "INSERT INTO t_reference SELECT * FROM file('$filename', JSONEachRow)"
$MY_CLICKHOUSE_CLIENT --query "INSERT INTO t_reference SELECT * FROM file('$filename', JSONEachRow)"
done;
$CLICKHOUSE_CLIENT --enable_parsing_to_custom_serialization 1 --query "INSERT INTO t_insert_mem SELECT * FROM file('$filename', JSONEachRow)"
$CLICKHOUSE_CLIENT --enable_parsing_to_custom_serialization 1 --query "INSERT INTO t_insert_mem SELECT * FROM file('$filename', JSONEachRow)"
$CLICKHOUSE_CLIENT --enable_parsing_to_custom_serialization 1 --query "INSERT INTO t_insert_mem SELECT * FROM s3(s3_conn, filename='$filename', format='JSONEachRow')"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file('$filename', LineAsString) FORMAT LineAsString" | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=INSERT+INTO+t_insert_mem+FORMAT+JSONEachRow&enable_parsing_to_custom_serialization=1" --data-binary @-
$MY_CLICKHOUSE_CLIENT --query "INSERT INTO t_insert_mem SELECT * FROM file('$filename', JSONEachRow)"
$MY_CLICKHOUSE_CLIENT --query "INSERT INTO t_insert_mem SELECT * FROM file('$filename', JSONEachRow)"
$CLICKHOUSE_CLIENT --query "
$MY_CLICKHOUSE_CLIENT --query "DETACH TABLE t_insert_mem"
$MY_CLICKHOUSE_CLIENT --query "ATTACH TABLE t_insert_mem"
$MY_CLICKHOUSE_CLIENT --query "INSERT INTO t_insert_mem SELECT * FROM s3(s3_conn, filename='$filename', format='JSONEachRow')"
$MY_CLICKHOUSE_CLIENT --query "SELECT * FROM file('$filename', LineAsString) FORMAT LineAsString" | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=INSERT+INTO+t_insert_mem+FORMAT+JSONEachRow&enable_parsing_to_custom_serialization=1" --data-binary @-
$MY_CLICKHOUSE_CLIENT --query "
SELECT count() FROM t_insert_mem;
SELECT sum(sipHash64(*)) FROM t_insert_mem;
SELECT sum(sipHash64(*)) FROM t_reference;
@ -53,7 +59,7 @@ $CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
SELECT written_bytes <= 3000000 FROM system.query_log
SELECT written_bytes <= 10000000 FROM system.query_log
WHERE query LIKE 'INSERT INTO t_insert_mem%' AND current_database = '$CLICKHOUSE_DATABASE' AND type = 'QueryFinish'
ORDER BY event_time_microseconds;

View File

@ -0,0 +1,37 @@
-- { echoOn }
-- https://github.com/ClickHouse/ClickHouse/issues/68895
SELECT arrayMax(x -> toFixedString('.', 1), []);
.
-- https://github.com/ClickHouse/ClickHouse/issues/69600
SELECT arrayMax(x -> (-x), [1, 2, 4]) AS res;
-1
SELECT arrayMax(x -> toUInt16(-x), [1, 2, 4]) AS res;
65535
-- https://github.com/ClickHouse/ClickHouse/pull/69640
SELECT arrayMin(x1 -> (x1 * toNullable(-1)), materialize([1, 2, 3]));
-3
SELECT arrayMin(x1 -> x1 * -1, [1,2,3]);
-3
DROP TABLE IF EXISTS test_aggregation_array;
CREATE TABLE test_aggregation_array (x Array(Int)) ENGINE=MergeTree() ORDER by tuple();
INSERT INTO test_aggregation_array VALUES ([1,2,3,4,5,6]), ([]), ([1,2,3]);
SELECT [arrayMin(x1 -> (x1 * materialize(-1)), [toNullable(toUInt256(0)), materialize(4)])], arrayMin([arrayMin([0])]) FROM test_aggregation_array GROUP BY arrayAvg([1]), [0, toUInt256(8)] WITH CUBE SETTINGS allow_experimental_analyzer = 1;
[-4] 0
[-4] 0
[-4] 0
[-4] 0
SELECT [arrayMin([3, arrayMin([toUInt128(8)]), 4, 5]), arrayMax([materialize(1)]), arrayMin([arrayMax([1]), 2]), 2], arrayMin([0, toLowCardinality(8)]), 2, arrayMax(x1 -> (x1 * -1), x) FROM test_aggregation_array;
[3,1,1,2] 0 2 -1
[3,1,1,2] 0 2 0
[3,1,1,2] 0 2 -1
select arrayMax(x -> x.1, [(1, 'a'), (0, 'b')]);
1
select arrayMin(x -> x.2, [(1, 'a'), (0, 'b')]);
a
-- Extra validation of generic arrayMin/arrayMax
WITH [(1,2),(1,3)] AS t SELECT arrayMin(t), arrayMax(t);
(1,2) (1,3)
WITH [map('a', 1, 'b', 2), map('a',1,'b',3)] AS t SELECT arrayMin(t), arrayMax(t);
{'a':1,'b':2} {'a':1,'b':3}
WITH [map('a', 1, 'b', 2, 'c', 10), map('a',1,'b',3, 'c', 0)] AS t SELECT arrayMin(x -> x['c'], t), arrayMax(x -> x['c'], t);
0 10

View File

@ -0,0 +1,26 @@
-- { echoOn }
-- https://github.com/ClickHouse/ClickHouse/issues/68895
SELECT arrayMax(x -> toFixedString('.', 1), []);
-- https://github.com/ClickHouse/ClickHouse/issues/69600
SELECT arrayMax(x -> (-x), [1, 2, 4]) AS res;
SELECT arrayMax(x -> toUInt16(-x), [1, 2, 4]) AS res;
-- https://github.com/ClickHouse/ClickHouse/pull/69640
SELECT arrayMin(x1 -> (x1 * toNullable(-1)), materialize([1, 2, 3]));
SELECT arrayMin(x1 -> x1 * -1, [1,2,3]);
DROP TABLE IF EXISTS test_aggregation_array;
CREATE TABLE test_aggregation_array (x Array(Int)) ENGINE=MergeTree() ORDER by tuple();
INSERT INTO test_aggregation_array VALUES ([1,2,3,4,5,6]), ([]), ([1,2,3]);
SELECT [arrayMin(x1 -> (x1 * materialize(-1)), [toNullable(toUInt256(0)), materialize(4)])], arrayMin([arrayMin([0])]) FROM test_aggregation_array GROUP BY arrayAvg([1]), [0, toUInt256(8)] WITH CUBE SETTINGS allow_experimental_analyzer = 1;
SELECT [arrayMin([3, arrayMin([toUInt128(8)]), 4, 5]), arrayMax([materialize(1)]), arrayMin([arrayMax([1]), 2]), 2], arrayMin([0, toLowCardinality(8)]), 2, arrayMax(x1 -> (x1 * -1), x) FROM test_aggregation_array;
select arrayMax(x -> x.1, [(1, 'a'), (0, 'b')]);
select arrayMin(x -> x.2, [(1, 'a'), (0, 'b')]);
-- Extra validation of generic arrayMin/arrayMax
WITH [(1,2),(1,3)] AS t SELECT arrayMin(t), arrayMax(t);
WITH [map('a', 1, 'b', 2), map('a',1,'b',3)] AS t SELECT arrayMin(t), arrayMax(t);
WITH [map('a', 1, 'b', 2, 'c', 10), map('a',1,'b',3, 'c', 0)] AS t SELECT arrayMin(x -> x['c'], t), arrayMax(x -> x['c'], t);

View File

@ -0,0 +1 @@
CREATE MATERIALIZED VIEW v0 AS (SELECT 1) INTERSECT (SELECT 1); --{serverError QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW}

View File

@ -1,3 +1,4 @@
v24.9.2.42-stable 2024-10-03
v24.9.1.3278-stable 2024-09-26
v24.8.4.13-lts 2024-09-06
v24.8.3.59-lts 2024-09-03

1 v24.9.1.3278-stable v24.9.2.42-stable 2024-09-26 2024-10-03
1 v24.9.2.42-stable 2024-10-03
2 v24.9.1.3278-stable v24.9.1.3278-stable 2024-09-26 2024-09-26
3 v24.8.4.13-lts v24.8.4.13-lts 2024-09-06 2024-09-06
4 v24.8.3.59-lts v24.8.3.59-lts 2024-09-03 2024-09-03