disable insertion and mutation

This commit is contained in:
Xu Jia 2024-07-15 15:10:39 +08:00
parent 697f4eae97
commit 77272c925d
11 changed files with 150 additions and 0 deletions

View File

@ -5608,3 +5608,9 @@ Default value: `10000000`.
Minimal size of block to compress in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached.
Default value: `1GiB`.
## disable_insertion_and_mutation
Disable all insert and mutations (alter table update / alter table delete / alter table drop partition). Set to true, can make this node focus on reading queries.
Default value: `false`.

View File

@ -157,6 +157,7 @@ namespace DB
M(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \
M(Double, gwp_asan_force_sample_probability, 0.0003, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \
M(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \
M(Bool, disable_insertion_and_mutation, false, "Disable all insert/alter/delete queries. This setting will be enabled if someone needs read-only nodes to prevent insertion and mutation affect reading performance.", 0)
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

View File

@ -46,6 +46,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TABLE;
extern const int UNKNOWN_DATABASE;
extern const int QUERY_IS_PROHIBITED;
}
@ -191,6 +192,12 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
"to execute ALTERs of different types (replicated and non replicated) in single query");
}
if (mutation_commands.hasNonEmptyMutationCommands() || !partition_commands.empty())
{
if (getContext()->getServerSettings().disable_insertion_and_mutation)
throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Mutations are prohibited");
}
if (!alter_commands.empty())
{
auto alter_lock = table->lockForAlter(getContext()->getSettingsRef().lock_acquire_timeout);

View File

@ -26,6 +26,7 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int QUERY_IS_PROHIBITED;
}
@ -50,6 +51,9 @@ BlockIO InterpreterDeleteQuery::execute()
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
if (getContext()->getGlobalContext()->getServerSettings().disable_insertion_and_mutation)
throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Delete queries are prohibited");
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (database->shouldReplicateQuery(getContext(), query_ptr))
{

View File

@ -44,6 +44,7 @@ namespace ProfileEvents
{
extern const Event InsertQueriesWithSubqueries;
extern const Event QueriesWithSubqueries;
extern const int QUERY_IS_PROHIBITED;
}
namespace DB
@ -406,6 +407,10 @@ BlockIO InterpreterInsertQuery::execute()
StoragePtr table = getTable(query);
checkStorageSupportsTransactionsIfNeeded(table, getContext());
if (getContext()->getServerSettings().disable_insertion_and_mutation
&& query.table_id.database_name != DatabaseCatalog::SYSTEM_DATABASE)
throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Insert queries are prohibited");
StoragePtr inner_table;
if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))
inner_table = mv->getTargetTable();

View File

@ -0,0 +1,16 @@
<clickhouse>
<remote_servers>
<default>
<shard>
<replica>
<host>writing_node</host>
<port>9000</port>
</replica>
<replica>
<host>reading_node</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,3 @@
<clickhouse>
<disable_insertion_and_mutation>true</disable_insertion_and_mutation>
</clickhouse>

View File

@ -0,0 +1,21 @@
<clickhouse>
<storage_configuration>
<disks>
<s3_with_keeper>
<type>s3_with_keeper</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_with_keeper>
</disks>
<policies>
<s3_with_keeper>
<volumes>
<main>
<disk>s3_with_keeper</disk>
</main>
</volumes>
</s3_with_keeper>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,3 @@
<clickhouse>
<disable_insertion_and_mutation>false</disable_insertion_and_mutation>
</clickhouse>

View File

@ -0,0 +1,84 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
import time
cluster = ClickHouseCluster(__file__)
writing_node = cluster.add_instance(
"writing_node",
main_configs=["config/writing_node.xml", "config/storage_policy.xml", "config/cluster.xml"],
with_zookeeper=True,
with_minio=True,
stay_alive=True,
macros={"shard": 1, "replica": 1},
)
reading_node = cluster.add_instance(
"reading_node",
main_configs=["config/reading_node.xml", "config/storage_policy.xml", "config/cluster.xml"],
with_zookeeper=True,
with_minio=True,
stay_alive=True,
macros={"shard": 1, "replica": 2},
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_disable_insertion_and_mutation(started_cluster):
writing_node.query("""CREATE TABLE my_table on cluster default (key UInt64, value String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/default.my_table', '{replica}') ORDER BY key partition by (key % 5) SETTINGS storage_policy='s3_with_keeper' """)
assert (
"QUERY_IS_PROHIBITED"
in reading_node.query_and_get_error("INSERT INTO my_table VALUES (1, 'hello')")
)
assert (
"QUERY_IS_PROHIBITED"
in reading_node.query_and_get_error("INSERT INTO my_table SETTINGS async_insert = 1 VALUES (1, 'hello')")
)
assert (
"QUERY_IS_PROHIBITED"
in reading_node.query_and_get_error("ALTER TABLE my_table delete where 1")
)
assert (
"QUERY_IS_PROHIBITED"
in reading_node.query_and_get_error("ALTER table my_table update key = 1 where 1")
)
assert (
"QUERY_IS_PROHIBITED"
in reading_node.query_and_get_error("ALTER TABLE my_table drop partition 0")
)
reading_node.query("SELECT * from my_table");
writing_node.query("INSERT INTO my_table VALUES (1, 'hello')")
writing_node.query("ALTER TABLE my_table delete where 1")
writing_node.query("ALTER table my_table update value = 'no hello' where 1")
reading_node.query("ALTER TABLE my_table ADD COLUMN new_column UInt64")
writing_node.query("SELECT new_column from my_table")
reading_node.query("SELECT new_column from my_table")
reading_node.query("ALter Table my_table MODIFY COLUMN new_column String")
assert(
"new_column\tString"
in reading_node.query("DESC my_table")
)
assert(
"new_column\tString"
in writing_node.query("DESC my_table")
)