DDLWorkerQueueTable - add intial scaffolding

This commit is contained in:
bharatnc 2020-11-23 23:58:21 -08:00
parent 80d88a7b17
commit 2c1f9e2a77
6 changed files with 64 additions and 0 deletions

View File

@ -1128,6 +1128,13 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
return node_path;
}
std::vector<String> DDLWorker::dumpQueriesInQueue()
{
std::vector<String> queries;
// TODO:
return queries;
}
void DDLWorker::runMainThread()
{
@ -1517,6 +1524,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
else
context.checkAccess(query_requires_access);
// enqueue the queries into the DDL worker queue
DDLLogEntry entry;
entry.hosts = std::move(hosts);
entry.query = queryToString(query_ptr);

View File

@ -45,6 +45,9 @@ public:
/// Pushes query into DDL queue, returns path to created node
String enqueueQuery(DDLLogEntry & entry);
/// Fetches the queries that are currently in the DDL queue
std::vector<String> dumpQueriesInQueue();
/// Host ID (name:port) for logging purposes
/// Note that in each task hosts are identified individually by name:port from initiator server cluster config
std::string getCommonHostID() const

View File

@ -0,0 +1,20 @@
#include "StorageSystemDDLWorkerQueue.h"
#include <algorithm>
#include <DataTypes/DataTypeString.h>
namespace DB
{
NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes()
{
return {
{"query", std::make_shared<DataTypeString>()},
};
}
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
// TODO: insert into table
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Storages/System/IStorageSystemOneBlock.h>
#include <ext/shared_ptr_helper.h>
namespace DB
{
class Context;
/** System table "ddl_worker_queue" with list of queries that are currently in the DDL worker queue.
*/
class StorageSystemDDLWorkerQueue final : public ext::shared_ptr_helper<StorageSystemDDLWorkerQueue>,
public IStorageSystemOneBlock<StorageSystemDDLWorkerQueue>
{
friend struct ext::shared_ptr_helper<StorageSystemDDLWorkerQueue>;
protected:
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
using IStorageSystemOneBlock::IStorageSystemOneBlock;
public:
std::string getName() const override { return "SystemDDLWorkerQueue"; }
static NamesAndTypesList getNamesAndTypes();
};
}

View File

@ -39,6 +39,8 @@
#include <Storages/System/StorageSystemZooKeeper.h>
#include <Storages/System/StorageSystemContributors.h>
#include <Storages/System/StorageSystemErrors.h>
#include <Storages/System/StorageSystemDDLWorkerQueue.h>
#if !defined(ARCADIA_BUILD)
#include <Storages/System/StorageSystemLicenses.h>
#include <Storages/System/StorageSystemTimeZones.h>
@ -134,6 +136,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
attach<StorageSystemMutations>(system_database, "mutations");
attach<StorageSystemReplicas>(system_database, "replicas");
attach<StorageSystemReplicationQueue>(system_database, "replication_queue");
attach<StorageSystemDDLWorkerQueue>(system_database, "ddl_worker_queue");
attach<StorageSystemDistributionQueue>(system_database, "distribution_queue");
attach<StorageSystemDictionaries>(system_database, "dictionaries");
attach<StorageSystemModels>(system_database, "models");

View File

@ -147,6 +147,7 @@ SRCS(
System/StorageSystemContributors.cpp
System/StorageSystemContributors.generated.cpp
System/StorageSystemCurrentRoles.cpp
System/StorageSystemDDLWorkerQueue.cpp
System/StorageSystemDataTypeFamilies.cpp
System/StorageSystemDatabases.cpp
System/StorageSystemDetachedParts.cpp