Merge branch 'master' of github.com:ClickHouse/ClickHouse into rip-copier

This commit is contained in:
Nikita Mikhaylov 2024-03-11 19:12:49 +01:00
commit fcf8a3821d
26 changed files with 645 additions and 109 deletions

View File

@ -86,6 +86,7 @@
namespace ProfileEvents
{
extern const Event ScalarSubqueriesGlobalCacheHit;
extern const Event ScalarSubqueriesLocalCacheHit;
extern const Event ScalarSubqueriesCacheMiss;
}
@ -1444,7 +1445,8 @@ private:
std::unordered_map<QueryTreeNodePtr, size_t> node_to_tree_size;
/// Global scalar subquery to scalar value map
std::unordered_map<QueryTreeNodePtrWithHash, Block> scalar_subquery_to_scalar_value;
std::unordered_map<QueryTreeNodePtrWithHash, Block> scalar_subquery_to_scalar_value_local;
std::unordered_map<QueryTreeNodePtrWithHash, Block> scalar_subquery_to_scalar_value_global;
const bool only_analyze;
};
@ -1951,6 +1953,24 @@ QueryTreeNodePtr QueryAnalyzer::tryGetLambdaFromSQLUserDefinedFunctions(const st
return result_node;
}
bool subtreeHasViewSource(const IQueryTreeNode * node, const Context & context)
{
if (!node)
return false;
if (const auto * table_node = node->as<TableNode>())
{
if (table_node->getStorageID().getFullNameNotQuoted() == context.getViewSource()->getStorageID().getFullNameNotQuoted())
return true;
}
for (const auto & child : node->getChildren())
if (subtreeHasViewSource(child.get(), context))
return true;
return false;
}
/// Evaluate scalar subquery and perform constant folding if scalar subquery does not have constant value
void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, IdentifierResolveScope & scope)
{
@ -1970,12 +1990,26 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
node_without_alias->removeAlias();
QueryTreeNodePtrWithHash node_with_hash(node_without_alias);
auto scalar_value_it = scalar_subquery_to_scalar_value.find(node_with_hash);
auto str_hash = DB::toString(node_with_hash.hash);
if (scalar_value_it != scalar_subquery_to_scalar_value.end())
bool can_use_global_scalars = !only_analyze && !(context->getViewSource() && subtreeHasViewSource(node_without_alias.get(), *context));
auto & scalars_cache = can_use_global_scalars ? scalar_subquery_to_scalar_value_global : scalar_subquery_to_scalar_value_local;
if (scalars_cache.contains(node_with_hash))
{
if (can_use_global_scalars)
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
else
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesLocalCacheHit);
scalar_block = scalars_cache.at(node_with_hash);
}
else if (context->hasQueryContext() && can_use_global_scalars && context->getQueryContext()->hasScalar(str_hash))
{
scalar_block = context->getQueryContext()->getScalar(str_hash);
scalar_subquery_to_scalar_value_global.emplace(node_with_hash, scalar_block);
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
scalar_block = scalar_value_it->second;
}
else
{
@ -2087,7 +2121,9 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
}
}
scalar_subquery_to_scalar_value.emplace(node_with_hash, scalar_block);
scalars_cache.emplace(node_with_hash, scalar_block);
if (can_use_global_scalars && context->hasQueryContext())
context->getQueryContext()->addScalar(str_hash, scalar_block);
}
const auto & scalar_column_with_type = scalar_block.safeGetByPosition(0);

View File

@ -21,6 +21,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ALL_CONNECTION_TRIES_FAILED;
extern const int BAD_ARGUMENTS;
}
@ -191,11 +192,20 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
max_entries = nested_pools.size();
}
else if (pool_mode == PoolMode::GET_ONE)
{
max_entries = 1;
}
else if (pool_mode == PoolMode::GET_MANY)
{
if (settings.max_parallel_replicas == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of the setting max_parallel_replicas must be greater than 0");
max_entries = settings.max_parallel_replicas;
}
else
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown pool allocation mode");
}
if (!priority_func)
priority_func = makeGetPriorityFunc(settings);

View File

