Compare commits

...

35 Commits

Author SHA1 Message Date
Anton Popov
60199b8248
Merge 99916f85fc into 7eee149487 2024-11-21 14:29:34 +00:00
Anton Popov
99916f85fc fix setting of priamry index 2024-11-21 14:29:11 +00:00
Vladimir Cherkasov
7eee149487
Merge pull request #72145 from ClickHouse/vdimir/fix02374_analyzer_join_using
FIx 02374_analyzer_join_using
2024-11-21 10:35:46 +00:00
Antonio Andelic
cc3c7e74ae
Merge pull request #70523 from ClickHouse/randomize-keeper-feature-flasgs-keeper
Randomize Keeper feature flags in integration tests
2024-11-21 08:20:07 +00:00
Yakov Olkhovskiy
e0f8b8d351
Merge pull request #70458 from ClickHouse/fix-ephemeral-comment
Fix ephemeral column comment
2024-11-21 05:10:11 +00:00
Alexey Milovidov
da2176d696
Merge pull request #72081 from ClickHouse/add-dashboard-selector
Add advanced dashboard selector
2024-11-21 05:06:51 +00:00
Alexey Milovidov
53e0036593
Merge pull request #72176 from ClickHouse/change-ldf-major-versions
Get rid of `major` tags in official docker images
2024-11-21 05:05:41 +00:00
Alexey Milovidov
25bd73ea5e
Merge pull request #72023 from ClickHouse/fix-bind
Fix comments
2024-11-21 05:03:24 +00:00
Yakov Olkhovskiy
72d5af29e0 Merge branch 'master' into fix-ephemeral-comment 2024-11-20 22:01:54 +00:00
Anton Popov
7881ae2286 better primary index cache 2024-11-20 20:24:06 +00:00
Mikhail f. Shiryaev
9a2a664b04
Get rid of major tags in official docker images 2024-11-20 16:36:50 +01:00
vdimir
f45bd58849
FIx 02374_analyzer_join_using 2024-11-20 11:51:42 +00:00
Anton Popov
2c963fe710 Merge remote-tracking branch 'upstream/master' into HEAD 2024-11-19 16:37:26 +00:00
Anton Popov
a74d615aaf added primary index cache 2024-11-19 16:22:36 +00:00
serxa
ad67608956 Add advanced dashboard selector 2024-11-19 13:18:21 +00:00
Antonio Andelic
57db5cf24c Randomize correctly 2024-11-19 13:39:54 +01:00
Antonio Andelic
459fa898ed Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-11-19 10:00:41 +01:00
Alexey Milovidov
49589da56e Fix comments 2024-11-18 07:18:46 +01:00
Antonio Andelic
0d875ecf5c
Always randomize in private 2024-11-04 12:56:14 +01:00
Antonio Andelic
6698212b5a Fix test 2024-10-31 13:39:41 +01:00
Antonio Andelic
c787838cb2 Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-10-31 12:01:31 +01:00
Antonio Andelic
eb020f1c4b Fix RemoveRecursive 2024-10-29 09:05:31 +01:00
Antonio Andelic
1a40df4d0c Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-10-28 12:07:38 +01:00
Antonio Andelic
4380c6035d Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-10-15 16:51:36 +02:00
Antonio Andelic
5145281088 Correct randomization 2024-10-15 16:51:32 +02:00
Antonio Andelic
35fa4c43e4 More fixes 2024-10-10 19:39:28 +02:00
robot-clickhouse
293e076493 Automatic style fix 2024-10-10 14:03:18 +00:00
Antonio Andelic
8b92603c6d Fix old version 2024-10-10 15:52:56 +02:00
Antonio Andelic
fb14f6e029 Fix MultiRead 2024-10-10 15:52:37 +02:00
robot-clickhouse
e1f37ec2bb Automatic style fix 2024-10-10 07:54:28 +00:00
Antonio Andelic
cc0ef6104f Fix MultiRead 2024-10-10 09:45:42 +02:00
robot-clickhouse
46ce65e66e Automatic style fix 2024-10-09 16:21:09 +00:00
Antonio Andelic
e048893b85 Randomize feature flags in integration test 2024-10-09 18:11:50 +02:00
Yakov Olkhovskiy
3827d90bb0 add test 2024-10-08 02:37:41 +00:00
Yakov Olkhovskiy
bf3a3ad607 fix ephemeral comment 2024-10-08 02:27:36 +00:00
77 changed files with 1021 additions and 280 deletions

View File

