Merge pull request #68249 from ClickHouse/rset

Fix 'Refresh set entry already exists'
This commit is contained in:
Michael Kolupaev 2024-08-13 20:07:31 +00:00 committed by GitHub
commit f4b8a98d06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 77 additions and 63 deletions

View File

@ -663,13 +663,16 @@ BlockIO InterpreterSystemQuery::execute()
startStopAction(ActionLocks::ViewRefresh, false);
break;
case Type::REFRESH_VIEW:
getRefreshTask()->run();
for (const auto & task : getRefreshTasks())
task->run();
break;
case Type::CANCEL_VIEW:
getRefreshTask()->cancel();
for (const auto & task : getRefreshTasks())
task->cancel();
break;
case Type::TEST_VIEW:
getRefreshTask()->setFakeTime(query.fake_time_for_view);
for (const auto & task : getRefreshTasks())
task->setFakeTime(query.fake_time_for_view);
break;
case Type::DROP_REPLICA:
dropReplica(query);
@ -1242,15 +1245,15 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "SYSTEM RESTART DISK is not supported");
}
RefreshTaskHolder InterpreterSystemQuery::getRefreshTask()
RefreshTaskList InterpreterSystemQuery::getRefreshTasks()
{
auto ctx = getContext();
ctx->checkAccess(AccessType::SYSTEM_VIEWS);
auto task = ctx->getRefreshSet().getTask(table_id);
if (!task)
auto tasks = ctx->getRefreshSet().findTasks(table_id);
if (tasks.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Refreshable view {} doesn't exist", table_id.getNameForLogs());
return task;
return tasks;
}

View File

@ -74,7 +74,7 @@ private:
void flushDistributed(ASTSystemQuery & query);
[[noreturn]] void restartDisk(String & name);
RefreshTaskHolder getRefreshTask();
RefreshTaskList getRefreshTasks();
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
void startStopAction(StorageActionBlockType action_type, bool start);

View File

@ -9,11 +9,6 @@ namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
RefreshSet::Handle::Handle(Handle && other) noexcept
{
*this = std::move(other);
@ -27,6 +22,7 @@ RefreshSet::Handle & RefreshSet::Handle::operator=(Handle && other) noexcept
parent_set = std::exchange(other.parent_set, nullptr);
id = std::move(other.id);
dependencies = std::move(other.dependencies);
iter = std::move(other.iter);
metric_increment = std::move(other.metric_increment);
return *this;
}
@ -39,21 +35,21 @@ RefreshSet::Handle::~Handle()
void RefreshSet::Handle::rename(StorageID new_id)
{
std::lock_guard lock(parent_set->mutex);
parent_set->removeDependenciesLocked(id, dependencies);
auto it = parent_set->tasks.find(id);
auto task = it->second;
parent_set->tasks.erase(it);
RefreshTaskHolder task = *iter;
parent_set->removeDependenciesLocked(task, dependencies);
parent_set->removeTaskLocked(id, iter);
id = new_id;
parent_set->tasks.emplace(id, task);
parent_set->addDependenciesLocked(id, dependencies);
iter = parent_set->addTaskLocked(id, task);
parent_set->addDependenciesLocked(task, dependencies);
}
void RefreshSet::Handle::changeDependencies(std::vector<StorageID> deps)
{
std::lock_guard lock(parent_set->mutex);
parent_set->removeDependenciesLocked(id, dependencies);
RefreshTaskHolder task = *iter;
parent_set->removeDependenciesLocked(task, dependencies);
dependencies = std::move(deps);
parent_set->addDependenciesLocked(id, dependencies);
parent_set->addDependenciesLocked(task, dependencies);
}
void RefreshSet::Handle::reset()
@ -63,8 +59,8 @@ void RefreshSet::Handle::reset()
{
std::lock_guard lock(parent_set->mutex);
parent_set->removeDependenciesLocked(id, dependencies);
parent_set->tasks.erase(id);
parent_set->removeDependenciesLocked(*iter, dependencies);
parent_set->removeTaskLocked(id, iter);
}
parent_set = nullptr;
@ -76,37 +72,50 @@ RefreshSet::RefreshSet() = default;
void RefreshSet::emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task)
{
std::lock_guard guard(mutex);
auto [it, is_inserted] = tasks.emplace(id, task);
if (!is_inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Refresh set entry already exists for table {}", id.getFullTableName());
addDependenciesLocked(id, dependencies);
const auto iter = addTaskLocked(id, task);
addDependenciesLocked(task, dependencies);
task->setRefreshSetHandleUnlock(Handle(this, id, dependencies));
task->setRefreshSetHandleUnlock(Handle(this, id, iter, dependencies));
}
void RefreshSet::addDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies)
RefreshTaskList::iterator RefreshSet::addTaskLocked(StorageID id, RefreshTaskHolder task)
{
RefreshTaskList & list = tasks[id];
list.push_back(task);
return std::prev(list.end());
}
void RefreshSet::removeTaskLocked(StorageID id, RefreshTaskList::iterator iter)
{
const auto it = tasks.find(id);
it->second.erase(iter);
if (it->second.empty())
tasks.erase(it);
}
void RefreshSet::addDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies)
{
for (const StorageID & dep : dependencies)
dependents[dep].insert(id);
dependents[dep].insert(task);
}
void RefreshSet::removeDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies)
void RefreshSet::removeDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies)
{
for (const StorageID & dep : dependencies)
{
auto & set = dependents[dep];
set.erase(id);
set.erase(task);
if (set.empty())
dependents.erase(dep);
}
}
RefreshTaskHolder RefreshSet::getTask(const StorageID & id) const
RefreshTaskList RefreshSet::findTasks(const StorageID & id) const
{
std::lock_guard lock(mutex);
if (auto task = tasks.find(id); task != tasks.end())
return task->second;
return nullptr;
if (auto it = tasks.find(id); it != tasks.end())
return it->second;
return {};
}
RefreshSet::InfoContainer RefreshSet::getInfo() const
@ -116,26 +125,23 @@ RefreshSet::InfoContainer RefreshSet::getInfo() const
lock.unlock();
InfoContainer res;
for (const auto & [id, task] : tasks_copy)
res.push_back(task->getInfo());
for (const auto & [id, list] : tasks_copy)
for (const auto & task : list)
res.push_back(task->getInfo());
return res;
}
std::vector<RefreshTaskHolder> RefreshSet::getDependents(const StorageID & id) const
{
std::lock_guard lock(mutex);
std::vector<RefreshTaskHolder> res;
auto it = dependents.find(id);
if (it == dependents.end())
return {};
for (const StorageID & dep_id : it->second)
if (auto task = tasks.find(dep_id); task != tasks.end())
res.push_back(task->second);
return res;
return std::vector<RefreshTaskHolder>(it->second.begin(), it->second.end());
}
RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, std::vector<StorageID> dependencies_)
RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, RefreshTaskList::iterator iter_, std::vector<StorageID> dependencies_)
: parent_set(parent_set_), id(std::move(id_)), dependencies(std::move(dependencies_))
, metric_increment(CurrentMetrics::Increment(CurrentMetrics::RefreshableViews)) {}
, iter(iter_), metric_increment(CurrentMetrics::Increment(CurrentMetrics::RefreshableViews)) {}
}