@ -19,6 +19,7 @@ namespace ErrorCodes
extern const int ALL_CONNECTION_TRIES_FAILED;
extern const int ALL_REPLICAS_ARE_STALE;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
HedgedConnectionsFactory::HedgedConnectionsFactory(
@ -82,7 +83,10 @@ std::vector<Connection *> HedgedConnectionsFactory::getManyConnections(PoolMode
}
case PoolMode::GET_MANY:
{
max_entries = max_parallel_replicas;
if (max_parallel_replicas == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of the setting max_parallel_replicas must be greater than 0");
max_entries = std::min(max_parallel_replicas, shuffled_pools.size());
break;
}
}

View File

@ -158,7 +158,7 @@ private:
/// checking the number of requested replicas that are still in process).
size_t requested_connections_count = 0;
const size_t max_parallel_replicas = 0;
const size_t max_parallel_replicas = 1;
const bool skip_unavailable_shards = 0;
};

View File

@ -112,7 +112,6 @@ namespace DB
M(UInt64, tables_loader_background_pool_size, 0, "The maximum number of threads that will be used for background async loading of tables. Zero means use all CPUs.", 0) \
M(Bool, async_load_databases, false, "Enable asynchronous loading of databases and tables to speedup server startup. Queries to not yet loaded entity will be blocked until load is finished.", 0) \
M(Bool, display_secrets_in_show_and_select, false, "Allow showing secrets in SHOW and SELECT queries via a format setting and a grant", 0) \
\
M(Seconds, keep_alive_timeout, DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT, "The number of seconds that ClickHouse waits for incoming requests before closing the connection.", 0) \
M(Seconds, replicated_fetches_http_connection_timeout, 0, "HTTP connection timeout for part fetch requests. Inherited from default profile `http_connection_timeout` if not set explicitly.", 0) \
M(Seconds, replicated_fetches_http_send_timeout, 0, "HTTP send timeout for part fetch requests. Inherited from default profile `http_send_timeout` if not set explicitly.", 0) \

View File

@ -17,16 +17,15 @@ using namespace DB;
namespace
{
bool withFileCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache
&& (!CurrentThread::getQueryId().empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache
|| !settings.avoid_readthrough_cache_outside_query_context);
}
bool withPageCache(const ReadSettings & settings, bool with_file_cache)
{
return settings.page_cache && !with_file_cache && settings.use_page_cache_for_disks_without_file_cache;
}
bool withFileCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache;
}
bool withPageCache(const ReadSettings & settings, bool with_file_cache)
{
return settings.page_cache && !with_file_cache && settings.use_page_cache_for_disks_without_file_cache;
}
}
namespace DB

View File

@ -43,10 +43,6 @@ ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settin
{
ReadSettings modified_settings{read_settings};
modified_settings.remote_fs_cache = cache;
if (!canUseReadThroughCache(read_settings))
modified_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
return object_storage->patchSettings(modified_settings);
}
@ -206,14 +202,4 @@ String CachedObjectStorage::getObjectsNamespace() const
return object_storage->getObjectsNamespace();
}
bool CachedObjectStorage::canUseReadThroughCache(const ReadSettings & settings)
{
if (!settings.avoid_readthrough_cache_outside_query_context)
return true;
return CurrentThread::isInitialized()
&& CurrentThread::get().getQueryContext()
&& !CurrentThread::getQueryId().empty();
}
}

View File

