mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #60314 from ClickHouse/fix-db-shutdown-with-async-loader
Fix database iterator waiting code
This commit is contained in:
commit
948ec2ba86
@ -39,7 +39,7 @@ void logAboutProgress(LoggerPtr log, size_t processed, size_t total, AtomicStopw
|
||||
{
|
||||
if (total && (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)))
|
||||
{
|
||||
LOG_INFO(log, "Processed: {}%", static_cast<Int64>(processed * 1000.0 / total) * 0.1);
|
||||
LOG_INFO(log, "Processed: {:.1f}%", static_cast<double>(processed) * 100.0 / total);
|
||||
watch.restart();
|
||||
}
|
||||
}
|
||||
|
@ -440,10 +440,22 @@ void DatabaseOrdinary::stopLoading()
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
|
||||
{
|
||||
auto result = DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
|
||||
std::scoped_lock lock(mutex);
|
||||
typeid_cast<DatabaseTablesSnapshotIterator &>(*result).setLoadTasks(startup_table);
|
||||
return result;
|
||||
// Wait for every table (matching the filter) to be loaded and started up before we make the snapshot.
|
||||
// It is important, because otherwise table might be:
|
||||
// - not attached and thus will be missed in the snapshot;
|
||||
// - not started, which is not good for DDL operations.
|
||||
LoadTaskPtrs tasks_to_wait;
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!filter_by_table_name)
|
||||
tasks_to_wait.reserve(startup_table.size());
|
||||
for (const auto & [table_name, task] : startup_table)
|
||||
if (!filter_by_table_name || filter_by_table_name(table_name))
|
||||
tasks_to_wait.emplace_back(task);
|
||||
}
|
||||
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), tasks_to_wait);
|
||||
|
||||
return DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
|
||||
|
@ -77,17 +77,12 @@ private:
|
||||
Tables tables;
|
||||
Tables::iterator it;
|
||||
|
||||
// Tasks to wait before returning a table
|
||||
using Tasks = std::unordered_map<String, LoadTaskPtr>;
|
||||
Tasks tasks;
|
||||
|
||||
protected:
|
||||
DatabaseTablesSnapshotIterator(DatabaseTablesSnapshotIterator && other) noexcept
|
||||
: IDatabaseTablesIterator(std::move(other.database_name))
|
||||
{
|
||||
size_t idx = std::distance(other.tables.begin(), other.it);
|
||||
std::swap(tables, other.tables);
|
||||
std::swap(tasks, other.tasks);
|
||||
other.it = other.tables.end();
|
||||
it = tables.begin();
|
||||
std::advance(it, idx);
|
||||
@ -110,17 +105,7 @@ public:
|
||||
|
||||
const String & name() const override { return it->first; }
|
||||
|
||||
const StoragePtr & table() const override
|
||||
{
|
||||
if (auto task = tasks.find(it->first); task != tasks.end())
|
||||
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task->second);
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void setLoadTasks(const Tasks & tasks_)
|
||||
{
|
||||
tasks = tasks_;
|
||||
}
|
||||
const StoragePtr & table() const override { return it->second; }
|
||||
};
|
||||
|
||||
using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
|
||||
|
@ -417,7 +417,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
|
||||
uuids_to_wait.push_back(table_to_wait);
|
||||
}
|
||||
}
|
||||
// only if operation is DETACH
|
||||
// only if operation is DETACH
|
||||
if ((!drop || !truncate) && query.sync)
|
||||
{
|
||||
/// Avoid "some tables are still in use" when sync mode is enabled
|
||||
|
Loading…
Reference in New Issue
Block a user