Merge pull request #17657 from MyroTk/master

RBAC testflows tests for SHOW, TRUNCATE, KILL, and OPTIMIZE.
This commit is contained in:
alexey-milovidov 2020-12-01 20:57:31 +03:00 committed by GitHub
commit 1cdf5012c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 3419 additions and 1492 deletions

View File

@ -20,6 +20,10 @@ issue_14810 = "https://github.com/ClickHouse/ClickHouse/issues/14810"
issue_15165 = "https://github.com/ClickHouse/ClickHouse/issues/15165"
issue_15980 = "https://github.com/ClickHouse/ClickHouse/issues/15980"
issue_16403 = "https://github.com/ClickHouse/ClickHouse/issues/16403"
issue_17146 = "https://github.com/ClickHouse/ClickHouse/issues/17146"
issue_17147 = "https://github.com/ClickHouse/ClickHouse/issues/17147"
issue_17653 = "https://github.com/ClickHouse/ClickHouse/issues/17653"
issue_17655 = "https://github.com/ClickHouse/ClickHouse/issues/17655"
xfails = {
"syntax/show create quota/I show create quota current":
@ -97,11 +101,17 @@ xfails = {
"privileges/alter move/:/:/:/:/user with revoked ALTER MOVE PARTITION privilege/":
[(Fail, issue_16403)],
"privileges/create table/create with join query privilege granted directly or via role/:":
[(Fail, issue_14149)],
[(Fail, issue_17653)],
"privileges/create table/create with join union subquery privilege granted directly or via role/:":
[(Fail, issue_14149)],
[(Fail, issue_17653)],
"privileges/create table/create with nested tables privilege granted directly or via role/:":
[(Fail, issue_14149)],
[(Fail, issue_17653)],
"privileges/kill mutation/no privilege/kill mutation on cluster":
[(Fail, issue_17146)],
"privileges/kill query/privilege granted directly or via role/:/":
[(Fail, issue_17147)],
"privileges/show dictionaries/:/check privilege/:/exists/EXISTS with privilege":
[(Fail, issue_17655)],
}
xflags = {

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -10,9 +10,6 @@ import rbac.helper.errors as errors
aliases = {"ALTER DELETE", "DELETE"}
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_AlterDelete_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, table_type, privilege, node=None):
"""Check that user is only able to execute ALTER DELETE when they have required privilege, either directly or via role.
"""

View File

@ -10,9 +10,6 @@ import rbac.helper.errors as errors
aliases = {"ALTER FETCH PARTITION", "FETCH PARTITION"}
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_AlterFetch_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, table_type, privilege, node=None):
"""Check that user is only able to execute ALTER FETCH PARTITION when they have required privilege, either directly or via role.
"""

View File

@ -10,9 +10,6 @@ import rbac.helper.errors as errors
aliases = {"ALTER FREEZE PARTITION", "FREEZE PARTITION"}
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_AlterFreeze_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, table_type, privilege, node=None):
"""Check that user is only able to execute ALTER FREEZE PARTITION when they have required privilege, either directly or via role.
"""

View File

@ -10,9 +10,6 @@ import rbac.helper.errors as errors
aliases = {"ALTER MOVE PARTITION", "ALTER MOVE PART", "MOVE PARTITION", "MOVE PART"}
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_AlterMove_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, table_type, privilege, node=None):
"""Check that user is only able to execute ALTER MOVE PARTITION when they have required privilege, either directly or via role.
"""

View File

@ -10,9 +10,6 @@ import rbac.helper.errors as errors
aliases = {"ALTER UPDATE", "UPDATE"}
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_AlterUpdate_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, table_type, privilege, node=None):
"""Check that user is only able to execute ALTER UPDATE when they have required privilege, either directly or via role.
"""

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_CreateDatabase_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute ATTACH DATABASE when they have required privilege, either directly or via role.
"""

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_CreateDictionary_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute ATTACH DICTIONARY when they have required privilege, either directly or via role.
"""

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_CreateTable_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute ATTACH TABLE when they have required privilege, either directly or via role.
"""

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_CreateTemporaryTable_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute ATTACH TEMPORARY TABLE when they have required privilege, either directly or via role.
"""

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_CreateDatabase_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute CREATE DATABASE when they have required privilege, either directly or via role.
"""

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_CreateDictionary_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute CREATE DICTIONARY when they have required privilege, either directly or via role.
"""

View File