@ -119,8 +119,6 @@ public:
const FileCacheSettings & getCacheSettings() const { return cache_settings; }
static bool canUseReadThroughCache(const ReadSettings & settings);
#if USE_AZURE_BLOB_STORAGE
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> getAzureBlobStorageClient() override
{

View File

@ -99,8 +99,7 @@ struct ReadSettings
bool enable_filesystem_cache = true;
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
bool enable_filesystem_cache_log = false;
/// Don't populate cache when the read is not part of query execution (e.g. background thread).
bool avoid_readthrough_cache_outside_query_context = true;
bool force_read_through_cache_merges = false;
size_t filesystem_cache_segments_batch_size = 20;
bool use_page_cache_for_disks_without_file_cache = false;

View File

@ -10,6 +10,7 @@
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/setThreadName.h>
#include <magic_enum.hpp>
@ -195,7 +196,7 @@ bool FileSegment::isDownloaded() const
String FileSegment::getCallerId()
{
if (!CurrentThread::isInitialized() || CurrentThread::getQueryId().empty())
return "None:" + toString(getThreadId());
return fmt::format("None:{}:{}", getThreadName(), toString(getThreadId()));
return std::string(CurrentThread::getQueryId()) + ":" + toString(getThreadId());
}

View File

@ -947,7 +947,7 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
if (number_of_replicas_to_use <= 1)
{
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("max_parallel_replicas", UInt64{0});
context->setSetting("max_parallel_replicas", UInt64{1});
LOG_DEBUG(log, "Disabling parallel replicas because there aren't enough rows to read");
return true;
}

View File

@ -295,7 +295,7 @@ bool applyTrivialCountIfPossible(
/// The query could use trivial count if it didn't use parallel replicas, so let's disable it
query_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
query_context->setSetting("max_parallel_replicas", UInt64{0});
query_context->setSetting("max_parallel_replicas", UInt64{1});
LOG_TRACE(getLogger("Planner"), "Disabling parallel replicas to be able to use a trivial count optimization");
}
@ -756,7 +756,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
{
planner_context->getMutableQueryContext()->setSetting(
"allow_experimental_parallel_reading_from_replicas", Field(0));
planner_context->getMutableQueryContext()->setSetting("max_parallel_replicas", UInt64{0});
planner_context->getMutableQueryContext()->setSetting("max_parallel_replicas", UInt64{1});
LOG_DEBUG(getLogger("Planner"), "Disabling parallel replicas because there aren't enough rows to read");
}
else if (number_of_replicas_to_use < settings.max_parallel_replicas)

View File

@ -149,7 +149,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
const auto & context = storage.getContext();
ReadSettings read_settings = context->getReadSettings();
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = !storage.getSettings()->force_read_through_cache_for_merges;
/// It does not make sense to use pthread_threadpool for background merges/mutations
/// And also to preserve backward compatibility
read_settings.local_fs_method = LocalFSReadMethod::pread;

View File

@ -192,6 +192,7 @@ struct Settings;
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for zero-copy table-independent info.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
M(Bool, cache_populated_by_fetch, false, "Only available in ClickHouse Cloud", 0) \
M(Bool, force_read_through_cache_for_merges, false, "Force read-through filesystem cache for merges", 0) \
M(Bool, allow_experimental_block_number_column, false, "Enable persisting column _block_number for each row.", 0) \
M(Bool, allow_experimental_replacing_merge_with_cleanup, false, "Allow experimental CLEANUP merges for ReplacingMergeTree with is_deleted column.", 0) \
\

View File

@ -10,7 +10,6 @@
01761_cast_to_enum_nullable
01925_join_materialized_columns
01952_optimize_distributed_group_by_sharding_key
02174_cte_scalar_cache_mv
02354_annoy
# Check after constants refactoring
02901_parallel_replicas_rollup

View File

@ -140,7 +140,7 @@ class CiCache:
self.s3 = s3
self.job_digests = job_digests
self.cache_s3_paths = {
job_type: f"{self._S3_CACHE_PREFIX}/{job_type.value}-{self.job_digests[self._get_reference_job_name(job_type)]}/"
job_type: f"{self._S3_CACHE_PREFIX}/{job_type.value}-{self._get_digest_for_job_type(self.job_digests, job_type)}/"
for job_type in self.JobType
}
self.s3_record_prefixes = {
@ -155,14 +155,23 @@ class CiCache:
if not self._LOCAL_CACHE_PATH.exists():
self._LOCAL_CACHE_PATH.mkdir(parents=True, exist_ok=True)
def _get_reference_job_name(self, job_type: JobType) -> str:
res = Build.PACKAGE_RELEASE
def _get_digest_for_job_type(
self, job_digests: Dict[str, str], job_type: JobType
) -> str:
if job_type == self.JobType.DOCS:
res = JobNames.DOCS_CHECK
res = job_digests[JobNames.DOCS_CHECK]
elif job_type == self.JobType.SRCS:
res = Build.PACKAGE_RELEASE
# any build type job has the same digest - pick up Build.PACKAGE_RELEASE or Build.PACKAGE_ASAN as a failover
# Build.PACKAGE_RELEASE may not exist in the list if we have reduced CI pipeline
if Build.PACKAGE_RELEASE in job_digests:
res = job_digests[Build.PACKAGE_RELEASE]
elif Build.PACKAGE_ASAN in job_digests:
# failover, if failover does not work - fix it!
res = job_digests[Build.PACKAGE_ASAN]
else:
assert False, "BUG, no build job in digest' list"
else:
assert False
assert False, "BUG, New JobType? - please update func"
return res
def _get_record_file_name(
@ -1183,13 +1192,13 @@ def _configure_jobs(
if batches_to_do:
jobs_to_do.append(job)
jobs_params[job] = {
"batches": batches_to_do,
"num_batches": num_batches,
}
elif add_to_skip:
# treat job as being skipped only if it's controlled by digest
jobs_to_skip.append(job)
jobs_params[job] = {
"batches": batches_to_do,
"num_batches": num_batches,
}
if not pr_info.is_release_branch():
# randomization bucket filtering (pick one random job from each bucket, for jobs with configured random_bucket property)
@ -1268,6 +1277,33 @@ def _configure_jobs(
jobs_to_do = list(
set(job for job in jobs_to_do_requested if job not in jobs_to_skip)
)
# if requested job does not have params in jobs_params (it happens for "run_by_label" job)
# we need to add params - otherwise it won't run as "batches" list will be empty
for job in jobs_to_do:
if job not in jobs_params:
num_batches = CI_CONFIG.get_job_config(job).num_batches
jobs_params[job] = {
"batches": list(range(num_batches)),
"num_batches": num_batches,
}
requested_batches = set()
for token in commit_tokens:
if token.startswith("batch_"):
try:
batches = [
int(batch) for batch in token.removeprefix("batch_").split("_")
]
except Exception:
print(f"ERROR: failed to parse commit tag [{token}]")
requested_batches.update(batches)
if requested_batches:
print(
f"NOTE: Only specific job batches were requested [{list(requested_batches)}]"
)
for job, params in jobs_params.items():
if params["num_batches"] > 1:
params["batches"] = list(requested_batches)
return {
"digests": digests,
@ -1372,7 +1408,11 @@ def _update_gh_statuses_action(indata: Dict, s3: S3Helper) -> None:
def _fetch_commit_tokens(message: str) -> List[str]:
pattern = r"#[\w-]+"
matches = [match[1:] for match in re.findall(pattern, message)]
res = [match for match in matches if match in Labels or match.startswith("job_")]
res = [
match
for match in matches
if match in Labels or match.startswith("job_") or match.startswith("batch_")
]
return res

View File

@ -44,11 +44,12 @@ RETRY_SLEEP = 0
class EventType:
UNKNOWN = 0
PUSH = 1
PULL_REQUEST = 2
SCHEDULE = 3
DISPATCH = 4
UNKNOWN = "unknown"
PUSH = "commits"
PULL_REQUEST = "pull_request"
SCHEDULE = "schedule"
DISPATCH = "dispatch"
MERGE_QUEUE = "merge_group"
def get_pr_for_commit(sha, ref):
@ -114,6 +115,12 @@ class PRInfo:
# release_pr and merged_pr are used for docker images additional cache
self.release_pr = 0
self.merged_pr = 0
self.labels = set()
repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}"
self.task_url = GITHUB_RUN_URL
self.repo_full_name = GITHUB_REPOSITORY
self.event_type = EventType.UNKNOWN
ref = github_event.get("ref", "refs/heads/master")
if ref and ref.startswith("refs/heads/"):
@ -154,10 +161,6 @@ class PRInfo:
else:
self.sha = github_event["pull_request"]["head"]["sha"]
repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}"
self.task_url = GITHUB_RUN_URL
self.repo_full_name = GITHUB_REPOSITORY
self.commit_html_url = f"{repo_prefix}/commits/{self.sha}"
self.pr_html_url = f"{repo_prefix}/pull/{self.number}"
@ -176,7 +179,7 @@ class PRInfo:
self.body = github_event["pull_request"]["body"]
self.labels = {
label["name"] for label in github_event["pull_request"]["labels"]
} # type: Set[str]
}
self.user_login = github_event["pull_request"]["user"]["login"] # type: str
self.user_orgs = set() # type: Set[str]
@ -191,6 +194,28 @@ class PRInfo:
self.diff_urls.append(self.compare_pr_url(github_event["pull_request"]))
elif (
EventType.MERGE_QUEUE in github_event
): # pull request and other similar events
self.event_type = EventType.MERGE_QUEUE
# FIXME: need pr? we can parse it from ["head_ref": "refs/heads/gh-readonly-queue/test-merge-queue/pr-6751-4690229995a155e771c52e95fbd446d219c069bf"]
self.number = 0
self.sha = github_event[EventType.MERGE_QUEUE]["head_sha"]
self.base_ref = github_event[EventType.MERGE_QUEUE]["base_ref"]
base_sha = github_event[EventType.MERGE_QUEUE]["base_sha"] # type: str
# ClickHouse/ClickHouse
self.base_name = github_event["repository"]["full_name"]
# any_branch-name - the name of working branch name
self.head_ref = github_event[EventType.MERGE_QUEUE]["head_ref"]
# UserName/ClickHouse or ClickHouse/ClickHouse
self.head_name = self.base_name
self.user_login = github_event["sender"]["login"]
self.diff_urls.append(
github_event["repository"]["compare_url"]
.replace("{base}", base_sha)
.replace("{head}", self.sha)
)
elif "commits" in github_event:
self.event_type = EventType.PUSH
# `head_commit` always comes with `commits`
@ -203,10 +228,8 @@ class PRInfo:
logging.error("Failed to convert %s to integer", merged_pr)
self.sha = github_event["after"]
pull_request = get_pr_for_commit(self.sha, github_event["ref"])
repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}"
self.task_url = GITHUB_RUN_URL
self.commit_html_url = f"{repo_prefix}/commits/{self.sha}"
self.repo_full_name = GITHUB_REPOSITORY
if pull_request is None or pull_request["state"] == "closed":
# it's merged PR to master
self.number = 0
@ -272,11 +295,7 @@ class PRInfo:
"GITHUB_SHA", "0000000000000000000000000000000000000000"
)
self.number = 0
self.labels = set()
repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}"
self.task_url = GITHUB_RUN_URL
self.commit_html_url = f"{repo_prefix}/commits/{self.sha}"
self.repo_full_name = GITHUB_REPOSITORY
self.pr_html_url = f"{repo_prefix}/commits/{ref}"
self.base_ref = ref
self.base_name = self.repo_full_name
@ -300,6 +319,9 @@ class PRInfo:
def is_scheduled(self):
return self.event_type == EventType.SCHEDULE
def is_merge_queue(self):
return self.event_type == EventType.MERGE_QUEUE
def is_dispatched(self):
return self.event_type == EventType.DISPATCH

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<force_read_through_cache_for_merges>1</force_read_through_cache_for_merges>
</merge_tree>
</clickhouse>

