Merge pull request #2032 from yandex/system-table-macro

System table macro
This commit is contained in:
alexey-milovidov 2018-03-14 02:10:30 +03:00 committed by GitHub
commit 04cc8a52f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 157 additions and 10 deletions

View File

@ -73,6 +73,8 @@ void ConfigReloader::run()
void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed)
{
std::lock_guard<std::mutex> lock(reload_mutex);
FilesChangesTracker new_files = getNewFileList();
if (force || new_files.isDifferOrNewerThan(files))
{

View File

@ -42,6 +42,9 @@ public:
/// Call this method to run the backround thread.
void start();
/// Reload immediately. For SYSTEM RELOAD CONFIG query.
void reload() { reloadIfNewer(/* force */ true, /* throw_on_error */ true, /* fallback_to_preprocessed */ false); }
private:
void run();
@ -74,6 +77,9 @@ private:
std::atomic<bool> quit{false};
std::thread thread;
/// Locked inside reloadIfNewer.
std::mutex reload_mutex;
};
}

View File

@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <map>
#include <memory>
namespace DB
@ -21,10 +22,13 @@ public:
*/
String expand(const String & s, size_t level = 0) const;
private:
using MacroMap = std::map<String, String>;
const MacroMap getMacroMap() const { return macros; }
private:
MacroMap macros;
};
using MacrosPtr = std::shared_ptr<Macros>;
}

View File

@ -130,7 +130,8 @@ struct ContextShared
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
BackgroundProcessingPoolPtr background_pool; /// The thread pool for the background work performed by the tables.
Macros macros; /// Substitutions extracted from config.
MacrosPtr macros; /// Substitutions extracted from config.
mutable std::mutex macros_mutex;
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
std::shared_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
@ -185,6 +186,8 @@ struct ContextShared
pcg64 rng{randomSeed()};
Context::ConfigReloadCallback config_reload_callback;
ContextShared(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory_)
: runtime_components_factory(std::move(runtime_components_factory_))
{
@ -1042,15 +1045,16 @@ void Context::setDefaultFormat(const String & name)
default_format = name;
}
const Macros & Context::getMacros() const
const MacrosPtr & Context::getMacros() const
{
std::unique_lock<std::mutex> lock(shared->macros_mutex);
return shared->macros;
}
void Context::setMacros(Macros && macros)
{
/// We assume that this assignment occurs once when the server starts. If this is not the case, you need to use a mutex.
shared->macros = macros;
std::unique_lock<std::mutex> lock(shared->macros_mutex);
shared->macros = std::make_shared<Macros>(std::move(macros));
}
const Context & Context::getQueryContext() const
@ -1622,6 +1626,22 @@ time_t Context::getUptimeSeconds() const
}
void Context::setConfigReloadCallback(ConfigReloadCallback && callback)
{
/// Is initialized at server startup, so lock isn't required. Otherwise use mutex.
shared->config_reload_callback = std::move(callback);
}
void Context::reloadConfig() const
{
/// Use mutex if callback may be changed after startup.
if (!shared->config_reload_callback)
throw Exception("Can't reload config beacuse config_reload_callback is not set.", ErrorCodes::LOGICAL_ERROR);
shared->config_reload_callback();
}
void Context::shutdown()
{
system_logs.reset();

View File

@ -80,6 +80,7 @@ using Dependencies = std::vector<DatabaseAndTableName>;
using TableAndCreateAST = std::pair<StoragePtr, ASTPtr>;
using TableAndCreateASTs = std::map<String, TableAndCreateAST>;
using MacrosPtr = std::shared_ptr<Macros>;
/** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries)
@ -206,7 +207,7 @@ public:
String getDefaultFormat() const; /// If default_format is not specified, some global default format is returned.
void setDefaultFormat(const String & name);
const Macros & getMacros() const;
const MacrosPtr & getMacros() const;
void setMacros(Macros && macros);
Settings getSettings() const;
@ -360,6 +361,10 @@ public:
/// Get the server uptime in seconds.
time_t getUptimeSeconds() const;
using ConfigReloadCallback = std::function<void()>;
void setConfigReloadCallback(ConfigReloadCallback && callback);
void reloadConfig() const;
void shutdown();
enum class ApplicationType

View File

@ -1132,7 +1132,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
}
}
query->cluster = context.getMacros().expand(query->cluster);
query->cluster = context.getMacros()->expand(query->cluster);
ClusterPtr cluster = context.getCluster(query->cluster);
DDLWorker & ddl_worker = context.getDDLWorker();

View File

@ -97,6 +97,9 @@ BlockIO InterpreterSystemQuery::execute()
throw Exception(status.message, status.code);
break;
}
case Type::RELOAD_CONFIG:
context.reloadConfig();
break;
case Type::STOP_LISTEN_QUERIES:
case Type::START_LISTEN_QUERIES:
case Type::RESTART_REPLICAS:

View File

@ -39,6 +39,8 @@ const char * ASTSystemQuery::typeToString(Type type)
return "RELOAD DICTIONARY";
case Type::RELOAD_DICTIONARIES:
return "RELOAD DICTIONARIES";
case Type::RELOAD_CONFIG:
return "RELOAD CONFIG";
case Type::STOP_MERGES:
return "STOP MERGES";
case Type::START_MERGES:

View File

@ -24,6 +24,7 @@ public:
SYNC_REPLICA,
RELOAD_DICTIONARY,
RELOAD_DICTIONARIES,
RELOAD_CONFIG,
STOP_MERGES,
START_MERGES,
STOP_REPLICATION_QUEUES,

View File

@ -231,6 +231,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
[&](ConfigurationPtr config)
{
global_context->setClustersConfig(config);
global_context->setMacros(Macros(*config, "macros"));
},
/* already_loaded = */ true);
@ -250,6 +251,12 @@ int Server::main(const std::vector<std::string> & /*args*/)
[&](ConfigurationPtr config) { global_context->setUsersConfig(config); },
/* already_loaded = */ false);
/// Reload config in SYSTEM RELOAD CONFIG query.
global_context->setConfigReloadCallback([&]() {
main_config_reloader->reload();
users_config_reloader->reload();
});
/// Limit on total number of concurrently executed queries.
global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0));

