Merge branch 'fix_force_drop_materialized_view' into just_another_fix_for_ddl_worker

This commit is contained in:
Alexander Tokmakov 2021-03-05 15:59:27 +03:00
commit e8987f799e
11 changed files with 81 additions and 49 deletions

View File

@ -715,7 +715,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
config().getString("path", ""),
std::move(main_config_zk_node_cache),
main_config_zk_changed_event,
[&](ConfigurationPtr config)
[&](ConfigurationPtr config, bool initial_loading)
{
Settings::checkNoSettingNamesAtTopLevel(*config, config_path);
@ -765,14 +765,19 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (config->has("max_partition_size_to_drop"))
global_context->setMaxPartitionSizeToDrop(config->getUInt64("max_partition_size_to_drop"));
if (config->has("zookeeper"))
global_context->reloadZooKeeperIfChanged(config);
if (!initial_loading)
{
/// We do not load ZooKeeper configuration on the first config loading
/// because TestKeeper server is not started yet.
if (config->has("zookeeper"))
global_context->reloadZooKeeperIfChanged(config);
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);
}
global_context->updateStorageConfiguration(*config);
},
/* already_loaded = */ true);
/* already_loaded = */ false); /// Reload it right now (initial loading)
auto & access_control = global_context->getAccessControlManager();
if (config().has("custom_settings_prefixes"))

View File