View File

@ -19,6 +19,9 @@ def cluster():
main_configs=[
"config.d/storage_conf.xml",
],
user_configs=[
"users.d/cache_on_write_operations.xml",
],
stay_alive=True,
)
cluster.add_instance(
@ -35,6 +38,17 @@ def cluster():
],
stay_alive=True,
)
cluster.add_instance(
"node_force_read_through_cache_on_merge",
main_configs=[
"config.d/storage_conf.xml",
"config.d/force_read_through_cache_for_merges.xml",
],
user_configs=[
"users.d/cache_on_write_operations.xml",
],
stay_alive=True,
)
logging.info("Starting cluster...")
cluster.start()
@ -80,12 +94,21 @@ def test_parallel_cache_loading_on_startup(cluster, node_name):
cache_state = node.query(
"SELECT key, file_segment_range_begin, size FROM system.filesystem_cache WHERE size > 0 ORDER BY key, file_segment_range_begin, size"
)
keys = (
node.query(
"SELECT distinct(key) FROM system.filesystem_cache WHERE size > 0 ORDER BY key, file_segment_range_begin, size"
)
.strip()
.splitlines()
)
node.restart_clickhouse()
assert cache_count == int(node.query("SELECT count() FROM system.filesystem_cache"))
# < because of additional files loaded into cache on server startup.
assert cache_count <= int(node.query("SELECT count() FROM system.filesystem_cache"))
keys_set = ",".join(["'" + x + "'" for x in keys])
assert cache_state == node.query(
"SELECT key, file_segment_range_begin, size FROM system.filesystem_cache ORDER BY key, file_segment_range_begin, size"
f"SELECT key, file_segment_range_begin, size FROM system.filesystem_cache WHERE key in ({keys_set}) ORDER BY key, file_segment_range_begin, size"
)
assert node.contains_in_log("Loading filesystem cache with 30 threads")
@ -323,3 +346,83 @@ def test_custom_cached_disk(cluster):
"SELECT cache_path FROM system.disks WHERE name = 'custom_cached4'"
).strip()
)
def test_force_filesystem_cache_on_merges(cluster):
def test(node, forced_read_through_cache_on_merge):
def to_int(value):
if value == "":
return 0
else:
return int(value)
r_cache_count = to_int(
node.query(
"SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'"
)
)
w_cache_count = to_int(
node.query(
"SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'"
)
)
node.query(
"""
DROP TABLE IF EXISTS test SYNC;
CREATE TABLE test (key UInt32, value String)
Engine=MergeTree()
ORDER BY value
SETTINGS disk = disk(
type = cache,
path = 'force_cache_on_merges',
disk = 'hdd_blob',
max_file_segment_size = '1Ki',
cache_on_write_operations = 1,
boundary_alignment = '1Ki',
max_size = '10Gi',
max_elements = 10000000,
load_metadata_threads = 30);
SYSTEM DROP FILESYSTEM CACHE;
INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000;
INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000;
"""
)
assert int(node.query("SELECT count() FROM system.filesystem_cache")) > 0
assert int(node.query("SELECT max(size) FROM system.filesystem_cache")) == 1024
w_cache_count_2 = int(
node.query(
"SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'"
)
)
assert w_cache_count_2 > w_cache_count
r_cache_count_2 = to_int(
node.query(
"SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'"
)
)
assert r_cache_count_2 == r_cache_count
node.query("SYSTEM DROP FILESYSTEM CACHE")
node.query("OPTIMIZE TABLE test FINAL")
r_cache_count_3 = to_int(
node.query(
"SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'"
)
)
if forced_read_through_cache_on_merge:
assert r_cache_count_3 > r_cache_count
else:
assert r_cache_count_3 == r_cache_count
node = cluster.instances["node_force_read_through_cache_on_merge"]
test(node, True)
node = cluster.instances["node"]
test(node, False)

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<enable_filesystem_cache_on_write_operations>1</enable_filesystem_cache_on_write_operations>
</default>
</profiles>
</clickhouse>

