mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Changes related to an internal feature
This commit is contained in:
parent
b1947b2c93
commit
9f80900d6f
@ -1219,7 +1219,7 @@ off_t CachedOnDiskReadBufferFromFile::getPosition()
|
||||
|
||||
void CachedOnDiskReadBufferFromFile::assertCorrectness() const
|
||||
{
|
||||
if (!CachedObjectStorage::canUseReadThroughCache()
|
||||
if (!CachedObjectStorage::canUseReadThroughCache(settings)
|
||||
&& !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache usage is not allowed (query_id: {})", query_id);
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
|
||||
|
||||
with_cache = settings.remote_fs_cache
|
||||
&& settings.enable_filesystem_cache
|
||||
&& (!query_id.empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
|
||||
&& (!query_id.empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache || !settings.avoid_readthrough_cache_outside_query_context);
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
|
||||
|
@ -57,7 +57,7 @@ ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settin
|
||||
ReadSettings modified_settings{read_settings};
|
||||
modified_settings.remote_fs_cache = cache;
|
||||
|
||||
if (!canUseReadThroughCache())
|
||||
if (!canUseReadThroughCache(read_settings))
|
||||
modified_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
|
||||
|
||||
return object_storage->patchSettings(modified_settings);
|
||||
@ -227,8 +227,11 @@ String CachedObjectStorage::getObjectsNamespace() const
|
||||
return object_storage->getObjectsNamespace();
|
||||
}
|
||||
|
||||
bool CachedObjectStorage::canUseReadThroughCache()
|
||||
bool CachedObjectStorage::canUseReadThroughCache(const ReadSettings & settings)
|
||||
{
|
||||
if (!settings.avoid_readthrough_cache_outside_query_context)
|
||||
return true;
|
||||
|
||||
return CurrentThread::isInitialized()
|
||||
&& CurrentThread::get().getQueryContext()
|
||||
&& !CurrentThread::getQueryId().empty();
|
||||
|
@ -112,7 +112,9 @@ public:
|
||||
|
||||
WriteSettings getAdjustedSettingsFromMetadataFile(const WriteSettings & settings, const std::string & path) const override;
|
||||
|
||||
static bool canUseReadThroughCache();
|
||||
const FileCacheSettings & getCacheSettings() const { return cache_settings; }
|
||||
|
||||
static bool canUseReadThroughCache(const ReadSettings & settings);
|
||||
|
||||
private:
|
||||
FileCache::Key getCacheKey(const std::string & path) const;
|
||||
|
@ -99,6 +99,8 @@ struct ReadSettings
|
||||
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
|
||||
bool enable_filesystem_cache_log = false;
|
||||
bool is_file_cache_persistent = false; /// Some files can be made non-evictable.
|
||||
/// Don't populate cache when the read is not part of query execution (e.g. background thread).
|
||||
bool avoid_readthrough_cache_outside_query_context = true;
|
||||
|
||||
size_t filesystem_cache_max_download_size = (128UL * 1024 * 1024 * 1024);
|
||||
bool skip_download_if_exceeds_query_cache = true;
|
||||
|
@ -85,7 +85,7 @@ public:
|
||||
EMPTY,
|
||||
/**
|
||||
* A newly created file segment never has DOWNLOADING state until call to getOrSetDownloader
|
||||
* because each cache user might acquire multiple file segments and reads them one by one,
|
||||
* because each cache user might acquire multiple file segments and read them one by one,
|
||||
* so only user which actually needs to read this segment earlier than others - becomes a downloader.
|
||||
*/
|
||||
DOWNLOADING,
|
||||
|
@ -85,6 +85,7 @@ public:
|
||||
|
||||
virtual void removeAll(const CacheGuard::Lock &) = 0;
|
||||
|
||||
/// From lowest to highest priority.
|
||||
virtual void iterate(IterateFunc && func, const CacheGuard::Lock &) = 0;
|
||||
|
||||
private:
|
||||
|
@ -202,6 +202,13 @@ bool DataPartStorageOnDiskBase::isStoredOnRemoteDisk() const
|
||||
return volume->getDisk()->isRemote();
|
||||
}
|
||||
|
||||
std::optional<String> DataPartStorageOnDiskBase::getCacheName() const
|
||||
{
|
||||
if (volume->getDisk()->supportsCache())
|
||||
return volume->getDisk()->getCacheName();
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
bool DataPartStorageOnDiskBase::supportZeroCopyReplication() const
|
||||
{
|
||||
return volume->getDisk()->supportZeroCopyReplication();
|
||||
|
@ -36,6 +36,7 @@ public:
|
||||
std::string getDiskName() const override;
|
||||
std::string getDiskType() const override;
|
||||
bool isStoredOnRemoteDisk() const override;
|
||||
std::optional<String> getCacheName() const override;
|
||||
bool supportZeroCopyReplication() const override;
|
||||
bool supportParallelWrite() const override;
|
||||
bool isBroken() const override;
|
||||
|
@ -149,6 +149,7 @@ public:
|
||||
virtual std::string getDiskName() const = 0;
|
||||
virtual std::string getDiskType() const = 0;
|
||||
virtual bool isStoredOnRemoteDisk() const { return false; }
|
||||
virtual std::optional<String> getCacheName() const { return std::nullopt; }
|
||||
virtual bool supportZeroCopyReplication() const { return false; }
|
||||
virtual bool supportParallelWrite() const = 0;
|
||||
virtual bool isBroken() const = 0;
|
||||
|
@ -12,6 +12,7 @@
|
||||
<disk>s3</disk>
|
||||
<max_size>100000000</max_size>
|
||||
<path>./cache_s3/</path>
|
||||
<cache_on_write_operations>1</cache_on_write_operations>
|
||||
</cache_s3>
|
||||
</disks>
|
||||
<policies>
|
||||
|
@ -0,0 +1,7 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<enable_filesystem_cache_on_write_operations>1</enable_filesystem_cache_on_write_operations>
|
||||
</default>
|
||||
</profiles>
|
||||
</clickhouse>
|
@ -19,6 +19,7 @@ def cluster():
|
||||
cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=["configs/config.d/storage_conf.xml"],
|
||||
user_configs=["configs/config.d/users.xml"],
|
||||
macros={"replica": "1"},
|
||||
with_minio=True,
|
||||
with_zookeeper=True,
|
||||
@ -26,12 +27,14 @@ def cluster():
|
||||
cluster.add_instance(
|
||||
"node2",
|
||||
main_configs=["configs/config.d/storage_conf.xml"],
|
||||
user_configs=["configs/config.d/users.xml"],
|
||||
macros={"replica": "2"},
|
||||
with_zookeeper=True,
|
||||
)
|
||||
cluster.add_instance(
|
||||
"node3",
|
||||
main_configs=["configs/config.d/storage_conf.xml"],
|
||||
user_configs=["configs/config.d/users.xml"],
|
||||
macros={"replica": "3"},
|
||||
with_zookeeper=True,
|
||||
)
|
||||
@ -74,7 +77,7 @@ def generate_values(date_str, count, sign=1):
|
||||
|
||||
def create_table(cluster, additional_settings=None):
|
||||
create_table_statement = """
|
||||
CREATE TABLE s3_test ON CLUSTER cluster(
|
||||
CREATE TABLE s3_test ON CLUSTER cluster (
|
||||
dt Date,
|
||||
id Int64,
|
||||
data String,
|
||||
@ -95,7 +98,8 @@ def create_table(cluster, additional_settings=None):
|
||||
def drop_table(cluster):
|
||||
yield
|
||||
for node in list(cluster.instances.values()):
|
||||
node.query("DROP TABLE IF EXISTS s3_test")
|
||||
node.query("DROP TABLE IF EXISTS s3_test SYNC")
|
||||
node.query("DROP TABLE IF EXISTS test_drop_table SYNC")
|
||||
|
||||
minio = cluster.minio_client
|
||||
# Remove extra objects to prevent tests cascade failing
|
||||
|
Loading…
Reference in New Issue
Block a user