diff --git a/base/poco/Net/src/TCPServerDispatcher.cpp b/base/poco/Net/src/TCPServerDispatcher.cpp index 20a1ffe1b4f..7f9f9a20ee7 100644 --- a/base/poco/Net/src/TCPServerDispatcher.cpp +++ b/base/poco/Net/src/TCPServerDispatcher.cpp @@ -93,7 +93,7 @@ void TCPServerDispatcher::release() void TCPServerDispatcher::run() { - AutoPtr guard(this, true); // ensure object stays alive + AutoPtr guard(this); // ensure object stays alive int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds(); @@ -149,11 +149,13 @@ void TCPServerDispatcher::enqueue(const StreamSocket& socket) { try { + this->duplicate(); _threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName); ++_currentThreads; } catch (Poco::Exception& exc) { + this->release(); ++_refusedConnections; std::cerr << "Got exception while starting thread for connection. Error code: " << exc.code() << ", message: '" << exc.displayText() << "'" << std::endl; diff --git a/docs/en/getting-started/example-datasets/laion.md b/docs/en/getting-started/example-datasets/laion.md index 0dbaceffc13..327c1796d11 100644 --- a/docs/en/getting-started/example-datasets/laion.md +++ b/docs/en/getting-started/example-datasets/laion.md @@ -10,10 +10,14 @@ The embeddings and the metadata are stored in separate files in the raw data. A converts them to CSV and imports them into ClickHouse. You can use the following `download.sh` script for that: ```bash -wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/img_emb/img_emb_${1}.npy # download image embedding -wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/text_emb/text_emb_${1}.npy # download text embedding -wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/metadata/metadata_${1}.parquet # download metadata -python3 process.py ${1} # merge files and convert to CSV +number=${1} +if [[ $number == '' ]]; then + number=1 +fi; +wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/img_emb/img_emb_${number}.npy # download image embedding +wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/text_emb/text_emb_${number}.npy # download text embedding +wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/metadata/metadata_${number}.parquet # download metadata +python3 process.py $number # merge files and convert to CSV ``` Script `process.py` is defined as follows: diff --git a/docs/en/sql-reference/aggregate-functions/reference/varpop.md b/docs/en/sql-reference/aggregate-functions/reference/varpop.md index 751688b0830..76472f62789 100644 --- a/docs/en/sql-reference/aggregate-functions/reference/varpop.md +++ b/docs/en/sql-reference/aggregate-functions/reference/varpop.md @@ -1,16 +1,99 @@ --- -slug: /en/sql-reference/aggregate-functions/reference/varpop +title: "varPop" +slug: "/en/sql-reference/aggregate-functions/reference/varpop" sidebar_position: 32 --- -# varPop(x) +This page covers the `varPop` and `varPopStable` functions available in ClickHouse. -Calculates the amount `Σ((x - x̅)^2) / n`, where `n` is the sample size and `x̅`is the average value of `x`. +## varPop -In other words, dispersion for a set of values. Returns `Float64`. +Calculates the population covariance between two data columns. The population covariance measures the degree to which two variables vary together. Calculates the amount `Σ((x - x̅)^2) / n`, where `n` is the sample size and `x̅`is the average value of `x`. -Alias: `VAR_POP`. +**Syntax** -:::note -This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `varPopStable` function. It works slower but provides a lower computational error. -::: \ No newline at end of file +```sql +covarPop(x, y) +``` + +**Parameters** + +- `x`: The first data column. [Numeric](../../../native-protocol/columns.md) +- `y`: The second data column. [Numeric](../../../native-protocol/columns.md) + +**Returned value** + +Returns an integer of type `Float64`. + +**Implementation details** + +This function uses a numerically unstable algorithm. If you need numerical stability in calculations, use the slower but more stable [`varPopStable` function](#varPopStable). + +**Example** + +```sql +DROP TABLE IF EXISTS test_data; +CREATE TABLE test_data +( + x Int32, + y Int32 +) +ENGINE = Memory; + +INSERT INTO test_data VALUES (1, 2), (2, 3), (3, 5), (4, 6), (5, 8); + +SELECT + covarPop(x, y) AS covar_pop +FROM test_data; +``` + +```response +3 +``` + +## varPopStable + +Calculates population covariance between two data columns using a stable, numerically accurate method to calculate the variance. This function is designed to provide reliable results even with large datasets or values that might cause numerical instability in other implementations. + +**Syntax** + +```sql +covarPopStable(x, y) +``` + +**Parameters** + +- `x`: The first data column. [String literal](../syntax#syntax-string-literal) +- `y`: The second data column. [Expression](../syntax#syntax-expressions) + +**Returned value** + +Returns an integer of type `Float64`. + +**Implementation details** + +Unlike [`varPop()`](#varPop), this function uses a stable, numerically accurate algorithm to calculate the population variance to avoid issues like catastrophic cancellation or loss of precision. This function also handles `NaN` and `Inf` values correctly, excluding them from calculations. + +**Example** + +Query: + +```sql +DROP TABLE IF EXISTS test_data; +CREATE TABLE test_data +( + x Int32, + y Int32 +) +ENGINE = Memory; + +INSERT INTO test_data VALUES (1, 2), (2, 9), (9, 5), (4, 6), (5, 8); + +SELECT + covarPopStable(x, y) AS covar_pop_stable +FROM test_data; +``` + +```response +0.5999999999999999 +``` diff --git a/docs/en/sql-reference/aggregate-functions/reference/varsamp.md b/docs/en/sql-reference/aggregate-functions/reference/varsamp.md index 9b2b94936ec..e75cb075ff8 100644 --- a/docs/en/sql-reference/aggregate-functions/reference/varsamp.md +++ b/docs/en/sql-reference/aggregate-functions/reference/varsamp.md @@ -1,18 +1,128 @@ --- +title: "varSamp" slug: /en/sql-reference/aggregate-functions/reference/varsamp sidebar_position: 33 --- -# varSamp +This page contains information on the `varSamp` and `varSampStable` ClickHouse functions. -Calculates the amount `Σ((x - x̅)^2) / (n - 1)`, where `n` is the sample size and `x̅`is the average value of `x`. +## varSamp -It represents an unbiased estimate of the variance of a random variable if passed values from its sample. +Calculate the sample variance of a data set. -Returns `Float64`. When `n <= 1`, returns `+∞`. +**Syntax** -Alias: `VAR_SAMP`. +```sql +varSamp(expr) +``` -:::note -This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `varSampStable` function. It works slower but provides a lower computational error. -::: +**Parameters** + +- `expr`: An expression representing the data set for which you want to calculate the sample variance. [Expression](../syntax#syntax-expressions) + +**Returned value** + +Returns a Float64 value representing the sample variance of the input data set. + +**Implementation details** + +The `varSamp()` function calculates the sample variance using the following formula: + +```plaintext +∑(x - mean(x))^2 / (n - 1) +``` + +Where: + +- `x` is each individual data point in the data set. +- `mean(x)` is the arithmetic mean of the data set. +- `n` is the number of data points in the data set. + +The function assumes that the input data set represents a sample from a larger population. If you want to calculate the variance of the entire population (when you have the complete data set), you should use the [`varPop()` function](./varpop#varpop) instead. + +This function uses a numerically unstable algorithm. If you need numerical stability in calculations, use the slower but more stable [`varSampStable` function](#varSampStable). + +**Example** + +Query: + +```sql +CREATE TABLE example_table +( + id UInt64, + value Float64 +) +ENGINE = MergeTree +ORDER BY id; + +INSERT INTO example_table VALUES (1, 10.5), (2, 12.3), (3, 9.8), (4, 11.2), (5, 10.7); + +SELECT varSamp(value) FROM example_table; +``` + +Response: + +```response +0.8650000000000091 +``` + +## varSampStable + +Calculate the sample variance of a data set using a numerically stable algorithm. + +**Syntax** + +```sql +varSampStable(expr) +``` + +**Parameters** + +- `expr`: An expression representing the data set for which you want to calculate the sample variance. [Expression](../syntax#syntax-expressions) + +**Returned value** + +The `varSampStable()` function returns a Float64 value representing the sample variance of the input data set. + +**Implementation details** + +The `varSampStable()` function calculates the sample variance using the same formula as the [`varSamp()`](#varSamp function): + +```plaintext +∑(x - mean(x))^2 / (n - 1) +``` + +Where: +- `x` is each individual data point in the data set. +- `mean(x)` is the arithmetic mean of the data set. +- `n` is the number of data points in the data set. + +The difference between `varSampStable()` and `varSamp()` is that `varSampStable()` is designed to provide a more deterministic and stable result when dealing with floating-point arithmetic. It uses an algorithm that minimizes the accumulation of rounding errors, which can be particularly important when dealing with large data sets or data with a wide range of values. + +Like `varSamp()`, the `varSampStable()` function assumes that the input data set represents a sample from a larger population. If you want to calculate the variance of the entire population (when you have the complete data set), you should use the [`varPopStable()` function](./varpop#varpopstable) instead. + +**Example** + +Query: + +```sql +CREATE TABLE example_table +( + id UInt64, + value Float64 +) +ENGINE = MergeTree +ORDER BY id; + +INSERT INTO example_table VALUES (1, 10.5), (2, 12.3), (3, 9.8), (4, 11.2), (5, 10.7); + +SELECT varSampStable(value) FROM example_table; +``` + +Response: + +```response +0.865 +``` + +This query calculates the sample variance of the `value` column in the `example_table` using the `varSampStable()` function. The result shows that the sample variance of the values `[10.5, 12.3, 9.8, 11.2, 10.7]` is approximately 0.865, which may differ slightly from the result of `varSamp()` due to the more precise handling of floating-point arithmetic. diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index c1ac3d08245..ab1a16a3edf 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -476,6 +476,7 @@ The server successfully detected this situation and will download merged part fr M(FileSegmentRemoveMicroseconds, "File segment remove() time") \ M(FileSegmentHolderCompleteMicroseconds, "File segments holder complete() time") \ M(FileSegmentFailToIncreasePriority, "Number of times the priority was not increased due to a high contention on the cache lock") \ + M(FilesystemCacheFailToReserveSpaceBecauseOfLockContention, "Number of times space reservation was skipped due to a high contention on the cache lock") \ M(FilesystemCacheHoldFileSegments, "Filesystem cache file segments count, which were hold") \ M(FilesystemCacheUnusedHoldFileSegments, "Filesystem cache file segments count, which were hold, but not used (because of seek or LIMIT n, etc)") \ \ diff --git a/src/Common/tests/gtest_connection_pool.cpp b/src/Common/tests/gtest_connection_pool.cpp index c271cc0e2ec..dcc3c11fd52 100644 --- a/src/Common/tests/gtest_connection_pool.cpp +++ b/src/Common/tests/gtest_connection_pool.cpp @@ -123,17 +123,15 @@ protected: std::string getServerUrl() const { - return "http://" + server_data.socket->address().toString(); + return "http://" + server_data.server->socket().address().toString(); } void startServer() { server_data.reset(); - server_data.params = new Poco::Net::HTTPServerParams(); - server_data.socket = std::make_unique(server_data.port); server_data.handler_factory = new HTTPRequestHandlerFactory(slowdown_receive); server_data.server = std::make_unique( - server_data.handler_factory, *server_data.socket, server_data.params); + server_data.handler_factory, server_data.port); server_data.server->start(); } @@ -155,8 +153,7 @@ protected: { // just some port to avoid collisions with others tests UInt16 port = 9871; - Poco::Net::HTTPServerParams::Ptr params; - std::unique_ptr socket; + HTTPRequestHandlerFactory::Ptr handler_factory; std::unique_ptr server; @@ -171,8 +168,6 @@ protected: server = nullptr; handler_factory = nullptr; - socket = nullptr; - params = nullptr; } ~ServerData() { diff --git a/src/Coordination/KeeperSnapshotManagerS3.cpp b/src/Coordination/KeeperSnapshotManagerS3.cpp index 796506a07db..80345db2524 100644 --- a/src/Coordination/KeeperSnapshotManagerS3.cpp +++ b/src/Coordination/KeeperSnapshotManagerS3.cpp @@ -121,8 +121,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo auth_settings.use_insecure_imds_request.value_or(false), auth_settings.expiration_window_seconds.value_or(S3::DEFAULT_EXPIRATION_WINDOW_SECONDS), auth_settings.no_sign_request.value_or(false), - }, - credentials.GetSessionToken()); + }); auto new_client = std::make_shared(std::move(new_uri), std::move(auth_settings), std::move(client)); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index d70a6cf51c5..7ba335099e6 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -778,6 +778,7 @@ class IColumn; M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \ M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \ M(UInt64, filesystem_cache_segments_batch_size, 20, "Limit on size of a single batch of file segments that a read buffer can request from cache. Too low value will lead to excessive requests to cache, too large may slow down eviction from cache", 0) \ + M(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, "Wait time to lock cache for sapce reservation in filesystem cache", 0) \ \ M(Bool, use_page_cache_for_disks_without_file_cache, false, "Use userspace page cache for remote disks that don't have filesystem cache enabled.", 0) \ M(Bool, read_from_page_cache_if_exists_otherwise_bypass_cache, false, "Use userspace page cache in passive mode, similar to read_from_filesystem_cache_if_exists_otherwise_bypass_cache.", 0) \ diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index e680c02671a..d7b0669f64f 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -93,6 +93,7 @@ static std::map sett {"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"}, {"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."}, {"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"}, + {"filesystem_cache_reserve_space_wait_lock_timeout_milliseconds", 1000, 1000, "Wait time to lock cache for sapce reservation in filesystem cache"}, }}, {"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"}, {"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"}, diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index 47ee5858562..1e108b481ee 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -637,7 +637,8 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment) ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size); - bool continue_predownload = file_segment.reserve(current_predownload_size); + bool continue_predownload = file_segment.reserve( + current_predownload_size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds); if (continue_predownload) { LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size); @@ -992,7 +993,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep() { chassert(file_offset_of_buffer_end + size - 1 <= file_segment.range().right); - bool success = file_segment.reserve(size); + bool success = file_segment.reserve(size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds); if (success) { chassert(file_segment.getCurrentWriteOffset() == static_cast(implementation_buffer->getPosition())); diff --git a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp index faed55de713..f4e309f461e 100644 --- a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp @@ -26,16 +26,18 @@ FileSegmentRangeWriter::FileSegmentRangeWriter( FileCache * cache_, const FileSegment::Key & key_, const FileCacheUserInfo & user_, + size_t reserve_space_lock_wait_timeout_milliseconds_, std::shared_ptr cache_log_, const String & query_id_, const String & source_path_) : cache(cache_) , key(key_) + , user(user_) + , reserve_space_lock_wait_timeout_milliseconds(reserve_space_lock_wait_timeout_milliseconds_) , log(getLogger("FileSegmentRangeWriter")) , cache_log(cache_log_) , query_id(query_id_) , source_path(source_path_) - , user(user_) { } @@ -89,7 +91,7 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset size_t size_to_write = std::min(available_size, size); - bool reserved = file_segment->reserve(size_to_write); + bool reserved = file_segment->reserve(size_to_write, reserve_space_lock_wait_timeout_milliseconds); if (!reserved) { appendFilesystemCacheLog(*file_segment); @@ -211,6 +213,7 @@ CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile( , key(key_) , query_id(query_id_) , user(user_) + , reserve_space_lock_wait_timeout_milliseconds(settings_.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds) , throw_on_error_from_cache(settings_.throw_on_error_from_cache) , cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log ? cache_log_ : nullptr) { @@ -251,7 +254,8 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool t if (!cache_writer) { - cache_writer = std::make_unique(cache.get(), key, user, cache_log, query_id, source_path); + cache_writer = std::make_unique( + cache.get(), key, user, reserve_space_lock_wait_timeout_milliseconds, cache_log, query_id, source_path); } Stopwatch watch(CLOCK_MONOTONIC); diff --git a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.h b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.h index 59e0c76ca3d..ad4f6b5916d 100644 --- a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.h +++ b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.h @@ -30,6 +30,7 @@ public: FileCache * cache_, const FileSegment::Key & key_, const FileCacheUserInfo & user_, + size_t reserve_space_lock_wait_timeout_milliseconds_, std::shared_ptr cache_log_, const String & query_id_, const String & source_path_); @@ -52,13 +53,14 @@ private: void completeFileSegment(); FileCache * cache; - FileSegment::Key key; + const FileSegment::Key key; + const FileCacheUserInfo user; + const size_t reserve_space_lock_wait_timeout_milliseconds; LoggerPtr log; std::shared_ptr cache_log; - String query_id; - String source_path; - FileCacheUserInfo user; + const String query_id; + const String source_path; FileSegmentsHolderPtr file_segments; @@ -99,11 +101,12 @@ private: String source_path; FileCacheKey key; - size_t current_download_offset = 0; const String query_id; const FileCacheUserInfo user; + const size_t reserve_space_lock_wait_timeout_milliseconds; + const bool throw_on_error_from_cache; - bool throw_on_error_from_cache; + size_t current_download_offset = 0; bool cache_in_error_state_or_disabled = false; std::unique_ptr cache_writer; diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index c0a63bf51b1..6a0cac35878 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -100,6 +100,7 @@ struct ReadSettings bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false; bool enable_filesystem_cache_log = false; size_t filesystem_cache_segments_batch_size = 20; + size_t filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = 1000; bool use_page_cache_for_disks_without_file_cache = false; bool read_from_page_cache_if_exists_otherwise_bypass_cache = false; diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index fcadf34f021..7d36677b468 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -20,6 +20,7 @@ struct WriteSettings bool enable_filesystem_cache_on_write_operations = false; bool enable_filesystem_cache_log = false; bool throw_on_error_from_cache = false; + size_t filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = 1000; bool s3_allow_parallel_part_upload = true; diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index 9c705ddc27c..ea40ffcfa3c 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -27,6 +27,7 @@ namespace ProfileEvents extern const Event FilesystemCacheReserveMicroseconds; extern const Event FilesystemCacheGetOrSetMicroseconds; extern const Event FilesystemCacheGetMicroseconds; + extern const Event FilesystemCacheFailToReserveSpaceBecauseOfLockContention; } namespace DB @@ -188,9 +189,9 @@ CacheGuard::Lock FileCache::lockCache() const return cache_guard.lock(); } -CacheGuard::Lock FileCache::tryLockCache() const +CacheGuard::Lock FileCache::tryLockCache(std::optional acquire_timeout) const { - return cache_guard.tryLock(); + return acquire_timeout.has_value() ? cache_guard.tryLockFor(acquire_timeout.value()) : cache_guard.tryLock(); } FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const @@ -776,12 +777,18 @@ bool FileCache::tryReserve( FileSegment & file_segment, const size_t size, FileCacheReserveStat & reserve_stat, - const UserInfo & user) + const UserInfo & user, + size_t lock_wait_timeout_milliseconds) { ProfileEventTimeIncrement watch(ProfileEvents::FilesystemCacheReserveMicroseconds); assertInitialized(); - auto cache_lock = lockCache(); + auto cache_lock = tryLockCache(std::chrono::milliseconds(lock_wait_timeout_milliseconds)); + if (!cache_lock) + { + ProfileEvents::increment(ProfileEvents::FilesystemCacheFailToReserveSpaceBecauseOfLockContention); + return false; + } LOG_TEST( log, "Trying to reserve space ({} bytes) for {}:{}, current usage {}/{}", diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 5b665ad0271..007c4fd9483 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -161,7 +161,8 @@ public: FileSegment & file_segment, size_t size, FileCacheReserveStat & stat, - const UserInfo & user); + const UserInfo & user, + size_t lock_wait_timeout_milliseconds); std::vector getFileSegmentInfos(const UserID & user_id); @@ -173,7 +174,7 @@ public: void deactivateBackgroundOperations(); CacheGuard::Lock lockCache() const; - CacheGuard::Lock tryLockCache() const; + CacheGuard::Lock tryLockCache(std::optional acquire_timeout = std::nullopt) const; std::vector sync(); diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index 6b2d4a4bec8..9ec2b090dc7 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -497,7 +497,7 @@ LockedKeyPtr FileSegment::lockKeyMetadata(bool assert_exists) const return metadata->tryLock(); } -bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve_stat) +bool FileSegment::reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat) { if (!size_to_reserve) throw Exception(ErrorCodes::LOGICAL_ERROR, "Zero space reservation is not allowed"); @@ -549,7 +549,7 @@ bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve if (!reserve_stat) reserve_stat = &dummy_stat; - bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user); + bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user, lock_wait_timeout_milliseconds); if (!reserved) setDownloadFailedUnlocked(lockFileSegment()); diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index ea97a6b0157..c34ee064345 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -199,7 +199,7 @@ public: /// Try to reserve exactly `size` bytes (in addition to the getDownloadedSize() bytes already downloaded). /// Returns true if reservation was successful, false otherwise. - bool reserve(size_t size_to_reserve, FileCacheReserveStat * reserve_stat = nullptr); + bool reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat = nullptr); /// Write data into reserved space. void write(const char * from, size_t size, size_t offset); diff --git a/src/Interpreters/Cache/Guards.h b/src/Interpreters/Cache/Guards.h index 5729620d82f..0ac7cb80483 100644 --- a/src/Interpreters/Cache/Guards.h +++ b/src/Interpreters/Cache/Guards.h @@ -61,17 +61,26 @@ namespace DB */ struct CacheGuard : private boost::noncopyable { + using Mutex = std::timed_mutex; /// struct is used (not keyword `using`) to make CacheGuard::Lock non-interchangable with other guards locks /// so, we wouldn't be able to pass CacheGuard::Lock to a function which accepts KeyGuard::Lock, for example - struct Lock : public std::unique_lock + struct Lock : public std::unique_lock { - using Base = std::unique_lock; + using Base = std::unique_lock; using Base::Base; }; Lock lock() { return Lock(mutex); } + Lock tryLock() { return Lock(mutex, std::try_to_lock); } - std::mutex mutex; + + Lock tryLockFor(const std::chrono::milliseconds & acquire_timeout) + { + return Lock(mutex, std::chrono::duration(acquire_timeout)); + } + +private: + Mutex mutex; }; /** diff --git a/src/Interpreters/Cache/Metadata.cpp b/src/Interpreters/Cache/Metadata.cpp index 727f2762cca..b79605622b6 100644 --- a/src/Interpreters/Cache/Metadata.cpp +++ b/src/Interpreters/Cache/Metadata.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -693,6 +694,9 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optionalset(memory->data(), memory->size()); } + const auto reserve_space_lock_wait_timeout_milliseconds = + Context::getGlobalContextInstance()->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds; + size_t offset = file_segment.getCurrentWriteOffset(); if (offset != static_cast(reader->getPosition())) reader->seek(offset, SEEK_SET); @@ -701,7 +705,7 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optionalavailable(); - if (!file_segment.reserve(size)) + if (!file_segment.reserve(size, reserve_space_lock_wait_timeout_milliseconds)) { LOG_TEST( log, "Failed to reserve space during background download " diff --git a/src/Interpreters/Cache/WriteBufferToFileSegment.cpp b/src/Interpreters/Cache/WriteBufferToFileSegment.cpp index 7cd4e2d6e8d..51914c0a14e 100644 --- a/src/Interpreters/Cache/WriteBufferToFileSegment.cpp +++ b/src/Interpreters/Cache/WriteBufferToFileSegment.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -18,9 +19,22 @@ namespace ErrorCodes extern const int NOT_ENOUGH_SPACE; } +namespace +{ + size_t getCacheLockWaitTimeout() + { + auto query_context = CurrentThread::getQueryContext(); + if (query_context) + return query_context->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds; + else + return Context::getGlobalContextInstance()->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds; + } +} + WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_) : WriteBufferFromFileDecorator(std::make_unique(file_segment_->getPath())) , file_segment(file_segment_) + , reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout()) { } @@ -31,6 +45,7 @@ WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegmentsHolderPtr segment : throw Exception(ErrorCodes::LOGICAL_ERROR, "WriteBufferToFileSegment can be created only from single segment")) , file_segment(&segment_holder_->front()) , segment_holder(std::move(segment_holder_)) + , reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout()) { } @@ -49,7 +64,7 @@ void WriteBufferToFileSegment::nextImpl() FileCacheReserveStat reserve_stat; /// In case of an error, we don't need to finalize the file segment /// because it will be deleted soon and completed in the holder's destructor. - bool ok = file_segment->reserve(bytes_to_write, &reserve_stat); + bool ok = file_segment->reserve(bytes_to_write, reserve_space_lock_wait_timeout_milliseconds, &reserve_stat); if (!ok) { diff --git a/src/Interpreters/Cache/WriteBufferToFileSegment.h b/src/Interpreters/Cache/WriteBufferToFileSegment.h index feb33472513..822488ceb48 100644 --- a/src/Interpreters/Cache/WriteBufferToFileSegment.h +++ b/src/Interpreters/Cache/WriteBufferToFileSegment.h @@ -28,6 +28,8 @@ private: /// Empty if file_segment is not owned by this WriteBufferToFileSegment FileSegmentsHolderPtr segment_holder; + + const size_t reserve_space_lock_wait_timeout_milliseconds; }; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index d658fbe9920..6a0657a842c 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -5166,6 +5166,7 @@ ReadSettings Context::getReadSettings() const res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache; res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log; res.filesystem_cache_segments_batch_size = settings.filesystem_cache_segments_batch_size; + res.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds; res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size; res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache; @@ -5214,6 +5215,7 @@ WriteSettings Context::getWriteSettings() const res.enable_filesystem_cache_on_write_operations = settings.enable_filesystem_cache_on_write_operations; res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log; res.throw_on_error_from_cache = settings.throw_on_error_from_cache_on_write_operations; + res.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds; res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload; diff --git a/src/Interpreters/tests/gtest_filecache.cpp b/src/Interpreters/tests/gtest_filecache.cpp index b596ccb0285..2679d1b8d18 100644 --- a/src/Interpreters/tests/gtest_filecache.cpp +++ b/src/Interpreters/tests/gtest_filecache.cpp @@ -245,7 +245,7 @@ void download(FileSegment & file_segment) ASSERT_EQ(file_segment.state(), State::DOWNLOADING); ASSERT_EQ(file_segment.getDownloadedSize(), 0); - ASSERT_TRUE(file_segment.reserve(file_segment.range().size())); + ASSERT_TRUE(file_segment.reserve(file_segment.range().size(), 1000)); download(cache_base_path, file_segment); ASSERT_EQ(file_segment.state(), State::DOWNLOADING); @@ -257,7 +257,7 @@ void assertDownloadFails(FileSegment & file_segment) { ASSERT_EQ(file_segment.getOrSetDownloader(), FileSegment::getCallerId()); ASSERT_EQ(file_segment.getDownloadedSize(), 0); - ASSERT_FALSE(file_segment.reserve(file_segment.range().size())); + ASSERT_FALSE(file_segment.reserve(file_segment.range().size(), 1000)); file_segment.complete(); } @@ -956,7 +956,7 @@ TEST_F(FileCacheTest, temporaryData) for (auto & segment : *some_data_holder) { ASSERT_TRUE(segment->getOrSetDownloader() == DB::FileSegment::getCallerId()); - ASSERT_TRUE(segment->reserve(segment->range().size())); + ASSERT_TRUE(segment->reserve(segment->range().size(), 1000)); download(*segment); segment->complete(); } diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index ff055508aa6..11da394feec 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1451,8 +1451,7 @@ void StorageS3::Configuration::connect(const ContextPtr & context) auth_settings.expiration_window_seconds.value_or( context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)), auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)), - }, - credentials.GetSessionToken()); + }); } void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection) diff --git a/tests/integration/test_distributed_config/configs/clusters.xml b/tests/integration/test_distributed_config/configs/clusters.xml new file mode 100644 index 00000000000..754d765f23f --- /dev/null +++ b/tests/integration/test_distributed_config/configs/clusters.xml @@ -0,0 +1,12 @@ + + + + + + localhost + 9000 + + + + + diff --git a/tests/integration/test_distributed_config/test.py b/tests/integration/test_distributed_config/test.py index 500e9ecdeed..bf4bb5a4335 100644 --- a/tests/integration/test_distributed_config/test.py +++ b/tests/integration/test_distributed_config/test.py @@ -3,7 +3,9 @@ from helpers.cluster import ClickHouseCluster import logging cluster = ClickHouseCluster(__file__) -node = cluster.add_instance("node", main_configs=["configs/overrides.xml"]) +node = cluster.add_instance( + "node", main_configs=["configs/overrides.xml", "configs/clusters.xml"] +) @pytest.fixture(scope="module") @@ -23,7 +25,7 @@ def test_distibuted_settings(start_cluster): node.query( """ CREATE TABLE data_1 (key Int) ENGINE Memory(); - CREATE TABLE dist_1 as data_1 ENGINE Distributed(default, default, data_1) SETTINGS flush_on_detach = true; + CREATE TABLE dist_1 as data_1 ENGINE Distributed(localhost_cluster, default, data_1) SETTINGS flush_on_detach = true; SYSTEM STOP DISTRIBUTED SENDS dist_1; INSERT INTO dist_1 SETTINGS prefer_localhost_replica=0 VALUES (1); DETACH TABLE dist_1; @@ -36,7 +38,7 @@ def test_distibuted_settings(start_cluster): node.query( """ CREATE TABLE data_2 (key Int) ENGINE Memory(); - CREATE TABLE dist_2 as data_2 ENGINE Distributed(default, default, data_2); + CREATE TABLE dist_2 as data_2 ENGINE Distributed(localhost_cluster, default, data_2); SYSTEM STOP DISTRIBUTED SENDS dist_2; INSERT INTO dist_2 SETTINGS prefer_localhost_replica=0 VALUES (2); DETACH TABLE dist_2; diff --git a/tests/integration/test_filesystem_cache/config.d/storage_conf.xml b/tests/integration/test_filesystem_cache/config.d/storage_conf.xml index b614815b34f..a8e4f9f8a99 100644 --- a/tests/integration/test_filesystem_cache/config.d/storage_conf.xml +++ b/tests/integration/test_filesystem_cache/config.d/storage_conf.xml @@ -7,4 +7,8 @@ + + system + filesystem_cache_log
+
diff --git a/tests/integration/test_filesystem_cache/test.py b/tests/integration/test_filesystem_cache/test.py index c44d817c57c..dfab462732a 100644 --- a/tests/integration/test_filesystem_cache/test.py +++ b/tests/integration/test_filesystem_cache/test.py @@ -426,3 +426,78 @@ def test_force_filesystem_cache_on_merges(cluster): test(node, True) node = cluster.instances["node"] test(node, False) + + +def test_system_sync_filesystem_cache(cluster): + node = cluster.instances["node"] + node.query( + """ +DROP TABLE IF EXISTS test; + +CREATE TABLE test (a Int32, b String) +ENGINE = MergeTree() ORDER BY tuple() +SETTINGS disk = disk(type = cache, + max_size = '100Ki', + path = "test_system_sync_filesystem_cache", + delayed_cleanup_interval_ms = 10000000, disk = hdd_blob), + min_bytes_for_wide_part = 10485760; + +INSERT INTO test SELECT 1, 'test'; + """ + ) + + query_id = "system_sync_filesystem_cache_1" + node.query( + "SELECT * FROM test FORMAT Null SETTINGS enable_filesystem_cache_log = 1", + query_id=query_id, + ) + + key, offset = ( + node.query( + f""" + SYSTEM FLUSH LOGS; + SELECT key, offset FROM system.filesystem_cache_log WHERE query_id = '{query_id}' ORDER BY size DESC LIMIT 1; + """ + ) + .strip() + .split("\t") + ) + + cache_path = node.query( + f"SELECT cache_path FROM system.filesystem_cache WHERE key = '{key}' and file_segment_range_begin = {offset}" + ) + + node.exec_in_container(["bash", "-c", f"rm {cache_path}"]) + + assert key in node.query("SYSTEM SYNC FILESYSTEM CACHE") + + node.query("SELECT * FROM test FORMAT Null") + assert key not in node.query("SYSTEM SYNC FILESYSTEM CACHE") + + query_id = "system_sync_filesystem_cache_2" + node.query( + "SELECT * FROM test FORMAT Null SETTINGS enable_filesystem_cache_log = 1", + query_id=query_id, + ) + + key, offset = ( + node.query( + f""" + SYSTEM FLUSH LOGS; + SELECT key, offset FROM system.filesystem_cache_log WHERE query_id = '{query_id}' ORDER BY size DESC LIMIT 1; + """ + ) + .strip() + .split("\t") + ) + cache_path = node.query( + f"SELECT cache_path FROM system.filesystem_cache WHERE key = '{key}' and file_segment_range_begin = {offset}" + ) + + node.exec_in_container(["bash", "-c", f"echo -n 'fff' > {cache_path}"]) + + assert key in node.query("SYSTEM SYNC FILESYSTEM CACHE") + + node.query("SELECT * FROM test FORMAT Null") + + assert key not in node.query("SYSTEM SYNC FILESYSTEM CACHE") diff --git a/tests/integration/test_input_format_parallel_parsing_memory_tracking/test.py b/tests/integration/test_input_format_parallel_parsing_memory_tracking/test.py index c95bbfda708..a89cb619350 100644 --- a/tests/integration/test_input_format_parallel_parsing_memory_tracking/test.py +++ b/tests/integration/test_input_format_parallel_parsing_memory_tracking/test.py @@ -41,7 +41,7 @@ def test_memory_tracking_total(): [ "bash", "-c", - "clickhouse local -q \"SELECT arrayStringConcat(arrayMap(x->toString(cityHash64(x)), range(1000)), ' ') from numbers(10000)\" > data.json", + "clickhouse local -q \"SELECT arrayStringConcat(arrayMap(x->toString(cityHash64(x)), range(1000)), ' ') from numbers(10000)\" > data.jsonl", ] ) @@ -56,7 +56,7 @@ def test_memory_tracking_total(): "--show-error", "--data-binary", "@data.json", - "http://127.1:8123/?query=INSERT%20INTO%20null%20FORMAT%20TSV", + "http://127.1:8123/?query=INSERT%20INTO%20null%20FORMAT%20JSONEachRow", ] ) == "" diff --git a/tests/queries/0_stateless/02810_system_sync_filesystem_cache.reference b/tests/queries/0_stateless/02810_system_sync_filesystem_cache.reference deleted file mode 100644 index 7614df8ec46..00000000000 --- a/tests/queries/0_stateless/02810_system_sync_filesystem_cache.reference +++ /dev/null @@ -1,3 +0,0 @@ -ok -ok -ok diff --git a/tests/queries/0_stateless/02810_system_sync_filesystem_cache.sh b/tests/queries/0_stateless/02810_system_sync_filesystem_cache.sh deleted file mode 100755 index c88ba4d5a74..00000000000 --- a/tests/queries/0_stateless/02810_system_sync_filesystem_cache.sh +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env bash -# Tags: no-fasttest, no-parallel, no-s3-storage, no-random-settings - -# set -x - -CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -# shellcheck source=../shell_config.sh -. "$CUR_DIR"/../shell_config.sh - - -$CLICKHOUSE_CLIENT -nm --query """ -DROP TABLE IF EXISTS test; - -CREATE TABLE test (a Int32, b String) -ENGINE = MergeTree() ORDER BY tuple() -SETTINGS disk = disk(type = cache, max_size = '100Ki', path = ${CLICKHOUSE_TEST_UNIQUE_NAME}, delayed_cleanup_interval_ms = 10000000, disk = s3_disk), min_bytes_for_wide_part = 10485760; - -INSERT INTO test SELECT 1, 'test'; -""" - -query_id=$RANDOM - -$CLICKHOUSE_CLIENT --query_id "$query_id" --query "SELECT * FROM test FORMAT Null SETTINGS enable_filesystem_cache_log = 1" - -${CLICKHOUSE_CLIENT} -q "system flush logs" - -key=$($CLICKHOUSE_CLIENT -nm --query """ -SELECT key FROM system.filesystem_cache_log WHERE query_id = '$query_id' ORDER BY size DESC LIMIT 1; -""") - -offset=$($CLICKHOUSE_CLIENT -nm --query """ -SELECT offset FROM system.filesystem_cache_log WHERE query_id = '$query_id' ORDER BY size DESC LIMIT 1; -""") - -path=$($CLICKHOUSE_CLIENT -nm --query """ -SELECT cache_path FROM system.filesystem_cache WHERE key = '$key' AND file_segment_range_begin = $offset; -""") - -rm $path - -$CLICKHOUSE_CLIENT --query "SELECT * FROM test FORMAT Null SETTINGS enable_filesystem_cache_log = 1" 2>&1 | grep -F -e "No such file or directory" > /dev/null && echo "ok" || echo "fail" - -CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=fatal/g') - -$CLICKHOUSE_CLIENT --query "SYSTEM SYNC FILESYSTEM CACHE" 2>&1 | grep -q "$key" && echo 'ok' || echo 'fail' - -$CLICKHOUSE_CLIENT --query "SELECT * FROM test FORMAT Null" - -key=$($CLICKHOUSE_CLIENT -nm --query """ -SELECT key FROM system.filesystem_cache_log WHERE query_id = '$query_id' ORDER BY size DESC LIMIT 1; -""") - -offset=$($CLICKHOUSE_CLIENT -nm --query """ -SELECT offset FROM system.filesystem_cache_log WHERE query_id = '$query_id' ORDER BY size DESC LIMIT 1; -""") - -path=$($CLICKHOUSE_CLIENT -nm --query """ -SELECT cache_path FROM system.filesystem_cache WHERE key = '$key' AND file_segment_range_begin = $offset; -""") - -echo -n 'fff' > $path - -#cat $path - -$CLICKHOUSE_CLIENT --query "SYSTEM SYNC FILESYSTEM CACHE" 2>&1 | grep -q "$key" && echo 'ok' || echo 'fail' - -$CLICKHOUSE_CLIENT --query "SELECT * FROM test FORMAT Null" - -$CLICKHOUSE_CLIENT --query "SYSTEM SYNC FILESYSTEM CACHE" diff --git a/utils/check-style/aspell-ignore/en/aspell-dict.txt b/utils/check-style/aspell-ignore/en/aspell-dict.txt index 7391411e1a5..57a8e0d5840 100644 --- a/utils/check-style/aspell-ignore/en/aspell-dict.txt +++ b/utils/check-style/aspell-ignore/en/aspell-dict.txt @@ -2692,7 +2692,9 @@ userver utils uuid varPop +varPopStable varSamp +varSampStable variadic variantElement variantType