View File

@ -19,6 +19,48 @@
94 94 94 94 5
99 99 99 99 5
02177_MV 7 80 22
4 4 4 4 5
9 9 9 9 5
14 14 14 14 5
19 19 19 19 5
24 24 24 24 5
29 29 29 29 5
34 34 34 34 5
39 39 39 39 5
44 44 44 44 5
49 49 49 49 5
54 54 54 54 5
59 59 59 59 5
64 64 64 64 5
69 69 69 69 5
74 74 74 74 5
79 79 79 79 5
84 84 84 84 5
89 89 89 89 5
94 94 94 94 5
99 99 99 99 5
02177_MV 0 0 22
10
40
70
100
130
160
190
220
250
280
310
340
370
400
430
460
490
520
550
580
02177_MV_2 0 0 21
10
40
70
@ -61,3 +103,24 @@
188
198
02177_MV_3 20 0 1
8
18
28
38
48
58
68
78
88
98
108
118
128
138
148
158
168
178
188
198
02177_MV_3 19 0 2

View File

@ -14,6 +14,8 @@ CREATE MATERIALIZED VIEW mv1 TO t2 AS
FROM t1
LIMIT 5;
set allow_experimental_analyzer = 0;
-- FIRST INSERT
INSERT INTO t1
WITH
@ -58,8 +60,48 @@ WHERE
AND query LIKE '-- FIRST INSERT\nINSERT INTO t1\n%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
truncate table t2;
set allow_experimental_analyzer = 1;
-- FIRST INSERT ANALYZER
INSERT INTO t1
WITH
(SELECT max(i) FROM t1) AS t1
SELECT
number as i,
t1 + t1 + t1 AS j -- Using global cache
FROM system.numbers
LIMIT 100
SETTINGS
min_insert_block_size_rows=5,
max_insert_block_size=5,
min_insert_block_size_rows_for_materialized_views=5,
max_block_size=5,
max_threads=1;
SELECT k, l, m, n, count()
FROM t2
GROUP BY k, l, m, n
ORDER BY k, l, m, n;
SYSTEM FLUSH LOGS;
SELECT
'02177_MV',
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '-- FIRST INSERT ANALYZER\nINSERT INTO t1\n%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
DROP TABLE mv1;
set allow_experimental_analyzer = 0;
CREATE TABLE t3 (z Int64) ENGINE = Memory;
CREATE MATERIALIZED VIEW mv2 TO t3 AS
SELECT
@ -91,8 +133,36 @@ WHERE
AND query LIKE '-- SECOND INSERT\nINSERT INTO t1%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
truncate table t3;
set allow_experimental_analyzer = 1;
-- SECOND INSERT ANALYZER
INSERT INTO t1
SELECT 0 as i, number as j from numbers(100)
SETTINGS
min_insert_block_size_rows=5,
max_insert_block_size=5,
min_insert_block_size_rows_for_materialized_views=5,
max_block_size=5,
max_threads=1;
SELECT * FROM t3 ORDER BY z ASC;
SYSTEM FLUSH LOGS;
SELECT
'02177_MV_2',
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '-- SECOND INSERT ANALYZER\nINSERT INTO t1%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
DROP TABLE mv2;
set allow_experimental_analyzer = 0;
CREATE TABLE t4 (z Int64) ENGINE = Memory;
CREATE MATERIALIZED VIEW mv3 TO t4 AS
@ -126,6 +196,35 @@ WHERE
AND query LIKE '-- THIRD INSERT\nINSERT INTO t1%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
truncate table t4;
set allow_experimental_analyzer = 1;
-- THIRD INSERT ANALYZER
INSERT INTO t1
SELECT number as i, number as j from numbers(100)
SETTINGS
min_insert_block_size_rows=5,
max_insert_block_size=5,
min_insert_block_size_rows_for_materialized_views=5,
max_block_size=5,
max_threads=1;
SYSTEM FLUSH LOGS;
SELECT * FROM t4 ORDER BY z ASC;
SELECT
'02177_MV_3',
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '-- THIRD INSERT ANALYZER\nINSERT INTO t1%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
DROP TABLE mv3;
DROP TABLE t1;
DROP TABLE t2;

