mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-14 19:45:11 +00:00
Allow to cache data for object storage table engines and data lakes using ETag and Path hash for cache key
This commit is contained in:
parent
1de0aeac33
commit
6b3ab311c3
@ -1486,6 +1486,8 @@ try
|
||||
|
||||
NamedCollectionFactory::instance().loadIfNot();
|
||||
|
||||
FileCacheFactory::instance().loadDefaultCaches(config());
|
||||
|
||||
/// Initialize main config reloader.
|
||||
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
|
||||
|
||||
|
@ -807,6 +807,7 @@ namespace ErrorCodes
|
||||
M(Bool, enable_filesystem_cache, true, "Use cache for remote filesystem. This setting does not turn on/off cache for disks (must be done via disk config), but allows to bypass cache for some queries if intended", 0) \
|
||||
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
|
||||
M(Bool, enable_filesystem_cache_log, false, "Allows to record the filesystem caching log for each query", 0) \
|
||||
M(String, filesystem_cache_name, "", "Filesystem cache name to use for stateless table engines or data lakes", 0) \
|
||||
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "Allow to use the filesystem cache in passive mode - benefit from the existing cache entries, but don't put more entries into the cache. If you set this setting for heavy ad-hoc queries and leave it disabled for short real-time queries, this will allows to avoid cache threshing by too heavy queries and to improve the overall system efficiency.", 0) \
|
||||
M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \
|
||||
M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \
|
||||
|
@ -82,7 +82,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
|
||||
{
|
||||
if (settings.remote_fs_cache->isInitialized())
|
||||
{
|
||||
auto cache_key = settings.remote_fs_cache->createKeyForPath(object_path);
|
||||
auto cache_key = settings.filecache_key.has_value() ? FileCacheKey::fromPath(object_path) : *settings.filecache_key;
|
||||
buf = std::make_unique<CachedOnDiskReadBufferFromFile>(
|
||||
object_path,
|
||||
cache_key,
|
||||
|
@ -31,7 +31,7 @@ CachedObjectStorage::CachedObjectStorage(
|
||||
|
||||
FileCache::Key CachedObjectStorage::getCacheKey(const std::string & path) const
|
||||
{
|
||||
return cache->createKeyForPath(path);
|
||||
return FileCacheKey::fromPath(path);
|
||||
}
|
||||
|
||||
ObjectStorageKey
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <string>
|
||||
#include <Core/Defines.h>
|
||||
#include <Interpreters/Cache/FileCache_fwd.h>
|
||||
#include <Interpreters/Cache/FileCacheKey.h>
|
||||
#include <Common/Throttler_fwd.h>
|
||||
#include <Common/Priority.h>
|
||||
#include <Common/Scheduler/ResourceLink.h>
|
||||
@ -101,6 +102,7 @@ struct ReadSettings
|
||||
bool enable_filesystem_cache_log = false;
|
||||
size_t filesystem_cache_segments_batch_size = 20;
|
||||
size_t filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = 1000;
|
||||
std::string filesystem_cache_name;
|
||||
|
||||
bool use_page_cache_for_disks_without_file_cache = false;
|
||||
bool read_from_page_cache_if_exists_otherwise_bypass_cache = false;
|
||||
@ -113,6 +115,7 @@ struct ReadSettings
|
||||
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
|
||||
|
||||
FileCachePtr remote_fs_cache;
|
||||
std::optional<FileCacheKey> filecache_key;
|
||||
|
||||
/// Bandwidth throttler to use during reading
|
||||
ThrottlerPtr remote_throttler;
|
||||
|
@ -122,11 +122,6 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
|
||||
query_limit = std::make_unique<FileCacheQueryLimit>();
|
||||
}
|
||||
|
||||
FileCache::Key FileCache::createKeyForPath(const String & path)
|
||||
{
|
||||
return Key(path);
|
||||
}
|
||||
|
||||
const FileCache::UserInfo & FileCache::getCommonUser()
|
||||
{
|
||||
static UserInfo user(getCommonUserID(), 0);
|
||||
@ -1168,7 +1163,7 @@ void FileCache::removeFileSegment(const Key & key, size_t offset, const UserID &
|
||||
|
||||
void FileCache::removePathIfExists(const String & path, const UserID & user_id)
|
||||
{
|
||||
removeKeyIfExists(createKeyForPath(path), user_id);
|
||||
removeKeyIfExists(Key::fromPath(path), user_id);
|
||||
}
|
||||
|
||||
void FileCache::removeAllReleasable(const UserID & user_id)
|
||||
|
@ -88,8 +88,6 @@ public:
|
||||
|
||||
const String & getBasePath() const;
|
||||
|
||||
static Key createKeyForPath(const String & path);
|
||||
|
||||
static const UserInfo & getCommonUser();
|
||||
|
||||
static const UserInfo & getInternalUser();
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include "FileCacheFactory.h"
|
||||
#include "FileCache.h"
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -43,6 +44,16 @@ FileCacheFactory::CacheByName FileCacheFactory::getAll()
|
||||
return caches_by_name;
|
||||
}
|
||||
|
||||
FileCachePtr FileCacheFactory::get(const std::string & cache_name)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
auto it = caches_by_name.find(cache_name);
|
||||
if (it == caches_by_name.end())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no cache by name `{}`", cache_name);
|
||||
return it->second->cache;
|
||||
}
|
||||
|
||||
FileCachePtr FileCacheFactory::getOrCreate(
|
||||
const std::string & cache_name,
|
||||
const FileCacheSettings & file_cache_settings,
|
||||
@ -202,4 +213,20 @@ void FileCacheFactory::clear()
|
||||
caches_by_name.clear();
|
||||
}
|
||||
|
||||
void FileCacheFactory::loadDefaultCaches(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys cache_names;
|
||||
config.keys(FILECACHE_DEFAULT_CONFIG_PATH, cache_names);
|
||||
auto * log = &Poco::Logger::get("FileCacheFactory");
|
||||
LOG_DEBUG(log, "Will load {} caches from default cache config", cache_names.size());
|
||||
for (const auto & name : cache_names)
|
||||
{
|
||||
FileCacheSettings settings;
|
||||
const auto & config_path = fmt::format("{}.{}", FILECACHE_DEFAULT_CONFIG_PATH, name);
|
||||
settings.loadFromConfig(config, config_path);
|
||||
auto cache = getOrCreate(name, settings, config_path);
|
||||
cache->initialize();
|
||||
LOG_DEBUG(log, "Loaded cache `{}` from default cache config", name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -44,6 +44,8 @@ public:
|
||||
const FileCacheSettings & file_cache_settings,
|
||||
const std::string & config_path);
|
||||
|
||||
FileCachePtr get(const std::string & cache_name);
|
||||
|
||||
FileCachePtr create(
|
||||
const std::string & cache_name,
|
||||
const FileCacheSettings & file_cache_settings,
|
||||
@ -53,8 +55,12 @@ public:
|
||||
|
||||
FileCacheDataPtr getByName(const std::string & cache_name);
|
||||
|
||||
void loadDefaultCaches(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
void updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
void remove(FileCachePtr cache);
|
||||
|
||||
void clear();
|
||||
|
||||
private:
|
||||
|
@ -12,11 +12,6 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
FileCacheKey::FileCacheKey(const std::string & path)
|
||||
: key(sipHash128(path.data(), path.size()))
|
||||
{
|
||||
}
|
||||
|
||||
FileCacheKey::FileCacheKey(const UInt128 & key_)
|
||||
: key(key_)
|
||||
{
|
||||
@ -32,6 +27,16 @@ FileCacheKey FileCacheKey::random()
|
||||
return FileCacheKey(UUIDHelpers::generateV4().toUnderType());
|
||||
}
|
||||
|
||||
FileCacheKey FileCacheKey::fromPath(const std::string & path)
|
||||
{
|
||||
return FileCacheKey(sipHash128(path.data(), path.size()));
|
||||
}
|
||||
|
||||
FileCacheKey FileCacheKey::fromKey(const UInt128 & key)
|
||||
{
|
||||
return FileCacheKey(key);
|
||||
}
|
||||
|
||||
FileCacheKey FileCacheKey::fromKeyString(const std::string & key_str)
|
||||
{
|
||||
if (key_str.size() != 32)
|
||||
|
@ -14,16 +14,16 @@ struct FileCacheKey
|
||||
|
||||
FileCacheKey() = default;
|
||||
|
||||
explicit FileCacheKey(const std::string & path);
|
||||
|
||||
explicit FileCacheKey(const UInt128 & key_);
|
||||
|
||||
static FileCacheKey random();
|
||||
static FileCacheKey fromPath(const std::string & path);
|
||||
static FileCacheKey fromKey(const UInt128 & key);
|
||||
static FileCacheKey fromKeyString(const std::string & key_str);
|
||||
|
||||
bool operator==(const FileCacheKey & other) const { return key == other.key; }
|
||||
bool operator<(const FileCacheKey & other) const { return key < other.key; }
|
||||
|
||||
static FileCacheKey fromKeyString(const std::string & key_str);
|
||||
private:
|
||||
explicit FileCacheKey(const UInt128 & key_);
|
||||
};
|
||||
|
||||
using FileCacheKeyAndOffset = std::pair<FileCacheKey, size_t>;
|
||||
|
@ -15,10 +15,12 @@ static constexpr size_t FILECACHE_BYPASS_THRESHOLD = 256 * 1024 * 1024;
|
||||
static constexpr double FILECACHE_DEFAULT_FREE_SPACE_SIZE_RATIO = 0; /// Disabled.
|
||||
static constexpr double FILECACHE_DEFAULT_FREE_SPACE_ELEMENTS_RATIO = 0; /// Disabled.
|
||||
static constexpr int FILECACHE_DEFAULT_FREE_SPACE_REMOVE_BATCH = 10;
|
||||
static constexpr auto FILECACHE_DEFAULT_CONFIG_PATH = "filesystem_caches";
|
||||
|
||||
class FileCache;
|
||||
using FileCachePtr = std::shared_ptr<FileCache>;
|
||||
|
||||
struct FileCacheSettings;
|
||||
struct FileCacheKey;
|
||||
|
||||
}
|
||||
|
@ -186,6 +186,7 @@ namespace Setting
|
||||
extern const SettingsUInt64 backup_threads;
|
||||
extern const SettingsString cluster_for_parallel_replicas;
|
||||
extern const SettingsBool enable_filesystem_cache;
|
||||
extern const SettingsString filesystem_cache_name;
|
||||
extern const SettingsBool enable_filesystem_cache_log;
|
||||
extern const SettingsBool enable_filesystem_cache_on_write_operations;
|
||||
extern const SettingsBool enable_filesystem_read_prefetches_log;
|
||||
@ -5638,6 +5639,7 @@ ReadSettings Context::getReadSettings() const
|
||||
res.filesystem_cache_segments_batch_size = settings_ref[Setting::filesystem_cache_segments_batch_size];
|
||||
res.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds
|
||||
= settings_ref[Setting::filesystem_cache_reserve_space_wait_lock_timeout_milliseconds];
|
||||
res.filesystem_cache_name = settings_ref[Setting::filesystem_cache_name];
|
||||
|
||||
res.filesystem_cache_max_download_size = settings_ref[Setting::filesystem_cache_max_download_size];
|
||||
res.skip_download_if_exceeds_query_cache = settings_ref[Setting::skip_download_if_exceeds_query_cache];
|
||||
|
@ -372,7 +372,7 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
std::cerr << "Step 1\n";
|
||||
auto cache = DB::FileCache("1", settings);
|
||||
cache.initialize();
|
||||
auto key = DB::FileCache::createKeyForPath("key1");
|
||||
auto key = DB::FileCacheKey::fromPath("key1");
|
||||
|
||||
auto get_or_set = [&](size_t offset, size_t size)
|
||||
{
|
||||
@ -736,7 +736,7 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
|
||||
auto cache2 = DB::FileCache("2", settings);
|
||||
cache2.initialize();
|
||||
auto key = DB::FileCache::createKeyForPath("key1");
|
||||
auto key = DB::FileCacheKey::fromPath("key1");
|
||||
|
||||
/// Get [2, 29]
|
||||
assertEqual(
|
||||
@ -755,7 +755,7 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
fs::create_directories(settings2.base_path);
|
||||
auto cache2 = DB::FileCache("3", settings2);
|
||||
cache2.initialize();
|
||||
auto key = DB::FileCache::createKeyForPath("key1");
|
||||
auto key = DB::FileCacheKey::fromPath("key1");
|
||||
|
||||
/// Get [0, 24]
|
||||
assertEqual(
|
||||
@ -770,7 +770,7 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
|
||||
auto cache = FileCache("4", settings);
|
||||
cache.initialize();
|
||||
const auto key = FileCache::createKeyForPath("key10");
|
||||
const auto key = FileCacheKey::fromPath("key10");
|
||||
const auto key_path = cache.getKeyPath(key, user);
|
||||
|
||||
cache.removeAllReleasable(user.user_id);
|
||||
@ -794,7 +794,7 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
|
||||
auto cache = DB::FileCache("5", settings);
|
||||
cache.initialize();
|
||||
const auto key = FileCache::createKeyForPath("key10");
|
||||
const auto key = FileCacheKey::fromPath("key10");
|
||||
const auto key_path = cache.getKeyPath(key, user);
|
||||
|
||||
cache.removeAllReleasable(user.user_id);
|
||||
@ -833,7 +833,7 @@ TEST_F(FileCacheTest, writeBuffer)
|
||||
segment_settings.kind = FileSegmentKind::Ephemeral;
|
||||
segment_settings.unbounded = true;
|
||||
|
||||
auto cache_key = FileCache::createKeyForPath(key);
|
||||
auto cache_key = FileCacheKey::fromPath(key);
|
||||
auto holder = cache.set(cache_key, 0, 3, segment_settings, user);
|
||||
/// The same is done in TemporaryDataOnDisk::createStreamToCacheFile.
|
||||
std::filesystem::create_directories(cache.getKeyPath(cache_key, user));
|
||||
@ -961,7 +961,7 @@ TEST_F(FileCacheTest, temporaryData)
|
||||
const auto user = FileCache::getCommonUser();
|
||||
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(nullptr, &file_cache, TemporaryDataOnDiskSettings{});
|
||||
|
||||
auto some_data_holder = file_cache.getOrSet(FileCache::createKeyForPath("some_data"), 0, 5_KiB, 5_KiB, CreateFileSegmentSettings{}, 0, user);
|
||||
auto some_data_holder = file_cache.getOrSet(FileCacheKey::fromPath("some_data"), 0, 5_KiB, 5_KiB, CreateFileSegmentSettings{}, 0, user);
|
||||
|
||||
{
|
||||
ASSERT_EQ(some_data_holder->size(), 5);
|
||||
@ -1103,7 +1103,7 @@ TEST_F(FileCacheTest, CachedReadBuffer)
|
||||
auto cache = std::make_shared<DB::FileCache>("8", settings);
|
||||
cache->initialize();
|
||||
|
||||
auto key = cache->createKeyForPath(file_path);
|
||||
auto key = DB::FileCacheKey::fromPath(file_path);
|
||||
const auto user = FileCache::getCommonUser();
|
||||
|
||||
{
|
||||
@ -1219,7 +1219,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
|
||||
{
|
||||
auto cache = DB::FileCache(std::to_string(++file_cache_name), settings);
|
||||
cache.initialize();
|
||||
auto key = FileCache::createKeyForPath("key1");
|
||||
auto key = FileCacheKey::fromPath("key1");
|
||||
|
||||
auto add_range = [&](size_t offset, size_t size)
|
||||
{
|
||||
@ -1342,7 +1342,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
|
||||
|
||||
std::string data1(15, '*');
|
||||
auto file1 = write_file("test1", data1);
|
||||
auto key1 = cache->createKeyForPath(file1);
|
||||
auto key1 = DB::FileCacheKey::fromPath(file1);
|
||||
|
||||
read_and_check(file1, key1, data1);
|
||||
|
||||
@ -1358,7 +1358,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
|
||||
|
||||
std::string data2(10, '*');
|
||||
auto file2 = write_file("test2", data2);
|
||||
auto key2 = cache->createKeyForPath(file2);
|
||||
auto key2 = DB::FileCacheKey::fromPath(file2);
|
||||
|
||||
read_and_check(file2, key2, data2);
|
||||
|
||||
|
@ -7,6 +7,9 @@
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Processors/Transforms/ExtractColumnsTransform.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
|
||||
#include <IO/Archives/createArchiveReader.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Formats/ReadSchemaUtils.h>
|
||||
@ -418,13 +421,38 @@ std::future<StorageObjectStorageSource::ReaderHolder> StorageObjectStorageSource
|
||||
return create_reader_scheduler([=, this] { return createReader(); }, Priority{});
|
||||
}
|
||||
|
||||
static void patchReadSettings(
|
||||
ReadSettings & read_settings,
|
||||
const StorageObjectStorage::ObjectInfo & object_info,
|
||||
const LoggerPtr & log)
|
||||
{
|
||||
if (read_settings.enable_filesystem_cache && !read_settings.filesystem_cache_name.empty())
|
||||
{
|
||||
if (object_info.metadata->etag.empty())
|
||||
{
|
||||
LOG_WARNING(log, "Cannot use filesystem cache, no etag specified");
|
||||
}
|
||||
else
|
||||
{
|
||||
SipHash hash;
|
||||
hash.update(object_info.getPath());
|
||||
hash.update(object_info.metadata->etag);
|
||||
read_settings.filecache_key = FileCacheKey::fromKey(hash.get128());
|
||||
read_settings.remote_fs_cache = FileCacheFactory::instance().get(read_settings.filesystem_cache_name);
|
||||
|
||||
LOG_TEST(log, "Using filesystem cache `{}` (path: {}, etag: {}, hash: {})",
|
||||
read_settings.filesystem_cache_name, object_info.getPath(),
|
||||
object_info.metadata->etag, toString(hash.get128()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
|
||||
const ObjectInfo & object_info, const ObjectStoragePtr & object_storage, const ContextPtr & context_, const LoggerPtr & log)
|
||||
{
|
||||
const auto & object_size = object_info.metadata->size_bytes;
|
||||
|
||||
auto read_settings = context_->getReadSettings().adjustBufferSize(object_size);
|
||||
read_settings.enable_filesystem_cache = false;
|
||||
/// FIXME: Changing this setting to default value breaks something around parquet reading
|
||||
read_settings.remote_read_min_bytes_for_seek = read_settings.remote_fs_buffer_size;
|
||||
|
||||
@ -434,6 +462,8 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
|
||||
/// User's object may change, don't cache it.
|
||||
read_settings.use_page_cache_for_disks_without_file_cache = false;
|
||||
|
||||
patchReadSettings(read_settings, object_info, log);
|
||||
|
||||
// Create a read buffer that will prefetch the first ~1 MB of the file.
|
||||
// When reading lots of tiny files, this prefetching almost doubles the throughput.
|
||||
// For bigger files, parallel reading is more useful.
|
||||
@ -452,6 +482,29 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!read_settings.filesystem_cache_name.empty())
|
||||
{
|
||||
/// TODO: fix inconsistency with readObject and readObjects...
|
||||
|
||||
auto read_buffer_creator = [&]()
|
||||
{
|
||||
return object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings);
|
||||
};
|
||||
return std::make_unique<CachedOnDiskReadBufferFromFile>(
|
||||
object_info.getPath(),
|
||||
read_settings.filecache_key.value(),
|
||||
read_settings.remote_fs_cache,
|
||||
FileCache::getCommonUser(),
|
||||
read_buffer_creator,
|
||||
read_settings,
|
||||
std::string(CurrentThread::getQueryId()),
|
||||
object_info.metadata->size_bytes,
|
||||
/* allow_seeks */false,
|
||||
/* use_external_buffer */true,
|
||||
/* read_until_position */std::nullopt,
|
||||
nullptr);
|
||||
}
|
||||
|
||||
/// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting.
|
||||
return object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings);
|
||||
}
|
||||
|
@ -401,7 +401,7 @@ Chunk SystemRemoteDataPathsSource::generate()
|
||||
|
||||
if (cache)
|
||||
{
|
||||
auto cache_paths = cache->tryGetCachePaths(cache->createKeyForPath(object.remote_path));
|
||||
auto cache_paths = cache->tryGetCachePaths(FileCacheKey::fromPath(object.remote_path));
|
||||
col_cache_paths->insert(Array(cache_paths.begin(), cache_paths.end()));
|
||||
}
|
||||
else
|
||||
|
@ -0,0 +1,8 @@
|
||||
<clickhouse>
|
||||
<filesystem_caches>
|
||||
<cache1>
|
||||
<max_size>1Gi</max_size>
|
||||
<path>cache1</path>
|
||||
</cache1>
|
||||
</filesystem_caches>
|
||||
</clickhouse>
|
@ -55,6 +55,7 @@ def started_cluster():
|
||||
"configs/named_collections.xml",
|
||||
"configs/schema_cache.xml",
|
||||
"configs/blob_log.xml",
|
||||
"configs/filesystem_caches.xml",
|
||||
],
|
||||
user_configs=[
|
||||
"configs/access.xml",
|
||||
@ -2393,3 +2394,42 @@ def test_respect_object_existence_on_partitioned_write(started_cluster):
|
||||
)
|
||||
|
||||
assert int(result) == 44
|
||||
|
||||
|
||||
def test_filesystem_cache(started_cluster):
|
||||
id = uuid.uuid4()
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"]
|
||||
table_name = "test_filesystem_cache"
|
||||
|
||||
instance.query(
|
||||
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{table_name}.tsv', auto, 'x UInt64') select number from numbers(100) SETTINGS s3_truncate_on_insert=1"
|
||||
)
|
||||
|
||||
query_id = f"{table_name}-{uuid.uuid4()}"
|
||||
instance.query(
|
||||
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{table_name}.tsv') SETTINGS filesystem_cache_name = 'cache1', enable_filesystem_cache=1",
|
||||
query_id=query_id,
|
||||
)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
count = int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['CachedReadBufferCacheWriteBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
|
||||
assert count == 290
|
||||
|
||||
query_id = f"{table_name}-{uuid.uuid4()}"
|
||||
instance.query(
|
||||
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{table_name}.tsv') SETTINGS filesystem_cache_name = 'cache1', enable_filesystem_cache=1",
|
||||
query_id=query_id,
|
||||
)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
assert count == int(instance.query(
|
||||
f"SELECT ProfileEvents['CachedReadBufferReadFromCacheBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
))
|
||||
|
Loading…
Reference in New Issue
Block a user