diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index 7c497baa450..877d8ff9bb7 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -1219,7 +1219,7 @@ off_t CachedOnDiskReadBufferFromFile::getPosition() void CachedOnDiskReadBufferFromFile::assertCorrectness() const { - if (!CachedObjectStorage::canUseReadThroughCache() + if (!CachedObjectStorage::canUseReadThroughCache(settings) && !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache usage is not allowed (query_id: {})", query_id); } diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 12fbbbcf747..04030fe5f8f 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -36,7 +36,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather( with_cache = settings.remote_fs_cache && settings.enable_filesystem_cache - && (!query_id.empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache); + && (!query_id.empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache || !settings.avoid_readthrough_cache_outside_query_context); } SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object) diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp index 1d24d9d5411..3e73e45638b 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp @@ -57,7 +57,7 @@ ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settin ReadSettings modified_settings{read_settings}; modified_settings.remote_fs_cache = cache; - if (!canUseReadThroughCache()) + if (!canUseReadThroughCache(read_settings)) modified_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true; return object_storage->patchSettings(modified_settings); @@ -227,8 +227,11 @@ String CachedObjectStorage::getObjectsNamespace() const return object_storage->getObjectsNamespace(); } -bool CachedObjectStorage::canUseReadThroughCache() +bool CachedObjectStorage::canUseReadThroughCache(const ReadSettings & settings) { + if (!settings.avoid_readthrough_cache_outside_query_context) + return true; + return CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() && !CurrentThread::getQueryId().empty(); diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h index b5186d39c32..ba9fbd02d94 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h @@ -112,7 +112,9 @@ public: WriteSettings getAdjustedSettingsFromMetadataFile(const WriteSettings & settings, const std::string & path) const override; - static bool canUseReadThroughCache(); + const FileCacheSettings & getCacheSettings() const { return cache_settings; } + + static bool canUseReadThroughCache(const ReadSettings & settings); private: FileCache::Key getCacheKey(const std::string & path) const; diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index e43ecd7f275..dae4261e92c 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -99,6 +99,8 @@ struct ReadSettings bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false; bool enable_filesystem_cache_log = false; bool is_file_cache_persistent = false; /// Some files can be made non-evictable. + /// Don't populate cache when the read is not part of query execution (e.g. background thread). + bool avoid_readthrough_cache_outside_query_context = true; size_t filesystem_cache_max_download_size = (128UL * 1024 * 1024 * 1024); bool skip_download_if_exceeds_query_cache = true; diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index 163a15fcfda..75395a671f4 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -85,7 +85,7 @@ public: EMPTY, /** * A newly created file segment never has DOWNLOADING state until call to getOrSetDownloader - * because each cache user might acquire multiple file segments and reads them one by one, + * because each cache user might acquire multiple file segments and read them one by one, * so only user which actually needs to read this segment earlier than others - becomes a downloader. */ DOWNLOADING, diff --git a/src/Interpreters/Cache/IFileCachePriority.h b/src/Interpreters/Cache/IFileCachePriority.h index ad63dcc7ea5..93343398783 100644 --- a/src/Interpreters/Cache/IFileCachePriority.h +++ b/src/Interpreters/Cache/IFileCachePriority.h @@ -85,6 +85,7 @@ public: virtual void removeAll(const CacheGuard::Lock &) = 0; + /// From lowest to highest priority. virtual void iterate(IterateFunc && func, const CacheGuard::Lock &) = 0; private: diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp index cfc3ff58f81..30776a8bc50 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp @@ -202,6 +202,13 @@ bool DataPartStorageOnDiskBase::isStoredOnRemoteDisk() const return volume->getDisk()->isRemote(); } +std::optional DataPartStorageOnDiskBase::getCacheName() const +{ + if (volume->getDisk()->supportsCache()) + return volume->getDisk()->getCacheName(); + return std::nullopt; +} + bool DataPartStorageOnDiskBase::supportZeroCopyReplication() const { return volume->getDisk()->supportZeroCopyReplication(); diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.h b/src/Storages/MergeTree/DataPartStorageOnDiskBase.h index 6b27b7296fc..043953eb20c 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.h +++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.h @@ -36,6 +36,7 @@ public: std::string getDiskName() const override; std::string getDiskType() const override; bool isStoredOnRemoteDisk() const override; + std::optional getCacheName() const override; bool supportZeroCopyReplication() const override; bool supportParallelWrite() const override; bool isBroken() const override; diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index f160254350d..933c9bd9958 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -149,6 +149,7 @@ public: virtual std::string getDiskName() const = 0; virtual std::string getDiskType() const = 0; virtual bool isStoredOnRemoteDisk() const { return false; } + virtual std::optional getCacheName() const { return std::nullopt; } virtual bool supportZeroCopyReplication() const { return false; } virtual bool supportParallelWrite() const = 0; virtual bool isBroken() const = 0; diff --git a/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/storage_conf.xml b/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/storage_conf.xml index 15239041478..96d59d5633e 100644 --- a/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/storage_conf.xml +++ b/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/storage_conf.xml @@ -12,6 +12,7 @@ s3 100000000 ./cache_s3/ + 1 diff --git a/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/users.xml b/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/users.xml new file mode 100644 index 00000000000..5de169edc1e --- /dev/null +++ b/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/users.xml @@ -0,0 +1,7 @@ + + + + 1 + + + diff --git a/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py b/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py index eca18820016..72a01d278d8 100644 --- a/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py +++ b/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py @@ -19,6 +19,7 @@ def cluster(): cluster.add_instance( "node1", main_configs=["configs/config.d/storage_conf.xml"], + user_configs=["configs/config.d/users.xml"], macros={"replica": "1"}, with_minio=True, with_zookeeper=True, @@ -26,12 +27,14 @@ def cluster(): cluster.add_instance( "node2", main_configs=["configs/config.d/storage_conf.xml"], + user_configs=["configs/config.d/users.xml"], macros={"replica": "2"}, with_zookeeper=True, ) cluster.add_instance( "node3", main_configs=["configs/config.d/storage_conf.xml"], + user_configs=["configs/config.d/users.xml"], macros={"replica": "3"}, with_zookeeper=True, ) @@ -74,7 +77,7 @@ def generate_values(date_str, count, sign=1): def create_table(cluster, additional_settings=None): create_table_statement = """ - CREATE TABLE s3_test ON CLUSTER cluster( + CREATE TABLE s3_test ON CLUSTER cluster ( dt Date, id Int64, data String, @@ -95,7 +98,8 @@ def create_table(cluster, additional_settings=None): def drop_table(cluster): yield for node in list(cluster.instances.values()): - node.query("DROP TABLE IF EXISTS s3_test") + node.query("DROP TABLE IF EXISTS s3_test SYNC") + node.query("DROP TABLE IF EXISTS test_drop_table SYNC") minio = cluster.minio_client # Remove extra objects to prevent tests cascade failing