Merge remote-tracking branch 'origin/master' into split_aggregator

This commit is contained in:
lgbo-ustc 2024-03-14 20:06:17 +08:00
commit 263b4de06a
33 changed files with 406 additions and 138 deletions

View File

@ -93,7 +93,7 @@ void TCPServerDispatcher::release()
void TCPServerDispatcher::run()
{
AutoPtr<TCPServerDispatcher> guard(this, true); // ensure object stays alive
AutoPtr<TCPServerDispatcher> guard(this); // ensure object stays alive
int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
@ -149,11 +149,13 @@ void TCPServerDispatcher::enqueue(const StreamSocket& socket)
{
try
{
this->duplicate();
_threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName);
++_currentThreads;
}
catch (Poco::Exception& exc)
{
this->release();
++_refusedConnections;
std::cerr << "Got exception while starting thread for connection. Error code: "
<< exc.code() << ", message: '" << exc.displayText() << "'" << std::endl;

View File

@ -10,10 +10,14 @@ The embeddings and the metadata are stored in separate files in the raw data. A
converts them to CSV and imports them into ClickHouse. You can use the following `download.sh` script for that:
```bash
wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/img_emb/img_emb_${1}.npy # download image embedding
wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/text_emb/text_emb_${1}.npy # download text embedding
wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/metadata/metadata_${1}.parquet # download metadata
python3 process.py ${1} # merge files and convert to CSV
number=${1}
if [[ $number == '' ]]; then
number=1
fi;
wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/img_emb/img_emb_${number}.npy # download image embedding
wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/text_emb/text_emb_${number}.npy # download text embedding
wget --tries=100 https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/embeddings/metadata/metadata_${number}.parquet # download metadata
python3 process.py $number # merge files and convert to CSV
```
Script `process.py` is defined as follows:

View File

@ -1,16 +1,99 @@
---
slug: /en/sql-reference/aggregate-functions/reference/varpop
title: "varPop"
slug: "/en/sql-reference/aggregate-functions/reference/varpop"
sidebar_position: 32
---
# varPop(x)
This page covers the `varPop` and `varPopStable` functions available in ClickHouse.
Calculates the amount `Σ((x - x̅)^2) / n`, where `n` is the sample size and `x̅`is the average value of `x`.
## varPop
In other words, dispersion for a set of values. Returns `Float64`.
Calculates the population covariance between two data columns. The population covariance measures the degree to which two variables vary together. Calculates the amount `Σ((x - x̅)^2) / n`, where `n` is the sample size and `x̅`is the average value of `x`.
Alias: `VAR_POP`.
**Syntax**
:::note
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `varPopStable` function. It works slower but provides a lower computational error.
:::
```sql
covarPop(x, y)
```
**Parameters**
- `x`: The first data column. [Numeric](../../../native-protocol/columns.md)
- `y`: The second data column. [Numeric](../../../native-protocol/columns.md)
**Returned value**
Returns an integer of type `Float64`.
**Implementation details**
This function uses a numerically unstable algorithm. If you need numerical stability in calculations, use the slower but more stable [`varPopStable` function](#varPopStable).
**Example**
```sql
DROP TABLE IF EXISTS test_data;
CREATE TABLE test_data
(
x Int32,
y Int32
)
ENGINE = Memory;
INSERT INTO test_data VALUES (1, 2), (2, 3), (3, 5), (4, 6), (5, 8);
SELECT
covarPop(x, y) AS covar_pop
FROM test_data;
```
```response
3
```
## varPopStable
Calculates population covariance between two data columns using a stable, numerically accurate method to calculate the variance. This function is designed to provide reliable results even with large datasets or values that might cause numerical instability in other implementations.
**Syntax**
```sql
covarPopStable(x, y)
```
**Parameters**
- `x`: The first data column. [String literal](../syntax#syntax-string-literal)
- `y`: The second data column. [Expression](../syntax#syntax-expressions)
**Returned value**
Returns an integer of type `Float64`.
**Implementation details**
Unlike [`varPop()`](#varPop), this function uses a stable, numerically accurate algorithm to calculate the population variance to avoid issues like catastrophic cancellation or loss of precision. This function also handles `NaN` and `Inf` values correctly, excluding them from calculations.
**Example**
Query:
```sql
DROP TABLE IF EXISTS test_data;
CREATE TABLE test_data
(
x Int32,
y Int32
)
ENGINE = Memory;
INSERT INTO test_data VALUES (1, 2), (2, 9), (9, 5), (4, 6), (5, 8);
SELECT
covarPopStable(x, y) AS covar_pop_stable
FROM test_data;
```
```response
0.5999999999999999
```

View File

@ -1,18 +1,128 @@
---
title: "varSamp"
slug: /en/sql-reference/aggregate-functions/reference/varsamp
sidebar_position: 33
---
# varSamp
This page contains information on the `varSamp` and `varSampStable` ClickHouse functions.
Calculates the amount `Σ((x - x̅)^2) / (n - 1)`, where `n` is the sample size and `x̅`is the average value of `x`.
## varSamp
It represents an unbiased estimate of the variance of a random variable if passed values from its sample.
Calculate the sample variance of a data set.
Returns `Float64`. When `n <= 1`, returns `+∞`.
**Syntax**
Alias: `VAR_SAMP`.
```sql
varSamp(expr)
```
:::note
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `varSampStable` function. It works slower but provides a lower computational error.
:::
**Parameters**
- `expr`: An expression representing the data set for which you want to calculate the sample variance. [Expression](../syntax#syntax-expressions)
**Returned value**
Returns a Float64 value representing the sample variance of the input data set.
**Implementation details**
The `varSamp()` function calculates the sample variance using the following formula:
```plaintext
∑(x - mean(x))^2 / (n - 1)
```
Where:
- `x` is each individual data point in the data set.
- `mean(x)` is the arithmetic mean of the data set.
- `n` is the number of data points in the data set.
The function assumes that the input data set represents a sample from a larger population. If you want to calculate the variance of the entire population (when you have the complete data set), you should use the [`varPop()` function](./varpop#varpop) instead.
This function uses a numerically unstable algorithm. If you need numerical stability in calculations, use the slower but more stable [`varSampStable` function](#varSampStable).
**Example**
Query:
```sql
CREATE TABLE example_table
(
id UInt64,
value Float64
)
ENGINE = MergeTree
ORDER BY id;
INSERT INTO example_table VALUES (1, 10.5), (2, 12.3), (3, 9.8), (4, 11.2), (5, 10.7);
SELECT varSamp(value) FROM example_table;
```
Response:
```response
0.8650000000000091
```
## varSampStable
Calculate the sample variance of a data set using a numerically stable algorithm.
**Syntax**
```sql
varSampStable(expr)
```
**Parameters**
- `expr`: An expression representing the data set for which you want to calculate the sample variance. [Expression](../syntax#syntax-expressions)
**Returned value**
The `varSampStable()` function returns a Float64 value representing the sample variance of the input data set.
**Implementation details**
The `varSampStable()` function calculates the sample variance using the same formula as the [`varSamp()`](#varSamp function):
```plaintext
∑(x - mean(x))^2 / (n - 1)
```
Where:
- `x` is each individual data point in the data set.
- `mean(x)` is the arithmetic mean of the data set.
- `n` is the number of data points in the data set.
The difference between `varSampStable()` and `varSamp()` is that `varSampStable()` is designed to provide a more deterministic and stable result when dealing with floating-point arithmetic. It uses an algorithm that minimizes the accumulation of rounding errors, which can be particularly important when dealing with large data sets or data with a wide range of values.
Like `varSamp()`, the `varSampStable()` function assumes that the input data set represents a sample from a larger population. If you want to calculate the variance of the entire population (when you have the complete data set), you should use the [`varPopStable()` function](./varpop#varpopstable) instead.
**Example**
Query:
```sql
CREATE TABLE example_table
(
id UInt64,
value Float64
)
ENGINE = MergeTree
ORDER BY id;
INSERT INTO example_table VALUES (1, 10.5), (2, 12.3), (3, 9.8), (4, 11.2), (5, 10.7);
SELECT varSampStable(value) FROM example_table;
```
Response:
```response
0.865
```
This query calculates the sample variance of the `value` column in the `example_table` using the `varSampStable()` function. The result shows that the sample variance of the values `[10.5, 12.3, 9.8, 11.2, 10.7]` is approximately 0.865, which may differ slightly from the result of `varSamp()` due to the more precise handling of floating-point arithmetic.

View File

@ -476,6 +476,7 @@ The server successfully detected this situation and will download merged part fr
M(FileSegmentRemoveMicroseconds, "File segment remove() time") \
M(FileSegmentHolderCompleteMicroseconds, "File segments holder complete() time") \
M(FileSegmentFailToIncreasePriority, "Number of times the priority was not increased due to a high contention on the cache lock") \
M(FilesystemCacheFailToReserveSpaceBecauseOfLockContention, "Number of times space reservation was skipped due to a high contention on the cache lock") \
M(FilesystemCacheHoldFileSegments, "Filesystem cache file segments count, which were hold") \
M(FilesystemCacheUnusedHoldFileSegments, "Filesystem cache file segments count, which were hold, but not used (because of seek or LIMIT n, etc)") \
\

View File

@ -123,17 +123,15 @@ protected:
std::string getServerUrl() const
{
return "http://" + server_data.socket->address().toString();
return "http://" + server_data.server->socket().address().toString();
}
void startServer()
{
server_data.reset();
server_data.params = new Poco::Net::HTTPServerParams();
server_data.socket = std::make_unique<Poco::Net::ServerSocket>(server_data.port);
server_data.handler_factory = new HTTPRequestHandlerFactory(slowdown_receive);
server_data.server = std::make_unique<Poco::Net::HTTPServer>(
server_data.handler_factory, *server_data.socket, server_data.params);
server_data.handler_factory, server_data.port);
server_data.server->start();
}
@ -155,8 +153,7 @@ protected:
{
// just some port to avoid collisions with others tests
UInt16 port = 9871;
Poco::Net::HTTPServerParams::Ptr params;
std::unique_ptr<Poco::Net::ServerSocket> socket;
HTTPRequestHandlerFactory::Ptr handler_factory;
std::unique_ptr<Poco::Net::HTTPServer> server;
@ -171,8 +168,6 @@ protected:
server = nullptr;
handler_factory = nullptr;
socket = nullptr;
params = nullptr;
}
~ServerData() {

View File

@ -121,8 +121,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
auth_settings.use_insecure_imds_request.value_or(false),
auth_settings.expiration_window_seconds.value_or(S3::DEFAULT_EXPIRATION_WINDOW_SECONDS),
auth_settings.no_sign_request.value_or(false),
},
credentials.GetSessionToken());
});
auto new_client = std::make_shared<KeeperSnapshotManagerS3::S3Configuration>(std::move(new_uri), std::move(auth_settings), std::move(client));

View File

@ -778,6 +778,7 @@ class IColumn;
M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
M(UInt64, filesystem_cache_segments_batch_size, 20, "Limit on size of a single batch of file segments that a read buffer can request from cache. Too low value will lead to excessive requests to cache, too large may slow down eviction from cache", 0) \
M(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, "Wait time to lock cache for sapce reservation in filesystem cache", 0) \
\
M(Bool, use_page_cache_for_disks_without_file_cache, false, "Use userspace page cache for remote disks that don't have filesystem cache enabled.", 0) \
M(Bool, read_from_page_cache_if_exists_otherwise_bypass_cache, false, "Use userspace page cache in passive mode, similar to read_from_filesystem_cache_if_exists_otherwise_bypass_cache.", 0) \

View File

@ -93,6 +93,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"},
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."},
{"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"},
{"filesystem_cache_reserve_space_wait_lock_timeout_milliseconds", 1000, 1000, "Wait time to lock cache for sapce reservation in filesystem cache"},
}},
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},
{"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"},

View File

@ -637,7 +637,8 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size);
bool continue_predownload = file_segment.reserve(current_predownload_size);
bool continue_predownload = file_segment.reserve(
current_predownload_size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds);
if (continue_predownload)
{
LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size);
@ -992,7 +993,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
{
chassert(file_offset_of_buffer_end + size - 1 <= file_segment.range().right);
bool success = file_segment.reserve(size);
bool success = file_segment.reserve(size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds);
if (success)
{
chassert(file_segment.getCurrentWriteOffset() == static_cast<size_t>(implementation_buffer->getPosition()));

View File

@ -26,16 +26,18 @@ FileSegmentRangeWriter::FileSegmentRangeWriter(
FileCache * cache_,
const FileSegment::Key & key_,
const FileCacheUserInfo & user_,
size_t reserve_space_lock_wait_timeout_milliseconds_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
const String & query_id_,
const String & source_path_)
: cache(cache_)
, key(key_)
, user(user_)
, reserve_space_lock_wait_timeout_milliseconds(reserve_space_lock_wait_timeout_milliseconds_)
, log(getLogger("FileSegmentRangeWriter"))
, cache_log(cache_log_)
, query_id(query_id_)
, source_path(source_path_)
, user(user_)
{
}
@ -89,7 +91,7 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
size_t size_to_write = std::min(available_size, size);
bool reserved = file_segment->reserve(size_to_write);
bool reserved = file_segment->reserve(size_to_write, reserve_space_lock_wait_timeout_milliseconds);
if (!reserved)
{
appendFilesystemCacheLog(*file_segment);
@ -211,6 +213,7 @@ CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
, key(key_)
, query_id(query_id_)
, user(user_)
, reserve_space_lock_wait_timeout_milliseconds(settings_.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds)
, throw_on_error_from_cache(settings_.throw_on_error_from_cache)
, cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log ? cache_log_ : nullptr)
{
@ -251,7 +254,8 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool t
if (!cache_writer)
{
cache_writer = std::make_unique<FileSegmentRangeWriter>(cache.get(), key, user, cache_log, query_id, source_path);
cache_writer = std::make_unique<FileSegmentRangeWriter>(
cache.get(), key, user, reserve_space_lock_wait_timeout_milliseconds, cache_log, query_id, source_path);
}
Stopwatch watch(CLOCK_MONOTONIC);

View File

@ -30,6 +30,7 @@ public:
FileCache * cache_,
const FileSegment::Key & key_,
const FileCacheUserInfo & user_,
size_t reserve_space_lock_wait_timeout_milliseconds_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
const String & query_id_,
const String & source_path_);
@ -52,13 +53,14 @@ private:
void completeFileSegment();
FileCache * cache;
FileSegment::Key key;
const FileSegment::Key key;
const FileCacheUserInfo user;
const size_t reserve_space_lock_wait_timeout_milliseconds;
LoggerPtr log;
std::shared_ptr<FilesystemCacheLog> cache_log;
String query_id;
String source_path;
FileCacheUserInfo user;
const String query_id;
const String source_path;
FileSegmentsHolderPtr file_segments;
@ -99,11 +101,12 @@ private:
String source_path;
FileCacheKey key;
size_t current_download_offset = 0;
const String query_id;
const FileCacheUserInfo user;
const size_t reserve_space_lock_wait_timeout_milliseconds;
const bool throw_on_error_from_cache;
bool throw_on_error_from_cache;
size_t current_download_offset = 0;
bool cache_in_error_state_or_disabled = false;
std::unique_ptr<FileSegmentRangeWriter> cache_writer;

View File

@ -100,6 +100,7 @@ struct ReadSettings
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
bool enable_filesystem_cache_log = false;
size_t filesystem_cache_segments_batch_size = 20;
size_t filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = 1000;
bool use_page_cache_for_disks_without_file_cache = false;
bool read_from_page_cache_if_exists_otherwise_bypass_cache = false;

View File

@ -20,6 +20,7 @@ struct WriteSettings
bool enable_filesystem_cache_on_write_operations = false;
bool enable_filesystem_cache_log = false;
bool throw_on_error_from_cache = false;
size_t filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = 1000;
bool s3_allow_parallel_part_upload = true;

View File

@ -27,6 +27,7 @@ namespace ProfileEvents
extern const Event FilesystemCacheReserveMicroseconds;
extern const Event FilesystemCacheGetOrSetMicroseconds;
extern const Event FilesystemCacheGetMicroseconds;
extern const Event FilesystemCacheFailToReserveSpaceBecauseOfLockContention;
}
namespace DB
@ -188,9 +189,9 @@ CacheGuard::Lock FileCache::lockCache() const
return cache_guard.lock();
}
CacheGuard::Lock FileCache::tryLockCache() const
CacheGuard::Lock FileCache::tryLockCache(std::optional<std::chrono::milliseconds> acquire_timeout) const
{
return cache_guard.tryLock();
return acquire_timeout.has_value() ? cache_guard.tryLockFor(acquire_timeout.value()) : cache_guard.tryLock();
}
FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const
@ -776,12 +777,18 @@ bool FileCache::tryReserve(
FileSegment & file_segment,
const size_t size,
FileCacheReserveStat & reserve_stat,
const UserInfo & user)
const UserInfo & user,
size_t lock_wait_timeout_milliseconds)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheReserveMicroseconds);
assertInitialized();
auto cache_lock = lockCache();
auto cache_lock = tryLockCache(std::chrono::milliseconds(lock_wait_timeout_milliseconds));
if (!cache_lock)
{
ProfileEvents::increment(ProfileEvents::FilesystemCacheFailToReserveSpaceBecauseOfLockContention);
return false;
}
LOG_TEST(
log, "Trying to reserve space ({} bytes) for {}:{}, current usage {}/{}",

View File

@ -161,7 +161,8 @@ public:
FileSegment & file_segment,
size_t size,
FileCacheReserveStat & stat,
const UserInfo & user);
const UserInfo & user,
size_t lock_wait_timeout_milliseconds);
std::vector<FileSegment::Info> getFileSegmentInfos(const UserID & user_id);
@ -173,7 +174,7 @@ public:
void deactivateBackgroundOperations();
CacheGuard::Lock lockCache() const;
CacheGuard::Lock tryLockCache() const;
CacheGuard::Lock tryLockCache(std::optional<std::chrono::milliseconds> acquire_timeout = std::nullopt) const;
std::vector<FileSegment::Info> sync();

View File

@ -497,7 +497,7 @@ LockedKeyPtr FileSegment::lockKeyMetadata(bool assert_exists) const
return metadata->tryLock();
}
bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve_stat)
bool FileSegment::reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat)
{
if (!size_to_reserve)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Zero space reservation is not allowed");
@ -549,7 +549,7 @@ bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve
if (!reserve_stat)
reserve_stat = &dummy_stat;
bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user);
bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user, lock_wait_timeout_milliseconds);
if (!reserved)
setDownloadFailedUnlocked(lockFileSegment());