@ -16,16 +16,18 @@ ClickHouse works 100-1000x faster than traditional database management systems,
For more information and documentation see https://clickhouse.com/.
<!-- This is not related to the docker official library, remove it before commit to https://github.com/docker-library/docs -->
## Versions
- The `latest` tag points to the latest release of the latest stable branch.
- Branch tags like `22.2` point to the latest release of the corresponding branch.
- Full version tags like `22.2.3.5` point to the corresponding release.
- Full version tags like `22.2.3` and `22.2.3.5` point to the corresponding release.
<!-- docker-official-library:off -->
<!-- This is not related to the docker official library, remove it before commit to https://github.com/docker-library/docs -->
- The tag `head` is built from the latest commit to the default branch.
- Each tag has optional `-alpine` suffix to reflect that it's built on top of `alpine`.
<!-- REMOVE UNTIL HERE -->
<!-- docker-official-library:on -->
### Compatibility
- The amd64 image requires support for [SSE3 instructions](https://en.wikipedia.org/wiki/SSE3). Virtually all x86 CPUs after 2005 support SSE3.

View File

@ -10,16 +10,18 @@ ClickHouse works 100-1000x faster than traditional database management systems,
For more information and documentation see https://clickhouse.com/.
<!-- This is not related to the docker official library, remove it before commit to https://github.com/docker-library/docs -->
## Versions
- The `latest` tag points to the latest release of the latest stable branch.
- Branch tags like `22.2` point to the latest release of the corresponding branch.
- Full version tags like `22.2.3.5` point to the corresponding release.
- Full version tags like `22.2.3` and `22.2.3.5` point to the corresponding release.
<!-- docker-official-library:off -->
<!-- This is not related to the docker official library, remove it before commit to https://github.com/docker-library/docs -->
- The tag `head` is built from the latest commit to the default branch.
- Each tag has optional `-alpine` suffix to reflect that it's built on top of `alpine`.
<!-- REMOVE UNTIL HERE -->
<!-- docker-official-library:on -->
### Compatibility
- The amd64 image requires support for [SSE3 instructions](https://en.wikipedia.org/wiki/SSE3). Virtually all x86 CPUs after 2005 support SSE3.

View File

@ -522,4 +522,3 @@ sidebar_label: 2024
* Backported in [#68518](https://github.com/ClickHouse/ClickHouse/issues/68518): Minor update in Dynamic/JSON serializations. [#68459](https://github.com/ClickHouse/ClickHouse/pull/68459) ([Kruglov Pavel](https://github.com/Avogar)).
* Backported in [#68558](https://github.com/ClickHouse/ClickHouse/issues/68558): CI: Minor release workflow fix. [#68536](https://github.com/ClickHouse/ClickHouse/pull/68536) ([Max K.](https://github.com/maxknv)).
* Backported in [#68576](https://github.com/ClickHouse/ClickHouse/issues/68576): CI: Tidy build timeout from 2h to 3h. [#68567](https://github.com/ClickHouse/ClickHouse/pull/68567) ([Max K.](https://github.com/maxknv)).

View File

@ -497,4 +497,3 @@ sidebar_label: 2024
* Backported in [#69899](https://github.com/ClickHouse/ClickHouse/issues/69899): Revert "Merge pull request [#69032](https://github.com/ClickHouse/ClickHouse/issues/69032) from alexon1234/include_real_time_execution_in_http_header". [#69885](https://github.com/ClickHouse/ClickHouse/pull/69885) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Backported in [#69931](https://github.com/ClickHouse/ClickHouse/issues/69931): RIPE is an acronym and thus should be capital. RIPE stands for **R**ACE **I**ntegrity **P**rimitives **E**valuation and RACE stands for **R**esearch and Development in **A**dvanced **C**ommunications **T**echnologies in **E**urope. [#69901](https://github.com/ClickHouse/ClickHouse/pull/69901) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).
* Backported in [#70034](https://github.com/ClickHouse/ClickHouse/issues/70034): Revert "Add RIPEMD160 function". [#70005](https://github.com/ClickHouse/ClickHouse/pull/70005) ([Robert Schulze](https://github.com/rschu1ze)).

View File

@ -936,4 +936,4 @@ SELECT mapPartialReverseSort((k, v) -> v, 2, map('k1', 3, 'k2', 1, 'k3', 2));
┌─mapPartialReverseSort(lambda(tuple(k, v), v), 2, map('k1', 3, 'k2', 1, 'k3', 2))─┐
│ {'k1':3,'k3':2,'k2':1} │
└──────────────────────────────────────────────────────────────────────────────────┘
```
```

View File

@ -111,6 +111,9 @@ namespace ServerSetting
extern const ServerSettingsString uncompressed_cache_policy;
extern const ServerSettingsUInt64 uncompressed_cache_size;
extern const ServerSettingsDouble uncompressed_cache_size_ratio;
extern const ServerSettingsString primary_index_cache_policy;
extern const ServerSettingsUInt64 primary_index_cache_size;
extern const ServerSettingsDouble primary_index_cache_size_ratio;
extern const ServerSettingsBool use_legacy_mongodb_integration;
}
@ -779,6 +782,16 @@ void LocalServer::processConfig()
}
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);
String primary_index_cache_policy = server_settings[ServerSetting::primary_index_cache_policy];
size_t primary_index_cache_size = server_settings[ServerSetting::primary_index_cache_size];
double primary_index_cache_size_ratio = server_settings[ServerSetting::primary_index_cache_size_ratio];
if (primary_index_cache_size > max_cache_size)
{
primary_index_cache_size = max_cache_size;
LOG_INFO(log, "Lowered primary index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size));
}
global_context->setPrimaryIndexCache(primary_index_cache_policy, primary_index_cache_size, primary_index_cache_size_ratio);
size_t mmap_cache_size = server_settings[ServerSetting::mmap_cache_size];
if (mmap_cache_size > max_cache_size)
{

View File

@ -280,6 +280,9 @@ namespace ServerSetting
extern const ServerSettingsString uncompressed_cache_policy;
extern const ServerSettingsUInt64 uncompressed_cache_size;
extern const ServerSettingsDouble uncompressed_cache_size_ratio;
extern const ServerSettingsString primary_index_cache_policy;
extern const ServerSettingsUInt64 primary_index_cache_size;
extern const ServerSettingsDouble primary_index_cache_size_ratio;
extern const ServerSettingsBool use_legacy_mongodb_integration;
}
@ -1563,6 +1566,16 @@ try
}
global_context->setMarkCache(mark_cache_policy, mark_cache_size, mark_cache_size_ratio);
String primary_index_cache_policy = server_settings[ServerSetting::primary_index_cache_policy];
size_t primary_index_cache_size = server_settings[ServerSetting::primary_index_cache_size];
double primary_index_cache_size_ratio = server_settings[ServerSetting::primary_index_cache_size_ratio];
if (primary_index_cache_size > max_cache_size)
{
primary_index_cache_size = max_cache_size;
LOG_INFO(log, "Lowered primary index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size));
}
global_context->setPrimaryIndexCache(primary_index_cache_policy, primary_index_cache_size, primary_index_cache_size_ratio);
size_t page_cache_size = server_settings[ServerSetting::page_cache_size];
if (page_cache_size != 0)
global_context->setPageCache(
@ -1897,6 +1910,7 @@ try
global_context->updateUncompressedCacheConfiguration(*config);
global_context->updateMarkCacheConfiguration(*config);
global_context->updatePrimaryIndexCacheConfiguration(*config);
global_context->updateIndexUncompressedCacheConfiguration(*config);
global_context->updateIndexMarkCacheConfiguration(*config);
global_context->updateMMappedFileCacheConfiguration(*config);

View File

@ -476,7 +476,7 @@
<input id="edit" type="button" value="✎" style="display: none;">
<input id="add" type="button" value="Add chart" style="display: none;">
<input id="reload" type="button" value="Reload">
<span id="search-span" class="nowrap" style="display: none;"><input id="search" type="button" value="🔎" title="Run query to obtain list of charts from ClickHouse"><input id="search-query" name="search" type="text" spellcheck="false"></span>
<span id="search-span" class="nowrap" style="display: none;"><input id="search" type="button" value="🔎" title="Run query to obtain list of charts from ClickHouse. Either select dashboard name or write your own query"><input id="search-query" name="search" list="search-options" type="text" spellcheck="false"><datalist id="search-options"></datalist></span>
<div id="chart-params"></div>
</div>
</form>
@ -532,9 +532,15 @@ const errorMessages = [
}
]
/// Dashboard selector
const dashboardSearchQuery = (dashboard_name) => `SELECT title, query FROM system.dashboards WHERE dashboard = '${dashboard_name}'`;
let dashboard_queries = {
"Overview": dashboardSearchQuery("Overview"),
};
const default_dashboard = 'Overview';
/// Query to fill `queries` list for the dashboard
let search_query = `SELECT title, query FROM system.dashboards WHERE dashboard = 'Overview'`;
let search_query = dashboardSearchQuery(default_dashboard);
let customized = false;
let queries = [];
@ -1439,7 +1445,7 @@ async function reloadAll(do_search) {
try {
updateParams();
if (do_search) {
search_query = document.getElementById('search-query').value;
search_query = toSearchQuery(document.getElementById('search-query').value);
queries = [];
refreshCustomized(false);
}
@ -1504,7 +1510,7 @@ function updateFromState() {
document.getElementById('url').value = host;
document.getElementById('user').value = user;
document.getElementById('password').value = password;
document.getElementById('search-query').value = search_query;
document.getElementById('search-query').value = fromSearchQuery(search_query);
refreshCustomized();
}
@ -1543,6 +1549,44 @@ if (window.location.hash) {
} catch {}
}
function fromSearchQuery(query) {
for (const dashboard_name in dashboard_queries) {
if (query == dashboard_queries[dashboard_name])
return dashboard_name;
}
return query;
}
function toSearchQuery(value) {
if (value in dashboard_queries)
return dashboard_queries[value];
else
return value;
}
async function populateSearchOptions() {
let {reply, error} = await doFetch("SELECT dashboard FROM system.dashboards GROUP BY dashboard ORDER BY ALL");
if (error) {
throw new Error(error);
}
let data = reply.data;
if (data.dashboard.length == 0) {
console.log("Unable to fetch dashboards list");
return;
}
dashboard_queries = {};
for (let i = 0; i < data.dashboard.length; i++) {
const dashboard = data.dashboard[i];
dashboard_queries[dashboard] = dashboardSearchQuery(dashboard);
}
const searchOptions = document.getElementById('search-options');
for (const dashboard in dashboard_queries) {
const opt = document.createElement('option');
opt.value = dashboard;
searchOptions.appendChild(opt);
}
}
async function start() {
try {
updateFromState();
@ -1558,6 +1602,7 @@ async function start() {
} else {
drawAll();
}
await populateSearchOptions();
} catch (e) {
showError(e.message);
}

View File

@ -165,6 +165,8 @@ enum class AccessType : uint8_t
M(SYSTEM_DROP_CONNECTIONS_CACHE, "SYSTEM DROP CONNECTIONS CACHE, DROP CONNECTIONS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_PREWARM_MARK_CACHE, "SYSTEM PREWARM MARK, PREWARM MARK CACHE, PREWARM MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MARK_CACHE, "SYSTEM DROP MARK, DROP MARK CACHE, DROP MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_PREWARM_PRIMARY_INDEX_CACHE, "SYSTEM PREWARM PRIMARY INDEX, PREWARM PRIMARY INDEX CACHE, PREWARM PRIMARY INDEX", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_PRIMARY_INDEX_CACHE, "SYSTEM DROP PRIMARY INDEX, DROP PRIMARY INDEX CACHE, DROP PRIMARY INDEX", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_UNCOMPRESSED_CACHE, "SYSTEM DROP UNCOMPRESSED, DROP UNCOMPRESSED CACHE, DROP UNCOMPRESSED", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_QUERY_CACHE, "SYSTEM DROP QUERY, DROP QUERY CACHE, DROP QUERY", GLOBAL, SYSTEM_DROP_CACHE) \

View File

@ -528,7 +528,7 @@ QueryTreeNodePtr IdentifierResolver::tryResolveIdentifierFromCompoundExpression(
*
* Resolve strategy:
* 1. Try to bind identifier to scope argument name to node map.
* 2. If identifier is binded but expression context and node type are incompatible return nullptr.
* 2. If identifier is bound but expression context and node type are incompatible return nullptr.
*
* It is important to support edge cases, where we lookup for table or function node, but argument has same name.
* Example: WITH (x -> x + 1) AS func, (func -> func(1) + func) AS lambda SELECT lambda(1);

View File

@ -362,7 +362,7 @@ ReplxxLineReader::ReplxxLineReader(
if (highlighter)
rx.set_highlighter_callback(highlighter);
/// By default C-p/C-n binded to COMPLETE_NEXT/COMPLETE_PREV,
/// By default C-p/C-n bound to COMPLETE_NEXT/COMPLETE_PREV,
/// bind C-p/C-n to history-previous/history-next like readline.
rx.bind_key(Replxx::KEY::control('N'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::HISTORY_NEXT, code); });
rx.bind_key(Replxx::KEY::control('P'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::HISTORY_PREVIOUS, code); });
@ -384,9 +384,9 @@ ReplxxLineReader::ReplxxLineReader(
rx.bind_key(Replxx::KEY::control('J'), commit_action);
rx.bind_key(Replxx::KEY::ENTER, commit_action);
/// By default COMPLETE_NEXT/COMPLETE_PREV was binded to C-p/C-n, re-bind
/// By default COMPLETE_NEXT/COMPLETE_PREV was bound to C-p/C-n, re-bind
/// to M-P/M-N (that was used for HISTORY_COMMON_PREFIX_SEARCH before, but
/// it also binded to M-p/M-n).
/// it also bound to M-p/M-n).
rx.bind_key(Replxx::KEY::meta('N'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::COMPLETE_NEXT, code); });
rx.bind_key(Replxx::KEY::meta('P'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::COMPLETE_PREVIOUS, code); });
/// By default M-BACKSPACE is KILL_TO_WHITESPACE_ON_LEFT, while in readline it is backward-kill-word

View File

@ -65,6 +65,8 @@
M(DefaultImplementationForNullsRowsWithNulls, "Number of rows which contain null values processed by default implementation for nulls in function execution", ValueType::Number) \
M(MarkCacheHits, "Number of times an entry has been found in the mark cache, so we didn't have to load a mark file.", ValueType::Number) \
M(MarkCacheMisses, "Number of times an entry has not been found in the mark cache, so we had to load a mark file in memory, which is a costly operation, adding to query latency.", ValueType::Number) \
M(PrimaryIndexCacheHits, "Number of times an entry has been found in the primary index cache, so we didn't have to load a index file.", ValueType::Number) \
M(PrimaryIndexCacheMisses, "Number of times an entry has not been found in the primary index cache, so we had to load a index file in memory, which is a costly operation, adding to query latency.", ValueType::Number) \
M(QueryCacheHits, "Number of times a query result has been found in the query cache (and query computation was avoided). Only updated for SELECT queries with SETTING use_query_cache = 1.", ValueType::Number) \
M(QueryCacheMisses, "Number of times a query result has not been found in the query cache (and required query computation). Only updated for SELECT queries with SETTING use_query_cache = 1.", ValueType::Number) \
/* Each page cache chunk access increments exactly one of the following 5 PageCacheChunk* counters. */ \
@ -231,6 +233,9 @@
M(BackgroundLoadingMarksTasks, "Number of background tasks for loading marks", ValueType::Number) \
M(LoadedMarksCount, "Number of marks loaded (total across columns).", ValueType::Number) \
M(LoadedMarksMemoryBytes, "Size of in-memory representations of loaded marks.", ValueType::Bytes) \
M(LoadedPrimaryIndexFiles, "Number of primary index files loaded.", ValueType::Number) \
M(LoadedPrimaryIndexRows, "Number of rows of primary key loaded.", ValueType::Number) \
M(LoadedPrimaryIndexBytes, "Number of rows of primary key loaded.", ValueType::Bytes) \
\
M(Merge, "Number of launched background merges.", ValueType::Number) \
M(MergeSourceParts, "Number of source parts scheduled for merges.", ValueType::Number) \

View File

@ -341,7 +341,10 @@ Coordination::Error ZooKeeper::tryGetChildren(
const EventPtr & watch,
Coordination::ListRequestType list_request_type)
{
return tryGetChildrenWatch(path, res, stat,
return tryGetChildrenWatch(
path,
res,
stat,
watch ? std::make_shared<Coordination::WatchCallback>(callbackForEvent(watch)) : Coordination::WatchCallbackPtr{},
list_request_type);
}
@ -975,11 +978,14 @@ void ZooKeeper::removeRecursive(const std::string & path, uint32_t remove_nodes_
Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit)
{
if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
const auto fallback_method = [&]
{
tryRemoveChildrenRecursive(path);
return tryRemove(path);
}
};
if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
return fallback_method();
auto promise = std::make_shared<std::promise<Coordination::RemoveRecursiveResponse>>();
auto future = promise->get_future();
@ -998,6 +1004,10 @@ Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint
}
auto response = future.get();
if (response.error == Coordination::Error::ZNOTEMPTY) /// limit was too low, try without RemoveRecursive request
return fallback_method();
return response.error;
}

View File

@ -486,13 +486,13 @@ public:
/// Remove the node with the subtree.
/// If Keeper supports RemoveRecursive operation then it will be performed atomically.
/// Otherwise if someone concurrently adds or removes a node in the subtree, the result is undefined.
void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 100);
void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 1000);
/// Same as removeRecursive but in case if Keeper does not supports RemoveRecursive and
/// if someone concurrently removes a node in the subtree, this will not cause errors.
/// For instance, you can call this method twice concurrently for the same node and the end
/// result would be the same as for the single call.
Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 100);
Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 1000);
/// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself.
/// Node defined as RemoveException will not be deleted.

View File

@ -767,6 +767,11 @@ size_t ZooKeeperMultiRequest::sizeImpl() const
}
void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
{
readImpl(in, /*request_validator=*/{});
}
void ZooKeeperMultiRequest::readImpl(ReadBuffer & in, RequestValidator request_validator)
{
while (true)
{
@ -788,6 +793,8 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
ZooKeeperRequestPtr request = ZooKeeperRequestFactory::instance().get(op_num);
request->readImpl(in);
if (request_validator)
request_validator(*request);
requests.push_back(request);
if (in.eof())

View File

@ -570,6 +570,9 @@ struct ZooKeeperMultiRequest final : MultiRequest<ZooKeeperRequestPtr>, ZooKeepe
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
using RequestValidator = std::function<void(const ZooKeeperRequest &)>;
void readImpl(ReadBuffer & in, RequestValidator request_validator);
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;

View File

@ -514,7 +514,13 @@ void KeeperContext::initializeFeatureFlags(const Poco::Util::AbstractConfigurati
feature_flags.disableFeatureFlag(feature_flag.value());
}
if (feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ))
feature_flags.enableFeatureFlag(KeeperFeatureFlag::FILTERED_LIST);
else
system_nodes_with_data[keeper_api_version_path] = toString(static_cast<uint8_t>(KeeperApiVersion::ZOOKEEPER_COMPATIBLE));
system_nodes_with_data[keeper_api_feature_flags_path] = feature_flags.getFeatureFlags();
}
feature_flags.logFlags(getLogger("KeeperContext"));
@ -569,6 +575,25 @@ const CoordinationSettings & KeeperContext::getCoordinationSettings() const
return *coordination_settings;
}
bool KeeperContext::isOperationSupported(Coordination::OpNum operation) const
{
switch (operation)
{
case Coordination::OpNum::FilteredList:
return feature_flags.isEnabled(KeeperFeatureFlag::FILTERED_LIST);
case Coordination::OpNum::MultiRead:
return feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ);
case Coordination::OpNum::CreateIfNotExists:
return feature_flags.isEnabled(KeeperFeatureFlag::CREATE_IF_NOT_EXISTS);
case Coordination::OpNum::CheckNotExists:
return feature_flags.isEnabled(KeeperFeatureFlag::CHECK_NOT_EXISTS);
case Coordination::OpNum::RemoveRecursive:
return feature_flags.isEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE);
default:
return true;
}
}
uint64_t KeeperContext::lastCommittedIndex() const
{
return last_committed_log_idx.load(std::memory_order_relaxed);

View File

@ -1,6 +1,7 @@
#pragma once
#include <Coordination/KeeperFeatureFlags.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <atomic>
#include <condition_variable>
#include <cstdint>
@ -103,6 +104,7 @@ public:
return precommit_sleep_probability_for_testing;
}
bool isOperationSupported(Coordination::OpNum operation) const;
private:
/// local disk defined using path or disk name
using Storage = std::variant<DiskPtr, std::string>;

