Merge remote-tracking branch 'origin/master' into mv4

This commit is contained in:
Michael Kolupaev 2024-08-14 00:29:48 +00:00
commit ab843c3cf5
12 changed files with 107 additions and 66 deletions

View File

@ -103,7 +103,7 @@ LIMIT 2;
└─────────┴─────────┴─────────┘
```
### Inserting data from a file into a table:
### Inserting data from a file into a table
``` sql
INSERT INTO FUNCTION

View File

@ -663,16 +663,19 @@ BlockIO InterpreterSystemQuery::execute()
startStopAction(ActionLocks::ViewRefresh, false);
break;
case Type::REFRESH_VIEW:
getRefreshTask()->run();
for (const auto & task : getRefreshTasks())
task->run();
break;
case Type::WAIT_VIEW:
getRefreshTask()->wait();
break;
case Type::CANCEL_VIEW:
getRefreshTask()->cancel();
for (const auto & task : getRefreshTasks())
task->cancel();
break;
case Type::TEST_VIEW:
getRefreshTask()->setFakeTime(query.fake_time_for_view);
for (const auto & task : getRefreshTasks())
task->setFakeTime(query.fake_time_for_view);
break;
case Type::DROP_REPLICA:
dropReplica(query);
@ -1245,15 +1248,15 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "SYSTEM RESTART DISK is not supported");
}
RefreshTaskHolder InterpreterSystemQuery::getRefreshTask()
RefreshTaskList InterpreterSystemQuery::getRefreshTasks()
{
auto ctx = getContext();
ctx->checkAccess(AccessType::SYSTEM_VIEWS);
auto task = ctx->getRefreshSet().getTask(table_id);
if (!task)
auto tasks = ctx->getRefreshSet().findTasks(table_id);
if (tasks.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Refreshable view {} doesn't exist", table_id.getNameForLogs());
return task;
return tasks;
}

View File

@ -74,7 +74,7 @@ private:
void flushDistributed(ASTSystemQuery & query);
[[noreturn]] void restartDisk(String & name);
RefreshTaskHolder getRefreshTask();
RefreshTaskList getRefreshTasks();
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
void startStopAction(StorageActionBlockType action_type, bool start);

View File

@ -57,8 +57,15 @@ LocalFileHolder::~LocalFileHolder()
{
if (original_readbuffer)
{
assert_cast<SeekableReadBuffer *>(original_readbuffer.get())->seek(0, SEEK_SET);
file_cache_controller->value().startBackgroundDownload(std::move(original_readbuffer), *thread_pool);
try
{
assert_cast<SeekableReadBuffer *>(original_readbuffer.get())->seek(0, SEEK_SET);
file_cache_controller->value().startBackgroundDownload(std::move(original_readbuffer), *thread_pool);
}
catch (...)
{
tryLogCurrentException(getLogger("LocalFileHolder"), "Exception during destructor of LocalFileHolder.");
}
}
}

View File

@ -9,11 +9,6 @@ namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
RefreshSet::Handle::Handle(Handle && other) noexcept
{
*this = std::move(other);
@ -27,6 +22,7 @@ RefreshSet::Handle & RefreshSet::Handle::operator=(Handle && other) noexcept
parent_set = std::exchange(other.parent_set, nullptr);
id = std::move(other.id);
dependencies = std::move(other.dependencies);
iter = std::move(other.iter);
metric_increment = std::move(other.metric_increment);
return *this;
}
@ -39,21 +35,21 @@ RefreshSet::Handle::~Handle()
void RefreshSet::Handle::rename(StorageID new_id)
{
std::lock_guard lock(parent_set->mutex);
parent_set->removeDependenciesLocked(id, dependencies);
auto it = parent_set->tasks.find(id);
auto task = it->second;
parent_set->tasks.erase(it);
RefreshTaskHolder task = *iter;
parent_set->removeDependenciesLocked(task, dependencies);
parent_set->removeTaskLocked(id, iter);
id = new_id;
parent_set->tasks.emplace(id, task);
parent_set->addDependenciesLocked(id, dependencies);
iter = parent_set->addTaskLocked(id, task);
parent_set->addDependenciesLocked(task, dependencies);
}
void RefreshSet::Handle::changeDependencies(std::vector<StorageID> deps)
{
std::lock_guard lock(parent_set->mutex);
parent_set->removeDependenciesLocked(id, dependencies);
RefreshTaskHolder task = *iter;
parent_set->removeDependenciesLocked(task, dependencies);
dependencies = std::move(deps);
parent_set->addDependenciesLocked(id, dependencies);
parent_set->addDependenciesLocked(task, dependencies);
}
void RefreshSet::Handle::reset()
@ -63,8 +59,8 @@ void RefreshSet::Handle::reset()
{
std::lock_guard lock(parent_set->mutex);
parent_set->removeDependenciesLocked(id, dependencies);
parent_set->tasks.erase(id);
parent_set->removeDependenciesLocked(*iter, dependencies);
parent_set->removeTaskLocked(id, iter);
}
parent_set = nullptr;
@ -76,37 +72,50 @@ RefreshSet::RefreshSet() = default;
void RefreshSet::emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task)
{
std::lock_guard guard(mutex);
auto [it, is_inserted] = tasks.emplace(id, task);
if (!is_inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Refresh set entry already exists for table {}", id.getFullTableName());
addDependenciesLocked(id, dependencies);
const auto iter = addTaskLocked(id, task);
addDependenciesLocked(task, dependencies);
task->setRefreshSetHandleUnlock(Handle(this, id, dependencies));
task->setRefreshSetHandleUnlock(Handle(this, id, iter, dependencies));
}
void RefreshSet::addDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies)
RefreshTaskList::iterator RefreshSet::addTaskLocked(StorageID id, RefreshTaskHolder task)
{
RefreshTaskList & list = tasks[id];
list.push_back(task);
return std::prev(list.end());
}
void RefreshSet::removeTaskLocked(StorageID id, RefreshTaskList::iterator iter)
{
const auto it = tasks.find(id);
it->second.erase(iter);
if (it->second.empty())
tasks.erase(it);
}
void RefreshSet::addDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies)
{
for (const StorageID & dep : dependencies)
dependents[dep].insert(id);
dependents[dep].insert(task);
}
void RefreshSet::removeDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies)
void RefreshSet::removeDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies)
{
for (const StorageID & dep : dependencies)
{
auto & set = dependents[dep];
set.erase(id);
set.erase(task);
if (set.empty())
dependents.erase(dep);
}
}
RefreshTaskHolder RefreshSet::getTask(const StorageID & id) const
RefreshTaskList RefreshSet::findTasks(const StorageID & id) const
{
std::lock_guard lock(mutex);
if (auto task = tasks.find(id); task != tasks.end())
return task->second;
return nullptr;
if (auto it = tasks.find(id); it != tasks.end())
return it->second;
return {};
}
RefreshSet::InfoContainer RefreshSet::getInfo() const
@ -116,26 +125,23 @@ RefreshSet::InfoContainer RefreshSet::getInfo() const
lock.unlock();
InfoContainer res;
for (const auto & [id, task] : tasks_copy)
res.push_back(task->getInfo());
for (const auto & [id, list] : tasks_copy)
for (const auto & task : list)
res.push_back(task->getInfo());
return res;
}
std::vector<RefreshTaskHolder> RefreshSet::getDependents(const StorageID & id) const
{
std::lock_guard lock(mutex);
std::vector<RefreshTaskHolder> res;
auto it = dependents.find(id);
if (it == dependents.end())
return {};
for (const StorageID & dep_id : it->second)
if (auto task = tasks.find(dep_id); task != tasks.end())
res.push_back(task->second);
return res;
return std::vector<RefreshTaskHolder>(it->second.begin(), it->second.end());
}
RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, std::vector<StorageID> dependencies_)
RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, RefreshTaskList::iterator iter_, std::vector<StorageID> dependencies_)
: parent_set(parent_set_), id(std::move(id_)), dependencies(std::move(dependencies_))
, metric_increment(CurrentMetrics::Increment(CurrentMetrics::RefreshableViews)) {}
, iter(iter_), metric_increment(CurrentMetrics::Increment(CurrentMetrics::RefreshableViews)) {}
}

