Merge branch 'master' into add-compatibility-for-merge-tree-settings

This commit is contained in:
Nikolai Kochetov 2024-10-04 10:44:30 +00:00
commit 28bf90d6c1
98 changed files with 1116 additions and 2050 deletions

View File

@ -1 +1,4 @@
# See contrib/usearch-cmake/CMakeLists.txt
set (FP16_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/FP16/")
add_library(_fp16 INTERFACE)
target_include_directories(_fp16 SYSTEM INTERFACE ${FP16_PROJECT_DIR}/include)

2
contrib/SimSIMD vendored

@ -1 +1 @@
Subproject commit 91a76d1ac519b3b9dc8957734a3dabd985f00c26
Subproject commit ff51434d90c66f916e94ff05b24530b127aa4cff

View File

@ -1 +1,4 @@
# See contrib/usearch-cmake/CMakeLists.txt
set(SIMSIMD_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/SimSIMD")
add_library(_simsimd INTERFACE)
target_include_directories(_simsimd SYSTEM INTERFACE "${SIMSIMD_PROJECT_DIR}/include")

2
contrib/usearch vendored

@ -1 +1 @@
Subproject commit 7a8967cb442b08ca20c3dd781414378e65957d37
Subproject commit d1d33eac94acd3b628e0b446c927ec3295ef63c7

View File

@ -1,14 +1,9 @@
set(FP16_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/FP16")
set(SIMSIMD_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/SimSIMD")
set(USEARCH_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/usearch")
add_library(_usearch INTERFACE)
target_include_directories(_usearch SYSTEM INTERFACE ${USEARCH_PROJECT_DIR}/include)
target_include_directories(_usearch SYSTEM INTERFACE
${FP16_PROJECT_DIR}/include
${SIMSIMD_PROJECT_DIR}/include
${USEARCH_PROJECT_DIR}/include)
target_link_libraries(_usearch INTERFACE _fp16)
target_compile_definitions(_usearch INTERFACE USEARCH_USE_FP16LIB)
# target_compile_definitions(_usearch INTERFACE USEARCH_USE_SIMSIMD)

View File

@ -34,7 +34,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.9.1.3278"
ARG VERSION="24.9.2.42"
ARG PACKAGES="clickhouse-keeper"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -35,7 +35,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.9.1.3278"
ARG VERSION="24.9.2.42"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -28,7 +28,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="24.9.1.3278"
ARG VERSION="24.9.2.42"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
#docker-official-library:off

View File

@ -0,0 +1 @@
[rabbitmq_consistent_hash_exchange].

View File

@ -13,3 +13,5 @@ ssl_options.fail_if_no_peer_cert = false
ssl_options.cacertfile = /etc/rabbitmq/ca-cert.pem
ssl_options.certfile = /etc/rabbitmq/server-cert.pem
ssl_options.keyfile = /etc/rabbitmq/server-key.pem
vm_memory_high_watermark.absolute = 2GB

View File

@ -0,0 +1,33 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.9.2.42-stable (de7c791a2ea) FIXME as compared to v24.9.1.3278-stable (6d058d82a8e)
#### Improvement
* Backported in [#70091](https://github.com/ClickHouse/ClickHouse/issues/70091): Add `show_create_query_identifier_quoting_rule` to define identifier quoting behavior of the show create query result. Possible values: - `user_display`: When the identifiers is a keyword. - `when_necessary`: When the identifiers is one of `{"distinct", "all", "table"}`, or it can cause ambiguity: column names, dictionary attribute names. - `always`: Always quote identifiers. [#69448](https://github.com/ClickHouse/ClickHouse/pull/69448) ([tuanpach](https://github.com/tuanpach)).
* Backported in [#70100](https://github.com/ClickHouse/ClickHouse/issues/70100): Follow-up to https://github.com/ClickHouse/ClickHouse/pull/69346 Point 4 described there will work now as well:. [#69563](https://github.com/ClickHouse/ClickHouse/pull/69563) ([Vitaly Baranov](https://github.com/vitlibar)).
* Backported in [#70048](https://github.com/ClickHouse/ClickHouse/issues/70048): Add new column readonly_duration to the system.replicas table. Needed to be able to distinguish actual readonly replicas from sentinel ones in alerts. [#69871](https://github.com/ClickHouse/ClickHouse/pull/69871) ([Miсhael Stetsyuk](https://github.com/mstetsyuk)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Backported in [#70193](https://github.com/ClickHouse/ClickHouse/issues/70193): Fix crash when executing `create view t as (with recursive 42 as ttt select ttt);`. [#69676](https://github.com/ClickHouse/ClickHouse/pull/69676) ([Han Fei](https://github.com/hanfei1991)).
* Backported in [#70083](https://github.com/ClickHouse/ClickHouse/issues/70083): Closes [#69752](https://github.com/ClickHouse/ClickHouse/issues/69752). [#69985](https://github.com/ClickHouse/ClickHouse/pull/69985) ([pufit](https://github.com/pufit)).
* Backported in [#70070](https://github.com/ClickHouse/ClickHouse/issues/70070): Fixes `Block structure mismatch` for queries with nested views and `WHERE` condition. Fixes [#66209](https://github.com/ClickHouse/ClickHouse/issues/66209). [#70054](https://github.com/ClickHouse/ClickHouse/pull/70054) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#70168](https://github.com/ClickHouse/ClickHouse/issues/70168): Fix wrong LOGICAL_ERROR when replacing literals in ranges. [#70122](https://github.com/ClickHouse/ClickHouse/pull/70122) ([Pablo Marcos](https://github.com/pamarcos)).
* Backported in [#70238](https://github.com/ClickHouse/ClickHouse/issues/70238): Check for Nullable(Nothing) type during ALTER TABLE MODIFY COLUMN/QUERY to prevent tables with such data type. [#70123](https://github.com/ClickHouse/ClickHouse/pull/70123) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70205](https://github.com/ClickHouse/ClickHouse/issues/70205): Fix wrong result with skipping index. [#70127](https://github.com/ClickHouse/ClickHouse/pull/70127) ([Raúl Marín](https://github.com/Algunenano)).
* Backported in [#70185](https://github.com/ClickHouse/ClickHouse/issues/70185): Fix data race in ColumnObject/ColumnTuple decompress method that could lead to heap use after free. [#70137](https://github.com/ClickHouse/ClickHouse/pull/70137) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70253](https://github.com/ClickHouse/ClickHouse/issues/70253): Fix possible hung in ALTER COLUMN with Dynamic type. [#70144](https://github.com/ClickHouse/ClickHouse/pull/70144) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70230](https://github.com/ClickHouse/ClickHouse/issues/70230): Use correct `max_types` parameter during Dynamic type creation for JSON subcolumn. [#70147](https://github.com/ClickHouse/ClickHouse/pull/70147) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#70217](https://github.com/ClickHouse/ClickHouse/issues/70217): Fix the password being displayed in `system.query_log` for users with bcrypt password authentication method. [#70148](https://github.com/ClickHouse/ClickHouse/pull/70148) ([Nikolay Degterinsky](https://github.com/evillique)).
* Backported in [#70267](https://github.com/ClickHouse/ClickHouse/issues/70267): Respect setting allow_simdjson in JSON type parser. [#70218](https://github.com/ClickHouse/ClickHouse/pull/70218) ([Pavel Kruglov](https://github.com/Avogar)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Backported in [#70052](https://github.com/ClickHouse/ClickHouse/issues/70052): Improve stateless test runner. [#69864](https://github.com/ClickHouse/ClickHouse/pull/69864) ([Alexey Katsman](https://github.com/alexkats)).
* Backported in [#70284](https://github.com/ClickHouse/ClickHouse/issues/70284): Improve pipdeptree generator for docker images. - Update requirements.txt for the integration tests runner container - Remove some small dependencies, improve `helpers/retry_decorator.py` - Upgrade docker-compose from EOL version 1 to version 2. [#70146](https://github.com/ClickHouse/ClickHouse/pull/70146) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Backported in [#70261](https://github.com/ClickHouse/ClickHouse/issues/70261): Update test_storage_s3_queue/test.py. [#70159](https://github.com/ClickHouse/ClickHouse/pull/70159) ([Kseniia Sumarokova](https://github.com/kssenii)).

View File

@ -1057,12 +1057,12 @@ Default value: throw
## deduplicate_merge_projection_mode
Whether to allow create projection for the table with non-classic MergeTree, that is not (Replicated, Shared) MergeTree. If allowed, what is the action when merge projections, either drop or rebuild. So classic MergeTree would ignore this setting.
Whether to allow create projection for the table with non-classic MergeTree, that is not (Replicated, Shared) MergeTree. Ignore option is purely for compatibility which might result in incorrect answer. Otherwise, if allowed, what is the action when merge projections, either drop or rebuild. So classic MergeTree would ignore this setting.
It also controls `OPTIMIZE DEDUPLICATE` as well, but has effect on all MergeTree family members. Similar to the option `lightweight_mutation_projection_mode`, it is also part level.
Possible values:
- throw, drop, rebuild
- ignore, throw, drop, rebuild
Default value: throw

View File

@ -316,6 +316,38 @@ Result:
Same as `toIPv4`, but if the IPv4 address has an invalid format, it returns null.
**Syntax**
```sql
toIPv4OrNull(value)
```
**Arguments**
- `value` — The value with IPv4 address.
**Returned value**
- `value` converted to the current IPv4 address. [String](../data-types/string.md).
**Example**
Query:
```sql
SELECT
toIPv4OrNull('192.168.0.1') AS s1,
toIPv4OrNull('192.168.0') AS s2
```
Result:
```response
┌─s1──────────┬─s2───┐
│ 192.168.0.1 │ ᴺᵁᴸᴸ │
└─────────────┴──────┘
```
## toIPv6OrDefault(string)
Same as `toIPv6`, but if the IPv6 address has an invalid format, it returns `::` (0 IPv6).

View File

@ -207,7 +207,31 @@ If `NULL` is passed, then the function returns type `Nullable(Nothing)`, which c
**Syntax**
```sql
toTypeName(x)
toTypeName(value)
```
**Arguments**
- `value` — The value with any arbitrary.
**Returned value**
- `value` converted to the current data type name. [String](../data-types/string.md).
**Example**
Query:
```sql
SELECT toTypeName(123);
```
Result:
```response
┌─toTypeName(123)─┐
│ UInt8 │
└─────────────────┘
```
## blockSize {#blockSize}

View File

@ -272,8 +272,7 @@ ALTER TABLE table_name MODIFY COLUMN column_name RESET SETTING max_compress_bloc
## MATERIALIZE COLUMN
Materializes a column with a `DEFAULT` or `MATERIALIZED` value expression.
This statement can be used to rewrite existing column data after a `DEFAULT` or `MATERIALIZED` expression has been added or updated (which only updates the metadata but does not change existing data).
Materializes a column with a `DEFAULT` or `MATERIALIZED` value expression. When adding a materialized column using `ALTER TABLE table_name ADD COLUMN column_name MATERIALIZED`, existing rows without materialized values are not automatically filled. `MATERIALIZE COLUMN` statement can be used to rewrite existing column data after a `DEFAULT` or `MATERIALIZED` expression has been added or updated (which only updates the metadata but does not change existing data).
Implemented as a [mutation](/docs/en/sql-reference/statements/alter/index.md#mutations).
For columns with a new or updated `MATERIALIZED` value expression, all existing rows are rewritten.

View File

@ -135,15 +135,15 @@ To change SQL security for an existing view, use
ALTER TABLE MODIFY SQL SECURITY { DEFINER | INVOKER | NONE } [DEFINER = { user | CURRENT_USER }]
```
### Examples sql security
### Examples
```sql
CREATE test_view
CREATE VIEW test_view
DEFINER = alice SQL SECURITY DEFINER
AS SELECT ...
```
```sql
CREATE test_view
CREATE VIEW test_view
SQL SECURITY INVOKER
AS SELECT ...
```

View File

@ -628,7 +628,9 @@ void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, Contex
auto condition_write_buffer = WriteBufferFromOwnString();
LOG_DEBUG(log, "Checking startup query condition `{}`", condition);
executeQuery(condition_read_buffer, condition_write_buffer, true, context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
auto startup_context = Context::createCopy(context);
startup_context->makeQueryContext();
executeQuery(condition_read_buffer, condition_write_buffer, true, startup_context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
auto result = condition_write_buffer.str();
@ -648,7 +650,9 @@ void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, Contex
auto write_buffer = WriteBufferFromOwnString();
LOG_DEBUG(log, "Executing query `{}`", query);
executeQuery(read_buffer, write_buffer, true, context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
auto startup_context = Context::createCopy(context);
startup_context->makeQueryContext();
executeQuery(read_buffer, write_buffer, true, startup_context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
}
}
catch (...)

View File

@ -13,19 +13,19 @@ max-statements=200
[tool.pylint.'MESSAGES CONTROL']
# pytest.mark.parametrize is not callable (not-callable)
disable = '''
missing-docstring,
too-few-public-methods,
invalid-name,
too-many-arguments,
too-many-locals,
too-many-instance-attributes,
bare-except,
broad-except,
cell-var-from-loop,
fixme,
invalid-name,
missing-docstring,
redefined-outer-name,
too-few-public-methods,
too-many-arguments,
too-many-instance-attributes,
too-many-locals,
too-many-public-methods,
wildcard-import,
redefined-outer-name,
broad-except,
bare-except,
'''
[tool.isort]

View File

@ -623,7 +623,7 @@ AuthResult AccessControl::authenticate(const Credentials & credentials, const Po
/// We use the same message for all authentication failures because we don't want to give away any unnecessary information for security reasons,
/// only the log will show the exact reason.
throw Exception(PreformattedMessage{message.str(),
"{}: Authentication failed: password is incorrect, or there is no user with such name.{}",
"{}: Authentication failed: password is incorrect, or there is no user with such name",
std::vector<std::string>{credentials.getUserName()}},
ErrorCodes::AUTHENTICATION_FAILED);
}

View File

@ -54,6 +54,7 @@ struct Settings;
M(UInt64, log_file_overallocate_size, 50 * 1024 * 1024, "If max_log_file_size is not set to 0, this value will be added to it for preallocating bytes on disk. If a log record is larger than this value, it could lead to uncaught out-of-space issues so a larger value is preferred", 0) \
M(UInt64, min_request_size_for_cache, 50 * 1024, "Minimal size of the request to cache the deserialization result. Caching can have negative effect on latency for smaller requests, set to 0 to disable", 0) \
M(UInt64, raft_limits_reconnect_limit, 50, "If connection to a peer is silent longer than this limit * (multiplied by heartbeat interval), we re-establish the connection.", 0) \
M(UInt64, raft_limits_response_limit, 20, "Total wait time for a response is calculated by multiplying response_limit with heart_beat_interval_ms", 0) \
M(Bool, async_replication, false, "Enable async replication. All write and read guarantees are preserved while better performance is achieved. Settings is disabled by default to not break backwards compatibility.", 0) \
M(Bool, experimental_use_rocksdb, false, "Use rocksdb as backend storage", 0) \
M(UInt64, latest_logs_cache_size_threshold, 1 * 1024 * 1024 * 1024, "Maximum total size of in-memory cache of latest log entries.", 0) \

View File

@ -411,7 +411,9 @@ KeeperContext::Storage KeeperContext::getLogsPathFromConfig(const Poco::Util::Ab
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalLogDisk", path);
auto disk = std::make_shared<DiskLocal>("LocalLogDisk", path);
disk->startup(Context::getGlobalContextInstance(), false);
return disk;
};
/// the most specialized path
@ -437,7 +439,9 @@ KeeperContext::Storage KeeperContext::getSnapshotsPathFromConfig(const Poco::Uti
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalSnapshotDisk", path);
auto disk = std::make_shared<DiskLocal>("LocalSnapshotDisk", path);
disk->startup(Context::getGlobalContextInstance(), false);
return disk;
};
/// the most specialized path
@ -463,7 +467,9 @@ KeeperContext::Storage KeeperContext::getStatePathFromConfig(const Poco::Util::A
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalStateFileDisk", path);
auto disk = std::make_shared<DiskLocal>("LocalStateFileDisk", path);
disk->startup(Context::getGlobalContextInstance(), false);
return disk;
};
if (config.has("keeper_server.state_storage_disk"))

View File

@ -506,6 +506,7 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co
nuraft::raft_server::limits raft_limits;
raft_limits.reconnect_limit_ = getValueOrMaxInt32AndLogWarning(coordination_settings->raft_limits_reconnect_limit, "raft_limits_reconnect_limit", log);
raft_limits.response_limit_ = getValueOrMaxInt32AndLogWarning(coordination_settings->raft_limits_response_limit, "response_limit", log);
raft_instance->set_raft_limits(raft_limits);
raft_instance->start_server(init_options.skip_initial_election_timeout_);
@ -1079,7 +1080,7 @@ ClusterUpdateActions KeeperServer::getRaftConfigurationDiff(const Poco::Util::Ab
void KeeperServer::applyConfigUpdateWithReconfigDisabled(const ClusterUpdateAction& action)
{
std::lock_guard _{server_write_mutex};
std::unique_lock server_write_lock{server_write_mutex};
if (is_recovering) return;
constexpr auto sleep_time = 500ms;
@ -1090,7 +1091,9 @@ void KeeperServer::applyConfigUpdateWithReconfigDisabled(const ClusterUpdateActi
auto backoff_on_refusal = [&](size_t i)
{
LOG_INFO(log, "Update was not accepted (try {}), backing off for {}", i + 1, sleep_time * (i + 1));
server_write_lock.unlock();
std::this_thread::sleep_for(sleep_time * (i + 1));
server_write_lock.lock();
};
const auto & coordination_settings = keeper_context->getCoordinationSettings();

View File

@ -178,7 +178,8 @@ IMPLEMENT_SETTING_ENUM(LightweightMutationProjectionMode, ErrorCodes::BAD_ARGUME
{"rebuild", LightweightMutationProjectionMode::REBUILD}})
IMPLEMENT_SETTING_ENUM(DeduplicateMergeProjectionMode, ErrorCodes::BAD_ARGUMENTS,
{{"throw", DeduplicateMergeProjectionMode::THROW},
{{"ignore", DeduplicateMergeProjectionMode::IGNORE},
{"throw", DeduplicateMergeProjectionMode::THROW},
{"drop", DeduplicateMergeProjectionMode::DROP},
{"rebuild", DeduplicateMergeProjectionMode::REBUILD}})

View File

@ -314,6 +314,7 @@ DECLARE_SETTING_ENUM(LightweightMutationProjectionMode)
enum class DeduplicateMergeProjectionMode : uint8_t
{
IGNORE,
THROW,
DROP,
REBUILD,

View File

@ -11,20 +11,6 @@
using namespace DB;
namespace
{
bool withFileCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache;
}
bool withPageCache(const ReadSettings & settings, bool with_file_cache)
{
return settings.page_cache && !with_file_cache && settings.use_page_cache_for_disks_without_file_cache;
}
}
namespace DB
{
namespace ErrorCodes
@ -35,7 +21,7 @@ namespace ErrorCodes
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size)
{
/// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task.
if (!withFileCache(settings))
if (!settings.enable_filesystem_cache)
return settings.remote_fs_buffer_size;
/// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file.
@ -45,7 +31,6 @@ size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const std::string & cache_path_prefix_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_)
@ -54,12 +39,10 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))
, cache_path_prefix(cache_path_prefix_)
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
, query_id(CurrentThread::getQueryId())
, use_external_buffer(use_external_buffer_)
, with_file_cache(withFileCache(settings))
, with_page_cache(withPageCache(settings, with_file_cache))
, with_file_cache(settings.enable_filesystem_cache)
, log(getLogger("ReadBufferFromRemoteFSGather"))
{
if (!blobs_to_read.empty())
@ -74,47 +57,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
}
current_object = object;
const auto & object_path = object.remote_path;
std::unique_ptr<ReadBufferFromFileBase> buf;
if (with_file_cache)
{
if (settings.remote_fs_cache->isInitialized())
{
auto cache_key = settings.remote_fs_cache->createKeyForPath(object_path);
buf = std::make_unique<CachedOnDiskReadBufferFromFile>(
object_path,
cache_key,
settings.remote_fs_cache,
FileCache::getCommonUser(),
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object); },
settings,
query_id,
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
/* read_until_position */std::nullopt,
cache_log);
}
else
{
settings.remote_fs_cache->throwInitExceptionIfNeeded();
}
}
/// Can't wrap CachedOnDiskReadBufferFromFile in CachedInMemoryReadBufferFromFile because the
/// former doesn't support seeks.
if (with_page_cache && !buf)
{
auto inner = read_buffer_creator(/* restricted_seek */false, object);
auto cache_key = FileChunkAddress { .path = cache_path_prefix + object_path };
buf = std::make_unique<CachedInMemoryReadBufferFromFile>(
cache_key, settings.page_cache, std::move(inner), settings);
}
if (!buf)
buf = read_buffer_creator(/* restricted_seek */true, object);
auto buf = read_buffer_creator(/* restricted_seek */true, object);
if (read_until_position > start_offset && read_until_position < start_offset + object.bytes_size)
buf->setReadUntilPosition(read_until_position - start_offset);

View File

@ -26,7 +26,6 @@ public:
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const std::string & cache_path_prefix_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_);
@ -71,12 +70,10 @@ private:
const ReadSettings settings;
const StoredObjects blobs_to_read;
const ReadBufferCreator read_buffer_creator;
const std::string cache_path_prefix;
const std::shared_ptr<FilesystemCacheLog> cache_log;
const String query_id;
const bool use_external_buffer;
const bool with_file_cache;
const bool with_page_cache;
size_t read_until_position = 0;
size_t file_offset_of_buffer_end = 0;

View File

@ -210,63 +210,14 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObject( /// NOLI
auto settings_ptr = settings.get();
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(), object.remote_path, patchSettings(read_settings), settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries);
}
std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto settings_ptr = settings.get();
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(),
object_.remote_path,
disk_read_settings,
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
/* use_external_buffer */true,
restricted_seek);
};
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"azure:",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"azure:",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
client.get(),
object.remote_path,
patchSettings(read_settings),
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
read_settings.remote_read_buffer_use_external_buffer,
read_settings.remote_read_buffer_restrict_seek,
/* read_until_position */0);
}
/// Open the file for write and return WriteBufferFromFileBase object.

View File

@ -51,12 +51,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -48,9 +48,7 @@ CachedObjectStorage::generateObjectKeyPrefixForDirectoryPath(const std::string &
ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settings) const
{
ReadSettings modified_settings{read_settings};
modified_settings.remote_fs_cache = cache;
return object_storage->patchSettings(modified_settings);
return object_storage->patchSettings(read_settings);
}
void CachedObjectStorage::startup()
@ -63,21 +61,45 @@ bool CachedObjectStorage::exists(const StoredObject & object) const
return object_storage->exists(object);
}
std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
return object_storage->readObjects(objects, patchSettings(read_settings), read_hint, file_size);
}
std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObject( /// NOLINT
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
if (read_settings.enable_filesystem_cache)
{
if (cache->isInitialized())
{
auto cache_key = cache->createKeyForPath(object.remote_path);
auto global_context = Context::getGlobalContextInstance();
auto modified_read_settings = read_settings.withNestedBuffer();
auto read_buffer_creator = [=, this]()
{
return object_storage->readObject(object, patchSettings(read_settings), read_hint, file_size);
};
return std::make_unique<CachedOnDiskReadBufferFromFile>(
object.remote_path,
cache_key,
cache,
FileCache::getCommonUser(),
read_buffer_creator,
modified_read_settings,
std::string(CurrentThread::getQueryId()),
object.bytes_size,
/* allow_seeks */!read_settings.remote_read_buffer_restrict_seek,
/* use_external_buffer */read_settings.remote_read_buffer_use_external_buffer,
/* read_until_position */std::nullopt,
global_context->getFilesystemCacheLog());
}
else
{
cache->throwInitExceptionIfNeeded();
}
}
return object_storage->readObject(object, patchSettings(read_settings), read_hint, file_size);
}

View File

@ -37,12 +37,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -11,6 +11,9 @@
#include <Common/CurrentMetrics.h>
#include <Common/Scheduler/IResourceManager.h>
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
#include <IO/CachedInMemoryReadBufferFromFile.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
#include <Disks/FakeDiskTransaction.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -496,16 +499,60 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
std::optional<size_t> file_size) const
{
const auto storage_objects = metadata_storage->getStorageObjects(path);
auto global_context = Context::getGlobalContextInstance();
const bool file_can_be_empty = !file_size.has_value() || *file_size == 0;
if (storage_objects.empty() && file_can_be_empty)
return std::make_unique<ReadBufferFromEmptyFile>();
return object_storage->readObjects(
auto read_settings = updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName());
/// We wrap read buffer from object storage (read_buf = object_storage->readObject())
/// inside ReadBufferFromRemoteFSGather, so add nested buffer setting.
read_settings = read_settings.withNestedBuffer();
auto read_buffer_creator =
[this, read_settings, read_hint, file_size]
(bool restricted_seek, const StoredObject & object_) mutable -> std::unique_ptr<ReadBufferFromFileBase>
{
read_settings.remote_read_buffer_restrict_seek = restricted_seek;
auto impl = object_storage->readObject(object_, read_settings, read_hint, file_size);
if ((!object_storage->supportsCache() || !read_settings.enable_filesystem_cache)
&& read_settings.page_cache && read_settings.use_page_cache_for_disks_without_file_cache)
{
/// Can't wrap CachedOnDiskReadBufferFromFile in CachedInMemoryReadBufferFromFile because the
/// former doesn't support seeks.
auto cache_path_prefix = fmt::format("{}:", magic_enum::enum_name(object_storage->getType()));
const auto object_namespace = object_storage->getObjectsNamespace();
if (!object_namespace.empty())
cache_path_prefix += object_namespace + "/";
const auto cache_key = FileChunkAddress { .path = cache_path_prefix + object_.remote_path };
impl = std::make_unique<CachedInMemoryReadBufferFromFile>(
cache_key, read_settings.page_cache, std::move(impl), read_settings);
}
return impl;
};
const bool use_async_buffer = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
storage_objects,
updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName()),
read_hint,
file_size);
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */use_async_buffer);
if (use_async_buffer)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
return impl;
}
std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(

View File

@ -82,28 +82,12 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObject( /// NOLIN
initializeHDFSFS();
auto path = extractObjectKeyFromURL(object);
return std::make_unique<ReadBufferFromHDFS>(
fs::path(url_without_path) / "", fs::path(data_directory) / path, config, patchSettings(read_settings));
}
std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
initializeHDFSFS();
auto disk_read_settings = patchSettings(read_settings);
auto read_buffer_creator =
[this, disk_read_settings]
(bool /* restricted_seek */, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
auto path = extractObjectKeyFromURL(object_);
return std::make_unique<ReadBufferFromHDFS>(
fs::path(url_without_path) / "", fs::path(data_directory) / path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
};
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, "hdfs:", disk_read_settings, nullptr, /* use_external_buffer */false);
fs::path(url_without_path) / "",
fs::path(data_directory) / path,
config,
patchSettings(read_settings),
/* read_until_position */0,
read_settings.remote_read_buffer_use_external_buffer);
}
std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOLINT

View File

@ -69,12 +69,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -150,13 +150,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const = 0;
/// Read multiple objects with common prefix
virtual std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const = 0;
/// Open the file for write and return WriteBufferFromFileBase object.
virtual std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -40,47 +40,12 @@ bool LocalObjectStorage::exists(const StoredObject & object) const
return fs::exists(object.remote_path);
}
std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator = [=](bool /* restricted_seek */, const StoredObject & object) -> std::unique_ptr<ReadBufferFromFileBase>
{ return std::make_unique<ReadBufferFromFile>(object.remote_path); };
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"file:",
modified_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */ false);
}
ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const
{
if (!read_settings.enable_filesystem_cache)
return IObjectStorage::patchSettings(read_settings);
auto modified_settings{read_settings};
/// For now we cannot allow asynchronous reader from local filesystem when CachedObjectStorage is used.
switch (modified_settings.local_fs_method)
{
case LocalFSReadMethod::pread_threadpool:
case LocalFSReadMethod::pread_fake_async:
{
modified_settings.local_fs_method = LocalFSReadMethod::pread;
LOG_INFO(log, "Changing local filesystem read method to `pread`");
break;
}
default:
{
break;
}
}
/// Other options might break assertions in AsynchronousBoundedReadBuffer.
modified_settings.local_fs_method = LocalFSReadMethod::pread;
modified_settings.direct_io_threshold = 0; /// Disable.
return IObjectStorage::patchSettings(modified_settings);
}

View File

@ -34,12 +34,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -176,65 +176,6 @@ bool S3ObjectStorage::exists(const StoredObject & object) const
return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {});
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto settings_ptr = s3_settings.get();
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client.get(),
uri.bucket,
object_.remote_path,
uri.version_id,
settings_ptr->request_settings,
disk_read_settings,
/* use_external_buffer */true,
/* offset */0,
/* read_until_position */0,
restricted_seek);
};
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"s3:" + uri.bucket + "/",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"s3:" + uri.bucket + "/",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
const StoredObject & object,
const ReadSettings & read_settings,
@ -248,7 +189,12 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
object.remote_path,
uri.version_id,
settings_ptr->request_settings,
patchSettings(read_settings));
patchSettings(read_settings),
read_settings.remote_read_buffer_use_external_buffer,
/* offset */0,
/* read_until_position */0,
read_settings.remote_read_buffer_restrict_seek,
object.bytes_size ? std::optional<size_t>(object.bytes_size) : std::nullopt);
}
std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLINT

View File

@ -89,12 +89,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -233,69 +233,18 @@ WebObjectStorage::FileDataPtr WebObjectStorage::tryGetFileInfo(const String & pa
}
}
std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
if (objects.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "WebObjectStorage support read only from single object");
return readObject(objects[0], read_settings, read_hint, file_size);
}
std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
size_t object_size = object.bytes_size;
auto read_buffer_creator =
[this, read_settings, object_size]
(bool /* restricted_seek */, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / object_.remote_path,
getContext(),
object_size,
read_settings,
/* use_external_buffer */true);
};
auto global_context = Context::getGlobalContextInstance();
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
"url:" + url + "/",
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
"url:" + url + "/",
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / object.remote_path,
getContext(),
object.bytes_size,
read_settings,
read_settings.remote_read_buffer_use_external_buffer);
}
void WebObjectStorage::throwNotAllowed()

View File

@ -39,12 +39,6 @@ public:
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const StoredObject & object,

View File

@ -104,7 +104,7 @@ struct ArrayAggregateImpl
static DataTypePtr getReturnType(const DataTypePtr & expression_return, const DataTypePtr & /*array_element*/)
{
if (aggregate_operation == AggregateOperation::max || aggregate_operation == AggregateOperation::min)
if constexpr (aggregate_operation == AggregateOperation::max || aggregate_operation == AggregateOperation::min)
{
return expression_return;
}
@ -152,9 +152,62 @@ struct ArrayAggregateImpl
return result;
}
template <AggregateOperation op = aggregate_operation>
requires(op == AggregateOperation::min || op == AggregateOperation::max)
static void executeMinOrMax(const ColumnPtr & mapped, const ColumnArray::Offsets & offsets, ColumnPtr & res_ptr)
{
const ColumnConst * const_column = checkAndGetColumn<ColumnConst>(&*mapped);
if (const_column)
{
MutableColumnPtr res_column = const_column->getDataColumn().cloneEmpty();
res_column->insertMany(const_column->getField(), offsets.size());
res_ptr = std::move(res_column);
return;
}
MutableColumnPtr res_column = mapped->cloneEmpty();
static constexpr int nan_null_direction_hint = aggregate_operation == AggregateOperation::min ? 1 : -1;
/// TODO: Introduce row_begin and row_end to getPermutation or an equivalent function to use that instead
/// (same use case as SingleValueDataBase::getSmallestIndex)
UInt64 start_of_array = 0;
for (auto end_of_array : offsets)
{
/// Array is empty
if (start_of_array == end_of_array)
{
res_column->insertDefault();
continue;
}
UInt64 index = start_of_array;
for (UInt64 i = index + 1; i < end_of_array; i++)
{
if constexpr (aggregate_operation == AggregateOperation::min)
{
if ((mapped->compareAt(i, index, *mapped, nan_null_direction_hint) < 0))
index = i;
}
else
{
if ((mapped->compareAt(i, index, *mapped, nan_null_direction_hint) > 0))
index = i;
}
}
res_column->insertFrom(*mapped, index);
start_of_array = end_of_array;
}
chassert(res_column->size() == offsets.size());
res_ptr = std::move(res_column);
}
template <typename Element>
static NO_SANITIZE_UNDEFINED bool executeType(const ColumnPtr & mapped, const ColumnArray::Offsets & offsets, ColumnPtr & res_ptr)
{
/// Min and Max are implemented in a different function
static_assert(aggregate_operation != AggregateOperation::min && aggregate_operation != AggregateOperation::max);
using ResultType = ArrayAggregateResult<Element, aggregate_operation>;
using ColVecType = ColumnVectorOrDecimal<Element>;
using ColVecResultType = ColumnVectorOrDecimal<ResultType>;
@ -197,11 +250,6 @@ struct ArrayAggregateImpl
/// Just multiply the value by array size.
res[i] = x * static_cast<ResultType>(array_size);
}
else if constexpr (aggregate_operation == AggregateOperation::min ||
aggregate_operation == AggregateOperation::max)
{
res[i] = x;
}
else if constexpr (aggregate_operation == AggregateOperation::average)
{
if constexpr (is_decimal<Element>)
@ -292,20 +340,6 @@ struct ArrayAggregateImpl
{
aggregate_value += element;
}
else if constexpr (aggregate_operation == AggregateOperation::min)
{
if (element < aggregate_value)
{
aggregate_value = element;
}
}
else if constexpr (aggregate_operation == AggregateOperation::max)
{
if (element > aggregate_value)
{
aggregate_value = element;
}
}
else if constexpr (aggregate_operation == AggregateOperation::product)
{
if constexpr (is_decimal<Element>)
@ -360,74 +394,41 @@ struct ArrayAggregateImpl
static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped)
{
if constexpr (aggregate_operation == AggregateOperation::max || aggregate_operation == AggregateOperation::min)
{
MutableColumnPtr res;
const auto & column = array.getDataPtr();
const ColumnConst * const_column = checkAndGetColumn<ColumnConst>(&*column);
if (const_column)
{
res = const_column->getDataColumn().cloneEmpty();
}
else
{
res = column->cloneEmpty();
}
const IColumn::Offsets & offsets = array.getOffsets();
size_t pos = 0;
for (const auto & offset : offsets)
{
if (offset == pos)
{
res->insertDefault();
continue;
}
size_t current_max_or_min_index = pos;
++pos;
for (; pos < offset; ++pos)
{
int compare_result = column->compareAt(pos, current_max_or_min_index, *column, 1);
if (aggregate_operation == AggregateOperation::max && compare_result > 0)
{
current_max_or_min_index = pos;
}
else if (aggregate_operation == AggregateOperation::min && compare_result < 0)
{
current_max_or_min_index = pos;
}
}
res->insert((*column)[current_max_or_min_index]);
}
return res;
}
const IColumn::Offsets & offsets = array.getOffsets();
ColumnPtr res;
if (executeType<UInt8>(mapped, offsets, res) ||
executeType<UInt16>(mapped, offsets, res) ||
executeType<UInt32>(mapped, offsets, res) ||
executeType<UInt64>(mapped, offsets, res) ||
executeType<UInt128>(mapped, offsets, res) ||
executeType<UInt256>(mapped, offsets, res) ||
executeType<Int8>(mapped, offsets, res) ||
executeType<Int16>(mapped, offsets, res) ||
executeType<Int32>(mapped, offsets, res) ||
executeType<Int64>(mapped, offsets, res) ||
executeType<Int128>(mapped, offsets, res) ||
executeType<Int256>(mapped, offsets, res) ||
executeType<Float32>(mapped, offsets, res) ||
executeType<Float64>(mapped, offsets, res) ||
executeType<Decimal32>(mapped, offsets, res) ||
executeType<Decimal64>(mapped, offsets, res) ||
executeType<Decimal128>(mapped, offsets, res) ||
executeType<Decimal256>(mapped, offsets, res) ||
executeType<DateTime64>(mapped, offsets, res))
if constexpr (aggregate_operation == AggregateOperation::min || aggregate_operation == AggregateOperation::max)
{
executeMinOrMax(mapped, offsets, res);
return res;
}
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Unexpected column for arraySum: {}", mapped->getName());
{
if (executeType<UInt8>(mapped, offsets, res) ||
executeType<UInt16>(mapped, offsets, res) ||
executeType<UInt32>(mapped, offsets, res) ||
executeType<UInt64>(mapped, offsets, res) ||
executeType<UInt128>(mapped, offsets, res) ||
executeType<UInt256>(mapped, offsets, res) ||
executeType<Int8>(mapped, offsets, res) ||
executeType<Int16>(mapped, offsets, res) ||
executeType<Int32>(mapped, offsets, res) ||
executeType<Int64>(mapped, offsets, res) ||
executeType<Int128>(mapped, offsets, res) ||
executeType<Int256>(mapped, offsets, res) ||
executeType<Float32>(mapped, offsets, res) ||
executeType<Float64>(mapped, offsets, res) ||
executeType<Decimal32>(mapped, offsets, res) ||
executeType<Decimal64>(mapped, offsets, res) ||
executeType<Decimal128>(mapped, offsets, res) ||
executeType<Decimal256>(mapped, offsets, res) ||
executeType<DateTime64>(mapped, offsets, res))
{
return res;
}
}
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Unexpected column for arraySum: {}", mapped->getName());
}
};

View File

@ -116,7 +116,8 @@ struct ReadSettings
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
FileCachePtr remote_fs_cache;
bool remote_read_buffer_restrict_seek = false;
bool remote_read_buffer_use_external_buffer = false;
/// Bandwidth throttler to use during reading
ThrottlerPtr remote_throttler;
@ -138,6 +139,14 @@ struct ReadSettings
res.prefetch_buffer_size = std::min(std::max(1ul, file_size), prefetch_buffer_size);
return res;
}
ReadSettings withNestedBuffer() const
{
ReadSettings res = *this;
res.remote_read_buffer_restrict_seek = true;
res.remote_read_buffer_use_external_buffer = true;
return res;
}
};
ReadSettings getReadSettings();

View File

@ -131,7 +131,12 @@ bool KeyMetadata::createBaseDirectory(bool throw_if_failed)
{
created_base_directory = false;
if (!throw_if_failed && e.code() == std::errc::no_space_on_device)
if (!throw_if_failed &&
(e.code() == std::errc::no_space_on_device
|| e.code() == std::errc::read_only_file_system
|| e.code() == std::errc::permission_denied
|| e.code() == std::errc::too_many_files_open
|| e.code() == std::errc::operation_not_permitted))
{
LOG_TRACE(cache_metadata->log, "Failed to create base directory for key {}, "
"because no space left on device", key);

View File

@ -16,6 +16,10 @@ bool less(const Field & lhs, const Field & rhs, int direction)
bool equals(const Field & lhs, const Field & rhs)
{
/// This will treat NaNs as equal
if (lhs.getType() == rhs.getType())
return lhs == rhs;
return applyVisitor(FieldVisitorAccurateEquals(), lhs, rhs);
}

View File

@ -869,6 +869,26 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
{
properties.indices = as_storage_metadata->getSecondaryIndices();
properties.projections = as_storage_metadata->getProjections().clone();
/// CREATE TABLE AS should copy PRIMARY KEY, ORDER BY, and similar clauses.
/// Note: only supports the source table engine is using the new syntax.
if (const auto * merge_tree_data = dynamic_cast<const MergeTreeData *>(as_storage.get()))
{
if (merge_tree_data->format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
if (!create.storage->primary_key && as_storage_metadata->isPrimaryKeyDefined() && as_storage_metadata->hasPrimaryKey())
create.storage->set(create.storage->primary_key, as_storage_metadata->getPrimaryKeyAST()->clone());
if (!create.storage->partition_by && as_storage_metadata->isPartitionKeyDefined() && as_storage_metadata->hasPartitionKey())
create.storage->set(create.storage->partition_by, as_storage_metadata->getPartitionKeyAST()->clone());
if (!create.storage->order_by && as_storage_metadata->isSortingKeyDefined() && as_storage_metadata->hasSortingKey())
create.storage->set(create.storage->order_by, as_storage_metadata->getSortingKeyAST()->clone());
if (!create.storage->sample_by && as_storage_metadata->isSamplingKeyDefined() && as_storage_metadata->hasSamplingKey())
create.storage->set(create.storage->sample_by, as_storage_metadata->getSamplingKeyAST()->clone());
}
}
}
else
{

View File

@ -24,6 +24,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_FORMAT_VERSION;
extern const int NOT_IMPLEMENTED;
};
GinIndexPostingsBuilder::GinIndexPostingsBuilder(UInt64 limit)
@ -153,13 +154,18 @@ GinIndexStore::GinIndexStore(const String & name_, DataPartStoragePtr storage_)
: name(name_)
, storage(storage_)
{
if (storage->getType() != MergeTreeDataPartStorageType::Full)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "INDEX {} with 'full_text' type supports only full storage", name);
}
GinIndexStore::GinIndexStore(const String & name_, DataPartStoragePtr storage_, MutableDataPartStoragePtr data_part_storage_builder_, UInt64 max_digestion_size_)
: name(name_)
, storage(storage_)
, data_part_storage_builder(data_part_storage_builder_)
, max_digestion_size(max_digestion_size_)
{
if (storage->getType() != MergeTreeDataPartStorageType::Full)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "INDEX {} with 'full_text' type supports only full storage", name);
}
bool GinIndexStore::exists() const

View File

@ -648,7 +648,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe
for (const auto & projection : projections)
{
if (merge_may_reduce_rows)
/// Checking IGNORE here is just for compatibility.
if (merge_may_reduce_rows && mode != DeduplicateMergeProjectionMode::IGNORE)
{
global_ctx->projections_to_rebuild.push_back(&projection);
continue;

View File

@ -2129,6 +2129,8 @@ try
runner([&, my_part = part]()
{
auto blocker_for_runner_thread = CannotAllocateThreadFaultInjector::blockFaultInjections();
auto res = loadDataPartWithRetries(
my_part->info, my_part->name, my_part->disk,
DataPartState::Outdated, data_parts_mutex, loading_parts_initial_backoff_ms,

View File

@ -221,7 +221,7 @@ struct Settings;
/** Projection settings. */ \
M(UInt64, max_projections, 25, "The maximum number of merge tree projections.", 0) \
M(LightweightMutationProjectionMode, lightweight_mutation_projection_mode, LightweightMutationProjectionMode::THROW, "When lightweight delete happens on a table with projection(s), the possible operations include throw the exception as projection exists, or drop projections of this table's relevant parts, or rebuild the projections.", 0) \
M(DeduplicateMergeProjectionMode, deduplicate_merge_projection_mode, DeduplicateMergeProjectionMode::THROW, "Whether to allow create projection for the table with non-classic MergeTree, if allowed, what is the action when merge, drop or rebuild.", 0) \
M(DeduplicateMergeProjectionMode, deduplicate_merge_projection_mode, DeduplicateMergeProjectionMode::THROW, "Whether to allow create projection for the table with non-classic MergeTree. Ignore option is purely for compatibility which might result in incorrect answer. Otherwise, if allowed, what is the action when merge, drop or rebuild.", 0) \
#define MAKE_OBSOLETE_MERGE_TREE_SETTING(M, TYPE, NAME, DEFAULT) \
M(TYPE, NAME, DEFAULT, "Obsolete setting, does nothing.", BaseSettingsHelpers::Flags::OBSOLETE)

View File

@ -11,7 +11,11 @@ namespace DB
std::unique_ptr<ReadBuffer> PartMetadataManagerOrdinary::read(const String & file_name) const
{
size_t file_size = part->getDataPartStorage().getFileSize(file_name);
auto res = part->getDataPartStorage().readFile(file_name, getReadSettings().adjustBufferSize(file_size), file_size, std::nullopt);
auto read_settings = getReadSettings().adjustBufferSize(file_size);
/// Default read method is pread_threadpool, but there is not much point in it here.
read_settings.local_fs_method = LocalFSReadMethod::pread;
auto res = part->getDataPartStorage().readFile(file_name, read_settings, file_size, std::nullopt);
if (isCompressedFromFileName(file_name))
return std::make_unique<CompressedReadBufferFromFile>(std::move(res));

View File

@ -77,7 +77,14 @@ bool isRetryableException(std::exception_ptr exception_ptr)
#endif
catch (const ErrnoException & e)
{
return e.getErrno() == EMFILE;
return e.getErrno() == EMFILE
|| e.getErrno() == ENOMEM
|| isNotEnoughMemoryErrorCode(e.code())
|| e.code() == ErrorCodes::NETWORK_ERROR
|| e.code() == ErrorCodes::SOCKET_TIMEOUT
|| e.code() == ErrorCodes::CANNOT_SCHEDULE_TASK
|| e.code() == ErrorCodes::ABORTED;
}
catch (const Coordination::Exception & e)
{
@ -91,6 +98,22 @@ bool isRetryableException(std::exception_ptr exception_ptr)
|| e.code() == ErrorCodes::CANNOT_SCHEDULE_TASK
|| e.code() == ErrorCodes::ABORTED;
}
catch (const std::filesystem::filesystem_error & e)
{
return e.code() == std::errc::no_space_on_device ||
e.code() == std::errc::read_only_file_system ||
e.code() == std::errc::too_many_files_open_in_system ||
e.code() == std::errc::operation_not_permitted ||
e.code() == std::errc::device_or_resource_busy ||
e.code() == std::errc::permission_denied ||
e.code() == std::errc::too_many_files_open ||
e.code() == std::errc::text_file_busy ||
e.code() == std::errc::timed_out ||
e.code() == std::errc::not_enough_memory ||
e.code() == std::errc::not_supported ||
e.code() == std::errc::too_many_links ||
e.code() == std::errc::too_many_symbolic_link_levels;
}
catch (const Poco::Net::NetException &)
{
return true;
@ -171,13 +194,9 @@ static IMergeTreeDataPart::Checksums checkDataPart(
SerializationInfo::Settings settings{ratio_of_defaults, false};
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
}
catch (const Poco::Exception & ex)
{
throw Exception(ErrorCodes::CORRUPTED_DATA, "Failed to load {}, with error {}", IMergeTreeDataPart::SERIALIZATION_FILE_NAME, ex.message());
}
catch (...)
{
throw;
throw Exception(ErrorCodes::CORRUPTED_DATA, "Failed to load file {} of data part {}, with error {}", IMergeTreeDataPart::SERIALIZATION_FILE_NAME, data_part->name, getCurrentExceptionMessage(true));
}
}
@ -399,18 +418,45 @@ IMergeTreeDataPart::Checksums checkDataPart(
ReadSettings read_settings;
read_settings.enable_filesystem_cache = false;
read_settings.enable_filesystem_cache_log = false;
read_settings.enable_filesystem_read_prefetches_log = false;
read_settings.page_cache = nullptr;
read_settings.load_marks_asynchronously = false;
read_settings.remote_fs_prefetch = false;
read_settings.page_cache_inject_eviction = false;
read_settings.use_page_cache_for_disks_without_file_cache = false;
read_settings.local_fs_method = LocalFSReadMethod::pread;
try
{
return checkDataPart(
data_part,
data_part_storage,
data_part->getColumns(),
data_part->getType(),
data_part->getFileNamesWithoutChecksums(),
read_settings,
require_checksums,
is_cancelled,
is_broken_projection,
throw_on_broken_projection);
}
catch (...)
{
if (isRetryableException(std::current_exception()))
{
LOG_DEBUG(
getLogger("checkDataPart"),
"Got retriable error {} checking data part {}, will return empty", data_part->name, getCurrentExceptionMessage(false));
/// We were unable to check data part because of some temporary exception
/// like Memory limit exceeded. If part is actually broken we will retry check
/// with the next read attempt of this data part.
return IMergeTreeDataPart::Checksums{};
}
throw;
}
return checkDataPart(
data_part,
data_part_storage,
data_part->getColumns(),
data_part->getType(),
data_part->getFileNamesWithoutChecksums(),
read_settings,
require_checksums,
is_cancelled,
is_broken_projection,
throw_on_broken_projection);
};
try
@ -431,7 +477,16 @@ IMergeTreeDataPart::Checksums checkDataPart(
catch (...)
{
if (isRetryableException(std::current_exception()))
throw;
{
LOG_DEBUG(
getLogger("checkDataPart"),
"Got retriable error {} checking data part {}, will return empty", data_part->name, getCurrentExceptionMessage(false));
/// We were unable to check data part because of some temporary exception
/// like Memory limit exceeded. If part is actually broken we will retry check
/// with the next read attempt of this data part.
return {};
}
return drop_cache_and_check();
}
}

View File

@ -9,6 +9,7 @@
#include <IO/ReadBufferFromFileBase.h>
#include <IO/Archives/createArchiveReader.h>
#include <Formats/FormatFactory.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Formats/ReadSchemaUtils.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/Cache/SchemaCache.h>
@ -426,37 +427,39 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
const auto & object_size = object_info.metadata->size_bytes;
auto read_settings = context_->getReadSettings().adjustBufferSize(object_size);
read_settings.enable_filesystem_cache = false;
/// FIXME: Changing this setting to default value breaks something around parquet reading
read_settings.remote_read_min_bytes_for_seek = read_settings.remote_fs_buffer_size;
/// User's object may change, don't cache it.
read_settings.enable_filesystem_cache = false;
read_settings.use_page_cache_for_disks_without_file_cache = false;
const bool object_too_small = object_size <= 2 * context_->getSettingsRef()[Setting::max_download_buffer_size];
const bool use_prefetch = object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
read_settings.remote_fs_method = use_prefetch ? RemoteFSReadMethod::threadpool : RemoteFSReadMethod::read;
/// User's object may change, don't cache it.
read_settings.use_page_cache_for_disks_without_file_cache = false;
const bool use_prefetch = object_too_small
&& read_settings.remote_fs_method == RemoteFSReadMethod::threadpool
&& read_settings.remote_fs_prefetch;
if (use_prefetch)
read_settings.remote_read_buffer_use_external_buffer = true;
auto impl = object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings);
// Create a read buffer that will prefetch the first ~1 MB of the file.
// When reading lots of tiny files, this prefetching almost doubles the throughput.
// For bigger files, parallel reading is more useful.
if (use_prefetch)
{
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
if (!use_prefetch)
return impl;
auto async_reader = object_storage->readObjects(
StoredObjects{StoredObject{object_info.getPath(), /* local_path */ "", object_size}}, read_settings);
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)
async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY);
auto & reader = context_->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
impl = std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
context_->getAsyncReadCounters(),
context_->getFilesystemReadPrefetchesLog());
return async_reader;
}
else
{
/// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting.
return object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings);
}
impl->setReadUntilEnd();
impl->prefetch(DEFAULT_PREFETCH_PRIORITY);
return impl;
}
StorageObjectStorageSource::IIterator::IIterator(const std::string & logger_name_)

View File

@ -100,17 +100,20 @@ void checkAllowedQueries(const ASTSelectQuery & query)
/// check if only one single select query in SelectWithUnionQuery
static bool isSingleSelect(const ASTPtr & select, ASTPtr & res)
{
auto new_select = select->as<ASTSelectWithUnionQuery &>();
if (new_select.list_of_selects->children.size() != 1)
auto * new_select = select->as<ASTSelectWithUnionQuery>();
if (new_select == nullptr)
return false;
auto & new_inner_query = new_select.list_of_selects->children.at(0);
if (new_select->list_of_selects->children.size() != 1)
return false;
auto & new_inner_query = new_select->list_of_selects->children.at(0);
if (new_inner_query->as<ASTSelectQuery>())
{
res = new_inner_query;
return true;
}
else
return isSingleSelect(new_inner_query, res);
return isSingleSelect(new_inner_query, res);
}
SelectQueryDescription SelectQueryDescription::getSelectQueryFromASTForMatView(const ASTPtr & select, bool refreshable, ContextPtr context)

View File

@ -1588,8 +1588,9 @@ bool StorageMergeTree::optimize(
{
assertNotReadonly();
const auto mode = getSettings()->deduplicate_merge_projection_mode;
if (deduplicate && getInMemoryMetadataPtr()->hasProjections()
&& getSettings()->deduplicate_merge_projection_mode == DeduplicateMergeProjectionMode::THROW)
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::IGNORE))
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"OPTIMIZE DEDUPLICATE query is not supported for table {} as it has projections. "
"User should drop all the projections manually before running the query, "

View File

@ -5833,8 +5833,9 @@ bool StorageReplicatedMergeTree::optimize(
if (!is_leader)
throw Exception(ErrorCodes::NOT_A_LEADER, "OPTIMIZE cannot be done on this replica because it is not a leader");
const auto mode = getSettings()->deduplicate_merge_projection_mode;
if (deduplicate && getInMemoryMetadataPtr()->hasProjections()
&& getSettings()->deduplicate_merge_projection_mode == DeduplicateMergeProjectionMode::THROW)
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::IGNORE))
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"OPTIMIZE DEDUPLICATE query is not supported for table {} as it has projections. "
"User should drop all the projections manually before running the query, "

View File

@ -1133,12 +1133,15 @@ def main() -> int:
if IS_CI and not pr_info.is_merge_queue:
if pr_info.is_release and pr_info.is_push_event:
if pr_info.is_master and pr_info.is_push_event:
print("Release/master: CI Cache add pending records for all todo jobs")
ci_cache.push_pending_all(pr_info.is_release)
# wait for pending jobs to be finished, await_jobs is a long blocking call
ci_cache.await_pending_jobs(pr_info.is_release)
if pr_info.is_master or pr_info.is_pr:
# - wait for pending jobs to be finished, await_jobs is a long blocking call
# - don't wait for release CI because some jobs may not be present there
# and we may wait until timeout in vain
ci_cache.await_pending_jobs(pr_info.is_release)
# conclude results
result["git_ref"] = git_ref

View File

@ -14,10 +14,9 @@ import string
import subprocess
import sys
import time
import zlib # for crc32
from collections import defaultdict
from itertools import chain
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from ci_utils import kill_ci_runner
from env_helper import IS_CI
@ -44,10 +43,6 @@ TASK_TIMEOUT = 8 * 60 * 60 # 8 hours
NO_CHANGES_MSG = "Nothing to run"
def stringhash(s):
return zlib.crc32(s.encode("utf-8"))
# Search test by the common prefix.
# This is accept tests w/o parameters in skip list.
#
@ -255,7 +250,8 @@ def clear_ip_tables_and_restart_daemons():
class ClickhouseIntegrationTestsRunner:
def __init__(self, result_path, params):
def __init__(self, repo_path: str, result_path: str, params: dict):
self.repo_path = repo_path
self.result_path = result_path
self.params = params
@ -313,11 +309,11 @@ class ClickhouseIntegrationTestsRunner:
def shuffle_test_groups(self):
return self.shuffle_groups != 0
def _pre_pull_images(self, repo_path):
image_cmd = self._get_runner_image_cmd(repo_path)
def _pre_pull_images(self):
image_cmd = self._get_runner_image_cmd()
cmd = (
f"cd {repo_path}/tests/integration && "
f"cd {self.repo_path}/tests/integration && "
f"timeout --signal=KILL 1h ./runner {self._get_runner_opts()} {image_cmd} "
"--pre-pull --command ' echo Pre Pull finished ' "
)
@ -422,12 +418,12 @@ class ClickhouseIntegrationTestsRunner:
return " ".join(result)
def _get_all_tests(self, repo_path):
image_cmd = self._get_runner_image_cmd(repo_path)
def _get_all_tests(self) -> List[str]:
image_cmd = self._get_runner_image_cmd()
runner_opts = self._get_runner_opts()
out_file_full = os.path.join(self.result_path, "runner_get_all_tests.log")
cmd = (
f"cd {repo_path}/tests/integration && "
f"cd {self.repo_path}/tests/integration && "
f"timeout --signal=KILL 1h ./runner {runner_opts} {image_cmd} -- --setup-plan "
)
@ -508,10 +504,10 @@ class ClickhouseIntegrationTestsRunner:
for test in current_counters[state]:
main_counters[state].append(test)
def _get_runner_image_cmd(self, repo_path):
def _get_runner_image_cmd(self):
image_cmd = ""
if self._can_run_with(
os.path.join(repo_path, "tests/integration", "runner"),
os.path.join(self.repo_path, "tests/integration", "runner"),
"--docker-image-version",
):
for img in IMAGES:
@ -523,7 +519,7 @@ class ClickhouseIntegrationTestsRunner:
image_cmd += f" --docker-image-version={runner_version} "
else:
if self._can_run_with(
os.path.join(repo_path, "tests/integration", "runner"),
os.path.join(self.repo_path, "tests/integration", "runner"),
"--docker-compose-images-tags",
):
image_cmd += (
@ -564,7 +560,6 @@ class ClickhouseIntegrationTestsRunner:
def try_run_test_group(
self,
repo_path,
test_group,
tests_in_group,
num_tries,
@ -573,7 +568,6 @@ class ClickhouseIntegrationTestsRunner:
):
try:
return self.run_test_group(
repo_path,
test_group,
tests_in_group,
num_tries,
@ -596,7 +590,6 @@ class ClickhouseIntegrationTestsRunner:
def run_test_group(
self,
repo_path,
test_group,
tests_in_group,
num_tries,
@ -620,7 +613,7 @@ class ClickhouseIntegrationTestsRunner:
tests_times[test] = 0
return counters, tests_times, []
image_cmd = self._get_runner_image_cmd(repo_path)
image_cmd = self._get_runner_image_cmd()
test_group_str = test_group.replace("/", "_").replace(".", "_")
log_paths = []
@ -639,10 +632,10 @@ class ClickhouseIntegrationTestsRunner:
test_names.add(test_name)
if i == 0:
test_data_dirs = self._find_test_data_dirs(repo_path, test_names)
test_data_dirs = self._find_test_data_dirs(self.repo_path, test_names)
info_basename = test_group_str + "_" + str(i) + ".nfo"
info_path = os.path.join(repo_path, "tests/integration", info_basename)
info_path = os.path.join(self.repo_path, "tests/integration", info_basename)
test_cmd = " ".join([shlex.quote(test) for test in sorted(test_names)])
parallel_cmd = f" --parallel {num_workers} " if num_workers > 0 else ""
@ -653,7 +646,7 @@ class ClickhouseIntegrationTestsRunner:
# -p -- (p)assed
# -s -- (s)kipped
cmd = (
f"cd {repo_path}/tests/integration && "
f"cd {self.repo_path}/tests/integration && "
f"timeout --signal=KILL 1h ./runner {self._get_runner_opts()} "
f"{image_cmd} -t {test_cmd} {parallel_cmd} {repeat_cmd} -- -rfEps --run-id={i} "
f"--color=no --durations=0 {_get_deselect_option(self.should_skip_tests())} "
@ -661,7 +654,7 @@ class ClickhouseIntegrationTestsRunner:
)
log_basename = test_group_str + "_" + str(i) + ".log"
log_path = os.path.join(repo_path, "tests/integration", log_basename)
log_path = os.path.join(self.repo_path, "tests/integration", log_basename)
with open(log_path, "w", encoding="utf-8") as log:
logging.info("Executing cmd: %s", cmd)
# ignore retcode, since it meaningful due to pipe to tee
@ -678,7 +671,7 @@ class ClickhouseIntegrationTestsRunner:
log_paths.append(log_result_path)
for pytest_log_path in glob.glob(
os.path.join(repo_path, "tests/integration/pytest*.log")
os.path.join(self.repo_path, "tests/integration/pytest*.log")
):
new_name = (
test_group_str
@ -689,11 +682,13 @@ class ClickhouseIntegrationTestsRunner:
)
os.rename(
pytest_log_path,
os.path.join(repo_path, "tests/integration", new_name),
os.path.join(self.repo_path, "tests/integration", new_name),
)
extra_logs_names.append(new_name)
dockerd_log_path = os.path.join(repo_path, "tests/integration/dockerd.log")
dockerd_log_path = os.path.join(
self.repo_path, "tests/integration/dockerd.log"
)
if os.path.exists(dockerd_log_path):
new_name = (
test_group_str
@ -704,7 +699,7 @@ class ClickhouseIntegrationTestsRunner:
)
os.rename(
dockerd_log_path,
os.path.join(repo_path, "tests/integration", new_name),
os.path.join(self.repo_path, "tests/integration", new_name),
)
extra_logs_names.append(new_name)
@ -721,7 +716,7 @@ class ClickhouseIntegrationTestsRunner:
for test_name, test_time in new_tests_times.items():
tests_times[test_name] = test_time
test_data_dirs_new = self._find_test_data_dirs(repo_path, test_names)
test_data_dirs_new = self._find_test_data_dirs(self.repo_path, test_names)
test_data_dirs_diff = self._get_test_data_dirs_difference(
test_data_dirs_new, test_data_dirs
)
@ -733,7 +728,7 @@ class ClickhouseIntegrationTestsRunner:
"integration_run_" + test_group_str + "_" + str(i) + ".tar.zst",
)
self._compress_logs(
os.path.join(repo_path, "tests/integration"),
os.path.join(self.repo_path, "tests/integration"),
extra_logs_names + list(test_data_dirs_diff),
extras_result_path,
)
@ -773,10 +768,10 @@ class ClickhouseIntegrationTestsRunner:
return counters, tests_times, log_paths
def run_flaky_check(self, repo_path, build_path, should_fail=False):
def run_flaky_check(self, build_path, should_fail=False):
pr_info = self.params["pr_info"]
tests_to_run = get_changed_tests_to_run(pr_info, repo_path)
tests_to_run = get_changed_tests_to_run(pr_info, self.repo_path)
if not tests_to_run:
logging.info("No integration tests to run found")
return "success", NO_CHANGES_MSG, [(NO_CHANGES_MSG, "OK")], ""
@ -807,7 +802,6 @@ class ClickhouseIntegrationTestsRunner:
final_retry += 1
logging.info("Running tests for the %s time", i)
group_counters, group_test_times, log_paths = self.try_run_test_group(
repo_path,
f"bugfix_{id_counter}" if should_fail else f"flaky{id_counter}",
[test_to_run],
1,
@ -873,17 +867,15 @@ class ClickhouseIntegrationTestsRunner:
return result_state, status_text, test_result, tests_log_paths
def run_impl(self, repo_path, build_path):
def run_impl(self, build_path):
stopwatch = Stopwatch()
if self.flaky_check or self.bugfix_validate_check:
result_state, status_text, test_result, tests_log_paths = (
self.run_flaky_check(
repo_path, build_path, should_fail=self.bugfix_validate_check
)
self.run_flaky_check(build_path, should_fail=self.bugfix_validate_check)
)
else:
result_state, status_text, test_result, tests_log_paths = (
self.run_normal_check(build_path, repo_path)
self.run_normal_check(build_path)
)
if self.soft_deadline_time < time.time():
@ -906,23 +898,35 @@ class ClickhouseIntegrationTestsRunner:
return result_state, status_text, test_result, tests_log_paths
def run_normal_check(self, build_path, repo_path):
def _get_tests_by_hash(self) -> List[str]:
"Tries it's best to group the tests equally between groups"
all_tests = self._get_all_tests()
if self.run_by_hash_total == 0:
return all_tests
grouped_tests = self.group_test_by_file(all_tests)
groups_by_hash = {
g: [] for g in range(self.run_by_hash_total)
} # type: Dict[int, List[str]]
for tests_in_group in grouped_tests.values():
# It should work determenistic, because it searches groups with min tests
min_group = min(len(tests) for tests in groups_by_hash.values())
# And then it takes a group with min index
group_to_increase = min(
g for g, t in groups_by_hash.items() if len(t) == min_group
)
groups_by_hash[group_to_increase].extend(tests_in_group)
return groups_by_hash[self.run_by_hash_num]
def run_normal_check(self, build_path):
self._install_clickhouse(build_path)
logging.info("Pulling images")
self._pre_pull_images(repo_path)
self._pre_pull_images()
logging.info(
"Dump iptables before run %s",
subprocess.check_output("sudo iptables -nvL", shell=True),
)
all_tests = self._get_all_tests(repo_path)
if self.run_by_hash_total != 0:
grouped_tests = self.group_test_by_file(all_tests)
all_filtered_by_hash_tests = []
for group, tests_in_group in grouped_tests.items():
if stringhash(group) % self.run_by_hash_total == self.run_by_hash_num:
all_filtered_by_hash_tests += tests_in_group
all_tests = all_filtered_by_hash_tests
parallel_skip_tests = self._get_parallel_tests_skip_list(repo_path)
all_tests = self._get_tests_by_hash()
parallel_skip_tests = self._get_parallel_tests_skip_list(self.repo_path)
logging.info(
"Found %s tests first 3 %s", len(all_tests), " ".join(all_tests[:3])
)
@ -980,7 +984,7 @@ class ClickhouseIntegrationTestsRunner:
break
logging.info("Running test group %s containing %s tests", group, len(tests))
group_counters, group_test_times, log_paths = self.try_run_test_group(
repo_path, group, tests, MAX_RETRY, NUM_WORKERS, 0
group, tests, MAX_RETRY, NUM_WORKERS, 0
)
total_tests = 0
for counter, value in group_counters.items():
@ -1051,15 +1055,16 @@ def run():
signal.signal(signal.SIGTERM, handle_sigterm)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
repo_path = os.environ.get("CLICKHOUSE_TESTS_REPO_PATH")
build_path = os.environ.get("CLICKHOUSE_TESTS_BUILD_PATH")
result_path = os.environ.get("CLICKHOUSE_TESTS_RESULT_PATH")
params_path = os.environ.get("CLICKHOUSE_TESTS_JSON_PARAMS_PATH")
repo_path = os.environ.get("CLICKHOUSE_TESTS_REPO_PATH", "")
build_path = os.environ.get("CLICKHOUSE_TESTS_BUILD_PATH", "")
result_path = os.environ.get("CLICKHOUSE_TESTS_RESULT_PATH", "")
params_path = os.environ.get("CLICKHOUSE_TESTS_JSON_PARAMS_PATH", "")
assert all((repo_path, build_path, result_path, params_path))
assert params_path
with open(params_path, "r", encoding="utf-8") as jfd:
params = json.loads(jfd.read())
runner = ClickhouseIntegrationTestsRunner(result_path, params)
runner = ClickhouseIntegrationTestsRunner(repo_path, result_path, params)
logging.info("Running tests")
@ -1068,9 +1073,7 @@ def run():
logging.info("Clearing dmesg before run")
subprocess.check_call("sudo -E dmesg --clear", shell=True)
state, description, test_results, _test_log_paths = runner.run_impl(
repo_path, build_path
)
state, description, test_results, _test_log_paths = runner.run_impl(build_path)
logging.info("Tests finished")
if IS_CI:

View File

@ -378,9 +378,9 @@ done
# collect minio audit and server logs
# wait for minio to flush its batch if it has any
sleep 1
clickhouse-client -q "SYSTEM FLUSH ASYNC INSERT QUEUE"
clickhouse-client ${logs_saver_client_options} -q "SELECT log FROM minio_audit_logs ORDER BY event_time INTO OUTFILE '/test_output/minio_audit_logs.jsonl.zst' FORMAT JSONEachRow"
clickhouse-client ${logs_saver_client_options} -q "SELECT log FROM minio_server_logs ORDER BY event_time INTO OUTFILE '/test_output/minio_server_logs.jsonl.zst' FORMAT JSONEachRow"
clickhouse-client -q "SYSTEM FLUSH ASYNC INSERT QUEUE" ||:
clickhouse-client ${logs_saver_client_options} -q "SELECT log FROM minio_audit_logs ORDER BY event_time INTO OUTFILE '/test_output/minio_audit_logs.jsonl.zst' FORMAT JSONEachRow" ||:
clickhouse-client ${logs_saver_client_options} -q "SELECT log FROM minio_server_logs ORDER BY event_time INTO OUTFILE '/test_output/minio_server_logs.jsonl.zst' FORMAT JSONEachRow" ||:
# Stop server so we can safely read data with clickhouse-local.
# Why do we read data with clickhouse-local?

View File

@ -1,7 +1,9 @@
services:
rabbitmq1:
image: rabbitmq:3.12.6-alpine
image: rabbitmq:4.0.2-alpine
hostname: rabbitmq1
environment:
RABBITMQ_FEATURE_FLAGS: feature_flags_v2,message_containers
expose:
- ${RABBITMQ_PORT:-5672}
- ${RABBITMQ_SECURE_PORT:-5671}
@ -14,3 +16,4 @@ services:
- /misc/rabbitmq/ca-cert.pem:/etc/rabbitmq/ca-cert.pem
- /misc/rabbitmq/server-cert.pem:/etc/rabbitmq/server-cert.pem
- /misc/rabbitmq/server-key.pem:/etc/rabbitmq/server-key.pem
- /misc/rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins

View File

@ -18,7 +18,7 @@ import traceback
import urllib.parse
from functools import cache
from pathlib import Path
from typing import List, Sequence, Tuple, Union
from typing import Any, List, Sequence, Tuple, Union
import requests
import urllib3
@ -228,7 +228,9 @@ def retry_exception(num, delay, func, exception=Exception, *args, **kwargs):
raise StopIteration("Function did not finished successfully")
def subprocess_check_call(args, detach=False, nothrow=False):
def subprocess_check_call(
args: Union[Sequence[str], str], detach: bool = False, nothrow: bool = False
) -> str:
# Uncomment for debugging
# logging.info('run:' + ' '.join(args))
return run_and_check(args, detach=detach, nothrow=nothrow)
@ -296,19 +298,32 @@ def check_postgresql_java_client_is_available(postgresql_java_client_id):
return p.returncode == 0
def check_rabbitmq_is_available(rabbitmq_id, cookie):
p = subprocess.Popen(
docker_exec(
"-e",
f"RABBITMQ_ERLANG_COOKIE={cookie}",
rabbitmq_id,
"rabbitmqctl",
"await_startup",
),
stdout=subprocess.PIPE,
)
p.wait(timeout=60)
return p.returncode == 0
def check_rabbitmq_is_available(rabbitmq_id, cookie, timeout=90):
try:
subprocess.check_output(
docker_exec(
"-e",
f"RABBITMQ_ERLANG_COOKIE={cookie}",
rabbitmq_id,
"rabbitmqctl",
"await_startup",
),
stderr=subprocess.STDOUT,
timeout=timeout,
)
return True
except subprocess.CalledProcessError as e:
# Raised if the command returns a non-zero exit code
error_message = (
f"RabbitMQ startup failed with return code {e.returncode}. "
f"Output: {e.output.decode(errors='replace')}"
)
raise RuntimeError(error_message)
except subprocess.TimeoutExpired as e:
# Raised if the command times out
raise RuntimeError(
f"RabbitMQ startup timed out. Output: {e.output.decode(errors='replace')}"
)
def rabbitmq_debuginfo(rabbitmq_id, cookie):
@ -372,22 +387,6 @@ async def nats_connect_ssl(nats_port, user, password, ssl_ctx=None):
return nc
def enable_consistent_hash_plugin(rabbitmq_id, cookie):
p = subprocess.Popen(
docker_exec(
"-e",
f"RABBITMQ_ERLANG_COOKIE={cookie}",
rabbitmq_id,
"rabbitmq-plugins",
"enable",
"rabbitmq_consistent_hash_exchange",
),
stdout=subprocess.PIPE,
)
p.communicate()
return p.returncode == 0
def get_instances_dir(name):
instances_dir_name = "_instances"
@ -2059,8 +2058,14 @@ class ClickHouseCluster:
return self.docker_client.api.logs(container_id).decode()
def exec_in_container(
self, container_id, cmd, detach=False, nothrow=False, use_cli=True, **kwargs
):
self,
container_id: str,
cmd: Sequence[str],
detach: bool = False,
nothrow: bool = False,
use_cli: bool = True,
**kwargs: Any,
) -> str:
if use_cli:
logging.debug(
f"run container_id:{container_id} detach:{detach} nothrow:{nothrow} cmd: {cmd}"
@ -2071,10 +2076,11 @@ class ClickHouseCluster:
if "privileged" in kwargs:
exec_cmd += ["--privileged"]
result = subprocess_check_call(
exec_cmd + [container_id] + cmd, detach=detach, nothrow=nothrow
exec_cmd + [container_id] + list(cmd), detach=detach, nothrow=nothrow
)
return result
else:
assert self.docker_client is not None
exec_id = self.docker_client.api.exec_create(container_id, cmd, **kwargs)
output = self.docker_client.api.exec_start(exec_id, detach=detach)
@ -2083,16 +2089,15 @@ class ClickHouseCluster:
container_info = self.docker_client.api.inspect_container(container_id)
image_id = container_info.get("Image")
image_info = self.docker_client.api.inspect_image(image_id)
logging.debug(("Command failed in container {}: ".format(container_id)))
logging.debug("Command failed in container %s: ", container_id)
pprint.pprint(container_info)
logging.debug("")
logging.debug(
("Container {} uses image {}: ".format(container_id, image_id))
)
logging.debug("Container %s uses image %s: ", container_id, image_id)
pprint.pprint(image_info)
logging.debug("")
message = 'Cmd "{}" failed in container {}. Return code {}. Output: {}'.format(
" ".join(cmd), container_id, exit_code, output
message = (
f'Cmd "{" ".join(cmd)}" failed in container {container_id}. '
f"Return code {exit_code}. Output: {output}"
)
if nothrow:
logging.debug(message)
@ -2347,22 +2352,14 @@ class ClickHouseCluster:
self.print_all_docker_pieces()
self.rabbitmq_ip = self.get_instance_ip(self.rabbitmq_host)
start = time.time()
while time.time() - start < timeout:
try:
if check_rabbitmq_is_available(
self.rabbitmq_docker_id, self.rabbitmq_cookie
):
logging.debug("RabbitMQ is available")
if enable_consistent_hash_plugin(
self.rabbitmq_docker_id, self.rabbitmq_cookie
):
logging.debug("RabbitMQ consistent hash plugin is available")
return True
time.sleep(0.5)
except Exception as ex:
logging.debug("Can't connect to RabbitMQ " + str(ex))
time.sleep(0.5)
try:
if check_rabbitmq_is_available(
self.rabbitmq_docker_id, self.rabbitmq_cookie, timeout
):
logging.debug("RabbitMQ is available")
return True
except Exception as ex:
logging.debug("RabbitMQ await_startup failed", exc_info=True)
try:
with open(os.path.join(self.rabbitmq_dir, "docker.log"), "w+") as f:
@ -2390,39 +2387,35 @@ class ClickHouseCluster:
def wait_zookeeper_secure_to_start(self, timeout=20):
logging.debug("Wait ZooKeeper Secure to start")
start = time.time()
while time.time() - start < timeout:
try:
for instance in ["zoo1", "zoo2", "zoo3"]:
conn = self.get_kazoo_client(instance)
conn.get_children("/")
conn.stop()
logging.debug("All instances of ZooKeeper Secure started")
return
except Exception as ex:
logging.debug("Can't connect to ZooKeeper secure " + str(ex))
time.sleep(0.5)
nodes = ["zoo1", "zoo2", "zoo3"]
self.wait_zookeeper_nodes_to_start(nodes, timeout)
raise Exception("Cannot wait ZooKeeper secure container")
def wait_zookeeper_to_start(self, timeout=180):
def wait_zookeeper_to_start(self, timeout: float = 180) -> None:
logging.debug("Wait ZooKeeper to start")
nodes = ["zoo1", "zoo2", "zoo3"]
self.wait_zookeeper_nodes_to_start(nodes, timeout)
def wait_zookeeper_nodes_to_start(
self, nodes: List[str], timeout: float = 60
) -> None:
start = time.time()
err = Exception("")
while time.time() - start < timeout:
try:
for instance in ["zoo1", "zoo2", "zoo3"]:
conn = self.get_kazoo_client(instance)
for node in nodes:
conn = self.get_kazoo_client(node)
conn.get_children("/")
conn.stop()
logging.debug("All instances of ZooKeeper started")
logging.debug("All instances of ZooKeeper started: %s", nodes)
return
except Exception as ex:
logging.debug(f"Can't connect to ZooKeeper {instance}: {ex}")
logging.debug("Can't connect to ZooKeeper %s: %s", node, ex)
err = ex
time.sleep(0.5)
raise Exception(
"Cannot wait ZooKeeper container (probably it's a `iptables-nft` issue, you may try to `sudo iptables -P FORWARD ACCEPT`)"
)
) from err
def make_hdfs_api(self, timeout=180, kerberized=False):
if kerberized:
@ -3367,7 +3360,7 @@ class ClickHouseInstance:
self.name = name
self.base_cmd = cluster.base_cmd
self.docker_id = cluster.get_instance_docker_id(self.name)
self.cluster = cluster
self.cluster = cluster # type: ClickHouseCluster
self.hostname = hostname if hostname is not None else self.name
self.external_dirs = external_dirs
@ -3978,7 +3971,13 @@ class ClickHouseInstance:
self.stop_clickhouse(stop_start_wait_sec, kill)
self.start_clickhouse(stop_start_wait_sec)
def exec_in_container(self, cmd, detach=False, nothrow=False, **kwargs):
def exec_in_container(
self,
cmd: Sequence[str],
detach: bool = False,
nothrow: bool = False,
**kwargs: Any,
) -> str:
return self.cluster.exec_in_container(
self.docker_id, cmd, detach, nothrow, **kwargs
)

View File

@ -1,11 +1,13 @@
import contextlib
import io
import logging
import re
import select
import socket
import subprocess
import time
import typing as tp
from os import path as p
from typing import Iterable, List, Optional, Sequence, Union
from kazoo.client import KazooClient
@ -23,7 +25,7 @@ ss_established = [
]
def get_active_zk_connections(node: ClickHouseInstance) -> tp.List[str]:
def get_active_zk_connections(node: ClickHouseInstance) -> List[str]:
return (
str(node.exec_in_container(ss_established, privileged=True, user="root"))
.strip()
@ -41,6 +43,7 @@ def get_zookeeper_which_node_connected_to(node: ClickHouseInstance) -> str:
assert (
len(result) == 1
), "ClickHouse must be connected only to one Zookeeper at a time"
assert isinstance(result[0], str)
return result[0]
@ -118,8 +121,10 @@ class KeeperClient(object):
in str(e)
and retry_count < connection_tries
):
print(
f"Got exception while connecting to Keeper: {e}\nWill reconnect, reconnect count = {retry_count}"
logging.debug(
"Got exception while connecting to Keeper: %s\nWill reconnect, reconnect count = %s",
e,
retry_count,
)
time.sleep(1)
else:
@ -169,12 +174,12 @@ class KeeperClient(object):
def get(self, path: str, timeout: float = 60.0) -> str:
return self.execute_query(f"get '{path}'", timeout)
def set(self, path: str, value: str, version: tp.Optional[int] = None) -> None:
def set(self, path: str, value: str, version: Optional[int] = None) -> None:
self.execute_query(
f"set '{path}' '{value}' {version if version is not None else ''}"
)
def rm(self, path: str, version: tp.Optional[int] = None) -> None:
def rm(self, path: str, version: Optional[int] = None) -> None:
self.execute_query(f"rm '{path}' {version if version is not None else ''}")
def exists(self, path: str, timeout: float = 60.0) -> bool:
@ -208,9 +213,9 @@ class KeeperClient(object):
def reconfig(
self,
joining: tp.Optional[str],
leaving: tp.Optional[str],
new_members: tp.Optional[str],
joining: Optional[str],
leaving: Optional[str],
new_members: Optional[str],
timeout: float = 60.0,
) -> str:
if bool(joining) + bool(leaving) + bool(new_members) != 1:
@ -236,7 +241,7 @@ class KeeperClient(object):
@classmethod
@contextlib.contextmanager
def from_cluster(
cls, cluster: ClickHouseCluster, keeper_node: str, port: tp.Optional[int] = None
cls, cluster: ClickHouseCluster, keeper_node: str, port: Optional[int] = None
) -> "KeeperClient":
client = cls(
cluster.server_bin_path,
@ -353,3 +358,22 @@ def wait_configs_equal(left_config: str, right_zk: KeeperClient, timeout: float
f"timeout while checking nodes configs to get equal. "
f"Left: {left_config}, right: {right_config}"
)
def replace_zookeeper_config(
nodes: Union[Sequence[ClickHouseInstance], ClickHouseInstance], new_config: str
) -> None:
if not isinstance(nodes, Sequence):
nodes = (nodes,)
for node in nodes:
node.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
node.query("SYSTEM RELOAD CONFIG")
def reset_zookeeper_config(
nodes: Union[Sequence[ClickHouseInstance], ClickHouseInstance],
file_path: str = p.join(p.dirname(p.realpath(__file__)), "zookeeper_config.xml"),
) -> None:
"""Resets the keeper config to default or to a given path on the disk"""
with open(file_path, "r", encoding="utf-8") as cf:
replace_zookeeper_config(nodes, cf.read())

View File

@ -344,7 +344,7 @@ def test_cmd_srvr(started_cluster):
assert result["Received"] == "10"
assert result["Sent"] == "10"
assert int(result["Connections"]) == 1
assert int(result["Zxid"], 16) > 10
assert int(result["Zxid"], 16) >= 10
assert result["Mode"] == "leader"
assert result["Node count"] == "14"

View File

@ -1,5 +1,6 @@
<clickhouse>
<keeper_server>
<use_cluster>0</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -9,6 +10,7 @@
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<raft_limits_response_limit>5</raft_limits_response_limit>
</coordination_settings>
<raft_configuration>

View File

@ -1,5 +1,6 @@
<clickhouse>
<keeper_server>
<use_cluster>0</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -9,6 +10,7 @@
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<raft_limits_response_limit>5</raft_limits_response_limit>
</coordination_settings>
<raft_configuration>

View File

@ -1,5 +1,6 @@
<clickhouse>
<keeper_server>
<use_cluster>0</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -9,6 +10,7 @@
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<raft_limits_response_limit>5</raft_limits_response_limit>
</coordination_settings>
<raft_configuration>

View File

@ -1,5 +1,6 @@
<clickhouse>
<keeper_server>
<use_cluster>0</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -9,6 +10,7 @@
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<raft_limits_response_limit>5</raft_limits_response_limit>
</coordination_settings>
<raft_configuration>

View File

@ -1,5 +1,6 @@
<clickhouse>
<keeper_server>
<use_cluster>0</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -9,6 +10,7 @@
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<raft_limits_response_limit>5</raft_limits_response_limit>
</coordination_settings>
<raft_configuration>

View File

@ -1,5 +1,6 @@
<clickhouse>
<keeper_server>
<use_cluster>0</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>4</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -9,6 +10,7 @@
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<raft_limits_response_limit>5</raft_limits_response_limit>
</coordination_settings>
<raft_configuration>

View File

@ -670,6 +670,7 @@ def test_python_client(started_cluster):
cursor.execute("INSERT INTO table1 VALUES (1), (4)")
cursor.execute("SELECT * FROM table1 ORDER BY a")
assert cursor.fetchall() == [{"a": 1}, {"a": 1}, {"a": 3}, {"a": 4}]
cursor.execute("DROP DATABASE x")
def test_golang_client(started_cluster, golang_container):

View File

@ -69,3 +69,5 @@ def test_restart_zookeeper(start_cluster):
retry_count=10,
sleep_time=1,
)
# restore the cluster state
cluster.start_zookeeper_nodes([node1_zk])

View File

@ -1,13 +1,18 @@
import os
import time
from os import path as p
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.keeper_utils import get_active_zk_connections
from helpers.keeper_utils import (
get_active_zk_connections,
replace_zookeeper_config,
reset_zookeeper_config,
)
from helpers.test_tools import assert_eq_with_retry
from helpers.utility import random_string
default_zk_config = p.join(p.dirname(p.realpath(__file__)), "configs/zookeeper.xml")
cluster = ClickHouseCluster(__file__, zookeeper_config_path="configs/zookeeper.xml")
node = cluster.add_instance("node", with_zookeeper=True)
@ -16,14 +21,6 @@ node = cluster.add_instance("node", with_zookeeper=True)
def start_cluster():
try:
cluster.start()
node.query(
"""
CREATE TABLE test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/shard1/test/test_table', '1')
PARTITION BY toYYYYMM(date)
ORDER BY id
"""
)
yield cluster
finally:
@ -31,19 +28,15 @@ def start_cluster():
def test_reload_zookeeper(start_cluster):
def wait_zookeeper_node_to_start(zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = start_cluster.get_kazoo_client(instance)
conn.get_children("/")
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
# random is used for flaky tests, where ZK is not fast enough to clear the node
node.query(
f"""
CREATE TABLE test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/shard1/{random_string(7)}/test_table', '1')
PARTITION BY toYYYYMM(date)
ORDER BY id
"""
)
node.query(
"INSERT INTO test_table(date, id) select today(), number FROM numbers(1000)"
)
@ -60,8 +53,7 @@ def test_reload_zookeeper(start_cluster):
</zookeeper>
</clickhouse>
"""
node.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
node.query("SYSTEM RELOAD CONFIG")
replace_zookeeper_config(node, new_config)
## config reloads, but can still work
assert_eq_with_retry(
node, "SELECT COUNT() FROM test_table", "1000", retry_count=120, sleep_time=0.5
@ -78,7 +70,7 @@ def test_reload_zookeeper(start_cluster):
## start zoo2, zoo3, table will be readonly too, because it only connect to zoo1
cluster.start_zookeeper_nodes(["zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo2", "zoo3"])
cluster.wait_zookeeper_nodes_to_start(["zoo2", "zoo3"])
node.query("SELECT COUNT() FROM test_table")
with pytest.raises(QueryRuntimeException):
node.query(
@ -98,8 +90,7 @@ def test_reload_zookeeper(start_cluster):
</zookeeper>
</clickhouse>
"""
node.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
node.query("SYSTEM RELOAD CONFIG")
replace_zookeeper_config(node, new_config)
active_zk_connections = get_active_zk_connections(node)
assert (
@ -114,3 +105,8 @@ def test_reload_zookeeper(start_cluster):
assert (
len(active_zk_connections) == 1
), "Total connections to ZooKeeper not equal to 1, {}".format(active_zk_connections)
# Reset cluster state
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
cluster.wait_zookeeper_nodes_to_start(["zoo1", "zoo2", "zoo3"])
reset_zookeeper_config(node, default_zk_config)
node.query("DROP TABLE test_table")

View File

@ -70,20 +70,6 @@ def wait_part_is_stuck(node, table_moving_path, moving_part):
time.sleep(1)
def wait_zookeeper_node_to_start(zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = cluster.get_kazoo_client(instance)
conn.get_children("/")
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
def test_remove_stale_moving_parts_without_zookeeper(started_cluster):
ch1.query(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")
@ -113,7 +99,7 @@ def test_remove_stale_moving_parts_without_zookeeper(started_cluster):
assert exec(ch1, "ls", table_moving_path).strip() == ""
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo1", "zoo2", "zoo3"])
cluster.wait_zookeeper_nodes_to_start(["zoo1", "zoo2", "zoo3"])
q(ch1, "SYSTEM START MOVES")
q(ch1, f"DROP TABLE test_remove")

View File

@ -962,8 +962,8 @@ def test_recover_staled_replica(started_cluster):
def test_recover_staled_replica_many_mvs(started_cluster):
main_node.query("DROP DATABASE IF EXISTS recover_mvs")
dummy_node.query("DROP DATABASE IF EXISTS recover_mvs")
main_node.query("DROP DATABASE IF EXISTS recover_mvs SYNC")
dummy_node.query("DROP DATABASE IF EXISTS recover_mvs SYNC")
main_node.query_with_retry(
"CREATE DATABASE IF NOT EXISTS recover_mvs ENGINE = Replicated('/clickhouse/databases/recover_mvs', 'shard1', 'replica1');"
@ -1104,8 +1104,8 @@ def test_recover_staled_replica_many_mvs(started_cluster):
query = "SELECT name FROM system.tables WHERE database='recover_mvs' ORDER BY name"
assert main_node.query(query) == dummy_node.query(query)
main_node.query("DROP DATABASE IF EXISTS recover_mvs")
dummy_node.query("DROP DATABASE IF EXISTS recover_mvs")
main_node.query("DROP DATABASE IF EXISTS recover_mvs SYNC")
dummy_node.query("DROP DATABASE IF EXISTS recover_mvs SYNC")
def test_startup_without_zk(started_cluster):
@ -1124,8 +1124,10 @@ def test_startup_without_zk(started_cluster):
main_node.query("INSERT INTO startup.rmt VALUES (42)")
with PartitionManager() as pm:
pm.drop_instance_zk_connections(main_node)
main_node.restart_clickhouse(stop_start_wait_sec=60)
pm.drop_instance_zk_connections(
main_node, action="REJECT --reject-with tcp-reset"
)
main_node.restart_clickhouse(stop_start_wait_sec=120)
assert main_node.query("SELECT (*,).1 FROM startup.rmt") == "42\n"
# we need to wait until the table is not readonly
@ -1220,6 +1222,11 @@ def test_sync_replica(started_cluster):
def test_force_synchronous_settings(started_cluster):
main_node.query("DROP DATABASE IF EXISTS test_force_synchronous_settings SYNC")
dummy_node.query("DROP DATABASE IF EXISTS test_force_synchronous_settings SYNC")
snapshotting_node.query(
"DROP DATABASE IF EXISTS test_force_synchronous_settings SYNC"
)
main_node.query(
"CREATE DATABASE test_force_synchronous_settings ENGINE = Replicated('/clickhouse/databases/test2', 'shard1', 'replica1');"
)
@ -1284,8 +1291,8 @@ def test_force_synchronous_settings(started_cluster):
def test_recover_digest_mismatch(started_cluster):
main_node.query("DROP DATABASE IF EXISTS recover_digest_mismatch")
dummy_node.query("DROP DATABASE IF EXISTS recover_digest_mismatch")
main_node.query("DROP DATABASE IF EXISTS recover_digest_mismatch SYNC")
dummy_node.query("DROP DATABASE IF EXISTS recover_digest_mismatch SYNC")
main_node.query(
"CREATE DATABASE recover_digest_mismatch ENGINE = Replicated('/clickhouse/databases/recover_digest_mismatch', 'shard1', 'replica1');"
@ -1330,15 +1337,16 @@ def test_recover_digest_mismatch(started_cluster):
dummy_node.start_clickhouse()
assert_eq_with_retry(dummy_node, query, expected)
main_node.query("DROP DATABASE IF EXISTS recover_digest_mismatch")
dummy_node.query("DROP DATABASE IF EXISTS recover_digest_mismatch")
main_node.query("DROP DATABASE IF EXISTS recover_digest_mismatch SYNC")
dummy_node.query("DROP DATABASE IF EXISTS recover_digest_mismatch SYNC")
print("Everything Okay")
def test_replicated_table_structure_alter(started_cluster):
main_node.query("DROP DATABASE IF EXISTS table_structure")
dummy_node.query("DROP DATABASE IF EXISTS table_structure")
main_node.query("DROP DATABASE IF EXISTS table_structure SYNC")
dummy_node.query("DROP DATABASE IF EXISTS table_structure SYNC")
competing_node.query("DROP DATABASE IF EXISTS table_structure SYNC")
main_node.query(
"CREATE DATABASE table_structure ENGINE = Replicated('/clickhouse/databases/table_structure', 'shard1', 'replica1');"
@ -1440,8 +1448,8 @@ def test_modify_comment(started_cluster):
def test_table_metadata_corruption(started_cluster):
main_node.query("DROP DATABASE IF EXISTS table_metadata_corruption")
dummy_node.query("DROP DATABASE IF EXISTS table_metadata_corruption")
main_node.query("DROP DATABASE IF EXISTS table_metadata_corruption SYNC")
dummy_node.query("DROP DATABASE IF EXISTS table_metadata_corruption SYNC")
main_node.query(
"CREATE DATABASE table_metadata_corruption ENGINE = Replicated('/clickhouse/databases/table_metadata_corruption', 'shard1', 'replica1');"
@ -1479,13 +1487,18 @@ def test_table_metadata_corruption(started_cluster):
dummy_node.start_clickhouse()
assert_eq_with_retry(dummy_node, query, expected)
main_node.query("DROP DATABASE IF EXISTS table_metadata_corruption")
dummy_node.query("DROP DATABASE IF EXISTS table_metadata_corruption")
main_node.query("DROP DATABASE IF EXISTS table_metadata_corruption SYNC")
dummy_node.query("DROP DATABASE IF EXISTS table_metadata_corruption SYNC")
def test_auto_recovery(started_cluster):
dummy_node.query("DROP DATABASE IF EXISTS auto_recovery")
bad_settings_node.query("DROP DATABASE IF EXISTS auto_recovery")
dummy_node.query("DROP DATABASE IF EXISTS auto_recovery SYNC")
bad_settings_node.query(
"DROP DATABASE IF EXISTS auto_recovery",
settings={
"implicit_transaction": 0,
},
)
dummy_node.query(
"CREATE DATABASE auto_recovery ENGINE = Replicated('/clickhouse/databases/auto_recovery', 'shard1', 'replica1');"
@ -1532,8 +1545,8 @@ def test_auto_recovery(started_cluster):
def test_all_groups_cluster(started_cluster):
dummy_node.query("DROP DATABASE IF EXISTS db_cluster")
bad_settings_node.query("DROP DATABASE IF EXISTS db_cluster")
dummy_node.query("DROP DATABASE IF EXISTS db_cluster SYNC")
bad_settings_node.query("DROP DATABASE IF EXISTS db_cluster SYNC")
dummy_node.query(
"CREATE DATABASE db_cluster ENGINE = Replicated('/clickhouse/databases/all_groups_cluster', 'shard1', 'replica1');"
)

View File

@ -1,17 +1,19 @@
import inspect
import os.path
import time
from contextlib import nullcontext as does_not_raise
from os import path as p
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.keeper_utils import get_active_zk_connections
from helpers.keeper_utils import (
get_active_zk_connections,
replace_zookeeper_config,
reset_zookeeper_config,
)
from helpers.test_tools import TSV, assert_eq_with_retry
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
default_zk_config = p.join(p.dirname(p.realpath(__file__)), "configs/zookeeper.xml")
cluster = ClickHouseCluster(__file__, zookeeper_config_path="configs/zookeeper.xml")
node1 = cluster.add_instance(
@ -40,32 +42,6 @@ def started_cluster():
cluster.shutdown()
def wait_zookeeper_node_to_start(zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = cluster.get_kazoo_client(instance)
conn.get_children("/")
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
def replace_zookeeper_config(new_config):
node1.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
node2.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
node1.query("SYSTEM RELOAD CONFIG")
node2.query("SYSTEM RELOAD CONFIG")
def revert_zookeeper_config():
with open(os.path.join(SCRIPT_DIR, "configs/zookeeper.xml"), "r") as f:
replace_zookeeper_config(f.read())
def test_create_and_drop():
node1.query("CREATE FUNCTION f1 AS (x, y) -> x + y")
assert node1.query("SELECT f1(12, 3)") == "15\n"
@ -192,6 +168,7 @@ def test_reload_zookeeper():
# remove zoo2, zoo3 from configs
replace_zookeeper_config(
(node1, node2),
inspect.cleandoc(
"""
<clickhouse>
@ -204,7 +181,7 @@ def test_reload_zookeeper():
</zookeeper>
</clickhouse>
"""
)
),
)
# config reloads, but can still work
@ -228,7 +205,7 @@ def test_reload_zookeeper():
# start zoo2, zoo3, user-defined functions will be readonly too, because it only connect to zoo1
cluster.start_zookeeper_nodes(["zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo2", "zoo3"])
cluster.wait_zookeeper_nodes_to_start(["zoo2", "zoo3"])
assert node2.query(
"SELECT name FROM system.functions WHERE name IN ['f1', 'f2', 'f3'] ORDER BY name"
) == TSV(["f1", "f2"])
@ -238,6 +215,7 @@ def test_reload_zookeeper():
# set config to zoo2, server will be normal
replace_zookeeper_config(
(node1, node2),
inspect.cleandoc(
"""
<clickhouse>
@ -250,7 +228,7 @@ def test_reload_zookeeper():
</zookeeper>
</clickhouse>
"""
)
),
)
active_zk_connections = get_active_zk_connections(node1)
@ -278,7 +256,7 @@ def test_reload_zookeeper():
# switch to the original version of zookeeper config
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
revert_zookeeper_config()
reset_zookeeper_config((node1, node2), default_zk_config)
# Start without ZooKeeper must be possible, user-defined functions will be loaded after connecting to ZooKeeper.
@ -295,7 +273,7 @@ def test_start_without_zookeeper():
)
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo1", "zoo2", "zoo3"])
cluster.wait_zookeeper_nodes_to_start(["zoo1", "zoo2", "zoo3"])
assert_eq_with_retry(
node2,

View File

@ -1,13 +1,19 @@
import inspect
import time
from dataclasses import dataclass
from os import path as p
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.keeper_utils import get_active_zk_connections
from helpers.keeper_utils import (
get_active_zk_connections,
replace_zookeeper_config,
reset_zookeeper_config,
)
from helpers.test_tools import TSV, assert_eq_with_retry
default_zk_config = p.join(p.dirname(p.realpath(__file__)), "configs/zookeeper.xml")
cluster = ClickHouseCluster(__file__, zookeeper_config_path="configs/zookeeper.xml")
node1 = cluster.add_instance(
@ -171,25 +177,6 @@ def test_rename_replicated(started_cluster, entity):
# ReplicatedAccessStorage must be able to continue working after reloading ZooKeeper.
def test_reload_zookeeper(started_cluster):
def wait_zookeeper_node_to_start(zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = cluster.get_kazoo_client(instance)
conn.get_children("/")
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
def replace_zookeeper_config(new_config):
node1.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
node2.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
node1.query("SYSTEM RELOAD CONFIG")
node2.query("SYSTEM RELOAD CONFIG")
node1.query("CREATE USER u1")
assert_eq_with_retry(
node2, "SELECT name FROM system.users WHERE name ='u1'", "u1\n"
@ -197,6 +184,7 @@ def test_reload_zookeeper(started_cluster):
## remove zoo2, zoo3 from configs
replace_zookeeper_config(
(node1, node2),
"""
<clickhouse>
<zookeeper>
@ -207,7 +195,7 @@ def test_reload_zookeeper(started_cluster):
<session_timeout_ms>2000</session_timeout_ms>
</zookeeper>
</clickhouse>
"""
""",
)
## config reloads, but can still work
@ -227,7 +215,7 @@ def test_reload_zookeeper(started_cluster):
## start zoo2, zoo3, users will be readonly too, because it only connect to zoo1
cluster.start_zookeeper_nodes(["zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo2", "zoo3"])
cluster.wait_zookeeper_nodes_to_start(["zoo2", "zoo3"])
assert node2.query(
"SELECT name FROM system.users WHERE name IN ['u1', 'u2'] ORDER BY name"
) == TSV(["u1", "u2"])
@ -235,6 +223,7 @@ def test_reload_zookeeper(started_cluster):
## set config to zoo2, server will be normal
replace_zookeeper_config(
(node1, node2),
"""
<clickhouse>
<zookeeper>
@ -245,7 +234,7 @@ def test_reload_zookeeper(started_cluster):
<session_timeout_ms>2000</session_timeout_ms>
</zookeeper>
</clickhouse>
"""
""",
)
active_zk_connections = get_active_zk_connections(node1)
@ -264,3 +253,9 @@ def test_reload_zookeeper(started_cluster):
assert (
len(active_zk_connections) == 1
), "Total connections to ZooKeeper not equal to 1, {}".format(active_zk_connections)
# Restore the test state
node1.query("DROP USER u1, u2, u3")
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
cluster.wait_zookeeper_nodes_to_start(["zoo1", "zoo2", "zoo3"])
reset_zookeeper_config((node1, node2), default_zk_config)

View File

@ -13,5 +13,13 @@
<scripts>
<query>SELECT * FROM system.query_log LIMIT 1</query>
</scripts>
<scripts>
<query>SELECT 1 SETTINGS skip_unavailable_shards = 1</query>
<condition>SELECT 1;</condition>
</scripts>
<scripts>
<query>SELECT 1 SETTINGS skip_unavailable_shards = 1</query>
<condition>SELECT 1;</condition>
</scripts>
</startup_scripts>
</clickhouse>

View File

@ -16,6 +16,12 @@ def test_startup_scripts():
try:
cluster.start()
assert node.query("SHOW TABLES") == "TestTable\n"
assert (
node.query(
"SELECT value, changed FROM system.settings WHERE name = 'skip_unavailable_shards'"
)
== "0\t0\n"
)
finally:
cluster.shutdown()

View File

@ -1087,7 +1087,7 @@ def test_drop_table(started_cluster):
started_cluster, files_path, files_to_generate, start_ind=0, row_num=100000
)
create_mv(node, table_name, dst_table_name)
node.wait_for_log_line(f"Reading from file: test_drop_data")
node.wait_for_log_line(f"rows from file: test_drop_data")
node.query(f"DROP TABLE {table_name} SYNC")
assert node.contains_in_log(
f"StorageS3Queue (default.{table_name}): Table is being dropped"

File diff suppressed because it is too large Load Diff

View File

@ -73,6 +73,7 @@ function configure
rm right/config/config.d/text_log.xml ||:
# backups disk uses absolute path, and this overlaps between servers, that could lead to errors
rm right/config/config.d/backups.xml ||:
rm left/config/config.d/backups.xml ||:
cp -rv right/config left ||:
# Start a temporary server to rename the tables

View File

@ -0,0 +1,110 @@
-------------- Test copy sorting clauses from source table --------------
CREATE TABLE default.x
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS index_granularity = 8192
-------------------------------------------------------------------------
CREATE TABLE default.x_as
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1, index_granularity = 8192
-------------- Test copy sorting clauses from destination table (source table without the same type clauses) --------------
CREATE TABLE default.x
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PRIMARY KEY (CounterID, EventDate, intHash32(UserID))
ORDER BY (CounterID, EventDate, intHash32(UserID))
SETTINGS index_granularity = 8192
-------------------------------------------------------------------------
CREATE TABLE default.x_as
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
PRIMARY KEY (CounterID, EventDate, intHash32(UserID))
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1, index_granularity = 8192
-------------- Test copy sorting clauses from destination table (source table with the same type clauses) --------------
CREATE TABLE default.x
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
ORDER BY CounterID
SETTINGS index_granularity = 8192
-------------------------------------------------------------------------
CREATE TABLE default.x_as
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1, index_granularity = 8192
-------------- Test compatibility with allow_deprecated_syntax_for_merge_tree (source table is old syntax) --------------
CREATE TABLE default.x
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID)), 8192)
-------------------------------------------------------------------------
CREATE TABLE default.x_as
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID)), 8192)
-------------- Test compatibility with allow_deprecated_syntax_for_merge_tree (source table is new syntax) --------------
CREATE TABLE default.x
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS index_granularity = 8192
-------------------------------------------------------------------------
CREATE TABLE default.x_as
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1, index_granularity = 8192

View File

@ -0,0 +1,58 @@
DROP TABLE IF EXISTS x;
DROP TABLE IF EXISTS x_as;
SELECT '-------------- Test copy sorting clauses from source table --------------';
CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID);
CREATE TABLE x_as AS x ENGINE = MergeTree SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1;
SHOW CREATE TABLE x FORMAT TSVRaw;
SELECT '-------------------------------------------------------------------------';
SHOW CREATE TABLE x_as FORMAT TSVRaw;
DROP TABLE x;
DROP TABLE x_as;
SELECT '-------------- Test copy sorting clauses from destination table (source table without the same type clauses) --------------';
CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree PRIMARY KEY (CounterID, EventDate, intHash32(UserID));
CREATE TABLE x_as AS x ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1;
SHOW CREATE TABLE x FORMAT TSVRaw;
SELECT '-------------------------------------------------------------------------';
SHOW CREATE TABLE x_as FORMAT TSVRaw;
DROP TABLE x;
DROP TABLE x_as;
SELECT '-------------- Test copy sorting clauses from destination table (source table with the same type clauses) --------------';
CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree ORDER BY (CounterID);
CREATE TABLE x_as AS x ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1;
SHOW CREATE TABLE x FORMAT TSVRaw;
SELECT '-------------------------------------------------------------------------';
SHOW CREATE TABLE x_as FORMAT TSVRaw;
DROP TABLE x;
DROP TABLE x_as;
SELECT '-------------- Test compatibility with allow_deprecated_syntax_for_merge_tree (source table is old syntax) --------------';
set allow_deprecated_syntax_for_merge_tree=1;
CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID)), 8192);
CREATE TABLE x_as AS x;
SHOW CREATE TABLE x FORMAT TSVRaw;
SELECT '-------------------------------------------------------------------------';
SHOW CREATE TABLE x_as FORMAT TSVRaw;
DROP TABLE x;
DROP TABLE x_as;
SELECT '-------------- Test compatibility with allow_deprecated_syntax_for_merge_tree (source table is new syntax) --------------';
CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID);
CREATE TABLE x_as AS x ENGINE = MergeTree SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1;
SHOW CREATE TABLE x FORMAT TSVRaw;
SELECT '-------------------------------------------------------------------------';
SHOW CREATE TABLE x_as FORMAT TSVRaw;
DROP TABLE x;
DROP TABLE x_as;

View File

@ -2,4 +2,3 @@
String
1
99

View File

@ -1,9 +1,5 @@
-- Tags: no-fasttest
-- no-fasttest: upper/lowerUTF8 use ICU
SELECT randomStringUTF8('string'); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT lengthUTF8(randomStringUTF8(100));
SELECT toTypeName(randomStringUTF8(10));
SELECT isValidUTF8(randomStringUTF8(100000));
SELECT randomStringUTF8(0);
SELECT lengthUTF8(lowerUTF8(randomStringUTF8(99))); -- bug #49672: msan assert

View File

@ -103,4 +103,4 @@ SELECT
FROM system.projection_parts
WHERE (database = currentDatabase()) AND (`table` = 'tp') AND (active = 1);
DROP TABLE tp;
DROP TABLE tp;

View File

@ -0,0 +1,6 @@
2 0
2 1
2 2
3 0
3 1
3 2

View File

@ -0,0 +1,31 @@
DROP TABLE IF EXISTS tp;
CREATE TABLE tp (
type Int32,
eventcnt UInt64,
PROJECTION p (select sum(eventcnt), type group by type)
) engine = ReplacingMergeTree order by type
SETTINGS deduplicate_merge_projection_mode = 'ignore';
INSERT INTO tp SELECT number%3, 1 FROM numbers(3);
INSERT INTO tp SELECT number%3, 2 FROM numbers(3);
OPTIMIZE TABLE tp DEDUPLICATE; -- { serverError SUPPORT_IS_DISABLED }
OPTIMIZE TABLE tp FINAL;
SET optimize_use_projections = false, force_optimize_projection = false;
SELECT sum(eventcnt) eventcnt, type
FROM tp
GROUP BY type
ORDER BY eventcnt, type;
SET optimize_use_projections = true, force_optimize_projection = true;
SELECT sum(eventcnt) eventcnt, type
FROM tp
GROUP BY type
ORDER By eventcnt, type;
DROP TABLE tp;

View File

@ -11,7 +11,9 @@ for i in {1..250}; do
table_structure+=", c$i String"
done
$CLICKHOUSE_CLIENT --query "
MY_CLICKHOUSE_CLIENT="$CLICKHOUSE_CLIENT --enable_parsing_to_custom_serialization 1"
$MY_CLICKHOUSE_CLIENT --query "
DROP TABLE IF EXISTS t_insert_mem;
DROP TABLE IF EXISTS t_reference;
@ -23,7 +25,7 @@ $CLICKHOUSE_CLIENT --query "
filename="test_data_sparse_$CLICKHOUSE_DATABASE.json"
$CLICKHOUSE_CLIENT --query "
$MY_CLICKHOUSE_CLIENT --query "
INSERT INTO FUNCTION file('$filename', LineAsString)
SELECT format('{{ \"id\": {}, \"c{}\": \"{}\" }}', number, number % 250, hex(number * 1000000)) FROM numbers(30000)
SETTINGS engine_file_truncate_on_insert = 1;
@ -34,15 +36,19 @@ $CLICKHOUSE_CLIENT --query "
"
for _ in {1..4}; do
$CLICKHOUSE_CLIENT --query "INSERT INTO t_reference SELECT * FROM file('$filename', JSONEachRow)"
$MY_CLICKHOUSE_CLIENT --query "INSERT INTO t_reference SELECT * FROM file('$filename', JSONEachRow)"
done;
$CLICKHOUSE_CLIENT --enable_parsing_to_custom_serialization 1 --query "INSERT INTO t_insert_mem SELECT * FROM file('$filename', JSONEachRow)"
$CLICKHOUSE_CLIENT --enable_parsing_to_custom_serialization 1 --query "INSERT INTO t_insert_mem SELECT * FROM file('$filename', JSONEachRow)"
$CLICKHOUSE_CLIENT --enable_parsing_to_custom_serialization 1 --query "INSERT INTO t_insert_mem SELECT * FROM s3(s3_conn, filename='$filename', format='JSONEachRow')"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file('$filename', LineAsString) FORMAT LineAsString" | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=INSERT+INTO+t_insert_mem+FORMAT+JSONEachRow&enable_parsing_to_custom_serialization=1" --data-binary @-
$MY_CLICKHOUSE_CLIENT --query "INSERT INTO t_insert_mem SELECT * FROM file('$filename', JSONEachRow)"
$MY_CLICKHOUSE_CLIENT --query "INSERT INTO t_insert_mem SELECT * FROM file('$filename', JSONEachRow)"
$CLICKHOUSE_CLIENT --query "
$MY_CLICKHOUSE_CLIENT --query "DETACH TABLE t_insert_mem"
$MY_CLICKHOUSE_CLIENT --query "ATTACH TABLE t_insert_mem"
$MY_CLICKHOUSE_CLIENT --query "INSERT INTO t_insert_mem SELECT * FROM s3(s3_conn, filename='$filename', format='JSONEachRow')"
$MY_CLICKHOUSE_CLIENT --query "SELECT * FROM file('$filename', LineAsString) FORMAT LineAsString" | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=INSERT+INTO+t_insert_mem+FORMAT+JSONEachRow&enable_parsing_to_custom_serialization=1" --data-binary @-
$MY_CLICKHOUSE_CLIENT --query "
SELECT count() FROM t_insert_mem;
SELECT sum(sipHash64(*)) FROM t_insert_mem;
SELECT sum(sipHash64(*)) FROM t_reference;
@ -53,7 +59,7 @@ $CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
SELECT written_bytes <= 3000000 FROM system.query_log
SELECT written_bytes <= 10000000 FROM system.query_log
WHERE query LIKE 'INSERT INTO t_insert_mem%' AND current_database = '$CLICKHOUSE_DATABASE' AND type = 'QueryFinish'
ORDER BY event_time_microseconds;

View File

@ -0,0 +1,3 @@
nan
nan
nan

View File

@ -0,0 +1,3 @@
SELECT nan ORDER BY 1 WITH FILL;
SELECT -nan ORDER BY 1 WITH FILL;
SELECT 0./0. ORDER BY 1 WITH FILL;

View File

@ -0,0 +1 @@
0

View File

@ -0,0 +1,4 @@
-- Tags: no-fasttest
-- no-fasttest: upper/lowerUTF8 use ICU
SELECT ignore(lengthUTF8(lowerUTF8(randomStringUTF8(99)))); -- bug #49672: msan assert

View File

@ -0,0 +1,37 @@
-- { echoOn }
-- https://github.com/ClickHouse/ClickHouse/issues/68895
SELECT arrayMax(x -> toFixedString('.', 1), []);
.
-- https://github.com/ClickHouse/ClickHouse/issues/69600
SELECT arrayMax(x -> (-x), [1, 2, 4]) AS res;
-1
SELECT arrayMax(x -> toUInt16(-x), [1, 2, 4]) AS res;
65535
-- https://github.com/ClickHouse/ClickHouse/pull/69640
SELECT arrayMin(x1 -> (x1 * toNullable(-1)), materialize([1, 2, 3]));
-3
SELECT arrayMin(x1 -> x1 * -1, [1,2,3]);
-3
DROP TABLE IF EXISTS test_aggregation_array;
CREATE TABLE test_aggregation_array (x Array(Int)) ENGINE=MergeTree() ORDER by tuple();
INSERT INTO test_aggregation_array VALUES ([1,2,3,4,5,6]), ([]), ([1,2,3]);
SELECT [arrayMin(x1 -> (x1 * materialize(-1)), [toNullable(toUInt256(0)), materialize(4)])], arrayMin([arrayMin([0])]) FROM test_aggregation_array GROUP BY arrayAvg([1]), [0, toUInt256(8)] WITH CUBE SETTINGS allow_experimental_analyzer = 1;
[-4] 0
[-4] 0
[-4] 0
[-4] 0
SELECT [arrayMin([3, arrayMin([toUInt128(8)]), 4, 5]), arrayMax([materialize(1)]), arrayMin([arrayMax([1]), 2]), 2], arrayMin([0, toLowCardinality(8)]), 2, arrayMax(x1 -> (x1 * -1), x) FROM test_aggregation_array;
[3,1,1,2] 0 2 -1
[3,1,1,2] 0 2 0
[3,1,1,2] 0 2 -1
select arrayMax(x -> x.1, [(1, 'a'), (0, 'b')]);
1
select arrayMin(x -> x.2, [(1, 'a'), (0, 'b')]);
a
-- Extra validation of generic arrayMin/arrayMax
WITH [(1,2),(1,3)] AS t SELECT arrayMin(t), arrayMax(t);
(1,2) (1,3)
WITH [map('a', 1, 'b', 2), map('a',1,'b',3)] AS t SELECT arrayMin(t), arrayMax(t);
{'a':1,'b':2} {'a':1,'b':3}
WITH [map('a', 1, 'b', 2, 'c', 10), map('a',1,'b',3, 'c', 0)] AS t SELECT arrayMin(x -> x['c'], t), arrayMax(x -> x['c'], t);
0 10

View File

@ -0,0 +1,26 @@
-- { echoOn }
-- https://github.com/ClickHouse/ClickHouse/issues/68895
SELECT arrayMax(x -> toFixedString('.', 1), []);
-- https://github.com/ClickHouse/ClickHouse/issues/69600
SELECT arrayMax(x -> (-x), [1, 2, 4]) AS res;
SELECT arrayMax(x -> toUInt16(-x), [1, 2, 4]) AS res;
-- https://github.com/ClickHouse/ClickHouse/pull/69640
SELECT arrayMin(x1 -> (x1 * toNullable(-1)), materialize([1, 2, 3]));
SELECT arrayMin(x1 -> x1 * -1, [1,2,3]);
DROP TABLE IF EXISTS test_aggregation_array;
CREATE TABLE test_aggregation_array (x Array(Int)) ENGINE=MergeTree() ORDER by tuple();
INSERT INTO test_aggregation_array VALUES ([1,2,3,4,5,6]), ([]), ([1,2,3]);
SELECT [arrayMin(x1 -> (x1 * materialize(-1)), [toNullable(toUInt256(0)), materialize(4)])], arrayMin([arrayMin([0])]) FROM test_aggregation_array GROUP BY arrayAvg([1]), [0, toUInt256(8)] WITH CUBE SETTINGS allow_experimental_analyzer = 1;
SELECT [arrayMin([3, arrayMin([toUInt128(8)]), 4, 5]), arrayMax([materialize(1)]), arrayMin([arrayMax([1]), 2]), 2], arrayMin([0, toLowCardinality(8)]), 2, arrayMax(x1 -> (x1 * -1), x) FROM test_aggregation_array;
select arrayMax(x -> x.1, [(1, 'a'), (0, 'b')]);
select arrayMin(x -> x.2, [(1, 'a'), (0, 'b')]);
-- Extra validation of generic arrayMin/arrayMax
WITH [(1,2),(1,3)] AS t SELECT arrayMin(t), arrayMax(t);
WITH [map('a', 1, 'b', 2), map('a',1,'b',3)] AS t SELECT arrayMin(t), arrayMax(t);
WITH [map('a', 1, 'b', 2, 'c', 10), map('a',1,'b',3, 'c', 0)] AS t SELECT arrayMin(x -> x['c'], t), arrayMax(x -> x['c'], t);

View File

@ -0,0 +1 @@
CREATE MATERIALIZED VIEW v0 AS (SELECT 1) INTERSECT (SELECT 1); --{serverError QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW}

View File

@ -1,12 +0,0 @@
Using queries from 'queries' directory
Connecting to ClickHouse server...... OK
Connected to server 24.7.1.1 @ 246f421f2402799fd11b22a608b4d0d497cb8438 chesema-processor-onCancel
Running 1 stateless tests (MainProcess).
00993_system_parts_race_condition_drop_zookeeper: [ OK ]
1 tests passed. 0 tests skipped. 124.59 s elapsed (MainProcess).
0 tests passed. 0 tests skipped. 0.00 s elapsed (MainProcess).
All tests have finished.

View File

@ -1,3 +1,4 @@
v24.9.2.42-stable 2024-10-03
v24.9.1.3278-stable 2024-09-26
v24.8.4.13-lts 2024-09-06
v24.8.3.59-lts 2024-09-03

1 v24.9.1.3278-stable v24.9.2.42-stable 2024-09-26 2024-10-03
1 v24.9.2.42-stable 2024-10-03
2 v24.9.1.3278-stable v24.9.1.3278-stable 2024-09-26 2024-09-26
3 v24.8.4.13-lts v24.8.4.13-lts 2024-09-06 2024-09-06
4 v24.8.3.59-lts v24.8.3.59-lts 2024-09-03 2024-09-03