View File

@ -5,12 +5,11 @@
#include <Storages/IStorage.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Common/CurrentMetrics.h>
#include <list>
namespace DB
{
using DatabaseAndTableNameSet = std::unordered_set<StorageID, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
enum class RefreshState : RefreshTaskStateUnderlying
{
Disabled = 0,
@ -46,8 +45,7 @@ struct RefreshInfo
class RefreshSet
{
public:
/// RAII thing that unregisters a task and its dependencies in destructor.
/// Storage IDs must be unique. Not thread safe.
/// RAII thing that unregisters a task and its dependencies in destructor. Not thread safe.
class Handle
{
friend class RefreshSet;
@ -73,9 +71,10 @@ public:
RefreshSet * parent_set = nullptr;
StorageID id = StorageID::createEmpty();
std::vector<StorageID> dependencies;
RefreshTaskList::iterator iter; // in parent_set->tasks[id]
std::optional<CurrentMetrics::Increment> metric_increment;
Handle(RefreshSet * parent_set_, StorageID id_, std::vector<StorageID> dependencies_);
Handle(RefreshSet * parent_set_, StorageID id_, RefreshTaskList::iterator iter_, std::vector<StorageID> dependencies_);
};
using InfoContainer = std::vector<RefreshInfo>;
@ -84,7 +83,9 @@ public:
void emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task);
RefreshTaskHolder getTask(const StorageID & id) const;
/// Finds active refreshable view(s) by database and table name.
/// Normally there's at most one, but we allow name collisions here, just in case.
RefreshTaskList findTasks(const StorageID & id) const;
InfoContainer getInfo() const;
@ -92,8 +93,8 @@ public:
std::vector<RefreshTaskHolder> getDependents(const StorageID & id) const;
private:
using TaskMap = std::unordered_map<StorageID, RefreshTaskHolder, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
using DependentsMap = std::unordered_map<StorageID, DatabaseAndTableNameSet, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
using TaskMap = std::unordered_map<StorageID, RefreshTaskList, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
using DependentsMap = std::unordered_map<StorageID, std::unordered_set<RefreshTaskHolder>, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
/// Protects the two maps below, not locked for any nontrivial operations (e.g. operations that
/// block or lock other mutexes).
@ -102,8 +103,10 @@ private:
TaskMap tasks;
DependentsMap dependents;
void addDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies);
void removeDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies);
RefreshTaskList::iterator addTaskLocked(StorageID id, RefreshTaskHolder task);
void removeTaskLocked(StorageID id, RefreshTaskList::iterator iter);
void addDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies);
void removeDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies);
};
}

