mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
dbms: added system.zookeeper table [#METR-2944].
This commit is contained in:
parent
2c2d4ea921
commit
0e923da9b3
44
dbms/include/DB/Storages/StorageSystemSettings.h
Normal file
44
dbms/include/DB/Storages/StorageSystemSettings.h
Normal file
@ -0,0 +1,44 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/SharedPtr.h>
|
||||
|
||||
#include <DB/Storages/IStorage.h>
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <DB/Interpreters/Context.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using Poco::SharedPtr;
|
||||
|
||||
|
||||
/** Реализует системную таблицу settings, которая позволяет получить информацию о текущих настройках.
|
||||
*/
|
||||
class StorageSystemSettings : public IStorage
|
||||
{
|
||||
public:
|
||||
static StoragePtr create(const std::string & name_, const Context & context_);
|
||||
|
||||
std::string getName() const override { return "SystemSettings"; }
|
||||
std::string getTableName() const override { return name; }
|
||||
|
||||
const NamesAndTypesList & getColumnsList() const override { return columns; }
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
ASTPtr query,
|
||||
const Settings & settings,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
size_t max_block_size = DEFAULT_BLOCK_SIZE,
|
||||
unsigned threads = 1) override;
|
||||
|
||||
private:
|
||||
const std::string name;
|
||||
const Context & context;
|
||||
NamesAndTypesList columns;
|
||||
|
||||
StorageSystemSettings(const std::string & name_, const Context & context_);
|
||||
};
|
||||
|
||||
}
|
44
dbms/include/DB/Storages/StorageSystemZooKeeper.h
Normal file
44
dbms/include/DB/Storages/StorageSystemZooKeeper.h
Normal file
@ -0,0 +1,44 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/SharedPtr.h>
|
||||
|
||||
#include <DB/Storages/IStorage.h>
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <DB/Interpreters/Context.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using Poco::SharedPtr;
|
||||
|
||||
|
||||
/** Реализует системную таблицу zookeeper, которая позволяет просматривать данные в ZooKeeper в целях отладки.
|
||||
*/
|
||||
class StorageSystemZooKeeper : public IStorage
|
||||
{
|
||||
public:
|
||||
static StoragePtr create(const std::string & name_, const Context & context_);
|
||||
|
||||
std::string getName() const override { return "SystemZooKeeper"; }
|
||||
std::string getTableName() const override { return name; }
|
||||
|
||||
const NamesAndTypesList & getColumnsList() const override { return columns; }
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
ASTPtr query,
|
||||
const Settings & settings,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
size_t max_block_size = DEFAULT_BLOCK_SIZE,
|
||||
unsigned threads = 1) override;
|
||||
|
||||
private:
|
||||
const std::string name;
|
||||
const Context & context;
|
||||
NamesAndTypesList columns;
|
||||
|
||||
StorageSystemZooKeeper(const std::string & name_, const Context & context_);
|
||||
};
|
||||
|
||||
}
|
@ -97,7 +97,7 @@ static bool isValidFunction(ASTPtr expression, const NameSet & columns)
|
||||
if (!isValidFunction(expression->children[i], columns))
|
||||
return false;
|
||||
|
||||
if (const ASTIdentifier * identifier = typeid_cast<const ASTIdentifier *>(&* expression))
|
||||
if (const ASTIdentifier * identifier = typeid_cast<const ASTIdentifier *>(&*expression))
|
||||
{
|
||||
if (identifier->kind == ASTIdentifier::Kind::Column)
|
||||
return columns.count(identifier->name);
|
||||
@ -108,7 +108,7 @@ static bool isValidFunction(ASTPtr expression, const NameSet & columns)
|
||||
/// Извлечь все подфункции главной конъюнкции, но зависящие только от заданных столбцов
|
||||
static void extractFunctions(ASTPtr expression, const NameSet & columns, std::vector<ASTPtr> & result)
|
||||
{
|
||||
const ASTFunction * function = typeid_cast<const ASTFunction *>(&* expression);
|
||||
const ASTFunction * function = typeid_cast<const ASTFunction *>(&*expression);
|
||||
if (function && function->name == "and")
|
||||
{
|
||||
for (size_t i = 0; i < function->arguments->children.size(); ++i)
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <DB/Storages/StorageSystemOne.h>
|
||||
#include <DB/Storages/StorageSystemMerges.h>
|
||||
#include <DB/Storages/StorageSystemSettings.h>
|
||||
#include <DB/Storages/StorageSystemZooKeeper.h>
|
||||
|
||||
#include "Server.h"
|
||||
#include "HTTPHandler.h"
|
||||
@ -353,8 +354,12 @@ int Server::main(const std::vector<std::string> & args)
|
||||
global_context->setGlobalContext(*global_context);
|
||||
global_context->setPath(config().getString("path"));
|
||||
|
||||
bool has_zookeeper = false;
|
||||
if (config().has("zookeeper"))
|
||||
{
|
||||
global_context->setZooKeeper(new zkutil::ZooKeeper(config(), "zookeeper"));
|
||||
has_zookeeper = true;
|
||||
}
|
||||
|
||||
if (config().has("interserver_http_port"))
|
||||
{
|
||||
@ -411,6 +416,9 @@ int Server::main(const std::vector<std::string> & args)
|
||||
global_context->addTable("system", "events", StorageSystemEvents::create("events"));
|
||||
global_context->addTable("system", "merges", StorageSystemMerges::create("merges", *global_context));
|
||||
|
||||
if (has_zookeeper)
|
||||
global_context->addTable("system", "zookeeper", StorageSystemZooKeeper::create("zookeeper", *global_context));
|
||||
|
||||
global_context->setCurrentDatabase(config().getString("default_database", "default"));
|
||||
|
||||
{
|
||||
|
@ -128,8 +128,6 @@ BlockInputStreams StorageSystemParts::read(
|
||||
block.insert(ColumnWithNameAndType(active_column, new DataTypeUInt8, "active"));
|
||||
}
|
||||
|
||||
std::cerr << block.dumpStructure();
|
||||
|
||||
/// Отфильтруем блок со столбцами database, table, engine, replicated и active.
|
||||
VirtualColumnUtils::filterBlockWithQuery(query->clone(), block, context);
|
||||
|
||||
|
65
dbms/src/Storages/StorageSystemSettings.cpp
Normal file
65
dbms/src/Storages/StorageSystemSettings.cpp
Normal file
@ -0,0 +1,65 @@
|
||||
#include <DB/Columns/ColumnString.h>
|
||||
#include <DB/DataTypes/DataTypeString.h>
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
#include <DB/DataStreams/OneBlockInputStream.h>
|
||||
#include <DB/Storages/StorageSystemSettings.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
StorageSystemSettings::StorageSystemSettings(const std::string & name_, const Context & context_)
|
||||
: name(name_), context(context_)
|
||||
, columns{
|
||||
{ "name", new DataTypeString },
|
||||
{ "value", new DataTypeString },
|
||||
{ "changed", new DataTypeUInt8 },
|
||||
}
|
||||
{
|
||||
}
|
||||
|
||||
StoragePtr StorageSystemSettings::create(const std::string & name_, const Context & context_)
|
||||
{
|
||||
return (new StorageSystemSettings(name_, context_))->thisPtr();
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreams StorageSystemSettings::read(
|
||||
const Names & column_names, ASTPtr query, const Settings & settings,
|
||||
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
|
||||
{
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
|
||||
ColumnWithNameAndType col_name{new ColumnString, new DataTypeString, "name"};
|
||||
ColumnWithNameAndType col_value{new ColumnString, new DataTypeString, "value"};
|
||||
ColumnWithNameAndType col_changed{new ColumnUInt8, new DataTypeUInt8, "changed"};
|
||||
|
||||
#define ADD_SETTING(TYPE, NAME, DEFAULT) \
|
||||
col_name.column->insert(String(#NAME)); \
|
||||
col_value.column->insert(settings.NAME.toString()); \
|
||||
col_changed.column->insert(UInt64(settings.NAME.changed));
|
||||
|
||||
APPLY_FOR_SETTINGS(ADD_SETTING)
|
||||
#undef ADD_SETTING
|
||||
|
||||
#define ADD_LIMIT(TYPE, NAME, DEFAULT) \
|
||||
col_name.column->insert(String(#NAME)); \
|
||||
col_value.column->insert(settings.limits.NAME.toString()); \
|
||||
col_changed.column->insert(UInt64(settings.limits.NAME.changed));
|
||||
|
||||
APPLY_FOR_LIMITS(ADD_LIMIT)
|
||||
#undef ADD_LIMIT
|
||||
|
||||
Block block{
|
||||
col_name,
|
||||
col_value,
|
||||
col_changed,
|
||||
};
|
||||
|
||||
return BlockInputStreams(1, new OneBlockInputStream(block));
|
||||
}
|
||||
|
||||
|
||||
}
|
179
dbms/src/Storages/StorageSystemZooKeeper.cpp
Normal file
179
dbms/src/Storages/StorageSystemZooKeeper.cpp
Normal file
@ -0,0 +1,179 @@
|
||||
#include <DB/Columns/ColumnString.h>
|
||||
#include <DB/DataTypes/DataTypeString.h>
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
#include <DB/DataTypes/DataTypeDateTime.h>
|
||||
#include <DB/DataStreams/OneBlockInputStream.h>
|
||||
#include <DB/Storages/StorageSystemZooKeeper.h>
|
||||
#include <DB/Parsers/ASTSelectQuery.h>
|
||||
#include <DB/Parsers/ASTIdentifier.h>
|
||||
#include <DB/Parsers/ASTLiteral.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
StorageSystemZooKeeper::StorageSystemZooKeeper(const std::string & name_, const Context & context_)
|
||||
: name(name_), context(context_)
|
||||
, columns{
|
||||
{ "name", new DataTypeString },
|
||||
{ "value", new DataTypeString },
|
||||
{ "czxid", new DataTypeInt64 },
|
||||
{ "mzxid", new DataTypeInt64 },
|
||||
{ "ctime", new DataTypeDateTime},
|
||||
{ "mtime", new DataTypeDateTime},
|
||||
{ "version", new DataTypeInt32 },
|
||||
{ "cversion", new DataTypeInt32 },
|
||||
{ "aversion", new DataTypeInt32 },
|
||||
{ "ephemeralOwner", new DataTypeInt64 },
|
||||
{ "dataLength", new DataTypeInt32 },
|
||||
{ "numChildren", new DataTypeInt32 },
|
||||
{ "pzxid", new DataTypeInt64 },
|
||||
{ "path", new DataTypeString },
|
||||
}
|
||||
{
|
||||
}
|
||||
|
||||
StoragePtr StorageSystemZooKeeper::create(const std::string & name_, const Context & context_)
|
||||
{
|
||||
return (new StorageSystemZooKeeper(name_, context_))->thisPtr();
|
||||
}
|
||||
|
||||
|
||||
static bool extractPathImpl(const IAST & elem, String & res)
|
||||
{
|
||||
const ASTFunction * function = typeid_cast<const ASTFunction *>(&elem);
|
||||
if (!function)
|
||||
return false;
|
||||
|
||||
if (function->name == "and")
|
||||
{
|
||||
for (size_t i = 0; i < function->arguments->children.size(); ++i)
|
||||
if (extractPathImpl(*function->arguments->children[i], res))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (function->name == "equals")
|
||||
{
|
||||
const ASTExpressionList & args = typeid_cast<const ASTExpressionList &>(*function->arguments);
|
||||
const IAST * value;
|
||||
|
||||
if (args.children.size() != 2)
|
||||
return false;
|
||||
|
||||
const ASTIdentifier * ident;
|
||||
if ((ident = typeid_cast<const ASTIdentifier *>(&*args.children.at(0))))
|
||||
value = &*args.children.at(1);
|
||||
else if ((ident = typeid_cast<const ASTIdentifier *>(&*args.children.at(1))))
|
||||
value = &*args.children.at(0);
|
||||
else
|
||||
return false;
|
||||
|
||||
if (ident->name != "path")
|
||||
return false;
|
||||
|
||||
const ASTLiteral * literal = typeid_cast<const ASTLiteral *>(value);
|
||||
if (!literal)
|
||||
return false;
|
||||
|
||||
if (literal->value.getType() != Field::Types::String)
|
||||
return false;
|
||||
|
||||
res = literal->value.safeGet<String>();
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/** Вынимает из запроса условие вида path = 'path', из конъюнкций в секции WHERE.
|
||||
*/
|
||||
static String extractPath(const ASTPtr & query)
|
||||
{
|
||||
const ASTSelectQuery & select = typeid_cast<const ASTSelectQuery &>(*query);
|
||||
if (!select.where_expression)
|
||||
return "";
|
||||
|
||||
String res;
|
||||
return extractPathImpl(*select.where_expression, res) ? res : "";
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreams StorageSystemZooKeeper::read(
|
||||
const Names & column_names, ASTPtr query, const Settings & settings,
|
||||
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
|
||||
{
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
|
||||
String path = extractPath(query);
|
||||
if (path.empty())
|
||||
throw Exception("SELECT from system.zookeeper table must contain condition like path = 'path' in WHERE clause.");
|
||||
|
||||
ColumnWithNameAndType col_name { new ColumnString, new DataTypeString, "name" };
|
||||
ColumnWithNameAndType col_value { new ColumnString, new DataTypeString, "value" };
|
||||
ColumnWithNameAndType col_czxid { new ColumnInt64, new DataTypeInt64, "czxid" };
|
||||
ColumnWithNameAndType col_mzxid { new ColumnInt64, new DataTypeInt64, "mzxid" };
|
||||
ColumnWithNameAndType col_ctime { new ColumnUInt32, new DataTypeDateTime, "ctime" };
|
||||
ColumnWithNameAndType col_mtime { new ColumnUInt32, new DataTypeDateTime, "mtime" };
|
||||
ColumnWithNameAndType col_version { new ColumnInt32, new DataTypeInt32, "version" };
|
||||
ColumnWithNameAndType col_cversion { new ColumnInt32, new DataTypeInt32, "cversion" };
|
||||
ColumnWithNameAndType col_aversion { new ColumnInt32, new DataTypeInt32, "aversion" };
|
||||
ColumnWithNameAndType col_ephemeralOwner{ new ColumnInt64, new DataTypeInt64, "ephemeralOwner" };
|
||||
ColumnWithNameAndType col_dataLength { new ColumnInt32, new DataTypeInt32, "dataLength" };
|
||||
ColumnWithNameAndType col_numChildren { new ColumnInt32, new DataTypeInt32, "numChildren" };
|
||||
ColumnWithNameAndType col_pzxid { new ColumnInt64, new DataTypeInt64, "pzxid" };
|
||||
ColumnWithNameAndType col_path { new ColumnString, new DataTypeString, "path" };
|
||||
|
||||
zkutil::ZooKeeperPtr zookeeper = context.getZooKeeper();
|
||||
|
||||
zkutil::Strings nodes = zookeeper->getChildren(path);
|
||||
|
||||
for (const String & node : nodes)
|
||||
{
|
||||
String value;
|
||||
zkutil::Stat stat;
|
||||
if (!zookeeper->tryGet(path + '/' + node, value, &stat))
|
||||
continue; /// Ноду успели удалить.
|
||||
|
||||
col_name.column->insert(node);
|
||||
col_value.column->insert(value);
|
||||
col_czxid.column->insert(stat.czxid);
|
||||
col_mzxid.column->insert(stat.mzxid);
|
||||
col_ctime.column->insert(UInt64(stat.ctime / 1000));
|
||||
col_mtime.column->insert(UInt64(stat.mtime / 1000));
|
||||
col_version.column->insert(Int64(stat.version));
|
||||
col_cversion.column->insert(Int64(stat.cversion));
|
||||
col_aversion.column->insert(Int64(stat.aversion));
|
||||
col_ephemeralOwner.column->insert(stat.ephemeralOwner);
|
||||
col_dataLength.column->insert(Int64(stat.dataLength));
|
||||
col_numChildren.column->insert(Int64(stat.numChildren));
|
||||
col_pzxid.column->insert(stat.pzxid);
|
||||
col_path.column->insert(path);
|
||||
}
|
||||
|
||||
Block block{
|
||||
col_name,
|
||||
col_value,
|
||||
col_czxid,
|
||||
col_mzxid,
|
||||
col_ctime,
|
||||
col_mtime,
|
||||
col_version,
|
||||
col_cversion,
|
||||
col_aversion,
|
||||
col_ephemeralOwner,
|
||||
col_dataLength,
|
||||
col_numChildren,
|
||||
col_pzxid,
|
||||
col_path,
|
||||
};
|
||||
|
||||
return BlockInputStreams(1, new OneBlockInputStream(block));
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user