View File

@ -1,62 +1,220 @@
Using storage policy: s3_cache
DROP TABLE IF EXISTS test_02241
CREATE TABLE test_02241 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization = 1
SYSTEM STOP MERGES test_02241
SYSTEM DROP FILESYSTEM CACHE
SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
FROM
(
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
FROM system.remote_data_paths
) AS data_paths
INNER JOIN
system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
)
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path
0
SELECT count(), sum(size) FROM system.filesystem_cache
0 0
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)
SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
FROM
(
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
FROM system.remote_data_paths
) AS data_paths
INNER JOIN
system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
)
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical
Row 1:
──────
file_segment_range_begin: 0
file_segment_range_end: 745
size: 746
state: DOWNLOADED
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path
8
SELECT count(), sum(size) FROM system.filesystem_cache
8 1100
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0
0
SELECT * FROM test_02241 FORMAT Null
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0
2
SELECT * FROM test_02241 FORMAT Null
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0
2
SELECT count(), sum(size) size FROM system.filesystem_cache
8 1100
SYSTEM DROP FILESYSTEM CACHE
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100, 200)
SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
FROM
(
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
FROM system.remote_data_paths
) AS data_paths
INNER JOIN
system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
)
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical;
Row 1:
──────
file_segment_range_begin: 0
file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path
8
SELECT count(), sum(size) FROM system.filesystem_cache
8 2014
SELECT count(), sum(size) FROM system.filesystem_cache
8 2014
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0
SELECT count(), sum(size) FROM system.filesystem_cache
8 2014
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000)
SELECT count(), sum(size) FROM system.filesystem_cache
24 84045
SYSTEM START MERGES test_02241
OPTIMIZE TABLE test_02241 FINAL
SELECT count(), sum(size) FROM system.filesystem_cache
32 167243
ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100
SELECT count(), sum(size) FROM system.filesystem_cache
41 250541
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000)
SYSTEM FLUSH LOGS
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000) 0
SELECT count() FROM test_02241
5010500
SELECT count() FROM test_02241 WHERE value LIKE '%010%'
18816
Using storage policy: local_cache
DROP TABLE IF EXISTS test_02241
CREATE TABLE test_02241 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='local_cache', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization = 1
SYSTEM STOP MERGES test_02241
SYSTEM DROP FILESYSTEM CACHE
SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
FROM
(
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
FROM system.remote_data_paths
) AS data_paths
INNER JOIN
system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
)
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path
0
SELECT count(), sum(size) FROM system.filesystem_cache
0 0
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)
SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
FROM
(
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
FROM system.remote_data_paths
) AS data_paths
INNER JOIN
system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
)
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical
Row 1:
──────
file_segment_range_begin: 0
file_segment_range_end: 745
size: 746
state: DOWNLOADED
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path
8
SELECT count(), sum(size) FROM system.filesystem_cache
8 1100
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0
0
SELECT * FROM test_02241 FORMAT Null
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0
2
SELECT * FROM test_02241 FORMAT Null
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0
2
SELECT count(), sum(size) size FROM system.filesystem_cache
8 1100
SYSTEM DROP FILESYSTEM CACHE
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100, 200)
SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
FROM
(
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
FROM system.remote_data_paths
) AS data_paths
INNER JOIN
system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
)
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical;
Row 1:
──────
file_segment_range_begin: 0
file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path
8
SELECT count(), sum(size) FROM system.filesystem_cache
8 2014
SELECT count(), sum(size) FROM system.filesystem_cache
8 2014
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0
SELECT count(), sum(size) FROM system.filesystem_cache
8 2014
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000)
SELECT count(), sum(size) FROM system.filesystem_cache
24 84045
SYSTEM START MERGES test_02241
OPTIMIZE TABLE test_02241 FINAL
SELECT count(), sum(size) FROM system.filesystem_cache
32 167243
ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100
SELECT count(), sum(size) FROM system.filesystem_cache
41 250541
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000)
SYSTEM FLUSH LOGS
INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000) 0
SELECT count() FROM test_02241
5010500
SELECT count() FROM test_02241 WHERE value LIKE '%010%'
18816