View File

@ -33,7 +33,6 @@ RefreshTask::RefreshTask(
{}
RefreshTaskHolder RefreshTask::create(
const StorageMaterializedView & view,
ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy)
{
@ -46,12 +45,9 @@ RefreshTaskHolder RefreshTask::create(
t->refreshTask();
});
std::vector<StorageID> deps;
if (strategy.dependencies)
for (auto && dependency : strategy.dependencies->children)
deps.emplace_back(dependency->as<const ASTTableIdentifier &>());
context->getRefreshSet().emplace(view.getStorageID(), deps, task);
task->initial_dependencies.emplace_back(dependency->as<const ASTTableIdentifier &>());
return task;
}
@ -61,6 +57,7 @@ void RefreshTask::initializeAndStart(std::shared_ptr<StorageMaterializedView> vi
view_to_refresh = view;
if (view->getContext()->getSettingsRef().stop_refreshable_materialized_views_on_startup)
stop_requested = true;
view->getContext()->getRefreshSet().emplace(view->getStorageID(), initial_dependencies, shared_from_this());
populateDependencies();
advanceNextRefreshTime(currentTime());
refresh_task->schedule();
@ -69,7 +66,8 @@ void RefreshTask::initializeAndStart(std::shared_ptr<StorageMaterializedView> vi
void RefreshTask::rename(StorageID new_id)
{
std::lock_guard guard(mutex);
set_handle.rename(new_id);
if (set_handle)
set_handle.rename(new_id);
}
void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy)

View File

@ -26,7 +26,6 @@ public:
/// The only proper way to construct task
static RefreshTaskHolder create(
const StorageMaterializedView & view,
ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy);
@ -84,9 +83,11 @@ private:
RefreshSchedule refresh_schedule;
RefreshSettings refresh_settings; // TODO: populate, use, update on alter
std::vector<StorageID> initial_dependencies;
RefreshSet::Handle set_handle;
/// StorageIDs of our dependencies that we're waiting for.
using DatabaseAndTableNameSet = std::unordered_set<StorageID, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
DatabaseAndTableNameSet remaining_dependencies;
bool time_arrived = false;

View File

@ -11,5 +11,6 @@ class RefreshTask;
using RefreshTaskStateUnderlying = UInt8;
using RefreshTaskHolder = std::shared_ptr<RefreshTask>;
using RefreshTaskObserver = std::weak_ptr<RefreshTask>;
using RefreshTaskList = std::list<RefreshTaskHolder>;
}

View File

@ -203,7 +203,6 @@ StorageMaterializedView::StorageMaterializedView(
{
fixed_uuid = false;
refresher = RefreshTask::create(
*this,
getContext(),
*query.refresh_strategy);
refresh_on_start = mode < LoadingStrictnessLevel::ATTACH && !query.is_create_empty;

View File

@ -5,6 +5,7 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Storages/MaterializedView/RefreshSet.h>
@ -19,6 +20,7 @@ ColumnsDescription StorageSystemViewRefreshes::getColumnsDescription()
{
{"database", std::make_shared<DataTypeString>(), "The name of the database the table is in."},
{"view", std::make_shared<DataTypeString>(), "Table name."},
{"uuid", std::make_shared<DataTypeUUID>(), "Table uuid (Atomic database)."},
{"status", std::make_shared<DataTypeString>(), "Current state of the refresh."},
{"last_refresh_result", std::make_shared<DataTypeString>(), "Outcome of the latest refresh attempt."},
{"last_refresh_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()),
@ -63,6 +65,7 @@ void StorageSystemViewRefreshes::fillData(
std::size_t i = 0;
res_columns[i++]->insert(refresh.view_id.getDatabaseName());
res_columns[i++]->insert(refresh.view_id.getTableName());
res_columns[i++]->insert(refresh.view_id.uuid);
res_columns[i++]->insert(toString(refresh.state));
res_columns[i++]->insert(toString(refresh.last_refresh_result));