@ -17,13 +17,17 @@ def create_without_create_table_privilege(self, node=None):
node = self.context.node
with user(node, f"{user_name}"):
try:
with Given("I don't have a table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with Given("I don't have a table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with When("I try to create a table without CREATE TABLE privilege as the user"):
node.query(f"CREATE TABLE {table_name} (x Int8) ENGINE = Memory", settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
with When("I try to create a table without CREATE TABLE privilege as the user"):
node.query(f"CREATE TABLE {table_name} (x Int8) ENGINE = Memory", settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
finally:
with Finally("I drop the table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
@TestScenario
def create_with_create_table_privilege_granted_directly_or_via_role(self, node=None):
@ -103,18 +107,23 @@ def create_with_revoked_create_table_privilege(self, grant_target_name, user_nam
if node is None:
node = self.context.node
with Given("I don't have a table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
try:
with Given("I don't have a table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with When("I grant CREATE TABLE privilege"):
node.query(f"GRANT CREATE TABLE ON {table_name} TO {grant_target_name}")
with When("I grant CREATE TABLE privilege"):
node.query(f"GRANT CREATE TABLE ON {table_name} TO {grant_target_name}")
with And("I revoke CREATE TABLE privilege"):
node.query(f"REVOKE CREATE TABLE ON {table_name} FROM {grant_target_name}")
with And("I revoke CREATE TABLE privilege"):
node.query(f"REVOKE CREATE TABLE ON {table_name} FROM {grant_target_name}")
with Then("I try to create a table on the table as the user"):
node.query(f"CREATE TABLE {table_name} (x Int8) ENGINE = Memory", settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
with Then("I try to create a table on the table as the user"):
node.query(f"CREATE TABLE {table_name} (x Int8) ENGINE = Memory", settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
finally:
with Finally("I drop the table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
@TestScenario
def create_without_source_table_privilege(self, node=None):
@ -131,16 +140,21 @@ def create_without_source_table_privilege(self, node=None):
with table(node, f"{source_table_name}"):
with user(node, f"{user_name}"):
try:
with When("I grant CREATE TABLE privilege to a user"):
node.query(f"GRANT CREATE TABLE ON {table_name} TO {user_name}")
with When("I grant CREATE TABLE privilege to a user"):
node.query(f"GRANT CREATE TABLE ON {table_name} TO {user_name}")
with And("I grant INSERT privilege"):
node.query(f"GRANT INSERT ON {table_name} TO {user_name}")
with And("I grant INSERT privilege"):
node.query(f"GRANT INSERT ON {table_name} TO {user_name}")
with Then("I try to create a table without select privilege on the table"):
node.query(f"CREATE TABLE {table_name} ENGINE = Memory AS SELECT * FROM {source_table_name}", settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
with Then("I try to create a table without select privilege on the table"):
node.query(f"CREATE TABLE {table_name} ENGINE = Memory AS SELECT * FROM {source_table_name}", settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
finally:
with Finally("I drop the table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
@TestScenario
def create_without_insert_privilege(self, node=None):
@ -158,15 +172,19 @@ def create_without_insert_privilege(self, node=None):
with table(node, f"{source_table_name}"):
with user(node, f"{user_name}"):
with When("I grant CREATE TABLE privilege to a user"):
node.query(f"GRANT CREATE TABLE ON {table_name} TO {user_name}")
try:
with When("I grant CREATE TABLE privilege to a user"):
node.query(f"GRANT CREATE TABLE ON {table_name} TO {user_name}")
with And("I grant SELECT privilege"):
node.query(f"GRANT SELECT ON {source_table_name} TO {user_name}")
with And("I grant SELECT privilege"):
node.query(f"GRANT SELECT ON {source_table_name} TO {user_name}")
with Then("I try to create a table without select privilege on the table"):
node.query(f"CREATE TABLE {table_name} ENGINE = Memory AS SELECT * FROM {source_table_name}", settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
with Then("I try to create a table without select privilege on the table"):
node.query(f"CREATE TABLE {table_name} ENGINE = Memory AS SELECT * FROM {source_table_name}", settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
finally:
with Finally("I drop the table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
@TestScenario
def create_with_source_table_privilege_granted_directly_or_via_role(self, node=None):
@ -281,20 +299,14 @@ def create_with_subquery(self, user_name, grant_target_name, node=None):
with When(f"permutation={permutation}, tables granted = {tables_granted}"):
with Given("I don't have a table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with Then("I attempt to create a table as the user"):
with When("I attempt to create a table as the user"):
node.query(create_table_query.format(table_name=table_name, table0_name=table0_name, table1_name=table1_name, table2_name=table2_name), settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
with When("I grant select on all tables"):
with grant_select_on_table(node, max(permutations(table_count=3))+1, grant_target_name, table0_name, table1_name, table2_name):
with Given("I don't have a table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with Then("I attempt to create a table as the user"):
with When("I attempt to create a table as the user"):
node.query(create_table_query.format(table_name=table_name, table0_name=table0_name, table1_name=table1_name, table2_name=table2_name), settings = [("user", f"{user_name}")])
finally:
@ -358,20 +370,14 @@ def create_with_join_query(self, grant_target_name, user_name, node=None):
with When(f"permutation={permutation}, tables granted = {tables_granted}"):
with Given("I don't have a table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with Then("I attempt to create a table as the user"):
with When("I attempt to create a table as the user"):
node.query(create_table_query.format(table_name=table_name, table0_name=table0_name, table1_name=table1_name), settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
with When("I grant select on all tables"):
with grant_select_on_table(node, max(permutations(table_count=2))+1, grant_target_name, table0_name, table1_name):
with Given("I don't have a table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with Then("I attempt to create a table as the user"):
with When("I attempt to create a table as the user"):
node.query(create_table_query.format(table_name=table_name, table0_name=table0_name, table1_name=table1_name), settings = [("user", f"{user_name}")])
finally:
@ -435,20 +441,14 @@ def create_with_union_query(self, grant_target_name, user_name, node=None):
with When(f"permutation={permutation}, tables granted = {tables_granted}"):
with Given("I don't have a table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with Then("I attempt to create a table as the user"):
with When("I attempt to create a table as the user"):
node.query(create_table_query.format(table_name=table_name, table0_name=table0_name, table1_name=table1_name), settings = [("user", f"{user_name}")],
exitcode=exitcode, message=message)
with When("I grant select on all tables"):
with grant_select_on_table(node, max(permutations(table_count=2))+1, grant_target_name, table0_name, table1_name):
with Given("I don't have a table"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with Then("I attempt to create a table as the user"):
with When("I attempt to create a table as the user"):
node.query(create_table_query.format(table_name=table_name, table0_name=table0_name, table1_name=table1_name), settings = [("user", f"{user_name}")])
finally:
@ -622,9 +622,18 @@ def create_with_nested_tables(self, grant_target_name, user_name, node=None):
settings = [("user", f"{user_name}")])
finally:
with Finally("I drop the tables"):
with Finally(f"I drop {table_name}"):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with And(f"I drop {table1_name}"):
node.query(f"DROP TABLE IF EXISTS {table1_name}")
with And(f"I drop {table3_name}"):
node.query(f"DROP TABLE IF EXISTS {table3_name}")
with And(f"I drop {table5_name}"):
node.query(f"DROP TABLE IF EXISTS {table5_name}")
@TestScenario
def create_as_another_table(self, node=None):
"""Check that user is able to create a table as another table with only CREATE TABLE privilege.
@ -707,7 +716,6 @@ def create_as_merge(self, node=None):
@TestFeature
@Requirements(
RQ_SRS_006_RBAC_Privileges_CreateTable("1.0"),
RQ_SRS_006_RBAC_Privileges_CreateTable_Access("1.0"),
)
@Name("create table")
def feature(self, stress=None, parallel=None, node="clickhouse1"):

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_CreateTemporaryTable_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute CREATE TEMPORARY TABLE when they have required privilege, either directly or via role.
"""

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_DropDatabase_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute DETACH DATABASE when they have required privilege, either directly or via role.
"""
@ -46,7 +43,9 @@ def privilege_check(grant_target_name, user_name, node=None):
node.query(f"DETACH DATABASE {db_name}", settings = [("user", user_name)],
exitcode=exitcode, message=message)
finally:
with Finally("I drop the database"):
with Finally("I reattach the database", flags=TE):
node.query(f"ATTACH DATABASE IF NOT EXISTS {db_name}")
with And("I drop the database", flags=TE):
node.query(f"DROP DATABASE IF EXISTS {db_name}")
with Scenario("user with privilege", setup=instrument_clickhouse_server_log):
@ -63,7 +62,9 @@ def privilege_check(grant_target_name, user_name, node=None):
node.query(f"DETACH DATABASE {db_name}", settings = [("user", user_name)])
finally:
with Finally("I drop the database"):
with Finally("I reattach the database", flags=TE):
node.query(f"ATTACH DATABASE IF NOT EXISTS {db_name}")
with And("I drop the database", flags=TE):
node.query(f"DROP DATABASE IF EXISTS {db_name}")
with Scenario("user with revoked privilege", setup=instrument_clickhouse_server_log):
@ -84,7 +85,9 @@ def privilege_check(grant_target_name, user_name, node=None):
exitcode=exitcode, message=message)
finally:
with Finally("I drop the database"):
with Finally("I reattach the database", flags=TE):
node.query(f"ATTACH DATABASE IF NOT EXISTS {db_name}")
with And("I drop the database", flags=TE):
node.query(f"DROP DATABASE IF EXISTS {db_name}")
@TestFeature

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_DropDictionary_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute DETACH DICTIONARY when they have required privilege, either directly or via role.
"""
@ -46,7 +43,9 @@ def privilege_check(grant_target_name, user_name, node=None):
node.query(f"DETACH DICTIONARY {dict_name}", settings = [("user", user_name)], exitcode=exitcode, message=message)
finally:
with Finally("I drop the dictionary"):
with Finally("I reattach the dictionary", flags=TE):
node.query(f"ATTACH DICTIONARY IF NOT EXISTS {dict_name}")
with And("I drop the dictionary", flags=TE):
node.query(f"DROP DICTIONARY IF EXISTS {dict_name}")
with Scenario("user with privilege", setup=instrument_clickhouse_server_log):
@ -63,7 +62,9 @@ def privilege_check(grant_target_name, user_name, node=None):
node.query(f"DETACH DICTIONARY {dict_name}", settings = [("user", user_name)])
finally:
with Finally("I drop the dictionary"):
with Finally("I reattach the dictionary", flags=TE):
node.query(f"ATTACH DICTIONARY IF NOT EXISTS {dict_name}")
with And("I drop the dictionary", flags=TE):
node.query(f"DROP DICTIONARY IF EXISTS {dict_name}")
with Scenario("user with revoked privilege", setup=instrument_clickhouse_server_log):
@ -83,7 +84,9 @@ def privilege_check(grant_target_name, user_name, node=None):
node.query(f"DETACH DICTIONARY {dict_name}", settings = [("user", user_name)], exitcode=exitcode, message=message)
finally:
with Finally("I drop the dictionary"):
with Finally("I reattach the dictionary", flags=TE):
node.query(f"ATTACH DICTIONARY IF NOT EXISTS {dict_name}")
with And("I drop the dictionary", flags=TE):
node.query(f"DROP DICTIONARY IF EXISTS {dict_name}")
@TestFeature

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_DropTable_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute DETACH TABLE when they have required privilege, either directly or via role.
"""
@ -47,7 +44,9 @@ def privilege_check(grant_target_name, user_name, node=None):
exitcode=exitcode, message=message)
finally:
with Finally("I drop the table"):
with Finally("I reattach the table", flags=TE):
node.query(f"ATTACH TABLE IF NOT EXISTS {table_name}")
with And("I drop the table", flags=TE):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with Scenario("user with privilege", setup=instrument_clickhouse_server_log):
@ -64,7 +63,9 @@ def privilege_check(grant_target_name, user_name, node=None):
node.query(f"DETACH TABLE {table_name}", settings = [("user", user_name)])
finally:
with Finally("I drop the table"):
with Finally("I reattach the table", flags=TE):
node.query(f"ATTACH TABLE IF NOT EXISTS {table_name}")
with And("I drop the table", flags=TE):
node.query(f"DROP TABLE IF EXISTS {table_name}")
with Scenario("user with revoked privilege", setup=instrument_clickhouse_server_log):
@ -85,7 +86,9 @@ def privilege_check(grant_target_name, user_name, node=None):
exitcode=exitcode, message=message)
finally:
with Finally("I drop the table"):
with Finally("I reattach the table", flags=TE):
node.query(f"ATTACH TABLE IF NOT EXISTS {table_name}")
with And("I drop the table", flags=TE):
node.query(f"DROP TABLE IF EXISTS {table_name}")
@TestFeature

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_DropView_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute DETACH VIEW when they have required privilege, either directly or via role.
"""
@ -47,8 +44,10 @@ def privilege_check(grant_target_name, user_name, node=None):
exitcode=exitcode, message=message)
finally:
with Finally("I drop the view"):
node.query(f"DROP VIEW IF EXISTS {view_name}")
with Finally("I reattach the view as a table", flags=TE):
node.query(f"ATTACH TABLE IF NOT EXISTS {view_name}")
with And("I drop the view", flags=TE):
node.query(f"DROP TABLE IF EXISTS {view_name}")
with Scenario("user with privilege", setup=instrument_clickhouse_server_log):
view_name = f"view_{getuid()}"
@ -64,8 +63,10 @@ def privilege_check(grant_target_name, user_name, node=None):
node.query(f"DETACH VIEW {view_name}", settings = [("user", user_name)])
finally:
with Finally("I drop the view"):
node.query(f"DROP VIEW IF EXISTS {view_name}")
with Finally("I reattach the view as a table", flags=TE):
node.query(f"ATTACH TABLE IF NOT EXISTS {view_name}")
with And("I drop the table", flags=TE):
node.query(f"DROP TABLE IF EXISTS {view_name}")
with Scenario("user with revoked privilege", setup=instrument_clickhouse_server_log):
view_name = f"view_{getuid()}"
@ -85,8 +86,10 @@ def privilege_check(grant_target_name, user_name, node=None):
exitcode=exitcode, message=message)
finally:
with Finally("I drop the view"):
node.query(f"DROP VIEW IF EXISTS {view_name}")
with Finally("I reattach the view as a table", flags=TE):
node.query(f"ATTACH TABLE IF NOT EXISTS {view_name}")
with And("I drop the view", flags=TE):
node.query(f"DROP TABLE IF EXISTS {view_name}")
@TestFeature
@Requirements(

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_DropDatabase_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute DROP DATABASE when they have required privilege, either directly or via role.
"""

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_DropDictionary_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute DROP DICTIONARY when they have required privilege, either directly or via role.
"""

View File

@ -3,9 +3,6 @@ from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_DropTable_Access("1.0"),
)
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute DROP TABLE when they have required privilege, either directly or via role.
"""

View File

@ -13,10 +13,18 @@ def feature(self):
try:
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.insert", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.select", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.show_tables", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.public_tables", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.distributed_table", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.grant_option", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.truncate", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.optimize", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.kill_query", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.kill_mutation", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.show.show_tables", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.show.show_dictionaries", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.show.show_databases", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.show.show_columns", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.alter.alter_column", "feature"), flags=TE), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.alter.alter_index", "feature"), flags=TE), {})

View File

@ -21,53 +21,67 @@ def without_privilege(self, table_type, node=None):
"""
user_name = f"user_{getuid()}"
table_name = f"table_{getuid()}"
if node is None:
node = self.context.node
with table(node, table_name, table_type):
with user(node, user_name):
with When("I run INSERT without privilege"):
exitcode, message = errors.not_enough_privileges(name=user_name)
node.query(f"INSERT INTO {table_name} (d) VALUES ('2020-01-01')", settings = [("user", user_name)],
exitcode=exitcode, message=message)
@TestScenario
@Requirements(
RQ_SRS_006_RBAC_Privileges_Insert_Grant("1.0"),
RQ_SRS_006_RBAC_Grant_Privilege_Insert("1.0"),
)
def user_with_privilege(self, table_type, node=None):
"""Check that user can insert into a table on which they have insert privilege.
"""
user_name = f"user_{getuid()}"
table_name = f"table_{getuid()}"
if node is None:
node = self.context.node
with table(node, table_name, table_type):
with user(node, user_name):
with When("I grant insert privilege"):
node.query(f"GRANT INSERT ON {table_name} TO {user_name}")
with And("I use INSERT"):
node.query(f"INSERT INTO {table_name} (d) VALUES ('2020-01-01')", settings=[("user",user_name)])
with Then("I check the insert functioned"):
output = node.query(f"SELECT d FROM {table_name} FORMAT JSONEachRow").output
assert output == '{"d":"2020-01-01"}', error()
@TestScenario
@Requirements(
RQ_SRS_006_RBAC_Privileges_Insert_Revoke("1.0"),
RQ_SRS_006_RBAC_Revoke_Privilege_Insert("1.0"),
)
def user_with_revoked_privilege(self, table_type, node=None):
"""Check that user is unable to insert into a table after insert privilege on that table has been revoked from user.
"""
user_name = f"user_{getuid()}"
table_name = f"table_{getuid()}"
if node is None:
node = self.context.node
with table(node, table_name, table_type):
with user(node, user_name):
with When("I grant insert privilege"):
node.query(f"GRANT INSERT ON {table_name} TO {user_name}")
with And("I revoke insert privilege"):
node.query(f"REVOKE INSERT ON {table_name} FROM {user_name}")
with And("I use INSERT"):
exitcode, message = errors.not_enough_privileges(name=user_name)
node.query(f"INSERT INTO {table_name} (d) VALUES ('2020-01-01')",
@ -124,7 +138,7 @@ def user_column_privileges(self, grant_columns, insert_columns_pass, data_fail,
@TestScenario
@Requirements(
RQ_SRS_006_RBAC_Privileges_Insert_Grant("1.0"),
RQ_SRS_006_RBAC_Grant_Privilege_Insert("1.0"),
)
def role_with_privilege(self, table_type, node=None):
"""Check that user can insert into a table after being granted a role that
@ -149,7 +163,7 @@ def role_with_privilege(self, table_type, node=None):
@TestScenario
@Requirements(
RQ_SRS_006_RBAC_Privileges_Insert_Revoke("1.0"),
RQ_SRS_006_RBAC_Revoke_Privilege_Insert("1.0"),
)
def role_with_revoked_privilege(self, table_type, node=None):
"""Check that user with a role that has insert privilege on a table

View File

@ -0,0 +1,256 @@
from rbac.requirements import *
from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
def no_privilege(self, node=None):
"""Check that user doesn't need privileges to execute `KILL MUTATION` with no mutations.
"""
if node is None:
node = self.context.node
with Scenario("kill mutation on a table"):
user_name = f"user_{getuid()}"
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with user(node, user_name):
with When("I attempt to kill mutation on table"):
node.query(f"KILL MUTATION WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)])
with Scenario("kill mutation on cluster"):
user_name = f"user_{getuid()}"
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with user(node, user_name):
with When("I attempt to kill mutation on cluster"):
node.query(f"KILL MUTATION ON CLUSTER sharded_cluster WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)])
@TestSuite
def privileges_granted_directly(self, node=None):
"""Check that a user is able to execute `KILL MUTATION` on a table with a mutation
if and only if the user has privilege matching the source of the mutation on that table.
For example, to execute `KILL MUTATION` after `ALTER UPDATE`, the user needs `ALTER UPDATE` privilege.
"""
user_name = f"user_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"):
Suite(test=update, setup=instrument_clickhouse_server_log)(user_name=user_name, grant_target_name=user_name)
Suite(test=delete, setup=instrument_clickhouse_server_log)(user_name=user_name, grant_target_name=user_name)
Suite(test=drop_column, setup=instrument_clickhouse_server_log)(user_name=user_name, grant_target_name=user_name)
@TestSuite
def privileges_granted_via_role(self, node=None):
"""Check that a user is able to execute `KILL MUTATION` on a table with a mutation
if and only if the user has privilege matching the source of the mutation on that table.
For example, to execute `KILL MUTATION` after `ALTER UPDATE`, the user needs `ALTER UPDATE` privilege.
"""
user_name = f"user_{getuid()}"
role_name = f"role_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"), role(node, f"{role_name}"):
with When("I grant the role to the user"):
node.query(f"GRANT {role_name} TO {user_name}")
Suite(test=update, setup=instrument_clickhouse_server_log)(user_name=user_name, grant_target_name=role_name)
Suite(test=delete, setup=instrument_clickhouse_server_log)(user_name=user_name, grant_target_name=role_name)
Suite(test=drop_column, setup=instrument_clickhouse_server_log)(user_name=user_name, grant_target_name=role_name)
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_KillMutation_AlterUpdate("1.0")
)
def update(self, user_name, grant_target_name, node=None):
"""Check that the user is able to execute `KILL MUTATION` after `ALTER UPDATE`
if and only if the user has `ALTER UPDATE` privilege.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
with Scenario("KILL ALTER UPDATE without privilege"):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with Given("I have an ALTER UPDATE mutation"):
node.query(f"ALTER TABLE {table_name} UPDATE a = x WHERE 1")
with When("I try to KILL MUTATION"):
node.query(f"KILL MUTATION WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)],
exitcode=exitcode, message="Exception: Not allowed to kill mutation.")
with Scenario("KILL ALTER UPDATE with privilege"):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with Given("I have an ALTER UPDATE mutation"):
node.query(f"ALTER TABLE {table_name} UPDATE a = x WHERE 1")
with When("I grant the ALTER UPDATE privilege"):
node.query(f"GRANT ALTER UPDATE ON {table_name} TO {grant_target_name}")
with When("I try to KILL MUTATION"):
node.query(f"KILL MUTATION WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)])
with Scenario("KILL ALTER UPDATE with revoked privilege"):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with Given("I have an ALTER UPDATE mutation"):
node.query(f"ALTER TABLE {table_name} UPDATE a = x WHERE 1")
with When("I grant the ALTER UPDATE privilege"):
node.query(f"GRANT ALTER UPDATE ON {table_name} TO {grant_target_name}")
with And("I revoke the ALTER UPDATE privilege"):
node.query(f"REVOKE ALTER UPDATE ON {table_name} FROM {grant_target_name}")
with When("I try to KILL MUTATION"):
node.query(f"KILL MUTATION WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)],
exitcode=exitcode, message="Exception: Not allowed to kill mutation.")
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_KillMutation_AlterDelete("1.0")
)
def delete(self, user_name, grant_target_name, node=None):
"""Check that the user is able to execute `KILL MUTATION` after `ALTER DELETE`
if and only if the user has `ALTER DELETE` privilege.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
with Scenario("KILL ALTER DELETE without privilege"):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with Given("I have an ALTER DELETE mutation"):
node.query(f"ALTER TABLE {table_name} DELETE WHERE 1")
with When("I try to KILL MUTATION"):
node.query(f"KILL MUTATION WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)],
exitcode=exitcode, message="Exception: Not allowed to kill mutation.")
with Scenario("KILL ALTER DELETE with privilege"):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with Given("I have an ALTER DELETE mutation"):
node.query(f"ALTER TABLE {table_name} DELETE WHERE 1")
with When("I grant the ALTER DELETE privilege"):
node.query(f"GRANT ALTER DELETE ON {table_name} TO {grant_target_name}")
with When("I try to KILL MUTATION"):
node.query(f"KILL MUTATION WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)])
with Scenario("KILL ALTER DELETE with revoked privilege"):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with Given("I have an ALTER DELETE mutation"):
node.query(f"ALTER TABLE {table_name} DELETE WHERE 1")
with When("I grant the ALTER DELETE privilege"):
node.query(f"GRANT ALTER DELETE ON {table_name} TO {grant_target_name}")
with And("I revoke the ALTER DELETE privilege"):
node.query(f"REVOKE ALTER DELETE ON {table_name} FROM {grant_target_name}")
with When("I try to KILL MUTATION"):
node.query(f"KILL MUTATION WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)],
exitcode=exitcode, message="Exception: Not allowed to kill mutation.")
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_KillMutation_AlterDropColumn("1.0")
)
def drop_column(self, user_name, grant_target_name, node=None):
"""Check that the user is able to execute `KILL MUTATION` after `ALTER DROP COLUMN`
if and only if the user has `ALTER DROP COLUMN` privilege.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
with Scenario("KILL ALTER DROP COLUMN without privilege"):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with Given("I have an ALTER DROP COLUMN mutation"):
node.query(f"ALTER TABLE {table_name} DROP COLUMN x")
with When("I try to KILL MUTATION"):
node.query(f"KILL MUTATION WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)],
exitcode=exitcode, message="Exception: Not allowed to kill mutation.")
with Scenario("KILL ALTER DROP COLUMN with privilege"):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with Given("I have an ALTER DROP COLUMN mutation"):
node.query(f"ALTER TABLE {table_name} DROP COLUMN x")
with When("I grant the ALTER DROP COLUMN privilege"):
node.query(f"GRANT ALTER DROP COLUMN ON {table_name} TO {grant_target_name}")
with When("I try to KILL MUTATION"):
node.query(f"KILL MUTATION WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)])
with Scenario("KILL ALTER DROP COLUMN with revoked privilege"):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name):
with Given("I have an ALTER DROP COLUMN mutation"):
node.query(f"ALTER TABLE {table_name} DROP COLUMN x")
with When("I grant the ALTER DROP COLUMN privilege"):
node.query(f"GRANT ALTER DROP COLUMN ON {table_name} TO {grant_target_name}")
with And("I revoke the ALTER DROP COLUMN privilege"):
node.query(f"REVOKE ALTER DROP COLUMN ON {table_name} FROM {grant_target_name}")
with When("I try to KILL MUTATION"):
node.query(f"KILL MUTATION WHERE database = 'default' AND table = '{table_name}'", settings = [("user", user_name)],
exitcode=exitcode, message="Exception: Not allowed to kill mutation.")
@TestFeature
@Requirements(
RQ_SRS_006_RBAC_Privileges_KillMutation("1.0"),
)
@Name("kill mutation")
def feature(self, node="clickhouse1", stress=None, parallel=None):
"""Check the RBAC functionality of KILL MUTATION.
"""
self.context.node = self.context.cluster.node(node)
if parallel is not None:
self.context.parallel = parallel
if stress is not None:
self.context.stress = stress
Suite(run=no_privilege, setup=instrument_clickhouse_server_log)
Suite(run=privileges_granted_directly, setup=instrument_clickhouse_server_log)
Suite(run=privileges_granted_via_role, setup=instrument_clickhouse_server_log)

View File

@ -0,0 +1,84 @@
from rbac.requirements import *
from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
def privilege_granted_directly_or_via_role(self, node=None):
"""Check that user is only able to execute KILL QUERY when they have required privilege, either directly or via role.
"""
role_name = f"role_{getuid()}"
user_name = f"user_{getuid()}"
if node is None:
node = self.context.node
with Suite("user with direct privilege", setup=instrument_clickhouse_server_log):
with user(node, user_name):
with When(f"I run checks that {user_name} is only able to execute KILL QUERY with required privileges"):
privilege_check(grant_target_name=user_name, user_name=user_name, node=node)
with Suite("user with privilege via role", setup=instrument_clickhouse_server_log):
with user(node, user_name), role(node, role_name):
with When("I grant the role to the user"):
node.query(f"GRANT {role_name} TO {user_name}")
with And(f"I run checks that {user_name} with {role_name} is only able to execute KILL QUERY with required privileges"):
privilege_check(grant_target_name=role_name, user_name=user_name, node=node)
def privilege_check(grant_target_name, user_name, node=None):
"""Run scenarios to check the user's access with different privileges.
"""
exitcode, message = errors.not_enough_privileges(name=f"{user_name}")
with Scenario("user without privilege", setup=instrument_clickhouse_server_log):
with When("I attempt to kill a query without privilege"):
node.query(f"KILL QUERY WHERE user ='default'", settings = [("user", user_name)],
exitcode=exitcode, message=message)
with Scenario("user with privilege", setup=instrument_clickhouse_server_log):
with When("I grant kill query privilege"):
node.query(f"GRANT KILL QUERY TO {grant_target_name}")
with Then("I attempt to kill a query"):
node.query(f"KILL QUERY WHERE 1", settings = [("user", user_name)])
with Scenario("user with revoked privilege", setup=instrument_clickhouse_server_log):
with When("I grant the kill query privilege"):
node.query(f"GRANT KILL QUERY TO {grant_target_name}")
with And("I revoke the kill query privilege"):
node.query(f"REVOKE KILL QUERY TO {grant_target_name}")
with Then("I attempt to kill a query"):
node.query(f"KILL QUERY WHERE 1", settings = [("user", user_name)],
exitcode=exitcode, message=message)
with Scenario("execute on cluster", setup=instrument_clickhouse_server_log):
with When("I grant the truncate privilege"):
node.query(f"GRANT KILL QUERY TO {grant_target_name}")
with Then("I attempt to kill a query"):
node.query(f"KILL QUERY ON CLUSTER WHERE 1'", settings = [("user", user_name)])
@TestFeature
@Requirements(
RQ_SRS_006_RBAC_Privileges_KillQuery("1.0"),
)
@Name("kill query")
def feature(self, node="clickhouse1", stress=None, parallel=None):
"""Check the RBAC functionality of KILL QUERY.
"""
self.context.node = self.context.cluster.node(node)
if parallel is not None:
self.context.parallel = parallel
if stress is not None:
self.context.stress = stress
with Suite(test=privilege_granted_directly_or_via_role):
privilege_granted_directly_or_via_role()

View File

@ -0,0 +1,113 @@
from rbac.requirements import *
from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
def privilege_granted_directly_or_via_role(self, table_type, node=None):
"""Check that user is only able to execute OPTIMIZE when they have required privilege, either directly or via role.
"""
role_name = f"role_{getuid()}"
user_name = f"user_{getuid()}"
if node is None:
node = self.context.node
with Suite("user with direct privilege", setup=instrument_clickhouse_server_log):
with user(node, user_name):
with When(f"I run checks that {user_name} is only able to execute OPTIMIZE with required privileges"):
privilege_check(grant_target_name=user_name, user_name=user_name, table_type=table_type, node=node)
with Suite("user with privilege via role", setup=instrument_clickhouse_server_log):
with user(node, user_name), role(node, role_name):
with When("I grant the role to the user"):
node.query(f"GRANT {role_name} TO {user_name}")
with And(f"I run checks that {user_name} with {role_name} is only able to execute OPTIMIZE with required privileges"):
privilege_check(grant_target_name=role_name, user_name=user_name, table_type=table_type, node=node)
def privilege_check(grant_target_name, user_name, table_type, node=None):
"""Run scenarios to check the user's access with different privileges.
"""
exitcode, message = errors.not_enough_privileges(name=f"{user_name}")
with Scenario("user without privilege", setup=instrument_clickhouse_server_log):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name, table_type):
with When("I attempt to optimize a table without privilege"):
node.query(f"OPTIMIZE TABLE {table_name} FINAL", settings = [("user", user_name)],
exitcode=exitcode, message=message)
with Scenario("user with privilege", setup=instrument_clickhouse_server_log):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name, table_type):
with When("I grant the optimize privilege"):
node.query(f"GRANT OPTIMIZE ON {table_name} TO {grant_target_name}")
with Then("I attempt to optimize a table"):
node.query(f"OPTIMIZE TABLE {table_name}", settings = [("user", user_name)])
with Scenario("user with revoked privilege", setup=instrument_clickhouse_server_log):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name, table_type):
with When("I grant the optimize privilege"):
node.query(f"GRANT OPTIMIZE ON {table_name} TO {grant_target_name}")
with And("I revoke the optimize privilege"):
node.query(f"REVOKE OPTIMIZE ON {table_name} FROM {grant_target_name}")
with Then("I attempt to optimize a table"):
node.query(f"OPTIMIZE TABLE {table_name}", settings = [("user", user_name)],
exitcode=exitcode, message=message)
with Scenario("execute on cluster", setup=instrument_clickhouse_server_log):
table_name = f"merge_tree_{getuid()}"
try:
with Given("I have a table on a cluster"):
node.query(f"CREATE TABLE {table_name} ON CLUSTER sharded_cluster (d DATE, a String, b UInt8, x String, y Int8) ENGINE = MergeTree() PARTITION BY y ORDER BY d")
with When("I grant the optimize privilege"):
node.query(f"GRANT OPTIMIZE ON {table_name} TO {grant_target_name}")
with Then("I attempt to optimize a table"):
node.query(f"OPTIMIZE TABLE {table_name} ON CLUSTER sharded_cluster", settings = [("user", user_name)])
finally:
with Finally("I drop the table from the cluster"):
node.query(f"DROP TABLE IF EXISTS {table_name} ON CLUSTER sharded_cluster")
@TestFeature
@Requirements(
RQ_SRS_006_RBAC_Privileges_Optimize("1.0"),
)
@Examples("table_type", [
(key,) for key in table_types.keys()
])
@Name("optimize")
def feature(self, node="clickhouse1", stress=None, parallel=None):
"""Check the RBAC functionality of OPTIMIZE.
"""
self.context.node = self.context.cluster.node(node)
if parallel is not None:
self.context.parallel = parallel
if stress is not None:
self.context.stress = stress
for example in self.examples:
table_type, = example
if table_type != "MergeTree" and not self.context.stress:
continue
with Example(str(example)):
with Suite(test=privilege_granted_directly_or_via_role):
privilege_granted_directly_or_via_role(table_type=table_type)

View File

@ -25,7 +25,7 @@ def without_privilege(self, table_type, node=None):
@TestScenario
@Requirements(
RQ_SRS_006_RBAC_Privileges_Select_Grant("1.0"),
RQ_SRS_006_RBAC_Grant_Privilege_Select("1.0"),
)
def user_with_privilege(self, table_type, node=None):
"""Check that user can select from a table on which they have select privilege.
@ -47,7 +47,7 @@ def user_with_privilege(self, table_type, node=None):
@TestScenario
@Requirements(
RQ_SRS_006_RBAC_Privileges_Select_Revoke("1.0"),
RQ_SRS_006_RBAC_Revoke_Privilege_Select("1.0"),
)
def user_with_revoked_privilege(self, table_type, node=None):
"""Check that user is unable to select from a table after select privilege
@ -115,7 +115,7 @@ def user_column_privileges(self, grant_columns, select_columns_pass, data_pass,
@TestScenario
@Requirements(
RQ_SRS_006_RBAC_Privileges_Select_Grant("1.0"),
RQ_SRS_006_RBAC_Grant_Privilege_Select("1.0"),
)
def role_with_privilege(self, table_type, node=None):
"""Check that user can select from a table after it is granted a role that
@ -142,7 +142,7 @@ def role_with_privilege(self, table_type, node=None):
@TestScenario
@Requirements(
RQ_SRS_006_RBAC_Privileges_Select_Revoke("1.0"),
RQ_SRS_006_RBAC_Revoke_Privilege_Select("1.0"),
)
def role_with_revoked_privilege(self, table_type, node=None):
"""Check that user with a role that has select privilege on a table is unable

View File

@ -0,0 +1,163 @@
from testflows.core import *
from testflows.asserts import error
from rbac.requirements import *
from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
def describe_with_privilege_granted_directly(self, node=None):
"""Check that user is able to execute DESCRIBE on a table if and only if
they have SHOW COLUMNS privilege for that table granted directly.
"""
user_name = f"user_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"):
table_name = f"table_name_{getuid()}"
Suite(test=describe, setup=instrument_clickhouse_server_log)(grant_target_name=user_name, user_name=user_name, table_name=table_name)
@TestSuite
def describe_with_privilege_granted_via_role(self, node=None):
"""Check that user is able to execute DESCRIBE on a table if and only if
they have SHOW COLUMNS privilege for that table granted through a role.
"""
user_name = f"user_{getuid()}"
role_name = f"role_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"), role(node, f"{role_name}"):
table_name = f"table_name_{getuid()}"
with When("I grant the role to the user"):
node.query(f"GRANT {role_name} TO {user_name}")
Suite(test=describe, setup=instrument_clickhouse_server_log)(grant_target_name=role_name, user_name=user_name, table_name=table_name)
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_DescribeTable("1.0"),
)
def describe(self, grant_target_name, user_name, table_name, node=None):
"""Check that user is able to execute DESCRIBE only when they have SHOW COLUMNS privilege.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
with table(node, table_name):
with Scenario("DESCRIBE table without privilege"):
with When(f"I attempt to DESCRIBE {table_name}"):
node.query(f"DESCRIBE {table_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
with Scenario("DESCRIBE with privilege"):
with When(f"I grant SHOW COLUMNS on the table"):
node.query(f"GRANT SHOW COLUMNS ON {table_name} TO {grant_target_name}")
with Then(f"I attempt to DESCRIBE {table_name}"):
node.query(f"DESCRIBE TABLE {table_name}", settings=[("user",user_name)])
with Scenario("DESCRIBE with revoked privilege"):
with When(f"I grant SHOW COLUMNS on the table"):
node.query(f"GRANT SHOW COLUMNS ON {table_name} TO {grant_target_name}")
with And(f"I revoke SHOW COLUMNS on the table"):
node.query(f"REVOKE SHOW COLUMNS ON {table_name} FROM {grant_target_name}")
with Then(f"I attempt to DESCRIBE {table_name}"):
node.query(f"DESCRIBE {table_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
@TestSuite
def show_create_with_privilege_granted_directly(self, node=None):
"""Check that user is able to execute SHOW CREATE on a table if and only if
they have SHOW COLUMNS privilege for that table granted directly.
"""
user_name = f"user_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"):
table_name = f"table_name_{getuid()}"
Suite(test=show_create, setup=instrument_clickhouse_server_log)(grant_target_name=user_name, user_name=user_name, table_name=table_name)
@TestSuite
def show_create_with_privilege_granted_via_role(self, node=None):
"""Check that user is able to execute SHOW CREATE on a table if and only if
they have SHOW COLUMNS privilege for that table granted directly.
"""
user_name = f"user_{getuid()}"
role_name = f"role_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"), role(node, f"{role_name}"):
table_name = f"table_name_{getuid()}"
with When("I grant the role to the user"):
node.query(f"GRANT {role_name} TO {user_name}")
Suite(test=show_create, setup=instrument_clickhouse_server_log)(grant_target_name=role_name, user_name=user_name, table_name=table_name)
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_ShowCreateTable("1.0"),
)
def show_create(self, grant_target_name, user_name, table_name, node=None):
"""Check that user is able to execute SHOW CREATE on a table only when they have SHOW COLUMNS privilege.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
with table(node, table_name):
with Scenario("SHOW CREATE without privilege"):
with When(f"I attempt to SHOW CREATE {table_name}"):
node.query(f"SHOW CREATE TABLE {table_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
with Scenario("SHOW CREATE with privilege"):
with When(f"I grant SHOW COLUMNS on the table"):
node.query(f"GRANT SHOW COLUMNS ON {table_name} TO {grant_target_name}")
with Then(f"I attempt to SHOW CREATE {table_name}"):
node.query(f"SHOW CREATE TABLE {table_name}", settings=[("user",user_name)])
with Scenario("SHOW CREATE with revoked privilege"):
with When(f"I grant SHOW COLUMNS on the table"):
node.query(f"GRANT SHOW COLUMNS ON {table_name} TO {grant_target_name}")
with And(f"I revoke SHOW COLUMNS on the table"):
node.query(f"REVOKE SHOW COLUMNS ON {table_name} FROM {grant_target_name}")
with Then(f"I attempt to SHOW CREATE {table_name}"):
node.query(f"SHOW CREATE TABLE {table_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
@TestFeature
@Name("show columns")
@Requirements(
RQ_SRS_006_RBAC_Privileges_ShowColumns("1.0")
)
def feature(self, node="clickhouse1"):
"""Check the RBAC functionality of SHOW COLUMNS.
"""
self.context.node = self.context.cluster.node(node)
Suite(run=describe_with_privilege_granted_directly, setup=instrument_clickhouse_server_log)
Suite(run=describe_with_privilege_granted_via_role, setup=instrument_clickhouse_server_log)
Suite(run=show_create_with_privilege_granted_directly, setup=instrument_clickhouse_server_log)
Suite(run=show_create_with_privilege_granted_via_role, setup=instrument_clickhouse_server_log)

View File

@ -0,0 +1,214 @@
from testflows.core import *
from testflows.asserts import error
from rbac.requirements import *
from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
def dict_privileges_granted_directly(self, node=None):
"""Check that a user is able to execute `USE` and `SHOW CREATE`
commands on a database and see the database when they execute `SHOW DATABASES` command
if and only if they have any privilege on that database granted directly.
"""
user_name = f"user_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"):
db_name = f"db_name_{getuid()}"
Suite(run=check_privilege, flags=TE,
examples=Examples("privilege on grant_target_name user_name db_name", [
tuple(list(row)+[user_name,user_name,db_name]) for row in check_privilege.examples
], args=Args(name="check privilege={privilege}", format_name=True)))
@TestSuite
def dict_privileges_granted_via_role(self, node=None):
"""Check that a user is able to execute `USE` and `SHOW CREATE`
commands on a database and see the database when they execute `SHOW DATABASES` command
if and only if they have any privilege on that database granted via role.
"""
user_name = f"user_{getuid()}"
role_name = f"role_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"), role(node, f"{role_name}"):
db_name = f"db_name_{getuid()}"
with When("I grant the role to the user"):
node.query(f"GRANT {role_name} TO {user_name}")
Suite(run=check_privilege, flags=TE,
examples=Examples("privilege on grant_target_name user_name db_name", [
tuple(list(row)+[role_name,user_name,db_name]) for row in check_privilege.examples
], args=Args(name="check privilege={privilege}", format_name=True)))
@TestOutline(Suite)
@Examples("privilege on",[
("SHOW","*.*"),
("SHOW DATABASES","db"),
("CREATE DATABASE","db"),
("DROP DATABASE","db"),
])
def check_privilege(self, privilege, on, grant_target_name, user_name, db_name, node=None):
"""Run checks for commands that require SHOW DATABASE privilege.
"""
if node is None:
node = self.context.node
on = on.replace("db", f"{db_name}")
Suite(test=show_db, setup=instrument_clickhouse_server_log)(privilege=privilege, on=on, grant_target_name=grant_target_name, user_name=user_name, db_name=db_name)
Suite(test=use, setup=instrument_clickhouse_server_log)(privilege=privilege, on=on, grant_target_name=grant_target_name, user_name=user_name, db_name=db_name)
Suite(test=show_create, setup=instrument_clickhouse_server_log)(privilege=privilege, on=on, grant_target_name=grant_target_name, user_name=user_name, db_name=db_name)
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_ShowDatabases_Query("1.0"),
)
def show_db(self, privilege, on, grant_target_name, user_name, db_name, node=None):
"""Check that user is only able to see a database in SHOW DATABASES when they have a privilege on that database.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
try:
with Given("I have a database"):
node.query(f"CREATE DATABASE {db_name}")
with Scenario("SHOW DATABASES without privilege"):
with When("I check the user doesn't see the database"):
output = node.query("SHOW DATABASES", settings = [("user", f"{user_name}")]).output
assert output == '', error()
with Scenario("SHOW DATABASES with privilege"):
with When(f"I grant {privilege} on the database"):
node.query(f"GRANT {privilege} ON {db_name}.* TO {grant_target_name}")
with Then("I check the user does see a database"):
output = node.query("SHOW DATABASES", settings = [("user", f"{user_name}")], message = f'{db_name}')
with Scenario("SHOW DATABASES with revoked privilege"):
with When(f"I grant {privilege} on the database"):
node.query(f"GRANT {privilege} ON {db_name}.* TO {grant_target_name}")
with And(f"I revoke {privilege} on the database"):
node.query(f"REVOKE {privilege} ON {db_name}.* FROM {grant_target_name}")
with Then("I check the user does not see a database"):
output = node.query("SHOW DATABASES", settings = [("user", f"{user_name}")]).output
assert output == f'', error()
finally:
with Finally("I drop the database"):
node.query(f"DROP DATABASE IF EXISTS {db_name}")
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_UseDatabase("1.0"),
)
def use(self, privilege, on, grant_target_name, user_name, db_name, node=None):
"""Check that user is able to execute EXISTS on a database if and only if the user has SHOW DATABASE privilege
on that database.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
try:
with Given("I have a database"):
node.query(f"CREATE DATABASE {db_name}")
with Scenario("USE without privilege"):
with When(f"I attempt to USE {db_name}"):
node.query(f"USE {db_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
with Scenario("USE with privilege"):
with When(f"I grant {privilege} on the database"):
node.query(f"GRANT {privilege} ON {db_name}.* TO {grant_target_name}")
with Then(f"I attempt to USE {db_name}"):
node.query(f"USE {db_name}", settings=[("user",user_name)])
with Scenario("USE with revoked privilege"):
with When(f"I grant {privilege} on the database"):
node.query(f"GRANT {privilege} ON {db_name}.* TO {grant_target_name}")
with And(f"I revoke {privilege} on the database"):
node.query(f"REVOKE {privilege} ON {db_name}.* FROM {grant_target_name}")
with Then(f"I attempt to USE {db_name}"):
node.query(f"USE {db_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
finally:
with Finally("I drop the database"):
node.query(f"DROP DATABASE IF EXISTS {db_name}")
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_ShowCreateDatabase("1.0"),
)
def show_create(self, privilege, on, grant_target_name, user_name, db_name, node=None):
"""Check that user is able to execute EXISTS on a database if and only if the user has SHOW DATABASE privilege
on that database.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
try:
with Given("I have a database"):
node.query(f"CREATE DATABASE {db_name}")
with Scenario("SHOW CREATE without privilege"):
with When(f"I attempt to SHOW CREATE {db_name}"):
node.query(f"SHOW CREATE DATABASE {db_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
with Scenario("SHOW CREATE with privilege"):
with When(f"I grant {privilege} on the database"):
node.query(f"GRANT {privilege} ON {db_name}.* TO {grant_target_name}")
with Then(f"I attempt to SHOW CREATE {db_name}"):
node.query(f"SHOW CREATE DATABASE {db_name}", settings=[("user",user_name)])
with Scenario("SHOW CREATE with revoked privilege"):
with When(f"I grant {privilege} on the database"):
node.query(f"GRANT {privilege} ON {db_name}.* TO {grant_target_name}")
with And(f"I revoke {privilege} on the database"):
node.query(f"REVOKE {privilege} ON {db_name}.* FROM {grant_target_name}")
with Then(f"I attempt to SHOW CREATE {db_name}"):
node.query(f"SHOW CREATE DATABASE {db_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
finally:
with Finally("I drop the database"):
node.query(f"DROP DATABASE IF EXISTS {db_name}")
@TestFeature
@Name("show databases")
@Requirements(
RQ_SRS_006_RBAC_Privileges_ShowDatabases("1.0")
)
def feature(self, node="clickhouse1"):
"""Check the RBAC functionality of SHOW DATABASES.
"""
self.context.node = self.context.cluster.node(node)
Suite(run=dict_privileges_granted_directly, setup=instrument_clickhouse_server_log)
Suite(run=dict_privileges_granted_via_role, setup=instrument_clickhouse_server_log)

View File

@ -0,0 +1,215 @@
from testflows.core import *
from testflows.asserts import error
from rbac.requirements import *
from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
def dict_privileges_granted_directly(self, node=None):
"""Check that a user is able to execute `SHOW CREATE` and `EXISTS`
commands on a dictionary and see the dictionary when they execute `SHOW DICTIONARIES` command
if and only if they have any privilege on that table granted directly.
"""
user_name = f"user_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"):
dict_name = f"dict_name_{getuid()}"
Suite(run=check_privilege, flags=TE,
examples=Examples("privilege on grant_target_name user_name dict_name", [
tuple(list(row)+[user_name,user_name,dict_name]) for row in check_privilege.examples
], args=Args(name="check privilege={privilege}", format_name=True)))
@TestSuite
def dict_privileges_granted_via_role(self, node=None):
"""Check that a user is able to execute `SHOW CREATE` and `EXISTS`
commands on a dictionary and see the dictionary when they execute `SHOW DICTIONARIES` command
if and only if they have any privilege on that table granted via role.
"""
user_name = f"user_{getuid()}"
role_name = f"role_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"), role(node, f"{role_name}"):
dict_name = f"dict_name_{getuid()}"
with When("I grant the role to the user"):
node.query(f"GRANT {role_name} TO {user_name}")
Suite(run=check_privilege, flags=TE,
examples=Examples("privilege on grant_target_name user_name dict_name", [
tuple(list(row)+[role_name,user_name,dict_name]) for row in check_privilege.examples
], args=Args(name="check privilege={privilege}", format_name=True)))
@TestOutline(Suite)
@Examples("privilege on",[
("SHOW","*.*"),
("SHOW DICTIONARIES","dict"),
("CREATE DICTIONARY","dict"),
("DROP DICTIONARY","dict"),
])
def check_privilege(self, privilege, on, grant_target_name, user_name, dict_name, node=None):
"""Run checks for commands that require SHOW DICTIONARY privilege.
"""
if node is None:
node = self.context.node
on = on.replace("dict", f"{dict_name}")
Suite(test=show_dict, setup=instrument_clickhouse_server_log)(privilege=privilege, on=on, grant_target_name=grant_target_name, user_name=user_name, dict_name=dict_name)
Suite(test=exists, setup=instrument_clickhouse_server_log)(privilege=privilege, on=on, grant_target_name=grant_target_name, user_name=user_name, dict_name=dict_name)
Suite(test=show_create, setup=instrument_clickhouse_server_log)(privilege=privilege, on=on, grant_target_name=grant_target_name, user_name=user_name, dict_name=dict_name)
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_ShowDictionaries_Query("1.0"),
)
def show_dict(self, privilege, on, grant_target_name, user_name, dict_name, node=None):
"""Check that user is only able to see a dictionary in SHOW DICTIONARIES
when they have a privilege on that dictionary.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
try:
with Given("I have a dictionary"):
node.query(f"CREATE DICTIONARY {dict_name}(x Int32, y Int32) PRIMARY KEY x LAYOUT(FLAT()) SOURCE(CLICKHOUSE()) LIFETIME(0)")
with Scenario("SHOW DICTIONARIES without privilege"):
with When("I check the user doesn't see the dictionary"):
output = node.query("SHOW DICTIONARIES", settings = [("user", f"{user_name}")]).output
assert output == '', error()
with Scenario("SHOW DICTIONARIES with privilege"):
with When(f"I grant {privilege} on the dictionary"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with Then("I check the user does see a dictionary"):
node.query("SHOW DICTIONARIES", settings = [("user", f"{user_name}")], message=f"{dict_name}")
with Scenario("SHOW DICTIONARIES with revoked privilege"):
with When(f"I grant {privilege} on the dictionary"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with And(f"I revoke {privilege} on the dictionary"):
node.query(f"REVOKE {privilege} ON {on} FROM {grant_target_name}")
with Then("I check the user does not see a dictionary"):
output = node.query("SHOW DICTIONARIES", settings = [("user", f"{user_name}")]).output
assert output == f'', error()
finally:
with Finally("I drop the dictionary"):
node.query(f"DROP DICTIONARY IF EXISTS {dict_name}")
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_ExistsDictionary("1.0"),
)
def exists(self, privilege, on, grant_target_name, user_name, dict_name, node=None):
"""Check that user is able to execute EXISTS on a dictionary if and only if the user has SHOW DICTIONARY privilege
on that dictionary.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
try:
with Given("I have a dictionary"):
node.query(f"CREATE DICTIONARY {dict_name}(x Int32, y Int32) PRIMARY KEY x LAYOUT(FLAT()) SOURCE(CLICKHOUSE()) LIFETIME(0)")
with Scenario("EXISTS without privilege"):
with When(f"I check if {dict_name} EXISTS"):
node.query(f"EXISTS {dict_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
with Scenario("EXISTS with privilege"):
with When(f"I grant {privilege} on the dictionary"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with Then(f"I check if {dict_name} EXISTS"):
node.query(f"EXISTS {dict_name}", settings=[("user",user_name)])
with Scenario("EXISTS with revoked privilege"):
with When(f"I grant {privilege} on the dictionary"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with And(f"I revoke {privilege} on the dictionary"):
node.query(f"REVOKE {privilege} ON {on} FROM {grant_target_name}")
with Then(f"I check if {dict_name} EXISTS"):
node.query(f"EXISTS {dict_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
finally:
with Finally("I drop the dictionary"):
node.query(f"DROP DICTIONARY IF EXISTS {dict_name}")
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_ShowCreateDictionary("1.0"),
)
def show_create(self, privilege, on, grant_target_name, user_name, dict_name, node=None):
"""Check that user is able to execute SHOW CREATE on a dictionary if and only if the user has SHOW DICTIONARY privilege
on that dictionary.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
try:
with Given("I have a dictionary"):
node.query(f"CREATE DICTIONARY {dict_name}(x Int32, y Int32) PRIMARY KEY x LAYOUT(FLAT()) SOURCE(CLICKHOUSE()) LIFETIME(0)")
with Scenario("SHOW CREATE without privilege"):
with When(f"I attempt to SHOW CREATE {dict_name}"):
node.query(f"SHOW CREATE DICTIONARY {dict_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
with Scenario("SHOW CREATE with privilege"):
with When(f"I grant {privilege} on the dictionary"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with Then(f"I attempt to SHOW CREATE {dict_name}"):
node.query(f"SHOW CREATE DICTIONARY {dict_name}", settings=[("user",user_name)])
with Scenario("SHOW CREATE with revoked privilege"):
with When(f"I grant {privilege} on the dictionary"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with And(f"I revoke {privilege} on the dictionary"):
node.query(f"REVOKE {privilege} ON {on} FROM {grant_target_name}")
with Then(f"I attempt to SHOW CREATE {dict_name}"):
node.query(f"SHOW CREATE DICTIONARY {dict_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
finally:
with Finally("I drop the dictionary"):
node.query(f"DROP DICTIONARY IF EXISTS {dict_name}")
@TestFeature
@Name("show dictionaries")
@Requirements(
RQ_SRS_006_RBAC_Privileges_ShowDictionaries("1.0"),
)
def feature(self, node="clickhouse1"):
"""Check the RBAC functionality of SHOW DICTIONARIES.
"""
self.context.node = self.context.cluster.node(node)
Suite(run=dict_privileges_granted_directly, setup=instrument_clickhouse_server_log)
Suite(run=dict_privileges_granted_via_role, setup=instrument_clickhouse_server_log)

View File

@ -0,0 +1,204 @@
from testflows.core import *
from testflows.asserts import error
from rbac.requirements import *
from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
def table_privileges_granted_directly(self, node=None):
"""Check that a user is able to execute `CHECK` and `EXISTS`
commands on a table and see the table when they execute `SHOW TABLE` command
if and only if they have any privilege on that table granted directly.
"""
user_name = f"user_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"):
table_name = f"table_name_{getuid()}"
Suite(run=check_privilege, flags=TE,
examples=Examples("privilege on grant_target_name user_name table_name", [
tuple(list(row)+[user_name,user_name,table_name]) for row in check_privilege.examples
], args=Args(name="check privilege={privilege}", format_name=True)))
@TestSuite
def table_privileges_granted_via_role(self, node=None):
"""Check that a user is able to execute `CHECK` and `EXISTS`
commands on a table and see the table when they execute `SHOW TABLE` command
if and only if they have any privilege on that table granted via role.
"""
user_name = f"user_{getuid()}"
role_name = f"role_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"), role(node, f"{role_name}"):
table_name = f"table_name_{getuid()}"
with When("I grant the role to the user"):
node.query(f"GRANT {role_name} TO {user_name}")
Suite(run=check_privilege, flags=TE,
examples=Examples("privilege on grant_target_name user_name table_name", [
tuple(list(row)+[role_name,user_name,table_name]) for row in check_privilege.examples
], args=Args(name="check privilege={privilege}", format_name=True)))
@TestOutline(Suite)
@Examples("privilege on",[
("SHOW", "*.*"),
("SHOW TABLES", "table"),
("SELECT", "table"),
("INSERT", "table"),
("ALTER", "table"),
("SELECT(a)", "table"),
("INSERT(a)", "table"),
("ALTER(a)", "table"),
])
def check_privilege(self, privilege, on, grant_target_name, user_name, table_name, node=None):
"""Run checks for commands that require SHOW TABLE privilege.
"""
if node is None:
node = self.context.node
Suite(test=show_tables, setup=instrument_clickhouse_server_log)(privilege=privilege, on=on, grant_target_name=grant_target_name, user_name=user_name, table_name=table_name)
Suite(test=exists, setup=instrument_clickhouse_server_log)(privilege=privilege, on=on, grant_target_name=grant_target_name, user_name=user_name, table_name=table_name)
Suite(test=check, setup=instrument_clickhouse_server_log)(privilege=privilege, on=on, grant_target_name=grant_target_name, user_name=user_name, table_name=table_name)
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_ShowTables_Query("1.0"),
)
def show_tables(self, privilege, on, grant_target_name, user_name, table_name, node=None):
"""Check that user is only able to see a table in SHOW TABLES when they have a privilege on that table.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
on = on.replace("table", f"{table_name}")
with table(node, table_name):
with Scenario("SHOW TABLES without privilege"):
with When("I check the user doesn't see the table"):
output = node.query("SHOW TABLES", settings = [("user", f"{user_name}")]).output
assert output == '', error()
with Scenario("SHOW TABLES with privilege"):
with When(f"I grant {privilege} on the table"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with Then("I check the user does see a table"):
node.query("SHOW TABLES", settings = [("user", f"{user_name}")], message=f"{table_name}")
with Scenario("SHOW TABLES with revoked privilege"):
with When(f"I grant {privilege} on the table"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with And(f"I revoke {privilege} on the table"):
node.query(f"REVOKE {privilege} ON {on} FROM {grant_target_name}")
with Then("I check the user does not see a table"):
output = node.query("SHOW TABLES", settings = [("user", f"{user_name}")]).output
assert output == '', error()
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_ExistsTable("1.0"),
)
def exists(self, privilege, on, grant_target_name, user_name, table_name, node=None):
"""Check that user is able to execute EXISTS on a table if and only if the user has SHOW TABLE privilege
on that table.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
if on == "table":
on = f"{table_name}"
with table(node, table_name):
with Scenario("EXISTS without privilege"):
with When(f"I check if {table_name} EXISTS"):
node.query(f"EXISTS {table_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
with Scenario("EXISTS with privilege"):
with When(f"I grant {privilege} on the table"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with Then(f"I check if {table_name} EXISTS"):
node.query(f"EXISTS {table_name}", settings=[("user",user_name)])
with Scenario("EXISTS with revoked privilege"):
with When(f"I grant {privilege} on the table"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with And(f"I revoke {privilege} on the table"):
node.query(f"REVOKE {privilege} ON {on} FROM {grant_target_name}")
with Then(f"I check if {table_name} EXISTS"):
node.query(f"EXISTS {table_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
@TestSuite
@Requirements(
RQ_SRS_006_RBAC_Privileges_CheckTable("1.0"),
)
def check(self, privilege, on, grant_target_name, user_name, table_name, node=None):
"""Check that user is able to execute CHECK on a table if and only if the user has SHOW TABLE privilege
on that table.
"""
exitcode, message = errors.not_enough_privileges(name=user_name)
if node is None:
node = self.context.node
if on == "table":
on = f"{table_name}"
with table(node, table_name):
with Scenario("CHECK without privilege"):
with When(f"I CHECK {table_name}"):
node.query(f"CHECK TABLE {table_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
with Scenario("CHECK with privilege"):
with When(f"I grant {privilege} on the table"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with Then(f"I CHECK {table_name}"):
node.query(f"CHECK TABLE {table_name}", settings=[("user",user_name)])
with Scenario("CHECK with revoked privilege"):
with When(f"I grant {privilege} on the table"):
node.query(f"GRANT {privilege} ON {on} TO {grant_target_name}")
with And(f"I revoke {privilege} on the table"):
node.query(f"REVOKE {privilege} ON {on} FROM {grant_target_name}")
with Then(f"I CHECK {table_name}"):
node.query(f"CHECK TABLE {table_name}", settings=[("user",user_name)],
exitcode=exitcode, message=message)
@TestFeature
@Name("show tables")
@Requirements(
RQ_SRS_006_RBAC_Privileges_ShowTables("1.0"),
)
def feature(self, node="clickhouse1"):
"""Check the RBAC functionality of SHOW TABLES.
"""
self.context.node = self.context.cluster.node(node)
Suite(run=table_privileges_granted_directly, setup=instrument_clickhouse_server_log)
Suite(run=table_privileges_granted_via_role, setup=instrument_clickhouse_server_log)

View File

@ -1,62 +0,0 @@
from contextlib import contextmanager
from testflows.core import *
from testflows.asserts import error
from rbac.requirements import *
from rbac.helper.common import *
import rbac.helper.errors as errors
@TestScenario
@Requirements(
RQ_SRS_006_RBAC_Table_ShowTables("1.0"),
)
def show_tables(self, node=None):
"""Check that a user is able to see a table in `SHOW TABLES` if and only if the user has privilege on that table,
either granted directly or through a role.
"""
user_name = f"user_{getuid()}"
role_name = f"role_{getuid()}"
if node is None:
node = self.context.node
with user(node, f"{user_name}"):
Scenario(test=show_tables_general, flags=TE,
name="create with create view and select privilege granted directly")(grant_target_name=user_name, user_name=user_name)
with user(node, f"{user_name}"), role(node, f"{role_name}"):
with When("I grant the role to the user"):
node.query(f"GRANT {role_name} TO {user_name}")
Scenario(test=show_tables_general, flags=TE,
name="create with create view and select privilege granted through a role")(grant_target_name=role_name, user_name=user_name)
@TestScenario
def show_tables_general(self, grant_target_name, user_name, node=None):
table0_name = f"table0_{getuid()}"
if node is None:
node = self.context.node
try:
with Given("I have a table"):
node.query(f"DROP TABLE IF EXISTS {table0_name}")
node.query(f"CREATE TABLE {table0_name} (a String, b Int8, d Date) Engine = Memory")
with Then("I check user does not see any tables"):
output = node.query("SHOW TABLES", settings = [("user", f"{user_name}")]).output
assert output == '', error()
with When("I grant select privilege on the table"):
node.query(f"GRANT SELECT(a) ON {table0_name} TO {grant_target_name}")
with Then("I check the user does see a table"):
output = node.query("SHOW TABLES", settings = [("user", f"{user_name}")]).output
assert output == f'{table0_name}', error()
finally:
with Finally("I drop the table"):
node.query(f"DROP TABLE IF EXISTS {table0_name}")
@TestFeature
@Name("show tables")
def feature(self, node="clickhouse1"):
self.context.node = self.context.cluster.node(node)
Scenario(run=show_tables, setup=instrument_clickhouse_server_log, flags=TE)

View File

@ -0,0 +1,107 @@
from rbac.requirements import *
from rbac.helper.common import *
import rbac.helper.errors as errors
@TestSuite
def privilege_granted_directly_or_via_role(self, table_type, node=None):
"""Check that user is only able to execute TRUNCATE when they have required privilege, either directly or via role.
"""
role_name = f"role_{getuid()}"
user_name = f"user_{getuid()}"
if node is None:
node = self.context.node
with Suite("user with direct privilege", setup=instrument_clickhouse_server_log):
with user(node, user_name):
with When(f"I run checks that {user_name} is only able to execute TRUNCATE with required privileges"):
privilege_check(grant_target_name=user_name, user_name=user_name, table_type=table_type, node=node)
with Suite("user with privilege via role", setup=instrument_clickhouse_server_log):
with user(node, user_name), role(node, role_name):
with When("I grant the role to the user"):
node.query(f"GRANT {role_name} TO {user_name}")
with And(f"I run checks that {user_name} with {role_name} is only able to execute TRUNCATE with required privileges"):
privilege_check(grant_target_name=role_name, user_name=user_name, table_type=table_type, node=node)
def privilege_check(grant_target_name, user_name, table_type, node=None):
"""Run scenarios to check the user's access with different privileges.
"""
exitcode, message = errors.not_enough_privileges(name=f"{user_name}")
with Scenario("user without privilege", setup=instrument_clickhouse_server_log):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name, table_type):
with When("I attempt to truncate a table without privilege"):
node.query(f"TRUNCATE TABLE {table_name}", settings = [("user", user_name)],
exitcode=exitcode, message=message)
with Scenario("user with privilege", setup=instrument_clickhouse_server_log):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name, table_type):
with When("I grant the truncate privilege"):
node.query(f"GRANT TRUNCATE ON {table_name} TO {grant_target_name}")
with Then("I attempt to truncate a table"):
node.query(f"TRUNCATE TABLE {table_name}", settings = [("user", user_name)])
with Scenario("user with revoked privilege", setup=instrument_clickhouse_server_log):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name, table_type):
with When("I grant the truncate privilege"):
node.query(f"GRANT TRUNCATE ON {table_name} TO {grant_target_name}")
with And("I revoke the truncate privilege"):
node.query(f"REVOKE TRUNCATE ON {table_name} FROM {grant_target_name}")
with Then("I attempt to truncate a table"):
node.query(f"TRUNCATE TABLE {table_name}", settings = [("user", user_name)],
exitcode=exitcode, message=message)
with Scenario("execute on cluster", setup=instrument_clickhouse_server_log):
table_name = f"merge_tree_{getuid()}"
with table(node, table_name, table_type):
with When("I grant the truncate privilege"):
node.query(f"GRANT TRUNCATE ON {table_name} TO {grant_target_name}")
with Then("I attempt to truncate a table"):
node.query(f"TRUNCATE TABLE IF EXISTS {table_name} ON CLUSTER sharded_cluster", settings = [("user", user_name)])
@TestFeature
@Requirements(
RQ_SRS_006_RBAC_Privileges_Truncate("1.0"),
)
@Examples("table_type", [
(key,) for key in table_types.keys()
])
@Name("truncate")
def feature(self, node="clickhouse1", stress=None, parallel=None):
"""Check the RBAC functionality of TRUNCATE.
"""
self.context.node = self.context.cluster.node(node)
if parallel is not None:
self.context.parallel = parallel
if stress is not None:
self.context.stress = stress
for example in self.examples:
table_type, = example
if table_type != "MergeTree" and not self.context.stress:
continue
with Example(str(example)):
with Suite(test=privilege_granted_directly_or_via_role):
privilege_granted_directly_or_via_role(table_type=table_type)