mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Load filesystem cache metadata asynchronously
This commit is contained in:
parent
7551c2602b
commit
f17655f13f
@ -51,7 +51,7 @@ StatusFile::StatusFile(std::string path_, FillFunction fill_)
|
||||
std::string contents;
|
||||
{
|
||||
ReadBufferFromFile in(path, 1024);
|
||||
LimitReadBuffer limit_in(in, 1024, /* trow_exception */ false, /* exact_limit */ {});
|
||||
LimitReadBuffer limit_in(in, 1024, /* throw_exception */ false, /* exact_limit */ {});
|
||||
readStringUntilEOF(contents, limit_in);
|
||||
}
|
||||
|
||||
|
@ -80,20 +80,27 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
|
||||
|
||||
if (with_file_cache)
|
||||
{
|
||||
auto cache_key = settings.remote_fs_cache->createKeyForPath(object_path);
|
||||
buf = std::make_unique<CachedOnDiskReadBufferFromFile>(
|
||||
object_path,
|
||||
cache_key,
|
||||
settings.remote_fs_cache,
|
||||
FileCache::getCommonUser(),
|
||||
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object); },
|
||||
settings,
|
||||
query_id,
|
||||
object.bytes_size,
|
||||
/* allow_seeks */false,
|
||||
/* use_external_buffer */true,
|
||||
/* read_until_position */std::nullopt,
|
||||
cache_log);
|
||||
if (settings.remote_fs_cache->isInitialized())
|
||||
{
|
||||
auto cache_key = settings.remote_fs_cache->createKeyForPath(object_path);
|
||||
buf = std::make_unique<CachedOnDiskReadBufferFromFile>(
|
||||
object_path,
|
||||
cache_key,
|
||||
settings.remote_fs_cache,
|
||||
FileCache::getCommonUser(),
|
||||
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object); },
|
||||
settings,
|
||||
query_id,
|
||||
object.bytes_size,
|
||||
/* allow_seeks */false,
|
||||
/* use_external_buffer */true,
|
||||
/* read_until_position */std::nullopt,
|
||||
cache_log);
|
||||
}
|
||||
else
|
||||
{
|
||||
settings.remote_fs_cache->throwInitExceptionIfNeeded();
|
||||
}
|
||||
}
|
||||
|
||||
/// Can't wrap CachedOnDiskReadBufferFromFile in CachedInMemoryReadBufferFromFile because the
|
||||
|
@ -99,7 +99,7 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
|
||||
/// Need to remove even if cache_on_write == false.
|
||||
removeCacheIfExists(object.remote_path);
|
||||
|
||||
if (cache_on_write)
|
||||
if (cache_on_write && cache->isInitialized())
|
||||
{
|
||||
auto key = getCacheKey(object.remote_path);
|
||||
return std::make_unique<CachedOnDiskWriteBufferFromFile>(
|
||||
@ -122,7 +122,8 @@ void CachedObjectStorage::removeCacheIfExists(const std::string & path_key_for_c
|
||||
return;
|
||||
|
||||
/// Add try catch?
|
||||
cache->removeKeyIfExists(getCacheKey(path_key_for_cache), FileCache::getCommonUser().user_id);
|
||||
if (cache->isInitialized())
|
||||
cache->removeKeyIfExists(getCacheKey(path_key_for_cache), FileCache::getCommonUser().user_id);
|
||||
}
|
||||
|
||||
void CachedObjectStorage::removeObject(const StoredObject & object)
|
||||
|
@ -11,11 +11,15 @@
|
||||
#include <Interpreters/Cache/EvictionCandidates.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <base/hex.h>
|
||||
#include <Common/callOnce.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
||||
#include <Core/ServerUUID.h>
|
||||
|
||||
#include <exception>
|
||||
#include <filesystem>
|
||||
#include <mutex>
|
||||
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
@ -88,6 +92,7 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
|
||||
, bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0)
|
||||
, boundary_alignment(settings.boundary_alignment)
|
||||
, load_metadata_threads(settings.load_metadata_threads)
|
||||
, load_metadata_asynchronously(settings.load_metadata_asynchronously)
|
||||
, write_cache_per_user_directory(settings.write_cache_per_user_id_directory)
|
||||
, keep_current_size_to_max_ratio(1 - settings.keep_free_space_size_ratio)
|
||||
, keep_current_elements_to_max_ratio(1 - settings.keep_free_space_elements_ratio)
|
||||
@ -136,7 +141,17 @@ const FileCache::UserInfo & FileCache::getInternalUser()
|
||||
|
||||
bool FileCache::isInitialized() const
|
||||
{
|
||||
return is_initialized.load(std::memory_order_seq_cst);
|
||||
return is_initialized;
|
||||
}
|
||||
|
||||
void FileCache::throwInitExceptionIfNeeded()
|
||||
{
|
||||
if (load_metadata_asynchronously)
|
||||
return;
|
||||
|
||||
std::lock_guard lock(init_mutex);
|
||||
if (init_exception)
|
||||
std::rethrow_exception(init_exception);
|
||||
}
|
||||
|
||||
const String & FileCache::getBasePath() const
|
||||
@ -170,6 +185,35 @@ void FileCache::assertInitialized() const
|
||||
}
|
||||
|
||||
void FileCache::initialize()
|
||||
{
|
||||
// Prevent initialize() from running twice. This may be caused by two cache disks being created with the same path (see integration/test_filesystem_cache).
|
||||
callOnce(initialize_called, [&] {
|
||||
bool need_to_load_metadata = fs::exists(getBasePath());
|
||||
try
|
||||
{
|
||||
if (!need_to_load_metadata)
|
||||
fs::create_directories(getBasePath());
|
||||
status_file = make_unique<StatusFile>(fs::path(getBasePath()) / "status", StatusFile::write_full_info);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
init_exception = std::current_exception();
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
throw;
|
||||
}
|
||||
|
||||
if (load_metadata_asynchronously)
|
||||
{
|
||||
load_metadata_main_thread = ThreadFromGlobalPool([this, need_to_load_metadata] { initializeImpl(need_to_load_metadata); });
|
||||
}
|
||||
else
|
||||
{
|
||||
initializeImpl(need_to_load_metadata);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void FileCache::initializeImpl(bool load_metadata)
|
||||
{
|
||||
std::lock_guard lock(init_mutex);
|
||||
|
||||
@ -178,16 +222,10 @@ void FileCache::initialize()
|
||||
|
||||
try
|
||||
{
|
||||
if (fs::exists(getBasePath()))
|
||||
{
|
||||
if (load_metadata)
|
||||
loadMetadata();
|
||||
}
|
||||
else
|
||||
{
|
||||
fs::create_directories(getBasePath());
|
||||
}
|
||||
|
||||
status_file = make_unique<StatusFile>(fs::path(getBasePath()) / "status", StatusFile::write_full_info);
|
||||
metadata.startup();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -196,8 +234,6 @@ void FileCache::initialize()
|
||||
throw;
|
||||
}
|
||||
|
||||
metadata.startup();
|
||||
|
||||
if (keep_current_size_to_max_ratio != 1 || keep_current_elements_to_max_ratio != 1)
|
||||
{
|
||||
keep_up_free_space_ratio_task = Context::getGlobalContextInstance()->getSchedulePool().createTask(log->name(), [this] { freeSpaceRatioKeepingThreadFunc(); });
|
||||
@ -205,6 +241,7 @@ void FileCache::initialize()
|
||||
}
|
||||
|
||||
is_initialized = true;
|
||||
LOG_TEST(log, "Initialized cache from {}", metadata.getBaseDirectory());
|
||||
}
|
||||
|
||||
CachePriorityGuard::Lock FileCache::lockCache() const
|
||||
@ -1185,7 +1222,6 @@ void FileCache::loadMetadataImpl()
|
||||
std::vector<ThreadFromGlobalPool> loading_threads;
|
||||
std::exception_ptr first_exception;
|
||||
std::mutex set_exception_mutex;
|
||||
std::atomic<bool> stop_loading = false;
|
||||
|
||||
LOG_INFO(log, "Loading filesystem cache with {} threads from {}", load_metadata_threads, metadata.getBaseDirectory());
|
||||
|
||||
@ -1195,7 +1231,7 @@ void FileCache::loadMetadataImpl()
|
||||
{
|
||||
loading_threads.emplace_back([&]
|
||||
{
|
||||
while (!stop_loading)
|
||||
while (!stop_loading_metadata)
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -1212,7 +1248,7 @@ void FileCache::loadMetadataImpl()
|
||||
if (!first_exception)
|
||||
first_exception = std::current_exception();
|
||||
}
|
||||
stop_loading = true;
|
||||
stop_loading_metadata = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -1225,7 +1261,7 @@ void FileCache::loadMetadataImpl()
|
||||
if (!first_exception)
|
||||
first_exception = std::current_exception();
|
||||
}
|
||||
stop_loading = true;
|
||||
stop_loading_metadata = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -1412,6 +1448,11 @@ FileCache::~FileCache()
|
||||
void FileCache::deactivateBackgroundOperations()
|
||||
{
|
||||
shutdown.store(true);
|
||||
|
||||
stop_loading_metadata = true;
|
||||
if (load_metadata_main_thread.joinable())
|
||||
load_metadata_main_thread.join();
|
||||
|
||||
metadata.shutdown();
|
||||
if (keep_up_free_space_ratio_task)
|
||||
keep_up_free_space_ratio_task->deactivate();
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
#include <IO/ReadSettings.h>
|
||||
|
||||
#include <Common/callOnce.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/StatusFile.h>
|
||||
#include <Interpreters/Cache/LRUFileCachePriority.h>
|
||||
@ -82,6 +83,9 @@ public:
|
||||
|
||||
bool isInitialized() const;
|
||||
|
||||
/// Throws if `!load_metadata_asynchronously` and there is an exception in `init_exception`
|
||||
void throwInitExceptionIfNeeded();
|
||||
|
||||
const String & getBasePath() const;
|
||||
|
||||
static Key createKeyForPath(const String & path);
|
||||
@ -198,6 +202,9 @@ private:
|
||||
const size_t bypass_cache_threshold;
|
||||
const size_t boundary_alignment;
|
||||
size_t load_metadata_threads;
|
||||
const bool load_metadata_asynchronously;
|
||||
std::atomic<bool> stop_loading_metadata = false;
|
||||
ThreadFromGlobalPool load_metadata_main_thread;
|
||||
const bool write_cache_per_user_directory;
|
||||
|
||||
BackgroundSchedulePool::TaskHolder keep_up_free_space_ratio_task;
|
||||
@ -209,6 +216,7 @@ private:
|
||||
|
||||
std::exception_ptr init_exception;
|
||||
std::atomic<bool> is_initialized = false;
|
||||
OnceFlag initialize_called;
|
||||
mutable std::mutex init_mutex;
|
||||
std::unique_ptr<StatusFile> status_file;
|
||||
std::atomic<bool> shutdown = false;
|
||||
@ -246,6 +254,8 @@ private:
|
||||
*/
|
||||
FileCacheQueryLimitPtr query_limit;
|
||||
|
||||
void initializeImpl(bool load_metadata);
|
||||
|
||||
void assertInitialized() const;
|
||||
void assertCacheCorrectness();
|
||||
|
||||
|
@ -65,6 +65,9 @@ void FileCacheSettings::loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetStrin
|
||||
if (has("load_metadata_threads"))
|
||||
load_metadata_threads = get_uint("load_metadata_threads");
|
||||
|
||||
if (has("load_metadata_asynchronously"))
|
||||
load_metadata_asynchronously = get_uint("load_metadata_asynchronously");
|
||||
|
||||
if (boundary_alignment > max_file_segment_size)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `boundary_alignment` cannot exceed `max_file_segment_size`");
|
||||
|
||||
|
@ -32,6 +32,7 @@ struct FileCacheSettings
|
||||
size_t background_download_queue_size_limit = FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_QUEUE_SIZE_LIMIT;
|
||||
|
||||
size_t load_metadata_threads = FILECACHE_DEFAULT_LOAD_METADATA_THREADS;
|
||||
bool load_metadata_asynchronously = false;
|
||||
|
||||
bool write_cache_per_user_id_directory = false;
|
||||
|
||||
|
@ -20,6 +20,7 @@ static Block getSampleBlock()
|
||||
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "max_size"},
|
||||
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "max_elements"},
|
||||
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "max_file_segment_size"},
|
||||
ColumnWithTypeAndName{std::make_shared<DataTypeUInt8>(), "is_initialized"},
|
||||
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "boundary_alignment"},
|
||||
ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt8>>(), "cache_on_write_operations"},
|
||||
ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt8>>(), "cache_hits_threshold"},
|
||||
@ -50,6 +51,7 @@ BlockIO InterpreterDescribeCacheQuery::execute()
|
||||
res_columns[i++]->insert(settings.max_size);
|
||||
res_columns[i++]->insert(settings.max_elements);
|
||||
res_columns[i++]->insert(settings.max_file_segment_size);
|
||||
res_columns[i++]->insert(cache->isInitialized());
|
||||
res_columns[i++]->insert(settings.boundary_alignment);
|
||||
res_columns[i++]->insert(settings.cache_on_write_operations);
|
||||
res_columns[i++]->insert(settings.cache_hits_threshold);
|
||||
|
@ -65,7 +65,7 @@ TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, Cu
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> TemporaryDataOnDisk::createRawStream(size_t max_file_size)
|
||||
{
|
||||
if (file_cache)
|
||||
if (file_cache && file_cache->isInitialized())
|
||||
{
|
||||
auto holder = createCacheFile(max_file_size);
|
||||
return std::make_unique<WriteBufferToFileSegment>(std::move(holder));
|
||||
@ -81,7 +81,7 @@ std::unique_ptr<WriteBufferFromFileBase> TemporaryDataOnDisk::createRawStream(si
|
||||
|
||||
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size)
|
||||
{
|
||||
if (file_cache)
|
||||
if (file_cache && file_cache->isInitialized())
|
||||
{
|
||||
auto holder = createCacheFile(max_file_size);
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <algorithm>
|
||||
#include <numeric>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
#include <Core/ServerUUID.h>
|
||||
#include <Common/iota.h>
|
||||
@ -42,6 +43,7 @@
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <base/scope_guard.h>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
namespace fs = std::filesystem;
|
||||
using namespace DB;
|
||||
|
||||
@ -358,9 +360,11 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
settings.max_size = 30;
|
||||
settings.max_elements = 5;
|
||||
settings.boundary_alignment = 1;
|
||||
settings.load_metadata_asynchronously = false;
|
||||
|
||||
const size_t file_size = INT_MAX; // the value doesn't really matter because boundary_alignment == 1.
|
||||
|
||||
|
||||
const auto user = FileCache::getCommonUser();
|
||||
{
|
||||
std::cerr << "Step 1\n";
|
||||
@ -815,6 +819,7 @@ TEST_F(FileCacheTest, writeBuffer)
|
||||
settings.max_elements = 5;
|
||||
settings.max_file_segment_size = 5;
|
||||
settings.base_path = cache_base_path;
|
||||
settings.load_metadata_asynchronously = false;
|
||||
|
||||
FileCache cache("6", settings);
|
||||
cache.initialize();
|
||||
@ -946,6 +951,7 @@ TEST_F(FileCacheTest, temporaryData)
|
||||
settings.max_size = 10_KiB;
|
||||
settings.max_file_segment_size = 1_KiB;
|
||||
settings.base_path = cache_base_path;
|
||||
settings.load_metadata_asynchronously = false;
|
||||
|
||||
DB::FileCache file_cache("7", settings);
|
||||
file_cache.initialize();
|
||||
@ -1073,6 +1079,7 @@ TEST_F(FileCacheTest, CachedReadBuffer)
|
||||
settings.max_size = 30;
|
||||
settings.max_elements = 10;
|
||||
settings.boundary_alignment = 1;
|
||||
settings.load_metadata_asynchronously = false;
|
||||
|
||||
ReadSettings read_settings;
|
||||
read_settings.enable_filesystem_cache = true;
|
||||
@ -1092,6 +1099,7 @@ TEST_F(FileCacheTest, CachedReadBuffer)
|
||||
|
||||
auto cache = std::make_shared<DB::FileCache>("8", settings);
|
||||
cache->initialize();
|
||||
|
||||
auto key = cache->createKeyForPath(file_path);
|
||||
const auto user = FileCache::getCommonUser();
|
||||
|
||||
@ -1132,6 +1140,7 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
|
||||
settings.max_size = 10_KiB;
|
||||
settings.max_file_segment_size = 1_KiB;
|
||||
settings.base_path = cache_base_path;
|
||||
settings.load_metadata_asynchronously = false;
|
||||
|
||||
DB::FileCache file_cache("cache", settings);
|
||||
file_cache.initialize();
|
||||
@ -1195,6 +1204,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
|
||||
settings.max_size = 40;
|
||||
settings.max_elements = 6;
|
||||
settings.boundary_alignment = 1;
|
||||
settings.load_metadata_asynchronously = false;
|
||||
|
||||
settings.cache_policy = "SLRU";
|
||||
settings.slru_size_ratio = 0.5;
|
||||
@ -1307,6 +1317,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
|
||||
settings2.boundary_alignment = 1;
|
||||
settings2.cache_policy = "SLRU";
|
||||
settings2.slru_size_ratio = 0.5;
|
||||
settings.load_metadata_asynchronously = false;
|
||||
|
||||
auto cache = std::make_shared<DB::FileCache>("slru_2", settings2);
|
||||
cache->initialize();
|
||||
|
@ -47,6 +47,9 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
|
||||
for (const auto & [cache_name, cache_data] : caches)
|
||||
{
|
||||
const auto & cache = cache_data->cache;
|
||||
if (!cache->isInitialized())
|
||||
continue;
|
||||
|
||||
cache->iterate([&](const FileSegment::Info & file_segment)
|
||||
{
|
||||
size_t i = 0;
|
||||
|
@ -21,6 +21,7 @@ ColumnsDescription StorageSystemFilesystemCacheSettings::getColumnsDescription()
|
||||
{"path", std::make_shared<DataTypeString>(), "Cache directory"},
|
||||
{"max_size", std::make_shared<DataTypeUInt64>(), "Cache size limit by the number of bytes"},
|
||||
{"max_elements", std::make_shared<DataTypeUInt64>(), "Cache size limit by the number of elements"},
|
||||
{"is_initialized", std::make_shared<DataTypeUInt8>(), "Whether the cache is initialized and ready to be used"},
|
||||
{"current_size", std::make_shared<DataTypeUInt64>(), "Current cache size by the number of bytes"},
|
||||
{"current_elements", std::make_shared<DataTypeUInt64>(), "Current cache size by the number of elements"},
|
||||
{"max_file_segment_size", std::make_shared<DataTypeUInt64>(), "Maximum allowed file segment size"},
|
||||
@ -56,6 +57,7 @@ void StorageSystemFilesystemCacheSettings::fillData(
|
||||
res_columns[i++]->insert(settings.base_path);
|
||||
res_columns[i++]->insert(settings.max_size);
|
||||
res_columns[i++]->insert(settings.max_elements);
|
||||
res_columns[i++]->insert(cache->isInitialized());
|
||||
res_columns[i++]->insert(cache->getUsedCacheSize());
|
||||
res_columns[i++]->insert(cache->getFileSegmentsNum());
|
||||
res_columns[i++]->insert(settings.max_file_segment_size);
|
||||
|
@ -27,6 +27,7 @@
|
||||
<slru_size_ratio>0.3</slru_size_ratio>
|
||||
<keep_free_space_size_ratio>0.15</keep_free_space_size_ratio>
|
||||
<keep_free_space_elements_ratio>0.15</keep_free_space_elements_ratio>
|
||||
<load_metadata_asynchronously>0</load_metadata_asynchronously>
|
||||
</s3_cache>
|
||||
<s3_cache_02933>
|
||||
<type>cache</type>
|
||||
@ -37,6 +38,7 @@
|
||||
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
|
||||
<background_download_threads>0</background_download_threads>
|
||||
<background_download_queue_size_limit>0</background_download_queue_size_limit>
|
||||
<load_metadata_asynchronously>0</load_metadata_asynchronously>
|
||||
</s3_cache_02933>
|
||||
<!-- local disks -->
|
||||
<local_disk>
|
||||
|
@ -19,6 +19,7 @@
|
||||
<boundary_alignment>10</boundary_alignment>
|
||||
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
|
||||
<cache_on_write_operations>0</cache_on_write_operations>
|
||||
<load_metadata_asynchronously>0</load_metadata_asynchronously>
|
||||
</s3_cache_02944>
|
||||
</disks>
|
||||
</storage_configuration>
|
||||
|
@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import time
|
||||
import os
|
||||
import random
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
@ -30,14 +31,6 @@ def cluster():
|
||||
"config.d/storage_conf_2.xml",
|
||||
],
|
||||
)
|
||||
cluster.add_instance(
|
||||
"node_no_filesystem_caches_path",
|
||||
main_configs=[
|
||||
"config.d/storage_conf.xml",
|
||||
"config.d/remove_filesystem_caches_path.xml",
|
||||
],
|
||||
stay_alive=True,
|
||||
)
|
||||
cluster.add_instance(
|
||||
"node_force_read_through_cache_on_merge",
|
||||
main_configs=[
|
||||
@ -59,6 +52,51 @@ def cluster():
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def non_shared_cluster():
|
||||
"""
|
||||
For tests that cannot run in parallel against the same node/cluster (see test_custom_cached_disk, which relies on
|
||||
changing server settings at runtime)
|
||||
"""
|
||||
try:
|
||||
# Randomize the cluster name
|
||||
cluster = ClickHouseCluster(f"{__file__}_non_shared_{random.randint(0, 10**7)}")
|
||||
cluster.add_instance(
|
||||
"node_no_filesystem_caches_path",
|
||||
main_configs=[
|
||||
"config.d/storage_conf.xml",
|
||||
"config.d/remove_filesystem_caches_path.xml",
|
||||
],
|
||||
stay_alive=True,
|
||||
)
|
||||
|
||||
logging.info("Starting test-exclusive cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def wait_for_cache_initialized(node, cache_path, max_attempts=50):
|
||||
initialized = False
|
||||
attempts = 0
|
||||
while not initialized:
|
||||
query_result = node.query(
|
||||
"SELECT path FROM system.filesystem_cache_settings WHERE is_initialized"
|
||||
)
|
||||
initialized = cache_path in query_result
|
||||
|
||||
if initialized:
|
||||
break
|
||||
|
||||
time.sleep(0.1)
|
||||
attempts += 1
|
||||
if attempts >= max_attempts:
|
||||
raise "Stopped waiting for cache to be initialized"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_name", ["node"])
|
||||
def test_parallel_cache_loading_on_startup(cluster, node_name):
|
||||
node = cluster.instances[node_name]
|
||||
@ -71,14 +109,21 @@ def test_parallel_cache_loading_on_startup(cluster, node_name):
|
||||
ORDER BY value
|
||||
SETTINGS disk = disk(
|
||||
type = cache,
|
||||
path = 'paralel_loading_test',
|
||||
name = 'parallel_loading_test',
|
||||
path = 'parallel_loading_test',
|
||||
disk = 'hdd_blob',
|
||||
max_file_segment_size = '1Ki',
|
||||
boundary_alignment = '1Ki',
|
||||
max_size = '1Gi',
|
||||
max_elements = 10000000,
|
||||
load_metadata_threads = 30);
|
||||
"""
|
||||
)
|
||||
|
||||
wait_for_cache_initialized(node, "parallel_loading_test")
|
||||
|
||||
node.query(
|
||||
"""
|
||||
SYSTEM DROP FILESYSTEM CACHE;
|
||||
INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000;
|
||||
SELECT * FROM test FORMAT Null;
|
||||
@ -103,6 +148,7 @@ def test_parallel_cache_loading_on_startup(cluster, node_name):
|
||||
)
|
||||
|
||||
node.restart_clickhouse()
|
||||
wait_for_cache_initialized(node, "parallel_loading_test")
|
||||
|
||||
# < because of additional files loaded into cache on server startup.
|
||||
assert cache_count <= int(node.query("SELECT count() FROM system.filesystem_cache"))
|
||||
@ -131,7 +177,7 @@ def test_caches_with_the_same_configuration(cluster, node_name):
|
||||
node = cluster.instances[node_name]
|
||||
cache_path = "cache1"
|
||||
|
||||
node.query(f"SYSTEM DROP FILESYSTEM CACHE;")
|
||||
node.query("SYSTEM DROP FILESYSTEM CACHE;")
|
||||
for table in ["test", "test2"]:
|
||||
node.query(
|
||||
f"""
|
||||
@ -142,14 +188,20 @@ def test_caches_with_the_same_configuration(cluster, node_name):
|
||||
ORDER BY value
|
||||
SETTINGS disk = disk(
|
||||
type = cache,
|
||||
name = {table},
|
||||
name = '{table}',
|
||||
path = '{cache_path}',
|
||||
disk = 'hdd_blob',
|
||||
max_file_segment_size = '1Ki',
|
||||
boundary_alignment = '1Ki',
|
||||
cache_on_write_operations=1,
|
||||
max_size = '1Mi');
|
||||
"""
|
||||
)
|
||||
|
||||
wait_for_cache_initialized(node, cache_path)
|
||||
|
||||
node.query(
|
||||
f"""
|
||||
SET enable_filesystem_cache_on_write_operations=1;
|
||||
INSERT INTO {table} SELECT * FROM generateRandom('a Int32, b String')
|
||||
LIMIT 1000;
|
||||
@ -195,9 +247,8 @@ def test_caches_with_the_same_configuration(cluster, node_name):
|
||||
@pytest.mark.parametrize("node_name", ["node_caches_with_same_path"])
|
||||
def test_caches_with_the_same_configuration_2(cluster, node_name):
|
||||
node = cluster.instances[node_name]
|
||||
cache_path = "cache1"
|
||||
|
||||
node.query(f"SYSTEM DROP FILESYSTEM CACHE;")
|
||||
node.query("SYSTEM DROP FILESYSTEM CACHE;")
|
||||
for table in ["cache1", "cache2"]:
|
||||
node.query(
|
||||
f"""
|
||||
@ -207,7 +258,13 @@ def test_caches_with_the_same_configuration_2(cluster, node_name):
|
||||
Engine=MergeTree()
|
||||
ORDER BY value
|
||||
SETTINGS disk = '{table}';
|
||||
"""
|
||||
)
|
||||
|
||||
wait_for_cache_initialized(node, "cache1")
|
||||
|
||||
node.query(
|
||||
f"""
|
||||
SET enable_filesystem_cache_on_write_operations=1;
|
||||
INSERT INTO {table} SELECT * FROM generateRandom('a Int32, b String')
|
||||
LIMIT 1000;
|
||||
@ -227,8 +284,8 @@ def test_caches_with_the_same_configuration_2(cluster, node_name):
|
||||
)
|
||||
|
||||
|
||||
def test_custom_cached_disk(cluster):
|
||||
node = cluster.instances["node_no_filesystem_caches_path"]
|
||||
def test_custom_cached_disk(non_shared_cluster):
|
||||
node = non_shared_cluster.instances["node_no_filesystem_caches_path"]
|
||||
|
||||
assert "Cannot create cached custom disk without" in node.query_and_get_error(
|
||||
f"""
|
||||
@ -377,6 +434,7 @@ def test_force_filesystem_cache_on_merges(cluster):
|
||||
ORDER BY value
|
||||
SETTINGS disk = disk(
|
||||
type = cache,
|
||||
name = 'force_cache_on_merges',
|
||||
path = 'force_cache_on_merges',
|
||||
disk = 'hdd_blob',
|
||||
max_file_segment_size = '1Ki',
|
||||
@ -385,7 +443,13 @@ def test_force_filesystem_cache_on_merges(cluster):
|
||||
max_size = '10Gi',
|
||||
max_elements = 10000000,
|
||||
load_metadata_threads = 30);
|
||||
"""
|
||||
)
|
||||
|
||||
wait_for_cache_initialized(node, "force_cache_on_merges")
|
||||
|
||||
node.query(
|
||||
"""
|
||||
SYSTEM DROP FILESYSTEM CACHE;
|
||||
INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000;
|
||||
INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000;
|
||||
@ -441,7 +505,13 @@ SETTINGS disk = disk(type = cache,
|
||||
path = "test_system_sync_filesystem_cache",
|
||||
delayed_cleanup_interval_ms = 10000000, disk = hdd_blob),
|
||||
min_bytes_for_wide_part = 10485760;
|
||||
"""
|
||||
)
|
||||
|
||||
wait_for_cache_initialized(node, "test_system_sync_filesystem_cache")
|
||||
|
||||
node.query(
|
||||
"""
|
||||
INSERT INTO test SELECT 1, 'test';
|
||||
"""
|
||||
)
|
||||
@ -525,7 +595,13 @@ SETTINGS disk = disk(type = cache,
|
||||
keep_free_space_elements_ratio = {elements_ratio},
|
||||
disk = hdd_blob),
|
||||
min_bytes_for_wide_part = 10485760;
|
||||
"""
|
||||
)
|
||||
|
||||
wait_for_cache_initialized(node, "test_keep_up_size_ratio")
|
||||
|
||||
node.query(
|
||||
"""
|
||||
INSERT INTO test SELECT randomString(200);
|
||||
"""
|
||||
)
|
||||
|
@ -1,2 +1,2 @@
|
||||
1
|
||||
102400 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/02344_describe_cache_test 0 5000 0 16
|
||||
102400 10000000 33554432 1 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/02344_describe_cache_test 0 5000 0 16
|
||||
|
@ -11,7 +11,7 @@ $CLICKHOUSE_CLIENT -nm --query """
|
||||
DROP TABLE IF EXISTS test;
|
||||
CREATE TABLE test (a Int32, b String)
|
||||
ENGINE = MergeTree() ORDER BY tuple()
|
||||
SETTINGS disk = disk(name = '$disk_name', type = cache, max_size = '100Ki', path = '$disk_name', disk = 's3_disk');
|
||||
SETTINGS disk = disk(name = '$disk_name', type = cache, max_size = '100Ki', path = '$disk_name', disk = 's3_disk', load_metadata_asynchronously = 0);
|
||||
"""
|
||||
|
||||
$CLICKHOUSE_CLIENT -nm --query """
|
||||
|
@ -1,2 +1,2 @@
|
||||
1048576 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection_sql 0 5000 0 16
|
||||
1048576 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection 0 5000 0 16
|
||||
1048576 10000000 33554432 1 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection_sql 0 5000 0 16
|
||||
1048576 10000000 33554432 1 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection 0 5000 0 16
|
||||
|
@ -3,8 +3,8 @@
|
||||
CREATE NAMED COLLECTION IF NOT EXISTS cache_collection_sql AS path = 'collection_sql', max_size = '1Mi';
|
||||
DROP TABLE IF EXISTS test;
|
||||
CREATE TABLE test (a Int32, b String)
|
||||
ENGINE = MergeTree() ORDER BY a SETTINGS disk = disk(type = cache, disk = 'local_disk', name = '$CLICHOUSE_TEST_UNIQUE_NAME', cache_name='cache_collection_sql');
|
||||
ENGINE = MergeTree() ORDER BY a SETTINGS disk = disk(type = cache, disk = 'local_disk', name = '$CLICHOUSE_TEST_UNIQUE_NAME', cache_name='cache_collection_sql', load_metadata_asynchronously = 0);
|
||||
DESCRIBE FILESYSTEM CACHE '$CLICHOUSE_TEST_UNIQUE_NAME';
|
||||
CREATE TABLE test2 (a Int32, b String)
|
||||
ENGINE = MergeTree() ORDER BY a SETTINGS disk = disk(type = cache, disk = 'local_disk', name = '$CLICHOUSE_TEST_UNIQUE_NAME_2', cache_name='cache_collection');
|
||||
ENGINE = MergeTree() ORDER BY a SETTINGS disk = disk(type = cache, disk = 'local_disk', name = '$CLICHOUSE_TEST_UNIQUE_NAME_2', cache_name='cache_collection', load_metadata_asynchronously = 0);
|
||||
DESCRIBE FILESYSTEM CACHE '$CLICHOUSE_TEST_UNIQUE_NAME_2';
|
||||
|
@ -1,7 +1,7 @@
|
||||
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 16
|
||||
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 10 1000 0 16
|
||||
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 5 1000 0 16
|
||||
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 15 1000 0 16
|
||||
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 2 1000 0 16
|
||||
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 1000 0 16
|
||||
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 16
|
||||
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 16
|
||||
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 10 1000 0 16
|
||||
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 5 1000 0 16
|
||||
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 15 1000 0 16
|
||||
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 2 1000 0 16
|
||||
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 1000 0 16
|
||||
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 16
|
||||
|
@ -1,20 +1,20 @@
|
||||
100 10 10 10 0 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
|
||||
100 10 10 1 10 0 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
|
||||
0
|
||||
10
|
||||
98
|
||||
set max_size from 100 to 10
|
||||
10 10 10 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
|
||||
10 10 10 1 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
|
||||
1
|
||||
8
|
||||
set max_size from 10 to 100
|
||||
100 10 10 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
|
||||
100 10 10 1 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
|
||||
10
|
||||
98
|
||||
set max_elements from 10 to 2
|
||||
100 2 10 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
|
||||
100 2 10 1 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
|
||||
2
|
||||
18
|
||||
set max_elements from 2 to 10
|
||||
100 10 10 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
|
||||
100 10 10 1 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
|
||||
10
|
||||
98
|
||||
|
Loading…
Reference in New Issue
Block a user