mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #24067 from azat/buffer-destroy-first
Flush Buffer tables before shutting down tables (within one database)
This commit is contained in:
commit
40d8d95fd2
@ -203,7 +203,7 @@ void DatabaseLazy::shutdown()
|
||||
for (const auto & kv : tables_snapshot)
|
||||
{
|
||||
if (kv.second.table)
|
||||
kv.second.table->shutdown();
|
||||
kv.second.table->flushAndShutdown();
|
||||
}
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
|
@ -529,7 +529,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
dropped_tables.push_back(tryGetTableUUID(table_name));
|
||||
dropped_dictionaries += table->isDictionary();
|
||||
|
||||
table->shutdown();
|
||||
table->flushAndShutdown();
|
||||
DatabaseAtomic::dropTable(getContext(), table_name, true);
|
||||
}
|
||||
else
|
||||
|
@ -125,10 +125,15 @@ void DatabaseWithOwnTablesBase::shutdown()
|
||||
tables_snapshot = tables;
|
||||
}
|
||||
|
||||
for (const auto & kv : tables_snapshot)
|
||||
{
|
||||
kv.second->flush();
|
||||
}
|
||||
|
||||
for (const auto & kv : tables_snapshot)
|
||||
{
|
||||
auto table_id = kv.second->getStorageID();
|
||||
kv.second->shutdown();
|
||||
kv.second->flushAndShutdown();
|
||||
if (table_id.hasUUID())
|
||||
{
|
||||
assert(getDatabaseName() == DatabaseCatalog::TEMPORARY_DATABASE || getUUID() != UUIDHelpers::Nil);
|
||||
|
@ -316,7 +316,7 @@ void DatabaseConnectionMySQL::shutdown()
|
||||
}
|
||||
|
||||
for (const auto & [table_name, modify_time_and_storage] : tables_snapshot)
|
||||
modify_time_and_storage.second->shutdown();
|
||||
modify_time_and_storage.second->flushAndShutdown();
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
local_tables_cache.clear();
|
||||
@ -343,7 +343,7 @@ void DatabaseConnectionMySQL::cleanOutdatedTables()
|
||||
{
|
||||
const auto table_lock = (*iterator)->lockExclusively(RWLockImpl::NO_QUERY, lock_acquire_timeout);
|
||||
|
||||
(*iterator)->shutdown();
|
||||
(*iterator)->flushAndShutdown();
|
||||
(*iterator)->is_dropped = true;
|
||||
iterator = outdated_tables.erase(iterator);
|
||||
}
|
||||
|
@ -171,7 +171,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
|
||||
else
|
||||
table->checkTableCanBeDetached();
|
||||
|
||||
table->shutdown();
|
||||
table->flushAndShutdown();
|
||||
TableExclusiveLockHolder table_lock;
|
||||
|
||||
if (database->getUUID() == UUIDHelpers::Nil)
|
||||
@ -215,7 +215,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
|
||||
else
|
||||
table->checkTableCanBeDropped();
|
||||
|
||||
table->shutdown();
|
||||
table->flushAndShutdown();
|
||||
|
||||
TableExclusiveLockHolder table_lock;
|
||||
if (database->getUUID() == UUIDHelpers::Nil)
|
||||
@ -253,7 +253,7 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name,
|
||||
else if (kind == ASTDropQuery::Kind::Drop)
|
||||
{
|
||||
context_handle->removeExternalTable(table_name);
|
||||
table->shutdown();
|
||||
table->flushAndShutdown();
|
||||
auto table_lock = table->lockExclusively(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
|
||||
/// Delete table data
|
||||
table->drop();
|
||||
@ -328,6 +328,14 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
|
||||
query_for_table.database = database_name;
|
||||
query_for_table.no_delay = query.no_delay;
|
||||
|
||||
/// Flush should not be done if shouldBeEmptyOnDetach() == false,
|
||||
/// since in this case getTablesIterator() may do some additional work,
|
||||
/// see DatabaseMaterializeMySQL<>::getTablesIterator()
|
||||
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
|
||||
{
|
||||
iterator->table()->flush();
|
||||
}
|
||||
|
||||
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
|
||||
{
|
||||
DatabasePtr db;
|
||||
|
@ -436,7 +436,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
|
||||
if (!table || !dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
|
||||
return nullptr;
|
||||
|
||||
table->shutdown();
|
||||
table->flushAndShutdown();
|
||||
{
|
||||
/// If table was already dropped by anyone, an exception will be thrown
|
||||
auto table_lock = table->lockExclusively(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
|
||||
|
@ -153,7 +153,7 @@ public:
|
||||
{
|
||||
stopFlushThread();
|
||||
if (table)
|
||||
table->shutdown();
|
||||
table->flushAndShutdown();
|
||||
}
|
||||
|
||||
String getName() override
|
||||
|
@ -444,6 +444,21 @@ public:
|
||||
*/
|
||||
virtual void startup() {}
|
||||
|
||||
/**
|
||||
* If the storage requires some complicated work on destroying,
|
||||
* then you have two virtual methods:
|
||||
* - flush()
|
||||
* - shutdown()
|
||||
*
|
||||
* @see shutdown()
|
||||
* @see flush()
|
||||
*/
|
||||
void flushAndShutdown()
|
||||
{
|
||||
flush();
|
||||
shutdown();
|
||||
}
|
||||
|
||||
/** If the table have to do some complicated work when destroying an object - do it in advance.
|
||||
* For example, if the table contains any threads for background work - ask them to complete and wait for completion.
|
||||
* By default, does nothing.
|
||||
@ -451,6 +466,10 @@ public:
|
||||
*/
|
||||
virtual void shutdown() {}
|
||||
|
||||
/// Called before shutdown() to flush data to underlying storage
|
||||
/// (for Buffer)
|
||||
virtual void flush() {}
|
||||
|
||||
/// Asks table to stop executing some action identified by action_type
|
||||
/// If table does not support such type of lock, and empty lock is returned
|
||||
virtual ActionLock getActionLock(StorageActionBlockType /* action_type */)
|
||||
|
@ -674,7 +674,7 @@ void StorageBuffer::startup()
|
||||
}
|
||||
|
||||
|
||||
void StorageBuffer::shutdown()
|
||||
void StorageBuffer::flush()
|
||||
{
|
||||
if (!flush_handle)
|
||||
return;
|
||||
|
@ -88,7 +88,7 @@ public:
|
||||
|
||||
void startup() override;
|
||||
/// Flush all buffers into the subordinate table and stop background thread.
|
||||
void shutdown() override;
|
||||
void flush() override;
|
||||
bool optimize(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
|
@ -133,6 +133,7 @@ public:
|
||||
|
||||
void startup() override { getNested()->startup(); }
|
||||
void shutdown() override { getNested()->shutdown(); }
|
||||
void flush() override { getNested()->flush(); }
|
||||
|
||||
ActionLock getActionLock(StorageActionBlockType action_type) override { return getNested()->getActionLock(action_type); }
|
||||
|
||||
|
@ -62,6 +62,13 @@ public:
|
||||
nested->shutdown();
|
||||
}
|
||||
|
||||
void flush() override
|
||||
{
|
||||
std::lock_guard lock{nested_mutex};
|
||||
if (nested)
|
||||
nested->flush();
|
||||
}
|
||||
|
||||
void drop() override
|
||||
{
|
||||
std::lock_guard lock{nested_mutex};
|
||||
|
@ -53,7 +53,7 @@ public:
|
||||
|
||||
void TearDown() override
|
||||
{
|
||||
table->shutdown();
|
||||
table->flushAndShutdown();
|
||||
destroyDisk<T>(disk);
|
||||
}
|
||||
|
||||
|
2
tests/queries/0_stateless/01870_buffer_flush.reference
Normal file
2
tests/queries/0_stateless/01870_buffer_flush.reference
Normal file
@ -0,0 +1,2 @@
|
||||
0
|
||||
5
|
26
tests/queries/0_stateless/01870_buffer_flush.sql
Normal file
26
tests/queries/0_stateless/01870_buffer_flush.sql
Normal file
@ -0,0 +1,26 @@
|
||||
-- Check that Buffer will be flushed before shutdown
|
||||
-- (via DETACH DATABASE)
|
||||
|
||||
drop database if exists db_01870;
|
||||
create database db_01870;
|
||||
|
||||
-- Right now the order for shutdown is defined and it is:
|
||||
-- (prefixes are important, to define the order)
|
||||
-- - a_data_01870
|
||||
-- - z_buffer_01870
|
||||
-- so on DETACH DATABASE the following error will be printed:
|
||||
--
|
||||
-- Destination table default.a_data_01870 doesn't exist. Block of data is discarded.
|
||||
create table db_01870.a_data_01870 as system.numbers Engine=TinyLog();
|
||||
create table db_01870.z_buffer_01870 as system.numbers Engine=Buffer(db_01870, a_data_01870, 1,
|
||||
100, 100, /* time */
|
||||
100, 100, /* rows */
|
||||
100, 1e6 /* bytes */
|
||||
);
|
||||
insert into db_01870.z_buffer_01870 select * from system.numbers limit 5;
|
||||
select count() from db_01870.a_data_01870;
|
||||
detach database db_01870;
|
||||
attach database db_01870;
|
||||
select count() from db_01870.a_data_01870;
|
||||
|
||||
drop database db_01870;
|
@ -721,6 +721,7 @@
|
||||
"01804_dictionary_decimal256_type",
|
||||
"01850_dist_INSERT_preserve_error", // uses cluster with different static databases shard_0/shard_1
|
||||
"01821_table_comment",
|
||||
"01710_projection_fetch"
|
||||
"01710_projection_fetch",
|
||||
"01870_buffer_flush" // creates database
|
||||
]
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user