View File

@ -104,6 +104,10 @@ namespace DB
DECLARE(UInt64, mark_cache_size, DEFAULT_MARK_CACHE_MAX_SIZE, "Size of cache for marks (index of MergeTree family of tables).", 0) \
DECLARE(Double, mark_cache_size_ratio, DEFAULT_MARK_CACHE_SIZE_RATIO, "The size of the protected queue in the mark cache relative to the cache's total size.", 0) \
DECLARE(Double, mark_cache_prewarm_ratio, 0.95, "The ratio of total size of mark cache to fill during prewarm.", 0) \
DECLARE(String, primary_index_cache_policy, DEFAULT_MARK_CACHE_POLICY, "Primary index cache policy name.", 0) \
DECLARE(UInt64, primary_index_cache_size, DEFAULT_MARK_CACHE_MAX_SIZE, "Size of cache for primary index (index of MergeTree family of tables).", 0) \
DECLARE(Double, primary_index_cache_size_ratio, DEFAULT_MARK_CACHE_SIZE_RATIO, "The size of the protected queue in the primary index cache relative to the cache's total size.", 0) \
DECLARE(Double, primary_index_cache_prewarm_ratio, 0.95, "The ratio of total size of mark cache to fill during prewarm.", 0) \
DECLARE(String, index_uncompressed_cache_policy, DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY, "Secondary index uncompressed cache policy name.", 0) \
DECLARE(UInt64, index_uncompressed_cache_size, DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks of secondary indices. Zero means disabled.", 0) \
DECLARE(Double, index_uncompressed_cache_size_ratio, DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO, "The size of the protected queue in the secondary index uncompressed cache relative to the cache's total size.", 0) \

View File

@ -34,6 +34,7 @@
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/PrimaryIndexCache.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/CompressionCodecSelector.h>
#include <IO/S3Settings.h>
@ -408,6 +409,7 @@ struct ContextSharedPart : boost::noncopyable
mutable ResourceManagerPtr resource_manager;
mutable UncompressedCachePtr uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files.
mutable PrimaryIndexCachePtr primary_index_cache TSA_GUARDED_BY(mutex);
mutable OnceFlag load_marks_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable OnceFlag prefetch_threadpool_initialized;
@ -3254,6 +3256,41 @@ ThreadPool & Context::getLoadMarksThreadpool() const
return *shared->load_marks_threadpool;
}
void Context::setPrimaryIndexCache(const String & cache_policy, size_t max_cache_size_in_bytes, double size_ratio)
{
std::lock_guard lock(shared->mutex);
if (shared->primary_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Primary index cache has been already created.");
shared->primary_index_cache = std::make_shared<PrimaryIndexCache>(cache_policy, max_cache_size_in_bytes, size_ratio);
}
void Context::updatePrimaryIndexCacheConfiguration(const Poco::Util::AbstractConfiguration & config)
{
std::lock_guard lock(shared->mutex);
if (!shared->primary_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mark cache was not created yet.");
size_t max_size_in_bytes = config.getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_MAX_SIZE);
shared->primary_index_cache->setMaxSizeInBytes(max_size_in_bytes);
}
PrimaryIndexCachePtr Context::getPrimaryIndexCache() const
{
SharedLockGuard lock(shared->mutex);
return shared->primary_index_cache;
}
void Context::clearPrimaryIndexCache() const
{
std::lock_guard lock(shared->mutex);
if (shared->primary_index_cache)
shared->primary_index_cache->clear();
}
void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio)
{
std::lock_guard lock(shared->mutex);
@ -3409,6 +3446,10 @@ void Context::clearCaches() const
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mark cache was not created yet.");
shared->mark_cache->clear();
if (!shared->primary_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Primary index cache was not created yet.");
shared->primary_index_cache->clear();
if (!shared->index_uncompressed_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index uncompressed cache was not created yet.");
shared->index_uncompressed_cache->clear();

View File

@ -22,6 +22,7 @@
#include <Server/HTTP/HTTPContext.h>
#include <Storages/IStorage_fwd.h>
#include "Storages/MergeTree/PrimaryIndexCache.h"
#include "config.h"
#include <functional>
@ -1076,6 +1077,11 @@ public:
void clearMarkCache() const;
ThreadPool & getLoadMarksThreadpool() const;
void setPrimaryIndexCache(const String & cache_policy, size_t max_cache_size_in_bytes, double size_ratio);
void updatePrimaryIndexCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<PrimaryIndexCache> getPrimaryIndexCache() const;
void clearPrimaryIndexCache() const;
void setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio);
void updateIndexUncompressedCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<UncompressedCache> getIndexUncompressedCache() const;

View File

@ -371,10 +371,19 @@ BlockIO InterpreterSystemQuery::execute()
prewarmMarkCache();
break;
}
case Type::PREWARM_PRIMARY_INDEX_CACHE:
{
prewarmPrimaryIndexCache();
break;
}
case Type::DROP_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->clearMarkCache();
break;
case Type::DROP_PRIMARY_INDEX_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_PRIMARY_INDEX_CACHE);
system_context->clearPrimaryIndexCache();
break;
case Type::DROP_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->clearUncompressedCache();
@ -1319,17 +1328,45 @@ void InterpreterSystemQuery::prewarmMarkCache()
auto table_ptr = DatabaseCatalog::instance().getTable(table_id, getContext());
auto * merge_tree = dynamic_cast<MergeTreeData *>(table_ptr.get());
if (!merge_tree)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Command PREWARM MARK CACHE is supported only for MergeTree table, but got: {}", table_ptr->getName());
auto mark_cache = getContext()->getMarkCache();
if (!mark_cache)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Mark cache is not configured");
ThreadPool pool(
CurrentMetrics::MergeTreePartsLoaderThreads,
CurrentMetrics::MergeTreePartsLoaderThreadsActive,
CurrentMetrics::MergeTreePartsLoaderThreadsScheduled,
getContext()->getSettingsRef()[Setting::max_threads]);
merge_tree->prewarmMarkCache(pool);
merge_tree->prewarmCaches(pool, std::move(mark_cache), nullptr);
}
void InterpreterSystemQuery::prewarmPrimaryIndexCache()
{
if (table_id.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table is not specified for PREWARM PRIMARY INDEX CACHE command");
getContext()->checkAccess(AccessType::SYSTEM_PREWARM_PRIMARY_INDEX_CACHE, table_id);
auto table_ptr = DatabaseCatalog::instance().getTable(table_id, getContext());
auto * merge_tree = dynamic_cast<MergeTreeData *>(table_ptr.get());
if (!merge_tree)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Command PREWARM PRIMARY INDEX CACHE is supported only for MergeTree table, but got: {}", table_ptr->getName());
auto index_cache = merge_tree->getPrimaryIndexCache();
if (!index_cache)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Primary index cache is not configured or is not enabled for table {}", table_id.getFullTableName());
ThreadPool pool(
CurrentMetrics::MergeTreePartsLoaderThreads,
CurrentMetrics::MergeTreePartsLoaderThreadsActive,
CurrentMetrics::MergeTreePartsLoaderThreadsScheduled,
getContext()->getSettingsRef()[Setting::max_threads]);
merge_tree->prewarmCaches(pool, nullptr, std::move(index_cache));
}
@ -1351,6 +1388,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_DNS_CACHE:
case Type::DROP_CONNECTIONS_CACHE:
case Type::DROP_MARK_CACHE:
case Type::DROP_PRIMARY_INDEX_CACHE:
case Type::DROP_MMAP_CACHE:
case Type::DROP_QUERY_CACHE:
case Type::DROP_COMPILED_EXPRESSION_CACHE:
@ -1538,6 +1576,11 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_PREWARM_MARK_CACHE, query.getDatabase(), query.getTable());
break;
}
case Type::PREWARM_PRIMARY_INDEX_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_PREWARM_MARK_CACHE, query.getDatabase(), query.getTable());
break;
}
case Type::SYNC_DATABASE_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_SYNC_DATABASE_REPLICA, query.getDatabase());

View File

@ -6,6 +6,8 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/StorageID.h>
#include <Common/ActionLock.h>
#include "Storages/MarkCache.h"
#include "Storages/MergeTree/PrimaryIndexCache.h"
#include <Disks/IVolume.h>
@ -82,7 +84,9 @@ private:
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
void startStopAction(StorageActionBlockType action_type, bool start);
void prewarmMarkCache();
void prewarmPrimaryIndexCache();
void stopReplicatedDDLQueries();
void startReplicatedDDLQueries();

View File

