row_policy_template - table * is respected

This commit is contained in:
Ilya Golshtein 2023-05-12 22:30:00 +00:00
parent 1027db6aca
commit 9ef610040f
12 changed files with 43 additions and 16 deletions

View File

@ -12,7 +12,7 @@ Columns:
- `database` ([String](../../sql-reference/data-types/string.md)) — Database name.
- `table` ([String](../../sql-reference/data-types/string.md)) — Table name.
- `table` ([String](../../sql-reference/data-types/string.md)) — Table name. Empty if policy for database.
- `id` ([UUID](../../sql-reference/data-types/uuid.md)) — Row policy ID.

View File

@ -22,7 +22,7 @@ String RowPolicyName::toString() const
name += backQuoteIfNeed(database);
name += '.';
}
name += backQuoteIfNeed(table_name);
name += (table_name == RowPolicyName::ANY_TABLE_MARK ? "*" : backQuoteIfNeed(table_name));
return name;
}

View File

@ -9,6 +9,8 @@ namespace DB
/// Represents the full name of a row policy, e.g. "myfilter ON mydb.mytable".
struct RowPolicyName
{
static constexpr char ANY_TABLE_MARK[] = "";
String short_name;
String database;
String table_name;

View File

@ -36,11 +36,19 @@ RowPolicyFilterPtr EnabledRowPolicies::getFilter(const String & database, const
auto it = loaded->find({database, table_name, filter_type});
if (it == loaded->end())
{ /// Look for a policy for database if a table policy not found
it = loaded->find({database, RowPolicy::ANY_TABLE_MARK, filter_type});
it = loaded->find({database, RowPolicyName::ANY_TABLE_MARK, filter_type});
if (it == loaded->end())
{
return {};
}
else
{
// deep copy found policy for database and change its table name to the actual one
auto policy_for_database = std::make_shared<RowPolicyFilter>(*it->second);
auto database_and_table_name = std::make_shared<std::pair<String, String>>(database, table_name);
policy_for_database->database_and_table_name = database_and_table_name;
return policy_for_database;
}
}
return it->second;

View File

@ -14,8 +14,6 @@ namespace DB
*/
struct RowPolicy : public IAccessEntity
{
static constexpr char ANY_TABLE_MARK[] = "*";
void setShortName(const String & short_name);
void setDatabase(const String & database);
void setTableName(const String & table_name);
@ -38,7 +36,7 @@ struct RowPolicy : public IAccessEntity
bool isPermissive() const { return !isRestrictive(); }
/// Applied for entire database
bool isForDatabase() const { return full_name.table_name == ANY_TABLE_MARK; }
bool isForDatabase() const { return full_name.table_name == RowPolicyName::ANY_TABLE_MARK; }
/// Sets that the policy is restrictive.
/// A row is only accessible if at least one of the permissive policies passes,

View File

@ -270,7 +270,7 @@ void RowPolicyCache::mixFiltersFor(EnabledRowPolicies & enabled)
if (table_it == table_mixers.end())
{ /// no exact match - create new mixer
MixedFiltersKey database_key = key;
database_key.table_name = RowPolicy::ANY_TABLE_MARK;
database_key.table_name = RowPolicyName::ANY_TABLE_MARK;
auto database_it = database_mixers.find(database_key);

View File

@ -30,6 +30,11 @@ void ASTRowPolicyName::replaceEmptyDatabase(const String & current_database)
full_name.database = current_database;
}
String ASTRowPolicyNames::tableOrAsterisk(const String & table_name) const
{
return table_name == RowPolicyName::ANY_TABLE_MARK ? "*" : backQuoteIfNeed(table_name);
}
void ASTRowPolicyNames::formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const
{
@ -73,7 +78,7 @@ void ASTRowPolicyNames::formatImpl(const FormatSettings & settings, FormatState
const String & table_name = full_name.table_name;
if (!database.empty())
settings.ostr << backQuoteIfNeed(database) + ".";
settings.ostr << backQuoteIfNeed(table_name);
settings.ostr << tableOrAsterisk(table_name);
}
}
else if (same_db_and_table_name)
@ -92,7 +97,7 @@ void ASTRowPolicyNames::formatImpl(const FormatSettings & settings, FormatState
settings.ostr << (settings.hilite ? hilite_keyword : "") << " ON " << (settings.hilite ? hilite_none : "");
if (!database.empty())
settings.ostr << backQuoteIfNeed(database) + ".";
settings.ostr << backQuoteIfNeed(table_name);
settings.ostr << tableOrAsterisk(table_name);
}
else
{
@ -108,7 +113,7 @@ void ASTRowPolicyNames::formatImpl(const FormatSettings & settings, FormatState
<< (settings.hilite ? hilite_none : "");
if (!database.empty())
settings.ostr << backQuoteIfNeed(database) + ".";
settings.ostr << backQuoteIfNeed(table_name);
settings.ostr << tableOrAsterisk(table_name);
}
}

View File

@ -45,5 +45,8 @@ public:
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams &) const override { return removeOnCluster<ASTRowPolicyNames>(clone()); }
void replaceEmptyDatabase(const String & current_database);
private:
String tableOrAsterisk(const String & table_name) const;
};
}