@ -518,7 +518,7 @@ void UsersConfigAccessStorage::load(
preprocessed_dir,
zkutil::ZooKeeperNodeCache(get_zookeeper_function),
std::make_shared<Poco::Event>(),
[&](Poco::AutoPtr<Poco::Util::AbstractConfiguration> new_config)
[&](Poco::AutoPtr<Poco::Util::AbstractConfiguration> new_config, bool /*initial_loading*/)
{
parseFromConfig(*new_config);
Settings::checkNoSettingNamesAtTopLevel(*new_config, users_config_path);

View File

@ -27,7 +27,7 @@ ConfigReloader::ConfigReloader(
, updater(std::move(updater_))
{
if (!already_loaded)
reloadIfNewer(/* force = */ true, /* throw_on_error = */ true, /* fallback_to_preprocessed = */ true);
reloadIfNewer(/* force = */ true, /* throw_on_error = */ true, /* fallback_to_preprocessed = */ true, /* initial_loading = */ true);
}
@ -66,7 +66,7 @@ void ConfigReloader::run()
if (quit)
return;
reloadIfNewer(zk_changed, /* throw_on_error = */ false, /* fallback_to_preprocessed = */ false);
reloadIfNewer(zk_changed, /* throw_on_error = */ false, /* fallback_to_preprocessed = */ false, /* initial_loading = */ false);
}
catch (...)
{
@ -76,7 +76,7 @@ void ConfigReloader::run()
}
}
void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed)
void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed, bool initial_loading)
{
std::lock_guard lock(reload_mutex);
@ -131,7 +131,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
try
{
updater(loaded_config.configuration);
updater(loaded_config.configuration, initial_loading);
}
catch (...)
{

View File

@ -27,7 +27,7 @@ class Context;
class ConfigReloader
{
public:
using Updater = std::function<void(ConfigurationPtr)>;
using Updater = std::function<void(ConfigurationPtr, bool)>;
/** include_from_path is usually /etc/metrika.xml (i.e. value of <include_from> tag)
*/
@ -46,12 +46,12 @@ public:
void start();
/// Reload immediately. For SYSTEM RELOAD CONFIG query.
void reload() { reloadIfNewer(/* force */ true, /* throw_on_error */ true, /* fallback_to_preprocessed */ false); }
void reload() { reloadIfNewer(/* force */ true, /* throw_on_error */ true, /* fallback_to_preprocessed */ false, /* initial_loading = */ false); }
private:
void run();
void reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed);
void reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed, bool initial_loading);
struct FileWithTimestamp;

View File

@ -108,6 +108,13 @@ StoragePtr DatabaseAtomic::detachTable(const String & name)
void DatabaseAtomic::dropTable(const Context & context, const String & table_name, bool no_delay)
{
if (auto * mv = dynamic_cast<StorageMaterializedView *>(tryGetTable(table_name, context).get()))
{
/// Remove the inner table (if any) to avoid deadlock
/// (due to attempt to execute DROP from the worker thread)
mv->dropInnerTable(no_delay, context);
}
String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop;
StoragePtr table;
@ -131,10 +138,7 @@ void DatabaseAtomic::dropTable(const Context & context, const String & table_nam
}
if (table->storesDataOnDisk())
tryRemoveSymlink(table_name);
/// Remove the inner table (if any) to avoid deadlock
/// (due to attempt to execute DROP from the worker thread)
if (auto * mv = dynamic_cast<StorageMaterializedView *>(table.get()))
mv->dropInnerTable(no_delay, context);
/// Notify DatabaseCatalog that table was dropped. It will remove table data in background.
/// Cleanup is performed outside of database to allow easily DROP DATABASE without waiting for cleanup to complete.
DatabaseCatalog::instance().enqueueDroppedTableCleanup(table->getStorageID(), table, table_metadata_path_drop, no_delay);

View File

@ -409,32 +409,6 @@ Strings StorageMaterializedView::getDataPaths() const
return {};
}
void StorageMaterializedView::checkTableCanBeDropped() const
{
/// Don't drop the target table if it was created manually via 'TO inner_table' statement
if (!has_inner_table)
return;
auto target_table = tryGetTargetTable();
if (!target_table)
return;
target_table->checkTableCanBeDropped();
}
void StorageMaterializedView::checkPartitionCanBeDropped(const ASTPtr & partition)
{
/// Don't drop the partition in target table if it was created manually via 'TO inner_table' statement
if (!has_inner_table)
return;
auto target_table = tryGetTargetTable();
if (!target_table)
return;
target_table->checkPartitionCanBeDropped(partition);
}
ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type)
{
return has_inner_table ? getTargetTable()->getActionLock(type) : ActionLock{};

View File

@ -66,9 +66,6 @@ public:
void shutdown() override;
void checkTableCanBeDropped() const override;
void checkPartitionCanBeDropped(const ASTPtr & partition) override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override;
StoragePtr getTargetTable() const;

View File

@ -0,0 +1,4 @@
<yandex>
<max_table_size_to_drop>1</max_table_size_to_drop>
<max_partition_size_to_drop>1</max_partition_size_to_drop>
</yandex>

View File

@ -0,0 +1,49 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=["configs/config.xml"], with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_force_drop_flag(node):
force_drop_flag_path = "/var/lib/clickhouse/flags/force_drop_table"
node.exec_in_container(["bash", "-c", "touch {} && chmod a=rw {}".format(force_drop_flag_path, force_drop_flag_path)], user="root")
@pytest.mark.parametrize("engine", ['Ordinary', 'Atomic'])
def test_drop_materialized_view(started_cluster, engine):
node.query("CREATE DATABASE d ENGINE={}".format(engine))
node.query("CREATE TABLE d.rmt (n UInt64) ENGINE=ReplicatedMergeTree('/test/rmt', 'r1') ORDER BY n PARTITION BY n % 2")
node.query("CREATE MATERIALIZED VIEW d.mv (n UInt64, s String) ENGINE=MergeTree ORDER BY n PARTITION BY n % 2 AS SELECT n, toString(n) AS s FROM d.rmt")
node.query("INSERT INTO d.rmt VALUES (1), (2)")
assert "is greater than max" in node.query_and_get_error("DROP TABLE d.rmt")
assert "is greater than max" in node.query_and_get_error("DROP TABLE d.mv")
assert "is greater than max" in node.query_and_get_error("TRUNCATE TABLE d.rmt")
assert "is greater than max" in node.query_and_get_error("TRUNCATE TABLE d.mv")
assert "is greater than max" in node.query_and_get_error("ALTER TABLE d.rmt DROP PARTITION '0'")
assert node.query("SELECT * FROM d.rmt ORDER BY n") == "1\n2\n"
assert node.query("SELECT * FROM d.mv ORDER BY n") == "1\t1\n2\t2\n"
create_force_drop_flag(node)
node.query("ALTER TABLE d.rmt DROP PARTITION '0'")
assert node.query("SELECT * FROM d.rmt ORDER BY n") == "1\n"
assert "is greater than max" in node.query_and_get_error("ALTER TABLE d.mv DROP PARTITION '0'")
create_force_drop_flag(node)
node.query("ALTER TABLE d.mv DROP PARTITION '0'")
assert node.query("SELECT * FROM d.mv ORDER BY n") == "1\t1\n"
assert "is greater than max" in node.query_and_get_error("DROP TABLE d.rmt SYNC")
create_force_drop_flag(node)
node.query("DROP TABLE d.rmt SYNC")
assert "is greater than max" in node.query_and_get_error("DROP TABLE d.mv SYNC")
create_force_drop_flag(node)
node.query("DROP TABLE d.mv SYNC")
node.query("DROP DATABASE d")

View File

@ -62,8 +62,7 @@ def test_reload_auxiliary_zookeepers(start_cluster):
</yandex>"""
node.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
# Hopefully it has finished the configuration reload
time.sleep(2)
node.query("SYSTEM RELOAD CONFIG")
node.query(
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper2:/clickhouse/tables/0/simple';"
@ -81,7 +80,7 @@ def test_reload_auxiliary_zookeepers(start_cluster):
</zookeeper>
</yandex>"""
node.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
time.sleep(2)
node.query("SYSTEM RELOAD CONFIG")
with pytest.raises(QueryRuntimeException):
node.query(
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper2:/clickhouse/tables/0/simple';"