@ -83,6 +83,12 @@ void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint curr
new_values["MarkCacheFiles"] = { mark_cache->count(), "Total number of mark files cached in the mark cache" };
}
if (auto primary_index_cache = getContext()->getPrimaryIndexCache())
{
new_values["PrimaryIndexCacheBytes"] = { primary_index_cache->sizeInBytes(), "Total size of primary index cache in bytes" };
new_values["PrimaryIndexCacheFiles"] = { primary_index_cache->count(), "Total number of index files cached in the primary index cache" };
}
if (auto page_cache = getContext()->getPageCache())
{
auto rss = page_cache->getResidentSetSize();

View File

@ -192,6 +192,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::WAIT_LOADING_PARTS:
case Type::FLUSH_DISTRIBUTED:
case Type::PREWARM_MARK_CACHE:
case Type::PREWARM_PRIMARY_INDEX_CACHE:
{
if (table)
{
@ -408,6 +409,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::DROP_MMAP_CACHE:
case Type::DROP_QUERY_CACHE:
case Type::DROP_MARK_CACHE:
case Type::DROP_PRIMARY_INDEX_CACHE:
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_UNCOMPRESSED_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:

View File

@ -24,7 +24,9 @@ public:
DROP_DNS_CACHE,
DROP_CONNECTIONS_CACHE,
PREWARM_MARK_CACHE,
PREWARM_PRIMARY_INDEX_CACHE,
DROP_MARK_CACHE,
DROP_PRIMARY_INDEX_CACHE,
DROP_UNCOMPRESSED_CACHE,
DROP_INDEX_MARK_CACHE,
DROP_INDEX_UNCOMPRESSED_CACHE,

View File

@ -237,6 +237,7 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
null_modifier.emplace(true);
}
bool is_comment = false;
/// Collate is also allowed after NULL/NOT NULL
if (!collation_expression && s_collate.ignore(pos, expected)
&& !collation_parser.parse(pos, collation_expression, expected))
@ -254,7 +255,9 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
else if (s_ephemeral.ignore(pos, expected))
{
default_specifier = s_ephemeral.getName();
if (!expr_parser.parse(pos, default_expression, expected) && type)
if (s_comment.ignore(pos, expected))
is_comment = true;
if ((is_comment || !expr_parser.parse(pos, default_expression, expected)) && type)
{
ephemeral_default = true;
@ -289,19 +292,22 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
if (require_type && !type && !default_expression)
return false; /// reject column name without type
if ((type || default_expression) && allow_null_modifiers && !null_modifier.has_value())
if (!is_comment)
{
if (s_not.ignore(pos, expected))
if ((type || default_expression) && allow_null_modifiers && !null_modifier.has_value())
{
if (!s_null.ignore(pos, expected))
return false;
null_modifier.emplace(false);
if (s_not.ignore(pos, expected))
{
if (!s_null.ignore(pos, expected))
return false;
null_modifier.emplace(false);
}
else if (s_null.ignore(pos, expected))
null_modifier.emplace(true);
}
else if (s_null.ignore(pos, expected))
null_modifier.emplace(true);
}
if (s_comment.ignore(pos, expected))
if (is_comment || s_comment.ignore(pos, expected))
{
/// should be followed by a string literal
if (!string_literal_parser.parse(pos, comment_expression, expected))

View File

@ -277,6 +277,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::SYNC_REPLICA:
case Type::WAIT_LOADING_PARTS:
case Type::PREWARM_MARK_CACHE:
case Type::PREWARM_PRIMARY_INDEX_CACHE:
{
if (!parseQueryWithOnCluster(res, pos, expected))
return false;

View File

@ -205,7 +205,7 @@ public:
}
private:
const RangesInDataParts & parts;
std::vector<IMergeTreeDataPart::Index> indices;
std::vector<IMergeTreeDataPart::IndexPtr> indices;
size_t loaded_columns = std::numeric_limits<size_t>::max();
};

View File

@ -1,5 +1,4 @@
#include <Server/KeeperTCPHandler.h>
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#if USE_NURAFT
@ -19,6 +18,8 @@
# include <Common/NetException.h>
# include <Common/PipeFDs.h>
# include <Common/Stopwatch.h>
# include <Common/ZooKeeper/ZooKeeperCommon.h>
# include <Common/ZooKeeper/ZooKeeperConstants.h>
# include <Common/ZooKeeper/ZooKeeperIO.h>
# include <Common/logger_useful.h>
# include <Common/setThreadName.h>
@ -63,6 +64,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
extern const int TIMEOUT_EXCEEDED;
extern const int BAD_ARGUMENTS;
}
struct PollResult
@ -637,7 +639,23 @@ std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveReque
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request->xid = xid;
request->readImpl(read_buffer);
auto request_validator = [&](const Coordination::ZooKeeperRequest & current_request)
{
if (!keeper_dispatcher->getKeeperContext()->isOperationSupported(current_request.getOpNum()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported operation: {}", current_request.getOpNum());
};
if (auto * multi_request = dynamic_cast<Coordination::ZooKeeperMultiRequest *>(request.get()))
{
multi_request->readImpl(read_buffer, request_validator);
}
else
{
request->readImpl(read_buffer);
request_validator(*request);
}
if (!keeper_dispatcher->putRequest(request, session_id, use_xid_64))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);

View File

@ -3,6 +3,7 @@
#include <base/types.h>
#include <exception>
#include <memory>
#include <optional>
#include <string_view>
#include <Compression/CompressedReadBuffer.h>
@ -58,6 +59,13 @@ namespace CurrentMetrics
extern const Metric PartsCompact;
}
namespace ProfileEvents
{
extern const Event LoadedPrimaryIndexFiles;
extern const Event LoadedPrimaryIndexRows;
extern const Event LoadedPrimaryIndexBytes;
}
namespace DB
{
@ -352,7 +360,6 @@ IMergeTreeDataPart::IMergeTreeDataPart(
incrementStateMetric(state);
incrementTypeMetric(part_type);
index = std::make_shared<Columns>();
minmax_idx = std::make_shared<MinMaxIndex>();
initializeIndexGranularityInfo();
@ -365,46 +372,61 @@ IMergeTreeDataPart::~IMergeTreeDataPart()
decrementTypeMetric(part_type);
}
IMergeTreeDataPart::Index IMergeTreeDataPart::getIndex() const
IMergeTreeDataPart::IndexPtr IMergeTreeDataPart::getIndex() const
{
std::scoped_lock lock(index_mutex);
if (!index_loaded)
loadIndex();
index_loaded = true;
if (index)
return index;
if (auto index_cache = storage.getPrimaryIndexCache())
return loadIndexToCache(*index_cache);
index = loadIndex();
return index;
}
void IMergeTreeDataPart::setIndex(const Columns & cols_)
IMergeTreeDataPart::IndexPtr IMergeTreeDataPart::loadIndexToCache(PrimaryIndexCache & index_cache) const
{
std::scoped_lock lock(index_mutex);
if (!index->empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "The index of data part can be set only once");
index = std::make_shared<const Columns>(cols_);
index_loaded = true;
auto key = PrimaryIndexCache::hash(getDataPartStorage().getFullPath());
auto callback = [this] { return loadIndex(); };
return index_cache.getOrSet(key, callback);
}
void IMergeTreeDataPart::setIndex(Columns && cols_)
void IMergeTreeDataPart::moveIndexToCache(PrimaryIndexCache & index_cache)
{
std::scoped_lock lock(index_mutex);
if (!index->empty())
if (!index)
return;
auto key = PrimaryIndexCache::hash(getDataPartStorage().getFullPath());
index_cache.set(key, std::const_pointer_cast<Index>(index));
index.reset();
for (const auto & [_, projection] : projection_parts)
projection->moveIndexToCache(index_cache);
}
void IMergeTreeDataPart::setIndex(Columns index_columns)
{
std::scoped_lock lock(index_mutex);
if (index)
throw Exception(ErrorCodes::LOGICAL_ERROR, "The index of data part can be set only once");
index = std::make_shared<const Columns>(std::move(cols_));
index_loaded = true;
optimizeIndexColumns(index_granularity->getMarksCount(), index_columns);
index = std::make_shared<Index>(std::move(index_columns));
}
void IMergeTreeDataPart::unloadIndex()
{
std::scoped_lock lock(index_mutex);
index = std::make_shared<Columns>();
index_loaded = false;
index.reset();
}
bool IMergeTreeDataPart::isIndexLoaded() const
{
std::scoped_lock lock(index_mutex);
return index_loaded;
return index != nullptr;
}
void IMergeTreeDataPart::setName(const String & new_name)
@ -615,8 +637,11 @@ void IMergeTreeDataPart::removeIfNeeded() noexcept
UInt64 IMergeTreeDataPart::getIndexSizeInBytes() const
{
std::scoped_lock lock(index_mutex);
if (!index)
return 0;
UInt64 res = 0;
for (const ColumnPtr & column : *index)
for (const auto & column : *index)
res += column->byteSize();
return res;
}
@ -624,8 +649,11 @@ UInt64 IMergeTreeDataPart::getIndexSizeInBytes() const
UInt64 IMergeTreeDataPart::getIndexSizeInAllocatedBytes() const
{
std::scoped_lock lock(index_mutex);
if (!index)
return 0;
UInt64 res = 0;
for (const ColumnPtr & column : *index)
for (const auto & column : *index)
res += column->allocatedBytes();
return res;
}
@ -755,7 +783,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
loadIndexGranularity();
if (!(*storage.getSettings())[MergeTreeSetting::primary_key_lazy_load])
getIndex();
index = loadIndex();
calculateColumnsAndSecondaryIndicesSizesOnDisk();
loadRowsCount(); /// Must be called after loadIndexGranularity() as it uses the value of `index_granularity`.
@ -929,7 +957,38 @@ void IMergeTreeDataPart::appendFilesOfIndexGranularity(Strings & /* files */) co
{
}
void IMergeTreeDataPart::loadIndex() const
template <typename Columns>
void IMergeTreeDataPart::optimizeIndexColumns(size_t marks_count, Columns & index_columns) const
{
size_t key_size = index_columns.size();
Float64 ratio_to_drop_suffix_columns = (*storage.getSettings())[MergeTreeSetting::primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns];
/// Cut useless suffix columns, if necessary.
if (key_size > 1 && ratio_to_drop_suffix_columns > 0 && ratio_to_drop_suffix_columns < 1)
{
chassert(marks_count > 0);
for (size_t j = 0; j < key_size - 1; ++j)
{
size_t num_changes = 0;
for (size_t i = 1; i < marks_count; ++i)
{
if (0 != index_columns[j]->compareAt(i, i - 1, *index_columns[j], 0))
++num_changes;
}
if (static_cast<Float64>(num_changes) / marks_count >= ratio_to_drop_suffix_columns)
{
key_size = j + 1;
index_columns.resize(key_size);
break;
}
}
LOG_TEST(storage.log, "Loaded primary key index for part {}, {} columns are kept in memory", name, key_size);
}
}
std::shared_ptr<IMergeTreeDataPart::Index> IMergeTreeDataPart::loadIndex() const
{
/// Memory for index must not be accounted as memory usage for query, because it belongs to a table.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
@ -937,70 +996,59 @@ void IMergeTreeDataPart::loadIndex() const
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
if (parent_part)
metadata_snapshot = metadata_snapshot->projections.get(name).metadata;
const auto & primary_key = metadata_snapshot->getPrimaryKey();
size_t key_size = primary_key.column_names.size();
if (key_size)
if (!key_size)
return std::make_shared<Index>();
MutableColumns loaded_index;
loaded_index.resize(key_size);
for (size_t i = 0; i < key_size; ++i)
{
MutableColumns loaded_index;
loaded_index.resize(key_size);
for (size_t i = 0; i < key_size; ++i)
{
loaded_index[i] = primary_key.data_types[i]->createColumn();
loaded_index[i]->reserve(index_granularity->getMarksCount());
}
String index_name = "primary" + getIndexExtensionFromFilesystem(getDataPartStorage());
String index_path = fs::path(getDataPartStorage().getRelativePath()) / index_name;
auto index_file = metadata_manager->read(index_name);
size_t marks_count = index_granularity->getMarksCount();
Serializations key_serializations(key_size);
for (size_t j = 0; j < key_size; ++j)
key_serializations[j] = primary_key.data_types[j]->getDefaultSerialization();
for (size_t i = 0; i < marks_count; ++i)
for (size_t j = 0; j < key_size; ++j)
key_serializations[j]->deserializeBinary(*loaded_index[j], *index_file, {});
/// Cut useless suffix columns, if necessary.
Float64 ratio_to_drop_suffix_columns = (*storage.getSettings())[MergeTreeSetting::primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns];
if (key_size > 1 && ratio_to_drop_suffix_columns > 0 && ratio_to_drop_suffix_columns < 1)
{
chassert(marks_count > 0);
for (size_t j = 0; j < key_size - 1; ++j)
{
size_t num_changes = 0;
for (size_t i = 1; i < marks_count; ++i)
if (0 != loaded_index[j]->compareAt(i, i - 1, *loaded_index[j], 0))
++num_changes;
if (static_cast<Float64>(num_changes) / marks_count >= ratio_to_drop_suffix_columns)
{
key_size = j + 1;
loaded_index.resize(key_size);
break;
}
}
}
for (size_t i = 0; i < key_size; ++i)
{
loaded_index[i]->shrinkToFit();
loaded_index[i]->protect();
if (loaded_index[i]->size() != marks_count)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data from index file {}(expected size: "
"{}, read: {})", index_path, marks_count, loaded_index[i]->size());
}
LOG_TEST(storage.log, "Loaded primary key index for part {}, {} columns are kept in memory", name, key_size);
if (!index_file->eof())
throw Exception(ErrorCodes::EXPECTED_END_OF_FILE, "Index file {} is unexpectedly long", index_path);
index = std::make_shared<Columns>(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
loaded_index[i] = primary_key.data_types[i]->createColumn();
loaded_index[i]->reserve(index_granularity->getMarksCount());
}
String index_name = "primary" + getIndexExtensionFromFilesystem(getDataPartStorage());
String index_path = fs::path(getDataPartStorage().getRelativePath()) / index_name;
auto index_file = metadata_manager->read(index_name);
size_t marks_count = index_granularity->getMarksCount();
Serializations key_serializations(key_size);
for (size_t j = 0; j < key_size; ++j)
key_serializations[j] = primary_key.data_types[j]->getDefaultSerialization();
for (size_t i = 0; i < marks_count; ++i)
{
for (size_t j = 0; j < key_size; ++j)
key_serializations[j]->deserializeBinary(*loaded_index[j], *index_file, {});
}
optimizeIndexColumns(marks_count, loaded_index);
size_t total_bytes = 0;
for (const auto & column : loaded_index)
{
column->shrinkToFit();
column->protect();
total_bytes += column->byteSize();
if (column->size() != marks_count)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data from index file {}(expected size: "
"{}, read: {})", index_path, marks_count, column->size());
}
if (!index_file->eof())
throw Exception(ErrorCodes::EXPECTED_END_OF_FILE, "Index file {} is unexpectedly long", index_path);
ProfileEvents::increment(ProfileEvents::LoadedPrimaryIndexFiles);
ProfileEvents::increment(ProfileEvents::LoadedPrimaryIndexRows, marks_count);
ProfileEvents::increment(ProfileEvents::LoadedPrimaryIndexBytes, total_bytes);
return std::make_shared<Index>(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
}
void IMergeTreeDataPart::appendFilesOfIndex(Strings & files) const

View File

@ -25,6 +25,7 @@
#include <Interpreters/TransactionVersionMetadata.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <Storages/MergeTree/IPartMetadataManager.h>
#include "Storages/MergeTree/PrimaryIndexCache.h"
namespace zkutil
@ -77,7 +78,8 @@ public:
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
using NameToNumber = std::unordered_map<std::string, size_t>;
using Index = std::shared_ptr<const Columns>;
using Index = Columns;
using IndexPtr = std::shared_ptr<const Index>;
using IndexSizeByName = std::unordered_map<std::string, ColumnSize>;
using Type = MergeTreeDataPartType;
@ -371,9 +373,11 @@ public:
/// Version of part metadata (columns, pk and so on). Managed properly only for replicated merge tree.
int32_t metadata_version;
Index getIndex() const;
void setIndex(const Columns & cols_);
void setIndex(Columns && cols_);
IndexPtr getIndex() const;
IndexPtr loadIndexToCache(PrimaryIndexCache & index_cache) const;
void moveIndexToCache(PrimaryIndexCache & index_cache);
void setIndex(Columns index_columns);
void unloadIndex();
bool isIndexLoaded() const;
@ -601,8 +605,7 @@ protected:
/// Lazily loaded in RAM. Contains each index_granularity-th value of primary key tuple.
/// Note that marks (also correspond to primary key) are not always in RAM, but cached. See MarkCache.h.
mutable std::mutex index_mutex;
mutable Index index TSA_GUARDED_BY(index_mutex);
mutable bool index_loaded TSA_GUARDED_BY(index_mutex) = false;
mutable IndexPtr index;
/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk
ColumnSize total_columns_size;
@ -697,7 +700,11 @@ private:
virtual void appendFilesOfIndexGranularity(Strings & files) const;
/// Loads the index file.
void loadIndex() const TSA_REQUIRES(index_mutex);
std::shared_ptr<Index> loadIndex() const;
/// Optimize index. Drop useless columns from suffix of primary key.
template <typename Columns>
void optimizeIndexColumns(size_t marks_count, Columns & index_columns) const;
void appendFilesOfIndex(Strings & files) const;

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
@ -72,8 +73,11 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
{
}
Columns IMergeTreeDataPartWriter::releaseIndexColumns()
std::optional<Columns> IMergeTreeDataPartWriter::releaseIndexColumns()
{
if (!settings.save_primary_index_in_memory)
return {};
/// The memory for index was allocated without thread memory tracker.
/// We need to deallocate it in shrinkToFit without memory tracker as well.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;

View File

@ -49,7 +49,7 @@ public:
virtual size_t getNumberOfOpenStreams() const = 0;
Columns releaseIndexColumns();
std::optional<Columns> releaseIndexColumns();
PlainMarksByName releaseCachedMarks();

View File

@ -1,5 +1,6 @@
#pragma once
#include <memory>
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>

View File

@ -445,8 +445,13 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
finish_callback = [storage_ptr = &storage]() { storage_ptr->merge_selecting_task->schedule(); };
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
addMarksToCache(*part, cached_marks, mark_cache);
if (auto mark_cache = storage.getMarkCacheToPrewarm())
addMarksToCache(*part, cached_marks, mark_cache.get());
/// Move index to cache and reset it here because we need
/// a correct part name after rename for a key of cache entry.
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm())
part->moveIndexToCache(*index_cache);
write_part_log({});
StorageReplicatedMergeTree::incrementMergedPartsProfileEvent(part->getType());

View File

@ -152,10 +152,17 @@ void MergePlainMergeTreeTask::finish()
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
if (auto mark_cache = storage.getMarkCacheToPrewarm())
{
auto marks = merge_task->releaseCachedMarks();
addMarksToCache(*new_part, marks, mark_cache);
addMarksToCache(*new_part, marks, mark_cache.get());
}
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm())
{
/// Move index to cache and reset it here because we need
/// a correct part name after rename for a key of cache entry.
new_part->moveIndexToCache(*index_cache);
}
write_part_log({});

View File

@ -569,8 +569,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
global_ctx->new_data_part->index_granularity_info,
ctx->blocks_are_granules_size);
bool save_marks_in_cache = (*storage_settings)[MergeTreeSetting::prewarm_mark_cache] && global_ctx->context->getMarkCache();
global_ctx->to = std::make_shared<MergedBlockOutputStream>(
global_ctx->new_data_part,
global_ctx->metadata_snapshot,
@ -581,7 +579,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
std::move(index_granularity_ptr),
global_ctx->txn ? global_ctx->txn->tid : Tx::PrehistoricTID,
/*reset_columns=*/ true,
save_marks_in_cache,
ctx->blocks_are_granules_size,
global_ctx->context->getWriteSettings());
@ -1120,8 +1117,6 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
ctx->executor = std::make_unique<PullingPipelineExecutor>(ctx->column_parts_pipeline);
NamesAndTypesList columns_list = {*ctx->it_name_and_type};
bool save_marks_in_cache = (*global_ctx->data->getSettings())[MergeTreeSetting::prewarm_mark_cache] && global_ctx->context->getMarkCache();
ctx->column_to = std::make_unique<MergedColumnOnlyOutputStream>(
global_ctx->new_data_part,
global_ctx->metadata_snapshot,
@ -1130,8 +1125,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot),
ctx->compression_codec,
global_ctx->to->getIndexGranularity(),
&global_ctx->written_offset_columns,
save_marks_in_cache);
&global_ctx->written_offset_columns);
ctx->column_elems_written = 0;
}

View File

@ -6,6 +6,8 @@
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/filesystemHelpers.h>
#include "Storages/MergeTree/IMergeTreeDataPart.h"
#include "Storages/MergeTree/PrimaryIndexCache.h"
#include <Formats/MarkInCompressedFile.h>
#include <Compression/CompressedReadBuffer.h>
@ -221,7 +223,9 @@ private:
std::promise<MergeTreeData::MutableDataPartPtr> promise{};
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns{};
PlainMarksByName cached_marks;
std::unique_ptr<Columns> cached_index;
MergeTreeTransactionPtr txn;
bool need_prefix;

View File

@ -233,12 +233,16 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsString storage_policy;
extern const MergeTreeSettingsFloat zero_copy_concurrent_part_removal_max_postpone_ratio;
extern const MergeTreeSettingsUInt64 zero_copy_concurrent_part_removal_max_split_times;
extern const MergeTreeSettingsBool use_primary_index_cache;
extern const MergeTreeSettingsBool prewarm_primary_index_cache;
extern const MergeTreeSettingsBool prewarm_mark_cache;
extern const MergeTreeSettingsBool primary_key_lazy_load;
}
namespace ServerSetting
{
extern const ServerSettingsDouble mark_cache_prewarm_ratio;
extern const ServerSettingsDouble primary_index_cache_prewarm_ratio;
}
namespace ErrorCodes
@ -2350,32 +2354,44 @@ void MergeTreeData::stopOutdatedAndUnexpectedDataPartsLoadingTask()
}
}
void MergeTreeData::prewarmMarkCacheIfNeeded(ThreadPool & pool)
PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCache() const
{
if (!(*getSettings())[MergeTreeSetting::prewarm_mark_cache])
return;
bool use_primary_index_cache = (*getSettings())[MergeTreeSetting::use_primary_index_cache];
bool primary_key_lazy_load = (*getSettings())[MergeTreeSetting::primary_key_lazy_load];
prewarmMarkCache(pool);
if (!use_primary_index_cache || !primary_key_lazy_load)
return nullptr;
return getContext()->getPrimaryIndexCache();
}
void MergeTreeData::prewarmMarkCache(ThreadPool & pool)
PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCacheToPrewarm() const
{
auto * mark_cache = getContext()->getMarkCache().get();
if (!mark_cache)
return;
if (!(*getSettings())[MergeTreeSetting::prewarm_primary_index_cache])
return nullptr;
auto metadata_snaphost = getInMemoryMetadataPtr();
auto column_names = getColumnsToPrewarmMarks(*getSettings(), metadata_snaphost->getColumns().getAllPhysical());
return getPrimaryIndexCache();
}
if (column_names.empty())
MarkCachePtr MergeTreeData::getMarkCacheToPrewarm() const
{
if (!(*getSettings())[MergeTreeSetting::prewarm_mark_cache])
return nullptr;
return getContext()->getMarkCache();
}
void MergeTreeData::prewarmCaches(ThreadPool & pool, MarkCachePtr mark_cache, PrimaryIndexCachePtr index_cache)
{
if (!mark_cache && !index_cache)
return;
Stopwatch watch;
LOG_TRACE(log, "Prewarming mark cache");
LOG_TRACE(log, "Prewarming mark and/or primary index caches");
auto data_parts = getDataPartsVectorForInternalUsage();
/// Prewarm mark cache firstly for the most fresh parts according
/// Prewarm caches firstly for the most fresh parts according
/// to time columns in partition key (if exists) and by modification time.
auto to_tuple = [](const auto & part)
@ -2388,20 +2404,41 @@ void MergeTreeData::prewarmMarkCache(ThreadPool & pool)
return to_tuple(lhs) > to_tuple(rhs);
});
ThreadPoolCallbackRunnerLocal<void> runner(pool, "PrewarmMarks");
double ratio_to_prewarm = getContext()->getServerSettings()[ServerSetting::mark_cache_prewarm_ratio];
ThreadPoolCallbackRunnerLocal<void> runner(pool, "PrewarmCaches");
double marks_ratio_to_prewarm = getContext()->getServerSettings()[ServerSetting::mark_cache_prewarm_ratio];
double index_ratio_to_prewarm = getContext()->getServerSettings()[ServerSetting::primary_index_cache_prewarm_ratio];
Names columns_to_prewarm_marks;
if (mark_cache)
{
auto metadata_snaphost = getInMemoryMetadataPtr();
columns_to_prewarm_marks = getColumnsToPrewarmMarks(*getSettings(), metadata_snaphost->getColumns().getAllPhysical());
}
for (const auto & part : data_parts)
{
if (mark_cache->sizeInBytes() >= mark_cache->maxSizeInBytes() * ratio_to_prewarm)
break;
bool added_task = false;
runner([&] { part->loadMarksToCache(column_names, mark_cache); });
if (index_cache && !part->isIndexLoaded() && index_cache->sizeInBytes() < index_cache->maxSizeInBytes() * index_ratio_to_prewarm)
{
added_task = true;
runner([&] { part->loadIndexToCache(*index_cache); });
}
if (mark_cache && mark_cache->sizeInBytes() < mark_cache->maxSizeInBytes() * marks_ratio_to_prewarm)
{
added_task = true;
runner([&] { part->loadMarksToCache(columns_to_prewarm_marks, mark_cache.get()); });
}
if (!added_task)
break;
}
runner.waitForAllToFinishAndRethrowFirstError();
watch.stop();
LOG_TRACE(log, "Prewarmed mark cache in {} seconds", watch.elapsedSeconds());
LOG_TRACE(log, "Prewarmed mark and/or primary index caches in {} seconds", watch.elapsedSeconds());
}
/// Is the part directory old.