View File

@ -199,7 +199,7 @@ public:
/// Try to reserve exactly `size` bytes (in addition to the getDownloadedSize() bytes already downloaded).
/// Returns true if reservation was successful, false otherwise.
bool reserve(size_t size_to_reserve, FileCacheReserveStat * reserve_stat = nullptr);
bool reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat = nullptr);
/// Write data into reserved space.
void write(const char * from, size_t size, size_t offset);

View File

@ -61,17 +61,26 @@ namespace DB
*/
struct CacheGuard : private boost::noncopyable
{
using Mutex = std::timed_mutex;
/// struct is used (not keyword `using`) to make CacheGuard::Lock non-interchangable with other guards locks
/// so, we wouldn't be able to pass CacheGuard::Lock to a function which accepts KeyGuard::Lock, for example
struct Lock : public std::unique_lock<std::mutex>
struct Lock : public std::unique_lock<Mutex>
{
using Base = std::unique_lock<std::mutex>;
using Base = std::unique_lock<Mutex>;
using Base::Base;
};
Lock lock() { return Lock(mutex); }
Lock tryLock() { return Lock(mutex, std::try_to_lock); }
std::mutex mutex;
Lock tryLockFor(const std::chrono::milliseconds & acquire_timeout)
{
return Lock(mutex, std::chrono::duration<double, std::milli>(acquire_timeout));
}
private:
Mutex mutex;
};
/**

View File

@ -1,6 +1,7 @@
#include <Interpreters/Cache/Metadata.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <filesystem>
@ -693,6 +694,9 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
reader->set(memory->data(), memory->size());
}
const auto reserve_space_lock_wait_timeout_milliseconds =
Context::getGlobalContextInstance()->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
size_t offset = file_segment.getCurrentWriteOffset();
if (offset != static_cast<size_t>(reader->getPosition()))
reader->seek(offset, SEEK_SET);
@ -701,7 +705,7 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
{
auto size = reader->available();
if (!file_segment.reserve(size))
if (!file_segment.reserve(size, reserve_space_lock_wait_timeout_milliseconds))
{
LOG_TEST(
log, "Failed to reserve space during background download "

View File

@ -1,6 +1,7 @@
#include <Interpreters/Cache/WriteBufferToFileSegment.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Context.h>
#include <IO/SwapHelper.h>
#include <IO/ReadBufferFromFile.h>
@ -18,9 +19,22 @@ namespace ErrorCodes
extern const int NOT_ENOUGH_SPACE;
}
namespace
{
size_t getCacheLockWaitTimeout()
{
auto query_context = CurrentThread::getQueryContext();
if (query_context)
return query_context->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
else
return Context::getGlobalContextInstance()->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
}
}
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_)
: WriteBufferFromFileDecorator(std::make_unique<WriteBufferFromFile>(file_segment_->getPath()))
, file_segment(file_segment_)
, reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout())
{
}
@ -31,6 +45,7 @@ WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegmentsHolderPtr segment
: throw Exception(ErrorCodes::LOGICAL_ERROR, "WriteBufferToFileSegment can be created only from single segment"))
, file_segment(&segment_holder_->front())
, segment_holder(std::move(segment_holder_))
, reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout())
{
}
@ -49,7 +64,7 @@ void WriteBufferToFileSegment::nextImpl()
FileCacheReserveStat reserve_stat;
/// In case of an error, we don't need to finalize the file segment
/// because it will be deleted soon and completed in the holder's destructor.
bool ok = file_segment->reserve(bytes_to_write, &reserve_stat);
bool ok = file_segment->reserve(bytes_to_write, reserve_space_lock_wait_timeout_milliseconds, &reserve_stat);
if (!ok)
{

View File

@ -28,6 +28,8 @@ private:
/// Empty if file_segment is not owned by this WriteBufferToFileSegment
FileSegmentsHolderPtr segment_holder;
const size_t reserve_space_lock_wait_timeout_milliseconds;
};

View File

@ -5166,6 +5166,7 @@ ReadSettings Context::getReadSettings() const
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.filesystem_cache_segments_batch_size = settings.filesystem_cache_segments_batch_size;
res.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size;
res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache;
@ -5214,6 +5215,7 @@ WriteSettings Context::getWriteSettings() const
res.enable_filesystem_cache_on_write_operations = settings.enable_filesystem_cache_on_write_operations;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.throw_on_error_from_cache = settings.throw_on_error_from_cache_on_write_operations;
res.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload;

View File

@ -245,7 +245,7 @@ void download(FileSegment & file_segment)
ASSERT_EQ(file_segment.state(), State::DOWNLOADING);
ASSERT_EQ(file_segment.getDownloadedSize(), 0);
ASSERT_TRUE(file_segment.reserve(file_segment.range().size()));
ASSERT_TRUE(file_segment.reserve(file_segment.range().size(), 1000));
download(cache_base_path, file_segment);
ASSERT_EQ(file_segment.state(), State::DOWNLOADING);
@ -257,7 +257,7 @@ void assertDownloadFails(FileSegment & file_segment)
{
ASSERT_EQ(file_segment.getOrSetDownloader(), FileSegment::getCallerId());
ASSERT_EQ(file_segment.getDownloadedSize(), 0);
ASSERT_FALSE(file_segment.reserve(file_segment.range().size()));
ASSERT_FALSE(file_segment.reserve(file_segment.range().size(), 1000));
file_segment.complete();
}
@ -956,7 +956,7 @@ TEST_F(FileCacheTest, temporaryData)
for (auto & segment : *some_data_holder)
{
ASSERT_TRUE(segment->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segment->reserve(segment->range().size()));
ASSERT_TRUE(segment->reserve(segment->range().size(), 1000));
download(*segment);
segment->complete();
}

View File

@ -1451,8 +1451,7 @@ void StorageS3::Configuration::connect(const ContextPtr & context)
auth_settings.expiration_window_seconds.value_or(
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
},
credentials.GetSessionToken());
});
}
void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection)

View File

@ -0,0 +1,12 @@
<clickhouse>
<remote_servers>
<localhost_cluster>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</localhost_cluster>
</remote_servers>
</clickhouse>

View File

@ -3,7 +3,9 @@ from helpers.cluster import ClickHouseCluster
import logging
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/overrides.xml"])
node = cluster.add_instance(
"node", main_configs=["configs/overrides.xml", "configs/clusters.xml"]
)
@pytest.fixture(scope="module")
@ -23,7 +25,7 @@ def test_distibuted_settings(start_cluster):
node.query(
"""
CREATE TABLE data_1 (key Int) ENGINE Memory();
CREATE TABLE dist_1 as data_1 ENGINE Distributed(default, default, data_1) SETTINGS flush_on_detach = true;
CREATE TABLE dist_1 as data_1 ENGINE Distributed(localhost_cluster, default, data_1) SETTINGS flush_on_detach = true;
SYSTEM STOP DISTRIBUTED SENDS dist_1;
INSERT INTO dist_1 SETTINGS prefer_localhost_replica=0 VALUES (1);
DETACH TABLE dist_1;
@ -36,7 +38,7 @@ def test_distibuted_settings(start_cluster):
node.query(
"""
CREATE TABLE data_2 (key Int) ENGINE Memory();
CREATE TABLE dist_2 as data_2 ENGINE Distributed(default, default, data_2);
CREATE TABLE dist_2 as data_2 ENGINE Distributed(localhost_cluster, default, data_2);
SYSTEM STOP DISTRIBUTED SENDS dist_2;
INSERT INTO dist_2 SETTINGS prefer_localhost_replica=0 VALUES (2);
DETACH TABLE dist_2;

View File

@ -7,4 +7,8 @@
</hdd_blob>
</disks>
</storage_configuration>
<filesystem_cache_log>
<database>system</database>
<table>filesystem_cache_log</table>
</filesystem_cache_log>
</clickhouse>

View File

@ -426,3 +426,78 @@ def test_force_filesystem_cache_on_merges(cluster):
test(node, True)
node = cluster.instances["node"]
test(node, False)
def test_system_sync_filesystem_cache(cluster):
node = cluster.instances["node"]
node.query(
"""
DROP TABLE IF EXISTS test;
CREATE TABLE test (a Int32, b String)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(type = cache,
max_size = '100Ki',
path = "test_system_sync_filesystem_cache",
delayed_cleanup_interval_ms = 10000000, disk = hdd_blob),
min_bytes_for_wide_part = 10485760;
INSERT INTO test SELECT 1, 'test';
"""
)
query_id = "system_sync_filesystem_cache_1"
node.query(
"SELECT * FROM test FORMAT Null SETTINGS enable_filesystem_cache_log = 1",
query_id=query_id,
)
key, offset = (
node.query(
f"""
SYSTEM FLUSH LOGS;
SELECT key, offset FROM system.filesystem_cache_log WHERE query_id = '{query_id}' ORDER BY size DESC LIMIT 1;
"""
)
.strip()
.split("\t")
)
cache_path = node.query(
f"SELECT cache_path FROM system.filesystem_cache WHERE key = '{key}' and file_segment_range_begin = {offset}"
)
node.exec_in_container(["bash", "-c", f"rm {cache_path}"])
assert key in node.query("SYSTEM SYNC FILESYSTEM CACHE")
node.query("SELECT * FROM test FORMAT Null")
assert key not in node.query("SYSTEM SYNC FILESYSTEM CACHE")
query_id = "system_sync_filesystem_cache_2"
node.query(
"SELECT * FROM test FORMAT Null SETTINGS enable_filesystem_cache_log = 1",
query_id=query_id,
)
key, offset = (
node.query(
f"""
SYSTEM FLUSH LOGS;
SELECT key, offset FROM system.filesystem_cache_log WHERE query_id = '{query_id}' ORDER BY size DESC LIMIT 1;
"""
)
.strip()
.split("\t")
)
cache_path = node.query(
f"SELECT cache_path FROM system.filesystem_cache WHERE key = '{key}' and file_segment_range_begin = {offset}"
)
node.exec_in_container(["bash", "-c", f"echo -n 'fff' > {cache_path}"])
assert key in node.query("SYSTEM SYNC FILESYSTEM CACHE")
node.query("SELECT * FROM test FORMAT Null")
assert key not in node.query("SYSTEM SYNC FILESYSTEM CACHE")

View File

@ -41,7 +41,7 @@ def test_memory_tracking_total():
[
"bash",
"-c",
"clickhouse local -q \"SELECT arrayStringConcat(arrayMap(x->toString(cityHash64(x)), range(1000)), ' ') from numbers(10000)\" > data.json",
"clickhouse local -q \"SELECT arrayStringConcat(arrayMap(x->toString(cityHash64(x)), range(1000)), ' ') from numbers(10000)\" > data.jsonl",
]
)
@ -56,7 +56,7 @@ def test_memory_tracking_total():
"--show-error",
"--data-binary",
"@data.json",
"http://127.1:8123/?query=INSERT%20INTO%20null%20FORMAT%20TSV",
"http://127.1:8123/?query=INSERT%20INTO%20null%20FORMAT%20JSONEachRow",
]
)
== ""

View File

@ -1,69 +0,0 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel, no-s3-storage, no-random-settings
# set -x
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm --query """
DROP TABLE IF EXISTS test;
CREATE TABLE test (a Int32, b String)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(type = cache, max_size = '100Ki', path = ${CLICKHOUSE_TEST_UNIQUE_NAME}, delayed_cleanup_interval_ms = 10000000, disk = s3_disk), min_bytes_for_wide_part = 10485760;
INSERT INTO test SELECT 1, 'test';
"""
query_id=$RANDOM
$CLICKHOUSE_CLIENT --query_id "$query_id" --query "SELECT * FROM test FORMAT Null SETTINGS enable_filesystem_cache_log = 1"
${CLICKHOUSE_CLIENT} -q "system flush logs"
key=$($CLICKHOUSE_CLIENT -nm --query """
SELECT key FROM system.filesystem_cache_log WHERE query_id = '$query_id' ORDER BY size DESC LIMIT 1;
""")
offset=$($CLICKHOUSE_CLIENT -nm --query """
SELECT offset FROM system.filesystem_cache_log WHERE query_id = '$query_id' ORDER BY size DESC LIMIT 1;
""")
path=$($CLICKHOUSE_CLIENT -nm --query """
SELECT cache_path FROM system.filesystem_cache WHERE key = '$key' AND file_segment_range_begin = $offset;
""")
rm $path
$CLICKHOUSE_CLIENT --query "SELECT * FROM test FORMAT Null SETTINGS enable_filesystem_cache_log = 1" 2>&1 | grep -F -e "No such file or directory" > /dev/null && echo "ok" || echo "fail"
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=fatal/g')
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC FILESYSTEM CACHE" 2>&1 | grep -q "$key" && echo 'ok' || echo 'fail'
$CLICKHOUSE_CLIENT --query "SELECT * FROM test FORMAT Null"
key=$($CLICKHOUSE_CLIENT -nm --query """
SELECT key FROM system.filesystem_cache_log WHERE query_id = '$query_id' ORDER BY size DESC LIMIT 1;
""")
offset=$($CLICKHOUSE_CLIENT -nm --query """
SELECT offset FROM system.filesystem_cache_log WHERE query_id = '$query_id' ORDER BY size DESC LIMIT 1;
""")
path=$($CLICKHOUSE_CLIENT -nm --query """
SELECT cache_path FROM system.filesystem_cache WHERE key = '$key' AND file_segment_range_begin = $offset;
""")
echo -n 'fff' > $path
#cat $path
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC FILESYSTEM CACHE" 2>&1 | grep -q "$key" && echo 'ok' || echo 'fail'
$CLICKHOUSE_CLIENT --query "SELECT * FROM test FORMAT Null"
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC FILESYSTEM CACHE"

View File

@ -2692,7 +2692,9 @@ userver
utils
uuid
varPop
varPopStable
varSamp
varSampStable
variadic
variantElement
variantType