View File

@ -144,7 +144,7 @@ StorageDistributed::StorageDistributed(
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
name(name_),
remote_database(remote_database_), remote_table(remote_table_),
context(context_), cluster_name(context.getMacros().expand(cluster_name_)), has_sharding_key(sharding_key_),
context(context_), cluster_name(context.getMacros()->expand(cluster_name_)), has_sharding_key(sharding_key_),
sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, columns).getActions(false) : nullptr),
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/'))

View File

@ -190,8 +190,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, context(context_),
current_zookeeper(context.getZooKeeper()), database_name(database_name_),
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'),
zookeeper_path(context.getMacros().expand(zookeeper_path_)),
replica_name(context.getMacros().expand(replica_name_)),
zookeeper_path(context.getMacros()->expand(zookeeper_path_)),
replica_name(context.getMacros()->expand(replica_name_)),
data(database_name, table_name,
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,

View File

@ -0,0 +1,48 @@
#include <Common/Macros.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemMacros.h>
#include <Interpreters/Context.h>
namespace DB
{
StorageSystemMacros::StorageSystemMacros(const std::string & name_)
: name(name_)
{
columns = NamesAndTypesList{
{"macro", std::make_shared<DataTypeString>()},
{"substitution", std::make_shared<DataTypeString>()},
};
}
BlockInputStreams StorageSystemMacros::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
auto macros = context.getMacros();
for (const auto & macro : macros->getMacroMap())
{
res_columns[0]->insert(macro.first);
res_columns[1]->insert(macro.second);
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
namespace DB
{
class Context;
/** Information about macros for introspection.
*/
class StorageSystemMacros : public ext::shared_ptr_helper<StorageSystemMacros>, public IStorage
{
public:
std::string getName() const override { return "SystemMacros"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
protected:
StorageSystemMacros(const std::string & name_);
};
}

View File

@ -10,6 +10,7 @@
#include <Storages/System/StorageSystemEvents.h>
#include <Storages/System/StorageSystemFunctions.h>
#include <Storages/System/StorageSystemGraphite.h>
#include <Storages/System/StorageSystemMacros.h>
#include <Storages/System/StorageSystemMerges.h>
#include <Storages/System/StorageSystemMetrics.h>
#include <Storages/System/StorageSystemModels.h>
@ -56,6 +57,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
system_database.attachTable("models", StorageSystemModels::create("models"));
system_database.attachTable("clusters", StorageSystemClusters::create("clusters"));
system_database.attachTable("graphite_retentions", StorageSystemGraphite::create("graphite_retentions"));
system_database.attachTable("macros", StorageSystemMacros::create("macros"));
if (has_zookeeper)
system_database.attachTable("zookeeper", StorageSystemZooKeeper::create("zookeeper"));

View File

@ -74,6 +74,17 @@ def test_DROP_DNS_CACHE(started_cluster):
assert TSV(instance.query("SELECT DISTINCT host_name, host_address FROM system.clusters WHERE cluster='lost_host_cluster'")) == TSV("lost_host\t127.0.0.1\n")
def test_RELOAD_CONFIG_AND_MACROS(started_cluster):
macros = "<yandex><macros><mac>ro</mac></macros></yandex>"
create_macros = 'echo "{}" > /etc/clickhouse-server/config.d/macros.xml'.format(macros)
instance = cluster.instances['ch1']
instance.exec_in_container(['bash', '-c', create_macros], privileged=True, user='root')
instance.query("SYSTEM RELOAD CONFIG")
assert TSV(instance.query("select * from system.macros")) == TSV("mac\tro\n")
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():