View File

@ -31,6 +31,8 @@
#include <Storages/DataDestinationType.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/PrimaryIndexCache.h>
#include <Interpreters/PartLog.h>
#include <Poco/Timestamp.h>
#include <Common/threadPoolCallbackRunner.h>
@ -506,9 +508,15 @@ public:
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks, std::optional<std::unordered_set<std::string>> expected_parts);
/// Returns a pointer to primary index cache if it is enabled.
PrimaryIndexCachePtr getPrimaryIndexCache() const;
/// Returns a pointer to primary index cache if it is enabled and required to be prewarmed.
PrimaryIndexCachePtr getPrimaryIndexCacheToPrewarm() const;
/// Returns a pointer to primary mark cache if it is required to be prewarmed.
MarkCachePtr getMarkCacheToPrewarm() const;
/// Prewarm mark cache for the most recent data parts.
void prewarmMarkCache(ThreadPool & pool);
void prewarmMarkCacheIfNeeded(ThreadPool & pool);
void prewarmCaches(ThreadPool & pool, MarkCachePtr mark_cache, PrimaryIndexCachePtr index_cache);
String getLogName() const { return log.loadName(); }

View File

@ -317,9 +317,10 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndexRow(const B
for (size_t i = 0; i < index_block.columns(); ++i)
{
const auto & column = index_block.getByPosition(i).column;
index_columns[i]->insertFrom(*column, row);
index_serializations[i]->serializeBinary(*column, row, index_stream, {});
if (settings.save_primary_index_in_memory)
index_columns[i]->insertFrom(*column, row);
}
}
@ -337,8 +338,14 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
*/
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
if (index_columns.empty())
index_columns = primary_index_block.cloneEmptyColumns();
if (settings.save_primary_index_in_memory)
{
if (index_columns.empty())
index_columns = primary_index_block.cloneEmptyColumns();
for (const auto & column : index_columns)
column->reserve(column->size() + granules_to_write.size());
}
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (const auto & granule : granules_to_write)