View File

@ -5,12 +5,11 @@
#include <Storages/IStorage.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Common/CurrentMetrics.h>
#include <list>
namespace DB
{
using DatabaseAndTableNameSet = std::unordered_set<StorageID, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
enum class RefreshState
{
Disabled = 0,
@ -47,8 +46,7 @@ struct RefreshInfo
class RefreshSet
{
public:
/// RAII thing that unregisters a task and its dependencies in destructor.
/// Storage IDs must be unique. Not thread safe.
/// RAII thing that unregisters a task and its dependencies in destructor. Not thread safe.
class Handle
{
friend class RefreshSet;
@ -74,9 +72,10 @@ public:
RefreshSet * parent_set = nullptr;
StorageID id = StorageID::createEmpty();
std::vector<StorageID> dependencies;
RefreshTaskList::iterator iter; // in parent_set->tasks[id]
std::optional<CurrentMetrics::Increment> metric_increment;
Handle(RefreshSet * parent_set_, StorageID id_, std::vector<StorageID> dependencies_);
Handle(RefreshSet * parent_set_, StorageID id_, RefreshTaskList::iterator iter_, std::vector<StorageID> dependencies_);
};
using InfoContainer = std::vector<RefreshInfo>;
@ -85,7 +84,9 @@ public:
void emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task);
RefreshTaskHolder getTask(const StorageID & id) const;
/// Finds active refreshable view(s) by database and table name.
/// Normally there's at most one, but we allow name collisions here, just in case.
RefreshTaskList findTasks(const StorageID & id) const;
InfoContainer getInfo() const;
@ -93,8 +94,8 @@ public:
std::vector<RefreshTaskHolder> getDependents(const StorageID & id) const;
private:
using TaskMap = std::unordered_map<StorageID, RefreshTaskHolder, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
using DependentsMap = std::unordered_map<StorageID, DatabaseAndTableNameSet, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
using TaskMap = std::unordered_map<StorageID, RefreshTaskList, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
using DependentsMap = std::unordered_map<StorageID, std::unordered_set<RefreshTaskHolder>, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
/// Protects the two maps below, not locked for any nontrivial operations (e.g. operations that
/// block or lock other mutexes).
@ -103,8 +104,10 @@ private:
TaskMap tasks;
DependentsMap dependents;
void addDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies);
void removeDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies);
RefreshTaskList::iterator addTaskLocked(StorageID id, RefreshTaskHolder task);
void removeTaskLocked(StorageID id, RefreshTaskList::iterator iter);
void addDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies);
void removeDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies);
};
}