View File

@ -10,13 +10,13 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
for STORAGE_POLICY in 's3_cache' 'local_cache'; do
echo "Using storage policy: $STORAGE_POLICY"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test_02241"
$CLICKHOUSE_CLIENT --query "CREATE TABLE test_02241 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='$STORAGE_POLICY', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization = 1"
$CLICKHOUSE_CLIENT --query "SYSTEM STOP MERGES test_02241"
$CLICKHOUSE_CLIENT --echo --query "DROP TABLE IF EXISTS test_02241"
$CLICKHOUSE_CLIENT --echo --query "CREATE TABLE test_02241 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='$STORAGE_POLICY', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization = 1"
$CLICKHOUSE_CLIENT --echo --query "SYSTEM STOP MERGES test_02241"
$CLICKHOUSE_CLIENT --query "SYSTEM DROP FILESYSTEM CACHE"
$CLICKHOUSE_CLIENT --echo --query "SYSTEM DROP FILESYSTEM CACHE"
$CLICKHOUSE_CLIENT -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state
$CLICKHOUSE_CLIENT --echo -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
@ -32,12 +32,12 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)"
$CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)"
$CLICKHOUSE_CLIENT -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state
$CLICKHOUSE_CLIENT --echo -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
@ -53,24 +53,24 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0"
$CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0"
$CLICKHOUSE_CLIENT --query "SELECT * FROM test_02241 FORMAT Null"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0"
$CLICKHOUSE_CLIENT --echo --query "SELECT * FROM test_02241 FORMAT Null"
$CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0"
$CLICKHOUSE_CLIENT --query "SELECT * FROM test_02241 FORMAT Null"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0"
$CLICKHOUSE_CLIENT --echo --query "SELECT * FROM test_02241 FORMAT Null"
$CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) size FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) size FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SYSTEM DROP FILESYSTEM CACHE"
$CLICKHOUSE_CLIENT --echo --query "SYSTEM DROP FILESYSTEM CACHE"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100, 200)"
$CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100, 200)"
$CLICKHOUSE_CLIENT -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state
$CLICKHOUSE_CLIENT --echo -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
@ -86,27 +86,28 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical;"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0"
$CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000)"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)"
$CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000)"
$CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SYSTEM START MERGES test_02241"
$CLICKHOUSE_CLIENT --echo --query "SYSTEM START MERGES test_02241"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "OPTIMIZE TABLE test_02241 FINAL"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "OPTIMIZE TABLE test_02241 FINAL"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --mutations_sync=2 --query "ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000)"
$CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --mutations_sync=2 --query "ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100"
$CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000)"
$CLICKHOUSE_CLIENT --echo --query "SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT -n --query "SELECT
query, ProfileEvents['RemoteFSReadBytes'] > 0 as remote_fs_read
@ -121,6 +122,6 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
DESC
LIMIT 1"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM test_02241"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM test_02241 WHERE value LIKE '%010%'"
$CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM test_02241"
$CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM test_02241 WHERE value LIKE '%010%'"
done

View File

@ -0,0 +1,5 @@
drop table if exists test_d;
create table test_d engine=Distributed(test_cluster_two_shard_three_replicas_localhost, system, numbers);
select * from test_d limit 10 settings max_parallel_replicas = 0, prefer_localhost_replica = 0; --{serverError BAD_ARGUMENTS}
drop table test_d;