View File

@ -1074,7 +1074,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
DataTypes key_types;
if (!key_indices.empty())
{
const auto & index = part->getIndex();
const auto index = part->getIndex();
for (size_t i : key_indices)
{

View File

@ -14,7 +14,7 @@
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/RowOrderOptimizer.h>
#include "Common/logger_useful.h"
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/Exception.h>
#include <Common/HashTable/HashMap.h>
@ -74,7 +74,6 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsFloat min_free_disk_ratio_to_perform_insert;
extern const MergeTreeSettingsBool optimize_row_order;
extern const MergeTreeSettingsFloat ratio_of_defaults_for_sparse_serialization;
extern const MergeTreeSettingsBool prewarm_mark_cache;
}
namespace ErrorCodes
@ -226,6 +225,27 @@ void MergeTreeDataWriter::TemporaryPart::finalize()
projection->getDataPartStorage().precommitTransaction();
}
void MergeTreeDataWriter::TemporaryPart::prewarmCaches()
{
/// This method must be called after rename and commit of part
/// because a correct path is required for the keys of caches.
if (auto mark_cache = part->storage.getMarkCacheToPrewarm())
{
for (const auto & stream : streams)
{
auto marks = stream.stream->releaseCachedMarks();
addMarksToCache(*part, marks, mark_cache.get());
}
}
if (auto index_cache = part->storage.getPrimaryIndexCacheToPrewarm())
{
/// Index was already set during writing. Now move it to cache.
part->moveIndexToCache(*index_cache);
}
}
std::vector<AsyncInsertInfoPtr> scatterAsyncInsertInfoBySelector(AsyncInsertInfoPtr async_insert_info, const IColumn::Selector & selector, size_t partition_num)
{
if (nullptr == async_insert_info)
@ -684,7 +704,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
bool save_marks_in_cache = (*data_settings)[MergeTreeSetting::prewarm_mark_cache] && data.getContext()->getMarkCache();
auto index_granularity_ptr = createMergeTreeIndexGranularity(
block.rows(),
@ -703,7 +722,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
std::move(index_granularity_ptr),
context->getCurrentTransaction() ? context->getCurrentTransaction()->tid : Tx::PrehistoricTID,
/*reset_columns=*/ false,
save_marks_in_cache,
/*blocks_are_granules_size=*/ false,
context->getWriteSettings());
@ -839,7 +857,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
bool save_marks_in_cache = (*data.getSettings())[MergeTreeSetting::prewarm_mark_cache] && data.getContext()->getMarkCache();
auto index_granularity_ptr = createMergeTreeIndexGranularity(
block.rows(),
@ -859,7 +876,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
std::move(index_granularity_ptr),
Tx::PrehistoricTID,
/*reset_columns=*/ false,
save_marks_in_cache,
/*blocks_are_granules_size=*/ false,
data.getContext()->getWriteSettings());

View File

@ -75,6 +75,7 @@ public:
void cancel();
void finalize();
void prewarmCaches();
};
/** All rows must correspond to same partition.

View File

@ -35,6 +35,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
bool can_use_adaptive_granularity_,
bool rewrite_primary_key_,
bool save_marks_in_cache_,
bool save_primary_index_in_memory_,
bool blocks_are_granules_size_)
: min_compress_block_size(
(*storage_settings)[MergeTreeSetting::min_compress_block_size] ? (*storage_settings)[MergeTreeSetting::min_compress_block_size] : global_settings[Setting::min_compress_block_size])
@ -48,6 +49,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
, can_use_adaptive_granularity(can_use_adaptive_granularity_)
, rewrite_primary_key(rewrite_primary_key_)
, save_marks_in_cache(save_marks_in_cache_)
, save_primary_index_in_memory(save_primary_index_in_memory_)
, blocks_are_granules_size(blocks_are_granules_size_)
, query_write_settings(query_write_settings_)
, low_cardinality_max_dictionary_size(global_settings[Setting::low_cardinality_max_dictionary_size])

View File

@ -64,6 +64,7 @@ struct MergeTreeWriterSettings
bool can_use_adaptive_granularity_,
bool rewrite_primary_key_,
bool save_marks_in_cache_,
bool save_primary_index_in_memory_,
bool blocks_are_granules_size_);
size_t min_compress_block_size;
@ -79,6 +80,7 @@ struct MergeTreeWriterSettings
bool can_use_adaptive_granularity;
bool rewrite_primary_key;
bool save_marks_in_cache;
bool save_primary_index_in_memory;
bool blocks_are_granules_size;
WriteSettings query_write_settings;

View File

@ -239,6 +239,8 @@ namespace ErrorCodes
DECLARE(UInt64, primary_key_compress_block_size, 65536, "Primary compress block size, the actual size of the block to compress.", 0) \
DECLARE(Bool, primary_key_lazy_load, true, "Load primary key in memory on first use instead of on table initialization. This can save memory in the presence of a large number of tables.", 0) \
DECLARE(Float, primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns, 0.9f, "If the value of a column of the primary key in data part changes at least in this ratio of times, skip loading next columns in memory. This allows to save memory usage by not loading useless columns of the primary key.", 0) \
DECLARE(Bool, use_primary_index_cache, false, "Use cache for primary index instead of saving all indexes in memory. Can be useful for very large tables", 0) \
DECLARE(Bool, prewarm_primary_index_cache, false, "If true primary index cache will be prewarmed by saving marks to mark cache on inserts, merges, fetches and on startup of server", 0) \
DECLARE(Bool, prewarm_mark_cache, false, "If true mark cache will be prewarmed by saving marks to mark cache on inserts, merges, fetches and on startup of server", 0) \
DECLARE(String, columns_to_prewarm_mark_cache, "", "List of columns to prewarm mark cache for (if enabled). Empty means all columns", 0) \
/** Projection settings. */ \

View File

@ -247,14 +247,7 @@ void MergeTreeSink::finishDelayedChunk()
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (added)
{
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
{
for (const auto & stream : partition.temp_part.streams)
{
auto marks = stream.stream->releaseCachedMarks();
addMarksToCache(*part, marks, mark_cache);
}
}
partition.temp_part.prewarmCaches();
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, partition.elapsed_ns, counters_snapshot));

View File

@ -30,7 +30,6 @@ MergedBlockOutputStream::MergedBlockOutputStream(
MergeTreeIndexGranularityPtr index_granularity_ptr,
TransactionID tid,
bool reset_columns_,
bool save_marks_in_cache,
bool blocks_are_granules_size,
const WriteSettings & write_settings_)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, reset_columns_)
@ -38,6 +37,11 @@ MergedBlockOutputStream::MergedBlockOutputStream(
, default_codec(default_codec_)
, write_settings(write_settings_)
{
/// Save marks in memory if prewarm is enabled to avoid rereading marks file.
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm() != nullptr;
/// Save primary index in memory if cache is disabled or is enabled with prewarm to avoid rereading marks file.
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm();
MergeTreeWriterSettings writer_settings(
data_part->storage.getContext()->getSettingsRef(),
write_settings,
@ -45,6 +49,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
data_part->index_granularity_info.mark_type.adaptive,
/* rewrite_primary_key = */ true,
save_marks_in_cache,
save_primary_index_in_memory,
blocks_are_granules_size);
/// TODO: looks like isStoredOnDisk() is always true for MergeTreeDataPart
@ -243,7 +248,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->rows_count = rows_count;
new_part->modification_time = time(nullptr);
new_part->setIndex(writer->releaseIndexColumns());
new_part->checksums = checksums;
new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
new_part->setBytesUncompressedOnDisk(checksums.getTotalSizeUncompressedOnDisk());
@ -256,6 +261,9 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->index_granularity = std::move(new_index_granularity);
}
if (auto computed_index = writer->releaseIndexColumns())
new_part->setIndex(std::move(*computed_index));
/// In mutation, existing_rows_count is already calculated in PartMergerWriter
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count
if (!new_part->existing_rows_count.has_value())

View File

@ -25,7 +25,6 @@ public:
MergeTreeIndexGranularityPtr index_granularity_ptr,
TransactionID tid,
bool reset_columns_ = false,
bool save_marks_in_cache = false,
bool blocks_are_granules_size = false,
const WriteSettings & write_settings = {});

View File

@ -1,11 +1,13 @@
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <IO/WriteSettings.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
@ -19,20 +21,23 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const ColumnsStatistics & stats_to_recalc,
CompressionCodecPtr default_codec,
MergeTreeIndexGranularityPtr index_granularity_ptr,
WrittenOffsetColumns * offset_columns,
bool save_marks_in_cache)
WrittenOffsetColumns * offset_columns)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, /*reset_columns=*/ true)
{
const auto & global_settings = data_part->storage.getContext()->getSettingsRef();
/// Save marks in memory if prewarm is enabled to avoid rereading marks file.
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm() != nullptr;
/// Save primary index in memory if cache is disabled or is enabled with prewarm to avoid rereading marks file.
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm();
/// Granularity is never recomputed while writing only columns.
MergeTreeWriterSettings writer_settings(
global_settings,
data_part->storage.getContext()->getSettingsRef(),
data_part->storage.getContext()->getWriteSettings(),
storage_settings,
data_part->index_granularity_info.mark_type.adaptive,
/*rewrite_primary_key=*/ false,
save_marks_in_cache,
save_primary_index_in_memory,
/*blocks_are_granules_size=*/ false);
writer = createMergeTreeDataPartWriter(

View File

@ -22,8 +22,7 @@ public:
const ColumnsStatistics & stats_to_recalc,
CompressionCodecPtr default_codec,
MergeTreeIndexGranularityPtr index_granularity_ptr,
WrittenOffsetColumns * offset_columns = nullptr,
bool save_marks_in_cache = false);
WrittenOffsetColumns * offset_columns = nullptr);
void write(const Block & block) override;

View File

@ -985,7 +985,6 @@ void finalizeMutatedPart(
new_data_part->rows_count = source_part->rows_count;
new_data_part->index_granularity = source_part->index_granularity;
new_data_part->setIndex(*source_part->getIndex());
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
@ -995,6 +994,9 @@ void finalizeMutatedPart(
new_data_part->index_granularity = std::move(new_index_granularity);
}
if (!new_data_part->storage.getPrimaryIndexCache())
new_data_part->setIndex(*source_part->getIndex());
/// Load rest projections which are hardlinked
bool noop;
new_data_part->loadProjections(false, false, noop, true /* if_not_loaded */);
@ -1650,7 +1652,6 @@ private:
std::move(index_granularity_ptr),
ctx->txn ? ctx->txn->tid : Tx::PrehistoricTID,
/*reset_columns=*/ true,
/*save_marks_in_cache=*/ false,
/*blocks_are_granules_size=*/ false,
ctx->context->getWriteSettings());

View File

@ -0,0 +1,8 @@
#include <Storages/MergeTree/PrimaryIndexCache.h>
namespace DB
{
template class CacheBase<UInt128, PrimaryIndex, UInt128TrivialHash, PrimaryIndexWeightFunction>;
}

View File