View File

@ -36,7 +36,7 @@ namespace
}
else if (is_any_table)
{
res_table_name = "*"; // RowPolicy::ANY_TABLE_MARK
res_table_name = RowPolicyName::ANY_TABLE_MARK;
}
/// If table is specified without DB it cannot be followed by "ON"

View File

@ -40,3 +40,5 @@ None
3 30
4 40
No problematic policy, select works
Policy for table `*` does not affect other tables in the database
other 100 20

View File

@ -86,3 +86,13 @@ DROP TABLE 02703_db.02703_rptable;
DROP TABLE 02703_db.02703_rptable_another;
DROP TABLE 02703_db.02703_unexpected_columns;
DROP DATABASE 02703_db;
SELECT 'Policy for table `*` does not affect other tables in the database';
CREATE DATABASE 02703_db_asterisk;
CREATE ROW POLICY 02703_asterisk ON 02703_db_asterisk.`*` USING x=1 AS permissive TO ALL;
CREATE TABLE 02703_db_asterisk.`*` (x UInt8, y UInt8) ENGINE = MergeTree ORDER BY x AS SELECT 100, 20;
CREATE TABLE 02703_db_asterisk.`other` (x UInt8, y UInt8) ENGINE = MergeTree ORDER BY x AS SELECT 100, 20;
SELECT 'star', * FROM 02703_db_asterisk.`*`;
SELECT 'other', * FROM 02703_db_asterisk.other;
DROP ROW POLICY 02703_asterisk ON 02703_db_asterisk.`*`;
DROP DATABASE 02703_db_asterisk;

View File

@ -1,11 +1,10 @@
-- row policies for database
-- SHOW CREATE POLICY db1_02703 ON db1_02703.*
CREATE ROW POLICY db1_02703 ON db1_02703.`*` FOR SELECT USING 1 TO ALL
CREATE ROW POLICY db1_02703 ON db1_02703.* FOR SELECT USING 1 TO ALL
-- SHOW CREATE POLICY ON db1_02703.*
CREATE ROW POLICY db1_02703 ON db1_02703.`*` FOR SELECT USING 1 TO ALL
CREATE ROW POLICY db1_02703 ON db1_02703.* FOR SELECT USING 1 TO ALL
CREATE ROW POLICY tbl1_02703 ON db1_02703.table FOR SELECT USING 1 TO ALL
-- SHOW CREATE POLICY ON db1_02703.`*`
CREATE ROW POLICY db1_02703 ON db1_02703.`*` FOR SELECT USING 1 TO ALL
R1, R2: (x == 1) OR (x == 2)
1
2
@ -15,7 +14,7 @@ SELECT \' -- SHOW CREATE POLICY db1_02703 ON db1_02703.*\'; []
SELECT \' -- SHOW CREATE POLICY ON db1_02703.*\'; []
SELECT \' -- SHOW CREATE POLICY ON db1_02703.`*`\'; []
SELECT \'R1, R2: (x == 1) OR (x == 2)\'; []
SELECT * FROM 02703_rqtable_default; ['`02703_filter_11_db` ON default.`*`','`02703_filter_11` ON default.`02703_rqtable_default`']
SELECT * FROM 02703_rqtable_default; ['`02703_filter_11_db` ON default.*','`02703_filter_11` ON default.`02703_rqtable_default`']
SELECT \'Check system.query_log\'; []
-- CREATE DATABASE-LEVEL POLICY IN CURRENT DATABASE
CREATE ROW POLICY db2_02703 ON db1_02703.`*` TO u1_02703
CREATE ROW POLICY db2_02703 ON db1_02703.* TO u1_02703