View File

@ -37,7 +37,7 @@ RefreshTask::RefreshTask(
refresh_settings.applyChanges(strategy.settings->changes);
}
OwnedRefreshTask RefreshTask::create(
RefreshTaskHolder RefreshTask::create(
StorageMaterializedView * view,
ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy)
@ -47,12 +47,9 @@ OwnedRefreshTask RefreshTask::create(
task->refresh_task = context->getSchedulePool().createTask("RefreshTask",
[self = task.get()] { self->refreshTask(); });
std::vector<StorageID> deps;
if (strategy.dependencies)
for (auto && dependency : strategy.dependencies->children)
deps.emplace_back(dependency->as<const ASTTableIdentifier &>());
context->getRefreshSet().emplace(view->getStorageID(), deps, task);
task->initial_dependencies.emplace_back(dependency->as<const ASTTableIdentifier &>());
return OwnedRefreshTask(task);
}
@ -61,6 +58,7 @@ void RefreshTask::initializeAndStart()
{
if (view->getContext()->getSettingsRef().stop_refreshable_materialized_views_on_startup)
stop_requested = true;
view->getContext()->getRefreshSet().emplace(view->getStorageID(), initial_dependencies, shared_from_this());
populateDependencies();
advanceNextRefreshTime(currentTime());
refresh_task->schedule();
@ -69,7 +67,8 @@ void RefreshTask::initializeAndStart()
void RefreshTask::rename(StorageID new_id)
{
std::lock_guard guard(mutex);
set_handle.rename(new_id);
if (set_handle)
set_handle.rename(new_id);
}
void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy)

View File

@ -26,8 +26,7 @@ public:
RefreshTask(StorageMaterializedView * view_, const ASTRefreshStrategy & strategy);
/// The only proper way to construct task
static OwnedRefreshTask create(
StorageMaterializedView * view,
static RefreshTaskHolder create(
ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy);
@ -92,11 +91,13 @@ private:
RefreshSchedule refresh_schedule;
RefreshSettings refresh_settings;
std::vector<StorageID> initial_dependencies;
bool refresh_append;
RefreshSet::Handle set_handle;
/// StorageIDs of our dependencies that we're waiting for.
using DatabaseAndTableNameSet = std::unordered_set<StorageID, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
DatabaseAndTableNameSet remaining_dependencies;
bool time_arrived = false;

View File

@ -9,5 +9,6 @@ namespace DB
class RefreshTask;
using RefreshTaskHolder = std::shared_ptr<RefreshTask>;
using RefreshTaskList = std::list<RefreshTaskHolder>;
}

View File

@ -5,6 +5,7 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Storages/MaterializedView/RefreshSet.h>
@ -19,6 +20,7 @@ ColumnsDescription StorageSystemViewRefreshes::getColumnsDescription()
{
{"database", std::make_shared<DataTypeString>(), "The name of the database the table is in."},
{"view", std::make_shared<DataTypeString>(), "Table name."},
{"uuid", std::make_shared<DataTypeUUID>(), "Table uuid (Atomic database)."},
{"status", std::make_shared<DataTypeString>(), "Current state of the refresh."},
{"last_refresh_result", std::make_shared<DataTypeString>(), "Outcome of the latest refresh attempt."},
{"last_refresh_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()),
@ -64,6 +66,7 @@ void StorageSystemViewRefreshes::fillData(
std::size_t i = 0;
res_columns[i++]->insert(refresh.view_id.getDatabaseName());
res_columns[i++]->insert(refresh.view_id.getTableName());
res_columns[i++]->insert(refresh.view_id.uuid);
res_columns[i++]->insert(toString(refresh.state));
res_columns[i++]->insert(toString(refresh.last_refresh_result));

View File

@ -16,4 +16,7 @@
<query>SELECT keys.key, value1 FROM keys ANY LEFT JOIN dict AS d ON (keys.key = d.key) FORMAT Null;</query>
<query>SELECT keys.key, value1 FROM keys ANY LEFT JOIN dict AS d ON (keys.key = d.key) FORMAT Null SETTINGS
allow_experimental_analyzer=1</query>
<drop_query>DROP TABLE IF EXISTS keys</drop_query>
<drop_query>DROP TABLE IF EXISTS dict</drop_query>
</test>

View File

@ -70,6 +70,21 @@ sleep 1
${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_cleaner(x) VALUES (4)"
sleep 0.1
for i in {1..10}
do
if [ "$(${CLICKHOUSE_CLIENT} --query="SELECT count() FROM system.mutations WHERE database = '$CLICKHOUSE_DATABASE' and table = 'mutations_cleaner'")" -eq 2 ]; then
break
fi
if [[ $i -eq 100 ]]; then
echo "Timed out while waiting for outdated mutation record to be deleted!"
fi
sleep 1
${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_cleaner(x) VALUES (4)"
done
# Check that the first mutation is cleaned
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, is_done FROM system.mutations WHERE database = '$CLICKHOUSE_DATABASE' and table = 'mutations_cleaner' ORDER BY mutation_id"