@ -0,0 +1,73 @@
#pragma once
#include <Common/CacheBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Common/HashTable/Hash.h>
#include <Columns/IColumn.h>
namespace ProfileEvents
{
extern const Event PrimaryIndexCacheHits;
extern const Event PrimaryIndexCacheMisses;
}
namespace DB
{
using PrimaryIndex = std::vector<ColumnPtr>;
/// Estimate of number of bytes in cache for primaryindexs.
struct PrimaryIndexWeightFunction
{
/// We spent additional bytes on key in hashmap, linked lists, shared pointers, etc ...
static constexpr size_t PRIMARY_INDEX_CACHE_OVERHEAD = 128;
size_t operator()(const PrimaryIndex & index) const
{
size_t res = 0;
for (const auto & column : index)
res += column->byteSize();
return res;
}
};
extern template class CacheBase<UInt128, PrimaryIndex, UInt128TrivialHash, PrimaryIndexWeightFunction>;
/** Cache of primary index for StorageMergeTree.
* PrimaryIndex is an index structure that addresses ranges in column file, corresponding to ranges of primary key.
*/
class PrimaryIndexCache : public CacheBase<UInt128, PrimaryIndex, UInt128TrivialHash, PrimaryIndexWeightFunction>
{
private:
using Base = CacheBase<UInt128, PrimaryIndex, UInt128TrivialHash, PrimaryIndexWeightFunction>;
public:
PrimaryIndexCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio)
: Base(cache_policy, max_size_in_bytes, 0, size_ratio)
{
}
/// Calculate key from path to file and offset.
static UInt128 hash(const String & part_path)
{
SipHash hash;
hash.update(part_path.data(), part_path.size() + 1);
return hash.get128();
}
template <typename LoadFunc>
MappedPtr getOrSet(const Key & key, LoadFunc && load)
{
auto result = Base::getOrSet(key, load);
if (result.second)
ProfileEvents::increment(ProfileEvents::PrimaryIndexCacheMisses);
else
ProfileEvents::increment(ProfileEvents::PrimaryIndexCacheHits);
return result.first;
}
};
using PrimaryIndexCachePtr = std::shared_ptr<PrimaryIndexCache>;
}

View File

@ -486,16 +486,9 @@ void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithF
/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
auto * mark_cache = storage.getContext()->getMarkCache().get();
if (!error && mark_cache)
{
for (const auto & stream : partition.temp_part.streams)
{
auto marks = stream.stream->releaseCachedMarks();
addMarksToCache(*part, marks, mark_cache);
}
}
if (!error)
partition.temp_part.prewarmCaches();
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, partition.elapsed_ns, counters_snapshot), ExecutionStatus(error));
@ -540,14 +533,7 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
if (conflict_block_ids.empty())
{
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
{
for (const auto & stream : partition.temp_part.streams)
{
auto marks = stream.stream->releaseCachedMarks();
addMarksToCache(*partition.temp_part.part, marks, mark_cache);
}
}
partition.temp_part.prewarmCaches();
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(

View File

@ -5,6 +5,7 @@
#include <base/types.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
@ -129,6 +130,7 @@ private:
std::unique_ptr<DelayedChunk> delayed_chunk;
void finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper);
void prewarmCaches(const MergeTreeDataWriter::TemporaryPart & temp_part) const;
};
using ReplicatedMergeTreeSinkWithAsyncDeduplicate = ReplicatedMergeTreeSinkImpl<true>;

View File

View File

@ -155,7 +155,11 @@ StorageMergeTree::StorageMergeTree(
loadMutations();
loadDeduplicationLog();
prewarmMarkCacheIfNeeded(getActivePartsLoadingThreadPool().get());
prewarmCaches(
getActivePartsLoadingThreadPool().get(),
getMarkCacheToPrewarm(),
getPrimaryIndexCacheToPrewarm());
}

View File

