Merge pull request #67733 from Zawa-ll/66073-system-load-primary-key-rebase

Added statement `SYSTEM LOAD PRIMARY KEY`
This commit is contained in:
Pablo Marcos 2024-11-28 16:02:31 +00:00 committed by GitHub
commit d8a235b785
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 178 additions and 11 deletions

View File

@ -506,6 +506,18 @@ Will do sync syscall.
SYSTEM SYNC FILE CACHE [ON CLUSTER cluster_name]
```
### LOAD PRIMARY KEY
Load the primary keys for the given table or for all tables.
```sql
SYSTEM LOAD PRIMARY KEY [db.]name
```
```sql
SYSTEM LOAD PRIMARY KEY
```
### UNLOAD PRIMARY KEY
Unload the primary keys for the given table or for all tables.

View File

@ -219,6 +219,7 @@ enum class AccessType : uint8_t
M(SYSTEM_FAILPOINT, "SYSTEM ENABLE FAILPOINT, SYSTEM DISABLE FAILPOINT, SYSTEM WAIT FAILPOINT", GLOBAL, SYSTEM) \
M(SYSTEM_LISTEN, "SYSTEM START LISTEN, SYSTEM STOP LISTEN", GLOBAL, SYSTEM) \
M(SYSTEM_JEMALLOC, "SYSTEM JEMALLOC PURGE, SYSTEM JEMALLOC ENABLE PROFILE, SYSTEM JEMALLOC DISABLE PROFILE, SYSTEM JEMALLOC FLUSH PROFILE", GLOBAL, SYSTEM) \
M(SYSTEM_LOAD_PRIMARY_KEY, "SYSTEM LOAD PRIMARY KEY", TABLE, SYSTEM) \
M(SYSTEM_UNLOAD_PRIMARY_KEY, "SYSTEM UNLOAD PRIMARY KEY", TABLE, SYSTEM) \
M(SYSTEM, "", GROUP, ALL) /* allows to execute SYSTEM {SHUTDOWN|RELOAD CONFIG|...} */ \
\

View File

@ -288,7 +288,7 @@ TEST(AccessRights, Union)
"SYSTEM REPLICATION QUEUES, SYSTEM VIRTUAL PARTS UPDATE, SYSTEM REDUCE BLOCKING PARTS, "
"SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, "
"SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, "
"SYSTEM UNLOAD PRIMARY KEY, dictGet ON db1.*, GRANT TABLE ENGINE ON db1, "
"SYSTEM LOAD PRIMARY KEY, SYSTEM UNLOAD PRIMARY KEY, dictGet ON db1.*, GRANT TABLE ENGINE ON db1, "
"GRANT SET DEFINER ON db1, GRANT NAMED COLLECTION ADMIN ON db1");
lhs = {};

View File

@ -818,6 +818,10 @@ BlockIO InterpreterSystemQuery::execute()
resetCoverage();
break;
}
case Type::LOAD_PRIMARY_KEY: {
loadPrimaryKeys();
break;
}
case Type::UNLOAD_PRIMARY_KEY:
{
unloadPrimaryKeys();
@ -1224,28 +1228,38 @@ void InterpreterSystemQuery::waitLoadingParts()
}
}
void InterpreterSystemQuery::loadPrimaryKeys()
{
loadOrUnloadPrimaryKeysImpl(true);
};
void InterpreterSystemQuery::unloadPrimaryKeys()
{
loadOrUnloadPrimaryKeysImpl(false);
}
void InterpreterSystemQuery::loadOrUnloadPrimaryKeysImpl(bool load)
{
if (!table_id.empty())
{
getContext()->checkAccess(AccessType::SYSTEM_UNLOAD_PRIMARY_KEY, table_id.database_name, table_id.table_name);
getContext()->checkAccess(load ? AccessType::SYSTEM_LOAD_PRIMARY_KEY : AccessType::SYSTEM_UNLOAD_PRIMARY_KEY, table_id.database_name, table_id.table_name);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (auto * merge_tree = dynamic_cast<MergeTreeData *>(table.get()))
{
LOG_TRACE(log, "Unloading primary keys for table {}", table_id.getFullTableName());
merge_tree->unloadPrimaryKeys();
LOG_TRACE(log, "{} primary keys for table {}", load ? "Loading" : "Unloading", table_id.getFullTableName());
load ? merge_tree->loadPrimaryKeys() : merge_tree->unloadPrimaryKeys();
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Command UNLOAD PRIMARY KEY is supported only for MergeTree table, but got: {}", table->getName());
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Command {} PRIMARY KEY is supported only for MergeTree tables, but got: {}", load ? "LOAD" : "UNLOAD", table->getName());
}
}
else
{
getContext()->checkAccess(AccessType::SYSTEM_UNLOAD_PRIMARY_KEY);
LOG_TRACE(log, "Unloading primary keys for all tables");
getContext()->checkAccess(load ? AccessType::SYSTEM_LOAD_PRIMARY_KEY : AccessType::SYSTEM_UNLOAD_PRIMARY_KEY);
LOG_TRACE(log, "{} primary keys for all tables", load ? "Loading" : "Unloading");
for (auto & database : DatabaseCatalog::instance().getDatabases())
{
@ -1253,7 +1267,7 @@ void InterpreterSystemQuery::unloadPrimaryKeys()
{
if (auto * merge_tree = dynamic_cast<MergeTreeData *>(it->table().get()))
{
merge_tree->unloadPrimaryKeys();
load ? merge_tree->loadPrimaryKeys() : merge_tree->unloadPrimaryKeys();
}
}
}
@ -1635,6 +1649,14 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_JEMALLOC);
break;
}
case Type::LOAD_PRIMARY_KEY:
{
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_LOAD_PRIMARY_KEY);
else
required_access.emplace_back(AccessType::SYSTEM_LOAD_PRIMARY_KEY, query.getDatabase(), query.getTable());
break;
}
case Type::UNLOAD_PRIMARY_KEY:
{
if (!query.table)

View File

@ -64,7 +64,10 @@ private:
void syncReplica(ASTSystemQuery & query);
void setReplicaReadiness(bool ready);
void waitLoadingParts();
void loadPrimaryKeys();
void unloadPrimaryKeys();
void loadOrUnloadPrimaryKeysImpl(bool load);
void syncReplicatedDatabase(ASTSystemQuery & query);

View File

@ -173,6 +173,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::START_PULLING_REPLICATION_LOG:
case Type::STOP_CLEANUP:
case Type::START_CLEANUP:
case Type::LOAD_PRIMARY_KEY:
case Type::UNLOAD_PRIMARY_KEY:
{
if (table)

View File

@ -105,6 +105,7 @@ public:
STOP_VIEWS,
CANCEL_VIEW,
TEST_VIEW,
LOAD_PRIMARY_KEY,
UNLOAD_PRIMARY_KEY,
END
};

View File

@ -325,6 +325,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
/// START/STOP DISTRIBUTED SENDS does not require table
case Type::STOP_DISTRIBUTED_SENDS:
case Type::START_DISTRIBUTED_SENDS:
case Type::LOAD_PRIMARY_KEY:
case Type::UNLOAD_PRIMARY_KEY:
{
if (!parseQueryWithOnClusterAndMaybeTable(res, pos, expected, /* require table = */ false, /* allow_string_literal = */ false))

View File

@ -9088,6 +9088,20 @@ bool MergeTreeData::initializeDiskOnConfigChange(const std::set<String> & new_ad
return true;
}
void MergeTreeData::loadPrimaryKeys()
{
static DataPartStates affordable_states = { MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated, MergeTreeDataPartState::Deleting };
for (const auto & data_part : getDataParts(affordable_states))
{
if (data_part->isProjectionPart())
continue;
if (!data_part->isIndexLoaded())
/// We call getIndex() because it calls loadIndex() after locking its mutex, but we don't need its value.
UNUSED(const_cast<IMergeTreeDataPart &>(*data_part).getIndex());
}
}
void MergeTreeData::unloadPrimaryKeys()
{
for (auto & part : getAllDataPartsVector())

View File

@ -1160,7 +1160,8 @@ public:
static VirtualColumnsDescription createVirtuals(const StorageInMemoryMetadata & metadata);
/// Unloads primary keys of all parts.
/// Load/unload primary keys of all data parts
void loadPrimaryKeys();
void unloadPrimaryKeys();
/// Unloads primary keys of outdated parts that are not used by any query.

View File

@ -192,7 +192,7 @@ def test_grant_all_on_table():
instance.query("GRANT ALL ON test.table TO B", user="A")
assert (
instance.query("SHOW GRANTS FOR B")
== "GRANT SHOW TABLES, SHOW COLUMNS, SHOW DICTIONARIES, SELECT, INSERT, ALTER TABLE, ALTER VIEW, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY, DROP TABLE, DROP VIEW, DROP DICTIONARY, UNDROP TABLE, TRUNCATE, OPTIMIZE, BACKUP, CREATE ROW POLICY, ALTER ROW POLICY, DROP ROW POLICY, SHOW ROW POLICIES, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM CLEANUP, SYSTEM VIEWS, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM VIRTUAL PARTS UPDATE, SYSTEM REDUCE BLOCKING PARTS, SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM FLUSH DISTRIBUTED, SYSTEM UNLOAD PRIMARY KEY, dictGet ON test.`table` TO B\n"
== "GRANT SHOW TABLES, SHOW COLUMNS, SHOW DICTIONARIES, SELECT, INSERT, ALTER TABLE, ALTER VIEW, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY, DROP TABLE, DROP VIEW, DROP DICTIONARY, UNDROP TABLE, TRUNCATE, OPTIMIZE, BACKUP, CREATE ROW POLICY, ALTER ROW POLICY, DROP ROW POLICY, SHOW ROW POLICIES, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM CLEANUP, SYSTEM VIEWS, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM VIRTUAL PARTS UPDATE, SYSTEM REDUCE BLOCKING PARTS, SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM FLUSH DISTRIBUTED, SYSTEM LOAD PRIMARY KEY, SYSTEM UNLOAD PRIMARY KEY, dictGet ON test.`table` TO B\n"
)
instance.query("REVOKE ALL ON test.table FROM B", user="A")
assert instance.query("SHOW GRANTS FOR B") == ""

View File

@ -168,6 +168,7 @@ SYSTEM UNFREEZE ['SYSTEM UNFREEZE'] GLOBAL SYSTEM
SYSTEM FAILPOINT ['SYSTEM ENABLE FAILPOINT','SYSTEM DISABLE FAILPOINT','SYSTEM WAIT FAILPOINT'] GLOBAL SYSTEM
SYSTEM LISTEN ['SYSTEM START LISTEN','SYSTEM STOP LISTEN'] GLOBAL SYSTEM
SYSTEM JEMALLOC ['SYSTEM JEMALLOC PURGE','SYSTEM JEMALLOC ENABLE PROFILE','SYSTEM JEMALLOC DISABLE PROFILE','SYSTEM JEMALLOC FLUSH PROFILE'] GLOBAL SYSTEM
SYSTEM LOAD PRIMARY KEY ['SYSTEM LOAD PRIMARY KEY'] TABLE SYSTEM
SYSTEM UNLOAD PRIMARY KEY ['SYSTEM UNLOAD PRIMARY KEY'] TABLE SYSTEM
SYSTEM [] \N ALL
dictGet ['dictHas','dictGetHierarchy','dictIsIn'] DICTIONARY ALL

View File

@ -0,0 +1,25 @@
-- Insert data into columns
100000 100000
-- Check primary key memory after inserting into both tables
test 100000000 100000000
test2 100000000 100000000
-- Unload primary keys for all tables in the database
OK
-- Check the primary key memory after unloading all tables
test 0 0
test2 0 0
-- Load primary key for all tables
OK
-- Check the primary key memory after loading all tables
test 100000000 100000000
test2 100000000 100000000
-- Unload primary keys for all tables in the database
OK
-- Check the primary key memory after unloading all tables
test 0 0
test2 0 0
-- Load primary key for only one table
OK
-- Check the primary key memory after loading only one table
test 100000000 100000000
test2 0 0

View File

@ -0,0 +1,85 @@
-- Tags: no-parallel
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS test2;
CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1;
CREATE TABLE test2 (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1;
SELECT '-- Insert data into columns';
INSERT INTO test SELECT randomString(1000) FROM numbers(100000);
INSERT INTO test2 SELECT randomString(1000) FROM numbers(100000);
SELECT (SELECT count() FROM test), (SELECT count() FROM test2);
SELECT '-- Check primary key memory after inserting into both tables';
SELECT
table,
round(primary_key_bytes_in_memory, -7),
round(primary_key_bytes_in_memory_allocated, -7)
FROM system.parts
WHERE
database = currentDatabase()
AND table IN ('test', 'test2')
ORDER BY table;
SELECT '-- Unload primary keys for all tables in the database';
SYSTEM UNLOAD PRIMARY KEY;
SELECT 'OK';
SELECT '-- Check the primary key memory after unloading all tables';
SELECT
table,
round(primary_key_bytes_in_memory, -7),
round(primary_key_bytes_in_memory_allocated, -7)
FROM system.parts
WHERE
database = currentDatabase()
AND table IN ('test', 'test2')
ORDER BY table;
SELECT '-- Load primary key for all tables';
SYSTEM LOAD PRIMARY KEY;
SELECT 'OK';
SELECT '-- Check the primary key memory after loading all tables';
SELECT
table,
round(primary_key_bytes_in_memory, -7),
round(primary_key_bytes_in_memory_allocated, -7)
FROM system.parts
WHERE
database = currentDatabase()
AND table IN ('test', 'test2')
ORDER BY table;
SELECT '-- Unload primary keys for all tables in the database';
SYSTEM UNLOAD PRIMARY KEY;
SELECT 'OK';
SELECT '-- Check the primary key memory after unloading all tables';
SELECT
table,
round(primary_key_bytes_in_memory, -7),
round(primary_key_bytes_in_memory_allocated, -7)
FROM system.parts
WHERE
database = currentDatabase()
AND table IN ('test', 'test2')
ORDER BY table;
SELECT '-- Load primary key for only one table';
SYSTEM LOAD PRIMARY KEY test;
SELECT 'OK';
SELECT '-- Check the primary key memory after loading only one table';
SELECT
table,
round(primary_key_bytes_in_memory, -7),
round(primary_key_bytes_in_memory_allocated, -7)
FROM system.parts
WHERE
database = currentDatabase()
AND table IN ('test', 'test2')
ORDER BY table;
DROP TABLE test;
DROP TABLE test2;