@ -208,7 +208,6 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsBool use_minimalistic_checksums_in_zookeeper;
extern const MergeTreeSettingsBool use_minimalistic_part_header_in_zookeeper;
extern const MergeTreeSettingsMilliseconds wait_for_unique_parts_send_before_shutdown_ms;
extern const MergeTreeSettingsBool prewarm_mark_cache;
}
namespace FailPoints
@ -511,7 +510,11 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
loadDataParts(skip_sanity_checks, expected_parts_on_this_replica);
prewarmMarkCacheIfNeeded(getActivePartsLoadingThreadPool().get());
prewarmCaches(
getActivePartsLoadingThreadPool().get(),
getMarkCacheToPrewarm(),
getPrimaryIndexCacheToPrewarm());
if (LoadingStrictnessLevel::ATTACH <= mode)
{
@ -5084,10 +5087,15 @@ bool StorageReplicatedMergeTree::fetchPart(
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
}
if ((*getSettings())[MergeTreeSetting::prewarm_mark_cache] && getContext()->getMarkCache())
if (auto mark_cache = getMarkCacheToPrewarm())
{
auto column_names = getColumnsToPrewarmMarks(*getSettings(), part->getColumns());
part->loadMarksToCache(column_names, getContext()->getMarkCache().get());
part->loadMarksToCache(column_names, mark_cache.get());
}
if (auto index_cache = getPrimaryIndexCacheToPrewarm())
{
part->loadIndexToCache(*index_cache);
}
write_part_log({});

View File

@ -299,8 +299,6 @@ class TagAttrs:
# Only one latest can exist
latest: ClickHouseVersion
# Only one can be a major one (the most fresh per a year)
majors: Dict[int, ClickHouseVersion]
# Only one lts version can exist
lts: Optional[ClickHouseVersion]
@ -345,14 +343,6 @@ def ldf_tags(version: ClickHouseVersion, distro: str, tag_attrs: TagAttrs) -> st
tags.append("lts")
tags.append(f"lts-{distro}")
# If the tag `22`, `23`, `24` etc. should be included in the tags
with_major = tag_attrs.majors.get(version.major) in (None, version)
if with_major:
tag_attrs.majors[version.major] = version
if without_distro:
tags.append(f"{version.major}")
tags.append(f"{version.major}-{distro}")
# Add all normal tags
for tag in (
f"{version.major}.{version.minor}",
@ -384,7 +374,7 @@ def generate_ldf(args: argparse.Namespace) -> None:
args.directory / git_runner(f"git -C {args.directory} rev-parse --show-cdup")
).absolute()
lines = ldf_header(git, directory)
tag_attrs = TagAttrs(versions[-1], {}, None)
tag_attrs = TagAttrs(versions[-1], None)
# We iterate from the most recent to the oldest version
for version in reversed(versions):

View File

@ -7,6 +7,7 @@ import os.path as p
import platform
import pprint
import pwd
import random
import re
import shlex
import shutil
@ -1650,6 +1651,8 @@ class ClickHouseCluster:
minio_certs_dir=None,
minio_data_dir=None,
use_keeper=True,
keeper_randomize_feature_flags=True,
keeper_required_feature_flags=[],
main_config_name="config.xml",
users_config_name="users.xml",
copy_common_configs=True,
@ -1682,6 +1685,8 @@ class ClickHouseCluster:
if not env_variables:
env_variables = {}
self.use_keeper = use_keeper
self.keeper_randomize_feature_flags = keeper_randomize_feature_flags
self.keeper_required_feature_flags = keeper_required_feature_flags
# Code coverage files will be placed in database directory
# (affect only WITH_COVERAGE=1 build)
@ -2828,15 +2833,51 @@ class ClickHouseCluster:
if self.use_keeper: # TODO: remove hardcoded paths from here
for i in range(1, 4):
current_keeper_config_dir = os.path.join(
f"{self.keeper_instance_dir_prefix}{i}", "config"
)
shutil.copy(
os.path.join(
self.keeper_config_dir, f"keeper_config{i}.xml"
),
os.path.join(
self.keeper_instance_dir_prefix + f"{i}", "config"
),
current_keeper_config_dir,
)
extra_configs_dir = os.path.join(
current_keeper_config_dir, f"keeper_config{i}.d"
)
os.mkdir(extra_configs_dir)
feature_flags_config = os.path.join(
extra_configs_dir, "feature_flags.yaml"
)
indentation = 4 * " "
def get_feature_flag_value(feature_flag):
if not self.keeper_randomize_feature_flags:
return 1
if feature_flag in self.keeper_required_feature_flags:
return 1
return random.randint(0, 1)
with open(feature_flags_config, "w") as ff_config:
ff_config.write("keeper_server:\n")
ff_config.write(f"{indentation}feature_flags:\n")
indentation *= 2
for feature_flag in [
"filtered_list",
"multi_read",
"check_not_exists",
"create_if_not_exists",
"remove_recursive",
]:
ff_config.write(
f"{indentation}{feature_flag}: {get_feature_flag_value(feature_flag)}\n"
)
run_and_check(self.base_zookeeper_cmd + common_opts, env=self.env)
self.up_called = True

View File

@ -13,6 +13,7 @@ node = cluster.add_instance(
main_configs=["configs/enable_keeper_map.xml"],
user_configs=["configs/keeper_retries.xml"],
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
stay_alive=True,
)

View File

@ -20,6 +20,7 @@ node1 = cluster.add_instance(
main_configs=["configs/config.xml"],
user_configs=["configs/users.xml"],
with_zookeeper=True,
keeper_required_feature_flags=["multi_read", "create_if_not_exists"],
macros={"shard": "shard1", "replica": "1"},
stay_alive=True,
)
@ -28,6 +29,7 @@ node2 = cluster.add_instance(
main_configs=["configs/config.xml"],
user_configs=["configs/users.xml"],
with_zookeeper=True,
keeper_required_feature_flags=["multi_read", "create_if_not_exists"],
macros={"shard": "shard1", "replica": "2"},
)
nodes = [node1, node2]

View File

@ -378,7 +378,7 @@ def test_reload_via_client(cluster, zk):
configure_from_zk(zk)
break
except QueryRuntimeException:
logging.exception("The new socket is not binded yet")
logging.exception("The new socket is not bound yet")
time.sleep(0.1)
if exception:

View File

@ -59,6 +59,9 @@ instance = cluster.add_instance(
user_configs=["configs/users.xml"],
with_kafka=True,
with_zookeeper=True, # For Replicated Table
keeper_required_feature_flags=[
"create_if_not_exists"
], # new Kafka doesn't work without this feature
macros={
"kafka_broker": "kafka1",
"kafka_topic_old": KAFKA_TOPIC_OLD,

View File

@ -99,6 +99,7 @@ def started_cluster():
with_minio=True,
with_azurite=True,
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
main_configs=[
"configs/zookeeper.xml",
"configs/s3queue_log.xml",
@ -110,6 +111,7 @@ def started_cluster():
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
main_configs=[
"configs/s3queue_log.xml",
],
@ -118,6 +120,7 @@ def started_cluster():
cluster.add_instance(
"old_instance",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
image="clickhouse/clickhouse-server",
tag="23.12",
stay_alive=True,
@ -127,6 +130,7 @@ def started_cluster():
cluster.add_instance(
"node1",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",
@ -137,6 +141,7 @@ def started_cluster():
cluster.add_instance(
"node2",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",
@ -149,6 +154,7 @@ def started_cluster():
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
main_configs=[
"configs/s3queue_log.xml",
"configs/merge_tree.xml",
@ -158,6 +164,7 @@ def started_cluster():
cluster.add_instance(
"instance_24.5",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
image="clickhouse/clickhouse-server",
tag="24.5",
stay_alive=True,
@ -170,6 +177,7 @@ def started_cluster():
cluster.add_instance(
"node_cloud_mode",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",

View File

@ -114,6 +114,8 @@ SYSTEM DROP DNS CACHE ['SYSTEM DROP DNS','DROP DNS CACHE','DROP DNS'] GLOBAL SYS
SYSTEM DROP CONNECTIONS CACHE ['SYSTEM DROP CONNECTIONS CACHE','DROP CONNECTIONS CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM PREWARM MARK CACHE ['SYSTEM PREWARM MARK','PREWARM MARK CACHE','PREWARM MARKS'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP MARK CACHE ['SYSTEM DROP MARK','DROP MARK CACHE','DROP MARKS'] GLOBAL SYSTEM DROP CACHE
SYSTEM PREWARM PRIMARY INDEX CACHE ['SYSTEM PREWARM PRIMARY INDEX','PREWARM PRIMARY INDEX CACHE','PREWARM PRIMARY INDEX'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP PRIMARY INDEX CACHE ['SYSTEM DROP PRIMARY INDEX','DROP PRIMARY INDEX CACHE','DROP PRIMARY INDEX'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP UNCOMPRESSED CACHE ['SYSTEM DROP UNCOMPRESSED','DROP UNCOMPRESSED CACHE','DROP UNCOMPRESSED'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP MMAP CACHE ['SYSTEM DROP MMAP','DROP MMAP CACHE','DROP MMAP'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP QUERY CACHE ['SYSTEM DROP QUERY','DROP QUERY CACHE','DROP QUERY'] GLOBAL SYSTEM DROP CACHE

View File

@ -96,13 +96,15 @@ SELECT 'First JOIN INNER second JOIN INNER';
First JOIN INNER second JOIN INNER
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
SELECT '--';
@ -115,13 +117,15 @@ SELECT 'First JOIN INNER second JOIN LEFT';
First JOIN INNER second JOIN LEFT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
SELECT '--';
@ -134,17 +138,19 @@ SELECT 'First JOIN INNER second JOIN RIGHT';
First JOIN INNER second JOIN RIGHT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
4 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
@ -156,17 +162,19 @@ SELECT 'First JOIN INNER second JOIN FULL';
First JOIN INNER second JOIN FULL
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
@ -178,13 +186,15 @@ SELECT 'First JOIN LEFT second JOIN INNER';
First JOIN LEFT second JOIN INNER
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
SELECT '--';
@ -197,14 +207,16 @@ SELECT 'First JOIN LEFT second JOIN LEFT';
First JOIN LEFT second JOIN LEFT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
2 UInt64 2 UInt64 Join_1_Value_2 String 0 UInt64 String 0 UInt64 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
Join_1_Value_2 String String String
@ -219,17 +231,19 @@ SELECT 'First JOIN LEFT second JOIN RIGHT';
First JOIN LEFT second JOIN RIGHT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
4 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
@ -241,19 +255,21 @@ SELECT 'First JOIN LEFT second JOIN FULL';
First JOIN LEFT second JOIN FULL
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
2 UInt64 2 UInt64 Join_1_Value_2 String 0 UInt64 String 0 UInt64 String
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
Join_1_Value_2 String String String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
@ -266,13 +282,15 @@ SELECT 'First JOIN RIGHT second JOIN INNER';
First JOIN RIGHT second JOIN INNER
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
SELECT '--';
@ -285,17 +303,19 @@ SELECT 'First JOIN RIGHT second JOIN LEFT';
First JOIN RIGHT second JOIN LEFT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
3 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String Join_2_Value_3 String String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String Join_2_Value_3 String String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
@ -307,17 +327,19 @@ SELECT 'First JOIN RIGHT second JOIN RIGHT';
First JOIN RIGHT second JOIN RIGHT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
4 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
@ -329,19 +351,21 @@ SELECT 'First JOIN RIGHT second JOIN FULL';
First JOIN RIGHT second JOIN FULL
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
3 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 String
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
String Join_2_Value_3 String String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String Join_2_Value_3 String String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
@ -354,14 +378,16 @@ SELECT 'First JOIN FULL second JOIN INNER';
First JOIN FULL second JOIN INNER
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
SELECT '--';
@ -374,19 +400,21 @@ SELECT 'First JOIN FULL second JOIN LEFT';
First JOIN FULL second JOIN LEFT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
2 UInt64 2 UInt64 Join_1_Value_2 String 0 UInt64 String 0 UInt64 String
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String Join_2_Value_3 String String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
Join_1_Value_2 String String String
String Join_2_Value_3 String String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
@ -399,18 +427,20 @@ SELECT 'First JOIN FULL second JOIN RIGHT';
First JOIN FULL second JOIN RIGHT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
4 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
@ -422,21 +452,23 @@ SELECT 'First JOIN FULL second JOIN FULL';
First JOIN FULL second JOIN FULL
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
2 UInt64 2 UInt64 Join_1_Value_2 String 0 UInt64 String 0 UInt64 String
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
String Join_2_Value_3 String String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
Join_1_Value_2 String String String
String Join_2_Value_3 String String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);

View File

@ -64,12 +64,14 @@ SELECT 'First JOIN {{ first_join_type }} second JOIN {{ second_join_type }}';
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 {{ first_join_type }} JOIN test_table_join_2 AS t2 USING (id) {{ second_join_type }} JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 {{ first_join_type }} JOIN test_table_join_2 AS t2 USING (id) {{ second_join_type }} JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
SELECT '--';
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 {{ first_join_type }} JOIN test_table_join_2 AS t2 USING (id) {{ second_join_type }} JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 {{ first_join_type }} JOIN test_table_join_2 AS t2 USING (id) {{ second_join_type }} JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
SELECT '--';

View File

@ -0,0 +1,11 @@
drop table if exists test;
CREATE TABLE test (
`start_s` UInt32 EPHEMERAL COMMENT 'start UNIX time' ,
`start_us` UInt16 EPHEMERAL COMMENT 'start microseconds',
`finish_s` UInt32 EPHEMERAL COMMENT 'finish UNIX time',
`finish_us` UInt16 EPHEMERAL COMMENT 'finish microseconds',
`captured` DateTime MATERIALIZED fromUnixTimestamp(start_s),
`duration` Decimal32(6) MATERIALIZED finish_s - start_s + (finish_us - start_us)/1000000
)
ENGINE Null;
drop table if exists test;

View File

@ -0,0 +1,16 @@
0
PrimaryIndexCacheBytes 0
PrimaryIndexCacheFiles 0
99
0
PrimaryIndexCacheBytes 1280
PrimaryIndexCacheFiles 2
0
PrimaryIndexCacheBytes 0
PrimaryIndexCacheFiles 0
49
0
PrimaryIndexCacheBytes 640
PrimaryIndexCacheFiles 1
2 160 1280
1 80 640

View File

@ -0,0 +1,45 @@
-- Tags: no-parallel
DROP TABLE IF EXISTS t_primary_index_cache;
CREATE TABLE t_primary_index_cache (a UInt64, b UInt64)
ENGINE = MergeTree ORDER BY a PARTITION BY a % 2
SETTINGS use_primary_index_cache = 1, prewarm_primary_index_cache = 0, index_granularity = 64, index_granularity_bytes = '10M', min_bytes_for_wide_part = 0;
SYSTEM DROP PRIMARY INDEX CACHE;
INSERT INTO t_primary_index_cache SELECT number, number FROM numbers(10000);
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT sum(primary_key_bytes_in_memory) FROM system.parts WHERE table = 't_primary_index_cache' AND active;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'PrimaryIndexCacheBytes') ORDER BY metric;
SELECT count() FROM t_primary_index_cache WHERE a > 100 AND a < 200;
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT sum(primary_key_bytes_in_memory) FROM system.parts WHERE table = 't_primary_index_cache' AND active;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'PrimaryIndexCacheBytes') ORDER BY metric;
SYSTEM DROP PRIMARY INDEX CACHE;
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT sum(primary_key_bytes_in_memory) FROM system.parts WHERE table = 't_primary_index_cache' AND active;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'PrimaryIndexCacheBytes') ORDER BY metric;
SELECT count() FROM t_primary_index_cache WHERE a > 100 AND a < 200 AND a % 2 = 0;
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT sum(primary_key_bytes_in_memory) FROM system.parts WHERE table = 't_primary_index_cache' AND active;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'PrimaryIndexCacheBytes') ORDER BY metric;
SYSTEM FLUSH LOGS;
SELECT
ProfileEvents['LoadedPrimaryIndexFiles'],
ProfileEvents['LoadedPrimaryIndexRows'],
ProfileEvents['LoadedPrimaryIndexBytes']
FROM system.query_log
WHERE query LIKE 'SELECT count() FROM t_primary_index_cache%' AND current_database = currentDatabase() AND type = 'QueryFinish'
ORDER BY event_time_microseconds;
DROP TABLE t_primary_index_cache;

View File

@ -0,0 +1,22 @@
449
0
449
0
898
898
0
898
898
0
898
0
898
0
0
0
0
0
0
0
1
0

View File

@ -0,0 +1,74 @@
-- Tags: no-parallel, no-shared-merge-tree
DROP TABLE IF EXISTS t_prewarm_cache_rmt_1;
DROP TABLE IF EXISTS t_prewarm_cache_rmt_2;
CREATE TABLE t_prewarm_cache_rmt_1 (a UInt64, b UInt64, c UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/03274_prewarm_mark_cache_smt/t_prewarm_cache', '1')
ORDER BY a PARTITION BY a % 2
SETTINGS prewarm_primary_index_cache = 1, use_primary_index_cache = 1;
CREATE TABLE t_prewarm_cache_rmt_2 (a UInt64, b UInt64, c UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/03274_prewarm_mark_cache_smt/t_prewarm_cache', '2')
ORDER BY a PARTITION BY a % 2
SETTINGS prewarm_primary_index_cache = 1, use_primary_index_cache = 1;
SYSTEM DROP PRIMARY INDEX CACHE;
SYSTEM STOP FETCHES t_prewarm_cache_rmt_2;
-- Check that prewarm works on insert.
INSERT INTO t_prewarm_cache_rmt_1 SELECT number, rand(), rand() FROM numbers(20000);
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE a % 2 = 0 AND a > 100 AND a < 1000;
SELECT sum(primary_key_bytes_in_memory) FROM system.parts WHERE database = currentDatabase() AND table IN ('t_prewarm_cache_rmt_1', 't_prewarm_cache_rmt_2');
-- Check that prewarm works on fetch.
SYSTEM DROP PRIMARY INDEX CACHE;
SYSTEM START FETCHES t_prewarm_cache_rmt_2;
SYSTEM SYNC REPLICA t_prewarm_cache_rmt_2;
SELECT count() FROM t_prewarm_cache_rmt_2 WHERE a % 2 = 0 AND a > 100 AND a < 1000;
SELECT sum(primary_key_bytes_in_memory) FROM system.parts WHERE database = currentDatabase() AND table IN ('t_prewarm_cache_rmt_1', 't_prewarm_cache_rmt_2');
-- Check that prewarm works on merge.
INSERT INTO t_prewarm_cache_rmt_1 SELECT number, rand(), rand() FROM numbers(20000);
OPTIMIZE TABLE t_prewarm_cache_rmt_1 FINAL;
SYSTEM SYNC REPLICA t_prewarm_cache_rmt_2;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE a % 2 = 0 AND a > 100 AND a < 1000;
SELECT count() FROM t_prewarm_cache_rmt_2 WHERE a % 2 = 0 AND a > 100 AND a < 1000;
SELECT sum(primary_key_bytes_in_memory) FROM system.parts WHERE database = currentDatabase() AND table IN ('t_prewarm_cache_rmt_1', 't_prewarm_cache_rmt_2');
-- Check that prewarm works on restart.
SYSTEM DROP PRIMARY INDEX CACHE;
DETACH TABLE t_prewarm_cache_rmt_1;
DETACH TABLE t_prewarm_cache_rmt_2;
ATTACH TABLE t_prewarm_cache_rmt_1;
ATTACH TABLE t_prewarm_cache_rmt_2;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE a % 2 = 0 AND a > 100 AND a < 1000;
SELECT count() FROM t_prewarm_cache_rmt_2 WHERE a % 2 = 0 AND a > 100 AND a < 1000;
SELECT sum(primary_key_bytes_in_memory) FROM system.parts WHERE database = currentDatabase() AND table IN ('t_prewarm_cache_rmt_1', 't_prewarm_cache_rmt_2');
SYSTEM DROP PRIMARY INDEX CACHE;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE a % 2 = 0 AND a > 100 AND a < 1000;
SELECT sum(primary_key_bytes_in_memory) FROM system.parts WHERE database = currentDatabase() AND table IN ('t_prewarm_cache_rmt_1', 't_prewarm_cache_rmt_2');
--- Check that system query works.
SYSTEM PREWARM PRIMARY INDEX CACHE t_prewarm_cache_rmt_1;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE a % 2 = 0 AND a > 100 AND a < 1000;
SELECT sum(primary_key_bytes_in_memory) FROM system.parts WHERE database = currentDatabase() AND table IN ('t_prewarm_cache_rmt_1', 't_prewarm_cache_rmt_2');
SYSTEM FLUSH LOGS;
SELECT ProfileEvents['LoadedPrimaryIndexFiles'] FROM system.query_log
WHERE current_database = currentDatabase() AND type = 'QueryFinish' AND query LIKE 'SELECT count() FROM t_prewarm_cache%'
ORDER BY event_time_microseconds;
DROP TABLE IF EXISTS t_prewarm_cache_rmt_1;
DROP TABLE IF EXISTS t_prewarm_cache_rmt_2;