Merge pull request #1750 from yandex/CLICKHOUSE-3346

Fault-tolerant cluster copier util
This commit is contained in:
Vitaliy Lyudvichenko 2018-01-31 13:21:51 +03:00 committed by GitHub
commit 94f38d744e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
130 changed files with 2666 additions and 358 deletions

View File

@ -4,6 +4,7 @@
#include <Poco/Net/DNS.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/isLocalAddress.h>
#include <Common/ProfileEvents.h>
#include <Interpreters/Settings.h>
@ -39,14 +40,7 @@ ConnectionPoolWithFailover::ConnectionPoolWithFailover(
for (size_t i = 0; i < nested_pools.size(); ++i)
{
ConnectionPool & connection_pool = dynamic_cast<ConnectionPool &>(*nested_pools[i]);
const std::string & host = connection_pool.getHost();
size_t hostname_difference = 0;
for (size_t i = 0; i < std::min(local_hostname.length(), host.length()); ++i)
if (local_hostname[i] != host[i])
++hostname_difference;
hostname_differences[i] = hostname_difference;
hostname_differences[i] = getHostNameDifference(local_hostname, connection_pool.getHost());
}
}

View File

@ -138,11 +138,12 @@ private:
struct OpResult : public zoo_op_result_t
{
/// Указатели в этой структуре указывают на поля в классе Op.
/// Поэтому деструктор не нужен
/// Pointers in this class point to fields of class Op.
/// Op instances have the same (or longer lifetime), therefore destructor is not required.
};
using Ops = std::vector<std::unique_ptr<Op>>;
using OpPtr = std::unique_ptr<Op>;
using Ops = std::vector<OpPtr>;
using OpResults = std::vector<OpResult>;
using OpResultsPtr = std::shared_ptr<OpResults>;
using Strings = std::vector<std::string>;

View File

@ -2,6 +2,7 @@
#include <string>
/** Get the FQDN for the local server by resolving DNS hostname - similar to calling the 'hostname' tool with the -f flag.
* If it does not work, return hostname - similar to calling 'hostname' without flags or 'uname -n'.
*/

View File

@ -10,25 +10,35 @@
namespace DB
{
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port)
bool isLocalAddress(const Poco::Net::SocketAddress & address)
{
static auto interfaces = Poco::Net::NetworkInterface::list();
if (clickhouse_port == address.port())
{
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&] (const Poco::Net::NetworkInterface & interface)
{
/** Compare the addresses without taking into account `scope`.
* Theoretically, this may not be correct - depends on `route` setting
* - through which interface we will actually access the specified address.
*/
return interface.address().length() == address.host().length()
&& 0 == memcmp(interface.address().addr(), address.host().addr(), address.host().length());
});
}
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&] (const Poco::Net::NetworkInterface & interface)
{
/** Compare the addresses without taking into account `scope`.
* Theoretically, this may not be correct - depends on `route` setting
* - through which interface we will actually access the specified address.
*/
return interface.address().length() == address.host().length()
&& 0 == memcmp(interface.address().addr(), address.host().addr(), address.host().length());
});
}
return false;
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port)
{
return clickhouse_port == address.port() && isLocalAddress(address);
}
size_t getHostNameDifference(const std::string & local_hostname, const std::string & host)
{
size_t hostname_difference = 0;
for (size_t i = 0; i < std::min(local_hostname.length(), host.length()); ++i)
if (local_hostname[i] != host[i])
++hostname_difference;
return hostname_difference;
}
}

View File

@ -23,4 +23,8 @@ namespace DB
*/
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port);
bool isLocalAddress(const Poco::Net::SocketAddress & address);
/// Returns number of different bytes in hostnames, used for load balancing
size_t getHostNameDifference(const std::string & local_hostname, const std::string & host);
}

View File

@ -4,7 +4,8 @@
namespace DB
{
SquashingBlockInputStream::SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes)
SquashingBlockInputStream::SquashingBlockInputStream(const BlockInputStreamPtr & src,
size_t min_block_size_rows, size_t min_block_size_bytes)
: transform(min_block_size_rows, min_block_size_bytes)
{
children.emplace_back(src);

View File

@ -12,7 +12,7 @@ namespace DB
class SquashingBlockInputStream : public IProfilingBlockInputStream
{
public:
SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes);
SquashingBlockInputStream(const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "Squashing"; }

View File

@ -16,20 +16,21 @@ bool isAtomicSet(std::atomic<bool> * val)
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
template <typename Pred>
void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, Pred && is_cancelled)
{
from.readPrefix();
to.writePrefix();
while (Block block = from.read())
{
if (isAtomicSet(is_cancelled))
if (is_cancelled())
break;
to.write(block);
}
if (isAtomicSet(is_cancelled))
if (is_cancelled())
return;
/// For outputting additional information in some formats.
@ -42,11 +43,28 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<boo
to.setExtremes(input->getExtremes());
}
if (isAtomicSet(is_cancelled))
if (is_cancelled())
return;
from.readSuffix();
to.writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
{
copyDataImpl(from, to, is_cancelled);
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <atomic>
#include <functional>
namespace DB
@ -14,4 +15,6 @@ class IBlockOutputStream;
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
}

View File

@ -173,4 +173,9 @@ void DatabaseDictionary::drop()
/// Additional actions to delete database are not required.
}
String DatabaseDictionary::getDataPath(const Context &) const
{
return {};
}
}

View File

@ -93,6 +93,8 @@ public:
const Context & context,
const String & table_name) const override;
String getDataPath(const Context & context) const override;
void shutdown() override;
void drop() override;
};

View File

@ -15,11 +15,11 @@ namespace ErrorCodes
DatabasePtr DatabaseFactory::get(
const String & engine_name,
const String & database_name,
const String & path,
const String & metadata_path,
Context & context)
{
if (engine_name == "Ordinary")
return std::make_shared<DatabaseOrdinary>(database_name, path);
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name);
else if (engine_name == "Dictionary")

View File

@ -13,7 +13,7 @@ public:
static DatabasePtr get(
const String & engine_name,
const String & database_name,
const String & path,
const String & metadata_path,
Context & context);
};

View File

@ -152,4 +152,9 @@ void DatabaseMemory::drop()
/// Additional actions to delete database are not required.
}
String DatabaseMemory::getDataPath(const Context &) const
{
return {};
}
}

View File

@ -84,6 +84,8 @@ public:
const Context & context,
const String & table_name) const override;
String getDataPath(const Context & context) const override;
void shutdown() override;
void drop() override;
};

View File

@ -90,10 +90,10 @@ static void loadTable(
}
DatabaseOrdinary::DatabaseOrdinary(
const String & name_, const String & path_)
: DatabaseMemory(name_), path(path_)
DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & metadata_path_, const Context & context)
: DatabaseMemory(name_), metadata_path(metadata_path_), data_path(context.getPath() + "data/" + escapeForFileName(name_) + "/")
{
Poco::File(data_path).createDirectory();
}
@ -108,7 +108,7 @@ void DatabaseOrdinary::loadTables(
FileNames file_names;
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(path); dir_it != dir_end; ++dir_it)
for (Poco::DirectoryIterator dir_it(metadata_path); dir_it != dir_end; ++dir_it)
{
/// For '.svn', '.gitignore' directory and similar.
if (dir_it.name().at(0) == '.')
@ -130,7 +130,7 @@ void DatabaseOrdinary::loadTables(
if (endsWith(dir_it.name(), ".sql"))
file_names.push_back(dir_it.name());
else
throw Exception("Incorrect file extension: " + dir_it.name() + " in metadata directory " + path,
throw Exception("Incorrect file extension: " + dir_it.name() + " in metadata directory " + metadata_path,
ErrorCodes::INCORRECT_FILE_NAME);
}
@ -162,7 +162,7 @@ void DatabaseOrdinary::loadTables(
watch.restart();
}
loadTable(context, path, *this, name, data_path, table, has_force_restore_data_flag);
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
}
};
@ -269,7 +269,7 @@ void DatabaseOrdinary::createTable(
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
String table_metadata_path = getTableMetadataPath(path, table_name);
String table_metadata_path = getTableMetadataPath(metadata_path, table_name);
String table_metadata_tmp_path = table_metadata_path + ".tmp";
String statement;
@ -312,7 +312,7 @@ void DatabaseOrdinary::removeTable(
{
StoragePtr res = detachTable(table_name);
String table_metadata_path = getTableMetadataPath(path, table_name);
String table_metadata_path = getTableMetadataPath(metadata_path, table_name);
try
{
@ -374,7 +374,7 @@ void DatabaseOrdinary::renameTable(
throw Exception{e};
}
ASTPtr ast = getCreateQueryImpl(path, table_name);
ASTPtr ast = getCreateQueryImpl(metadata_path, table_name);
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ast_create_query.table = to_table_name;
@ -388,7 +388,7 @@ time_t DatabaseOrdinary::getTableMetadataModificationTime(
const Context & /*context*/,
const String & table_name)
{
String table_metadata_path = getTableMetadataPath(path, table_name);
String table_metadata_path = getTableMetadataPath(metadata_path, table_name);
Poco::File meta_file(table_metadata_path);
if (meta_file.exists())
@ -406,7 +406,7 @@ ASTPtr DatabaseOrdinary::getCreateQuery(
const Context & /*context*/,
const String & table_name) const
{
ASTPtr ast = getCreateQueryImpl(path, table_name);
ASTPtr ast = getCreateQueryImpl(metadata_path, table_name);
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ast_create_query.attach = false;
@ -455,8 +455,8 @@ void DatabaseOrdinary::alterTable(
/// Read the definition of the table and replace the necessary parts with new ones.
String table_name_escaped = escapeForFileName(name);
String table_metadata_tmp_path = path + "/" + table_name_escaped + ".sql.tmp";
String table_metadata_path = path + "/" + table_name_escaped + ".sql";
String table_metadata_tmp_path = metadata_path + "/" + table_name_escaped + ".sql.tmp";
String table_metadata_path = metadata_path + "/" + table_name_escaped + ".sql";
String statement;
{
@ -499,4 +499,9 @@ void DatabaseOrdinary::alterTable(
}
}
String DatabaseOrdinary::getDataPath(const Context &) const
{
return data_path;
}
}

View File

@ -13,10 +13,11 @@ namespace DB
class DatabaseOrdinary : public DatabaseMemory
{
protected:
const String path;
const String metadata_path;
const String data_path;
public:
DatabaseOrdinary(const String & name_, const String & path_);
DatabaseOrdinary(const String & name_, const String & metadata_path_, const Context & context);
String getEngineName() const override { return "Ordinary"; }
@ -58,6 +59,8 @@ public:
const Context & context,
const String & table_name) const override;
String getDataPath(const Context & context) const override;
void shutdown() override;
void drop() override;

View File

@ -129,6 +129,9 @@ public:
const Context & context,
const String & name) const = 0;
/// Returns path for persistent data storage if the database supports it, empty string otherwise
virtual String getDataPath(const Context & context) const = 0;
/// Ask all tables to complete the background threads they are using and delete all table objects.
virtual void shutdown() = 0;

View File

@ -132,19 +132,26 @@ Clusters::Clusters(Poco::Util::AbstractConfiguration & config, const Settings &
ClusterPtr Clusters::getCluster(const std::string & cluster_name) const
{
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
auto it = impl.find(cluster_name);
return (it != impl.end()) ? it->second : nullptr;
}
void Clusters::setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster)
{
std::lock_guard lock(mutex);
impl[cluster_name] = cluster;
}
void Clusters::updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_name, config_keys);
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
for (const auto & key : config_keys)
{
@ -163,11 +170,12 @@ void Clusters::updateClusters(Poco::Util::AbstractConfiguration & config, const
Clusters::Impl Clusters::getContainer() const
{
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
/// The following line copies container of shared_ptrs to return value under lock
return impl;
}
/// Implementation of `Cluster` class
Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name)

View File

@ -97,6 +97,7 @@ public:
UInt32 shard_num;
UInt32 weight;
Addresses local_addresses;
/// nullptr if there are no remote addresses
ConnectionPoolWithFailoverPtr pool;
bool has_internal_replication;
};
@ -168,8 +169,9 @@ public:
Clusters & operator=(const Clusters &) = delete;
ClusterPtr getCluster(const std::string & cluster_name) const;
void setCluster(const String & cluster_name, const ClusterPtr & cluster);
void updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");
void updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name);
public:
using Impl = std::map<String, ClusterPtr>;

View File

@ -1370,13 +1370,27 @@ Clusters & Context::getClusters() const
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
void Context::setClustersConfig(const ConfigurationPtr & config)
void Context::setClustersConfig(const ConfigurationPtr & config, const String & config_name)
{
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
shared->clusters_config = config;
if (shared->clusters)
shared->clusters->updateClusters(*shared->clusters_config, settings);
if (!shared->clusters)
shared->clusters = std::make_unique<Clusters>(*shared->clusters_config, settings, config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name);
}
void Context::setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster)
{
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
if (!shared->clusters)
throw Exception("Clusters are not set", ErrorCodes::LOGICAL_ERROR);
shared->clusters->setCluster(cluster_name, cluster);
}

View File

@ -318,8 +318,10 @@ public:
Clusters & getClusters() const;
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
void setClustersConfig(const ConfigurationPtr & config, const String & config_name = "remote_servers");
/// Sets custom cluster, but doesn't update configuration
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
void reloadClusterConfig();
void setClustersConfig(const ConfigurationPtr & config);
Compiler & getCompiler();
QueryLog & getQueryLog();

View File

@ -7,11 +7,13 @@ namespace DB
{
class Context;
class Cluster;
class InterpreterCheckQuery : public IInterpreter
{
public:
InterpreterCheckQuery(const ASTPtr & query_ptr_, const Context & context_);
BlockIO execute() override;
private:
@ -19,6 +21,7 @@ private:
private:
ASTPtr query_ptr;
const Context & context;
Block result;
};

View File

@ -103,13 +103,10 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
String database_name_escaped = escapeForFileName(database_name);
/// Create directories for tables data and metadata.
/// Create directories for tables metadata.
String path = context.getPath();
String data_path = path + "data/" + database_name_escaped + "/";
String metadata_path = path + "metadata/" + database_name_escaped + "/";
Poco::File(metadata_path).createDirectory();
Poco::File(data_path).createDirectory();
DatabasePtr database = DatabaseFactory::get(database_engine_name, database_name, metadata_path, context);
@ -458,13 +455,9 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
String current_database = context.getCurrentDatabase();
String database_name = create.database.empty() ? current_database : create.database;
String database_name_escaped = escapeForFileName(database_name);
String table_name = create.table;
String table_name_escaped = escapeForFileName(table_name);
String data_path = path + "data/" + database_name_escaped + "/";
String metadata_path = path + "metadata/" + database_name_escaped + "/" + table_name_escaped + ".sql";
// If this is a stub ATTACH query, read the query definition from the database
if (create.attach && !create.storage && !create.columns)
{
@ -511,9 +504,13 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
std::unique_ptr<DDLGuard> guard;
String data_path;
DatabasePtr database;
if (!create.is_temporary)
{
context.assertDatabaseExists(database_name);
database = context.getDatabase(database_name);
data_path = database->getDataPath(context);
/** If the table already exists, and the request specifies IF NOT EXISTS,
* then we allow concurrent CREATE queries (which do nothing).
@ -548,7 +545,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (create.is_temporary)
context.getSessionContext().addExternalTable(table_name, res);
else
context.getDatabase(database_name)->createTable(context, table_name, res, query_ptr);
database->createTable(context, table_name, res, query_ptr);
}
res->startup();

View File

@ -66,6 +66,7 @@ BlockIO InterpreterDropQuery::execute()
String data_path = path + "data/" + database_name_escaped + "/";
String metadata_path = path + "metadata/" + database_name_escaped + "/";
String database_metadata_path = path + "metadata/" + database_name_escaped + ".sql";
auto database = context.tryGetDatabase(database_name);
if (!database && !drop.if_exists)
@ -163,6 +164,11 @@ BlockIO InterpreterDropQuery::execute()
Poco::File(data_path).remove(false);
Poco::File(metadata_path).remove(false);
/// Old ClickHouse versions did not store database.sql files
Poco::File database_metadata_file(database_metadata_path);
if (database_metadata_file.exists())
database_metadata_file.remove(false);
}
return {};

View File

@ -136,7 +136,7 @@ void loadMetadataSystem(Context & context)
Poco::File(global_path + "data/" SYSTEM_DATABASE).createDirectories();
Poco::File(global_path + "metadata/" SYSTEM_DATABASE).createDirectories();
auto system_database = std::make_shared<DatabaseOrdinary>(SYSTEM_DATABASE, global_path + "metadata/" SYSTEM_DATABASE);
auto system_database = std::make_shared<DatabaseOrdinary>(SYSTEM_DATABASE, global_path + "metadata/" SYSTEM_DATABASE, context);
context.addDatabase(SYSTEM_DATABASE, system_database);
}

View File

@ -82,7 +82,7 @@ bool ParserList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
auto list = std::make_shared<ASTExpressionList>();
node = list;
while (1)
while (true)
{
if (first)
{

View File

@ -317,6 +317,21 @@ ASTPtr parseQuery(
}
ASTPtr parseQuery(
IParser & parser,
const std::string & query,
const std::string & query_description)
{
return parseQuery(parser, query.data(), query.data() + query.size(), query_description);
}
ASTPtr parseQuery(IParser & parser, const std::string & query)
{
return parseQuery(parser, query.data(), query.data() + query.size(), parser.getName());
}
std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, std::vector<std::string> & queries_list)
{
ASTPtr ast;
@ -357,4 +372,5 @@ std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, s
return std::make_pair(begin, pos == end);
}
}

View File

@ -32,6 +32,15 @@ ASTPtr parseQuery(
const char * end,
const std::string & description);
ASTPtr parseQuery(
IParser & parser,
const std::string & query,
const std::string & query_description);
ASTPtr parseQuery(
IParser & parser,
const std::string & query);
/** Split queries separated by ; on to list of single queries
* Returns pointer to the end of last sucessfuly parsed query (first), and true if all queries are sucessfuly parsed (second)

View File

@ -44,6 +44,9 @@ target_link_libraries (clickhouse-compressor-lib clickhouse_common_io ${Boost_PR
add_library (clickhouse-format-lib ${SPLIT_SHARED} Format.cpp)
target_link_libraries (clickhouse-format-lib clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
add_library (clickhouse-cluster-copier-lib ClusterCopier.cpp)
target_link_libraries (clickhouse-cluster-copier-lib clickhouse-server-lib clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions)
if (USE_EMBEDDED_COMPILER)
link_directories (${LLVM_LIBRARY_DIRS})
add_subdirectory ("Compiler-${LLVM_VERSION}")
@ -67,8 +70,11 @@ if (CLICKHOUSE_SPLIT_BINARY)
target_link_libraries (clickhouse-compressor clickhouse-compressor-lib)
add_executable (clickhouse-format clickhouse-format.cpp)
target_link_libraries (clickhouse-format clickhouse-format-lib dbms)
add_executable (clickhouse-cluster-copier clickhouse-cluster-copier.cpp)
target_link_libraries (clickhouse-cluster-copier clickhouse-cluster-copier-lib)
set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-performance-test clickhouse-extract-from-config clickhouse-format)
set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-performance-test
clickhouse-extract-from-config clickhouse-format clickhouse-cluster-copier)
if (USE_EMBEDDED_COMPILER)
add_executable (clickhouse-clang clickhouse-clang.cpp)
@ -100,6 +106,7 @@ else ()
clickhouse-extract-from-config-lib
clickhouse-compressor-lib
clickhouse-format-lib
clickhouse-cluster-copier-lib
)
add_custom_target (clickhouse-server ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-server DEPENDS clickhouse)
@ -110,6 +117,7 @@ else ()
add_custom_target (clickhouse-extract-from-config ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-extract-from-config DEPENDS clickhouse)
add_custom_target (clickhouse-compressor ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-compressor DEPENDS clickhouse)
add_custom_target (clickhouse-format ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-format DEPENDS clickhouse)
add_custom_target (clickhouse-cluster-copier ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-cluster-copier DEPENDS clickhouse)
# install always because depian package want this files:
add_custom_target (clickhouse-clang ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-clang DEPENDS clickhouse)
add_custom_target (clickhouse-lld ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-lld DEPENDS clickhouse)
@ -124,6 +132,7 @@ else ()
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-extract-from-config
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-compressor
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-format
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-cluster-copier
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-clang
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-lld
DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,205 @@
#pragma once
#include <Poco/Util/ServerApplication.h>
/* = clickhouse-cluster-copier util =
* Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
*
* Configuration of copying tasks is set in special ZooKeeper node (called the description node).
* A ZooKeeper path to the description node is specified via --task-path </task/path> parameter.
* So, node /task/path/description should contain special XML content describing copying tasks.
*
* Simultaneously many clickhouse-cluster-copier processes located on any servers could execute the same task.
* ZooKeeper node /task/path/ is used by the processes to coordinate their work.
* You must not add additional child nodes to /task/path/.
*
* Currently you are responsible for launching cluster-copier processes.
* You can launch as many processes as you want, whenever and wherever you want.
* Each process try to select nearest available shard of source cluster and copy some part of data (partition) from it to the whole
* destination cluster with resharding.
* Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage.
*
* Since the workers coordinate their work via ZooKeeper, in addition to --task-path </task/path> you have to specify ZooKeeper
* configuration via --config-file <zookeeper.xml> parameter. Example of zookeeper.xml:
<yandex>
<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>
* When you run clickhouse-cluster-copier --config-file <zookeeper.xml> --task-path </task/path>
* the process connects to ZooKeeper, reads tasks config from /task/path/description and executes them.
*
*
* = Format of task config =
<yandex>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
...
</source_cluster>
<destination_cluster>
...
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>2</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations.
They are overlaid by <settings_pull/> and <settings_push/> respectively -->
<settings>
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- Name of the table task, it must be an unique name suitable for ZooKeeper node name -->
<table_hits>
<-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
<cluster_pull>source_cluster</cluster_pull>
<database_pull>test</database_pull>
<table_pull>hits</table_pull>
<-- Destination cluster name and tables in which the data should be inserted -->
<cluster_push>destination_cluster</cluster_push>
<database_push>test</database_push>
<table_push>hits2</table_push>
<!-- Engine of destination tables.
If destination tables have not be created, workers create them using columns definition from source tables and engine
definition from here.
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
specify partitions that should be copied in <enabled_partitions/>.
NOTE: Currently partition key of source and destination tables should be the same.
-->
<engine>ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/hits2/{shard}/hits2', '{replica}', EventDate, (CounterID, EventDate), 8192)</engine>
<!-- Sharding key used to insert data to destination cluster -->
<sharding_key>intHash32(UserID)</sharding_key>
<!-- Optional expression that filter data while pull them from source servers -->
<where_condition>CounterID != 0</where_condition>
<!-- Optional section, it specifies partitions that should be copied, other partition will be ignored -->
<enabled_partitions>
<partition>201712</partition>
<partition>201801</partition>
...
</enabled_partitions>
</table_hits>
</table_visits>
...
</table_visits>
...
</tables>
</yandex>
* = Implementation details =
*
* cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
* Distributed table (to preform data resharding). So, worker job is a partition of a source shard.
* A job has three states: Active, Finished and Abandoned. Abandoned means that worker died and did not finish the job.
*
* If an error occurred during the copying (a worker failed or a worker did not finish the INSERT), then the whole partition (on
* all destination servers) should be dropped and refilled. So, copying entity is a partition of all destination shards.
* If a failure is detected a special /is_dirty node is created in ZooKeeper signalling that other workers copying the same partition
* should stop, after a refilling procedure should start.
*
* ZooKeeper task node has the following structure:
* /task/path_root - path passed in --task-path parameter
* /description - contains user-defined XML config of the task
* /task_active_workers - contains ephemeral nodes of all currently active workers, used to implement max_workers limitation
* /server_fqdn#PID_timestamp - cluster-copier worker ID
* ...
* /tables - directory with table tasks
* /table_hits - directory of table_hits task
* /partition1 - directory for partition1
* /shards - directory for source cluster shards
* /1 - worker job for the first shard of partition1 of table test.hits
* Contains info about current status (Active or Finished) and worker ID.
* /2
* ...
* /partition_active_workers
* /1 - for each job in /shards a corresponding ephemeral node created in /partition_active_workers
* It is used to detect Abandoned jobs (if there is Active node in /shards and there is no node in
* /partition_active_workers).
* Also, it is used to track active workers in the partition (when we need to refill the partition we do
* not DROP PARTITION while there are active workers)
* /2
* ...
* /is_dirty - the node is set if some worker detected that an error occurred (the INSERT is failed or an Abandoned node is
* detected). If the node appeared workers in this partition should stop and start cleaning and refilling
* partition procedure.
* During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition
* workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node.
* /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker.
* /test_visits
* ...
*/
namespace DB
{
class ClusterCopierApp : public Poco::Util::ServerApplication
{
public:
void initialize(Poco::Util::Application & self) override;
void handleHelp(const std::string &, const std::string &);
void defineOptions(Poco::Util::OptionSet & options) override;
int main(const std::vector<std::string> &) override;
private:
void mainImpl();
void setupLogging();
std::string config_xml_path;
std::string task_path;
std::string log_level = "debug";
bool is_safe_mode = false;
double copy_fault_probability = 0;
bool is_help = false;
std::string base_dir;
std::string process_path;
std::string process_id;
std::string host_id;
};
}

View File

@ -463,6 +463,14 @@ static const char * minimal_default_user_xml =
"</yandex>";
static ConfigurationPtr getConfigurationFromXMLString(const char * xml_data)
{
std::stringstream ss{std::string{xml_data}};
Poco::XML::InputSource input_source{ss};
return {new Poco::Util::XMLConfiguration{&input_source}};
}
void LocalServer::setupUsers()
{
ConfigurationPtr users_config;
@ -477,11 +485,7 @@ void LocalServer::setupUsers()
}
else
{
std::stringstream default_user_stream;
default_user_stream << minimal_default_user_xml;
Poco::XML::InputSource default_user_source(default_user_stream);
users_config = ConfigurationPtr(new Poco::Util::XMLConfiguration(&default_user_source));
users_config = getConfigurationFromXMLString(minimal_default_user_xml);
}
if (users_config)

View File

@ -0,0 +1,2 @@
int mainEntryClickHouseClusterCopier(int argc, char ** argv);
int main(int argc_, char ** argv_) { return mainEntryClickHouseClusterCopier(argc_, argv_); }

View File

@ -18,6 +18,7 @@ int mainEntryClickHousePerformanceTest(int argc, char ** argv);
int mainEntryClickHouseExtractFromConfig(int argc, char ** argv);
int mainEntryClickHouseCompressor(int argc, char ** argv);
int mainEntryClickHouseFormat(int argc, char ** argv);
int mainEntryClickHouseClusterCopier(int argc, char ** argv);
#if USE_EMBEDDED_COMPILER
int mainEntryClickHouseClang(int argc, char ** argv);
@ -41,6 +42,7 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
{"extract-from-config", mainEntryClickHouseExtractFromConfig},
{"compressor", mainEntryClickHouseCompressor},
{"format", mainEntryClickHouseFormat},
{"copier", mainEntryClickHouseClusterCopier},
#if USE_EMBEDDED_COMPILER
{"clang", mainEntryClickHouseClang},
{"lld", mainEntryClickHouseLLD},

View File

@ -53,9 +53,10 @@ namespace ErrorCodes
}
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast,
const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_)
: storage(storage), query_ast(query_ast), cluster(cluster_), insert_sync(insert_sync_), insert_timeout(insert_timeout_)
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_,
const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_)
: storage(storage), query_ast(query_ast), cluster(cluster_), settings(settings_), insert_sync(insert_sync_),
insert_timeout(insert_timeout_)
{
}
@ -392,9 +393,21 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
{
InterpreterInsertQuery interp{query_ast, storage.context};
std::unique_ptr<Context> local_context;
std::optional<InterpreterInsertQuery> interp;
auto block_io = interp.execute();
/// Async insert does not support settings forwarding yet whereas sync one supports
if (insert_sync)
interp.emplace(query_ast, storage.context);
else
{
/// Overwrite global settings by user settings
local_context = std::make_unique<Context>(storage.context);
local_context->setSettings(settings);
interp.emplace(query_ast, *local_context);
}
auto block_io = interp->execute();
block_io.out->writePrefix();
for (size_t i = 0; i < repeats; ++i)
@ -410,7 +423,7 @@ void DistributedBlockOutputStream::writeToShardSync(const Block & block, const s
auto connection = pool->get();
const auto & query_string = queryToString(query_ast);
RemoteBlockOutputStream remote{*connection, query_string};
RemoteBlockOutputStream remote{*connection, query_string, &settings};
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};

View File

@ -32,7 +32,8 @@ class StorageDistributed;
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_);
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_,
const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_);
void write(const Block & block) override;
@ -88,6 +89,7 @@ private:
StorageDistributed & storage;
ASTPtr query_ast;
ClusterPtr cluster;
const Settings & settings;
bool insert_sync;
UInt64 insert_timeout;
size_t blocks_inserted = 0;

View File

@ -21,6 +21,7 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
extern const int DUPLICATE_COLUMN;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
}
@ -289,4 +290,15 @@ void ITableDeclaration::check(const Block & block, bool need_all) const
}
}
ITableDeclaration::ITableDeclaration(const NamesAndTypesList & columns, const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns, const ColumnDefaults & column_defaults)
: columns{columns},
materialized_columns{materialized_columns},
alias_columns{alias_columns},
column_defaults{column_defaults}
{
if (columns.empty())
throw Exception("Empty list of columns passed to storage constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
}
}

View File

@ -84,20 +84,21 @@ public:
ITableDeclaration() = default;
ITableDeclaration(
const NamesAndTypesList & columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults)
: materialized_columns{materialized_columns},
alias_columns{alias_columns},
column_defaults{column_defaults}
{}
const ColumnDefaults & column_defaults);
NamesAndTypesList columns;
NamesAndTypesList materialized_columns{};
NamesAndTypesList alias_columns{};
ColumnDefaults column_defaults{};
private:
virtual const NamesAndTypesList & getColumnsListImpl() const = 0;
virtual const NamesAndTypesList & getColumnsListImpl() const
{
return columns;
}
using ColumnsListRange = boost::range::joined_range<const NamesAndTypesList, const NamesAndTypesList>;
/// Returns a lazily joined range of table's ordinary and materialized columns, without unnecessary copying

View File

@ -93,7 +93,8 @@ MergeTreeData::MergeTreeData(
bool require_part_metadata_,
bool attach,
BrokenPartCallback broken_part_callback_)
: ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
: ITableDeclaration{columns_, materialized_columns_, alias_columns_, column_defaults_},
context(context_),
sampling_expression(sampling_expression_),
index_granularity(settings_.index_granularity),
merging_params(merging_params_),
@ -102,7 +103,7 @@ MergeTreeData::MergeTreeData(
partition_expr_ast(partition_expr_ast_),
require_part_metadata(require_part_metadata_),
database_name(database_), table_name(table_),
full_path(full_path_), columns(columns_),
full_path(full_path_),
broken_part_callback(broken_part_callback_),
log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")),
data_parts_by_name(data_parts_indexes.get<TagByName>()),

View File

@ -550,8 +550,6 @@ private:
String table_name;
String full_path;
NamesAndTypesList columns;
/// Current column sizes in compressed and uncompressed form.
ColumnSizes column_sizes;

View File

@ -56,8 +56,8 @@ StorageBuffer::StorageBuffer(const std::string & name_, const NamesAndTypesList
Context & context_,
size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_, bool allow_materialized_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
name(name_), columns(columns_), context(context_),
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
name(name_), context(context_),
num_shards(num_shards_), buffers(num_shards_),
min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
destination_database(destination_database_), destination_table(destination_table_),

View File

@ -53,8 +53,6 @@ public:
std::string getName() const override { return "Buffer"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -82,7 +80,6 @@ public:
private:
String name;
NamesAndTypesList columns;
Context & context;

View File

@ -14,8 +14,6 @@ public:
std::string getTableName() const override { return table_name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -25,7 +23,7 @@ public:
private:
String table_name;
NamesAndTypesList columns;
String column_description_file_name;
String data_description_file_name;
Block sample_block;

View File

@ -30,8 +30,8 @@ StorageDictionary::StorageDictionary(
const ColumnDefaults & column_defaults_,
const DictionaryStructure & dictionary_structure_,
const String & dictionary_name_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
columns(columns_), dictionary_name(dictionary_name_),
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
dictionary_name(dictionary_name_),
logger(&Poco::Logger::get("StorageDictionary"))
{
checkNamesAndTypesCompatibleWithDictionary(dictionary_structure_);

View File

@ -37,7 +37,6 @@ private:
using Ptr = MultiVersion<IDictionaryBase>::Version;
String table_name;
NamesAndTypesList columns;
String dictionary_name;
Poco::Logger * logger;

View File

@ -50,6 +50,8 @@ namespace DB
namespace ErrorCodes
{
extern const int STORAGE_REQUIRES_PARAMETER;
extern const int BAD_ARGUMENTS;
extern const int READONLY;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
}
@ -141,8 +143,8 @@ StorageDistributed::StorageDistributed(
const Context & context_,
const ASTPtr & sharding_key_,
const String & data_path_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
name(name_), columns(columns_),
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
name(name_),
remote_database(remote_database_), remote_table(remote_table_),
context(context_), cluster_name(context.getMacros().expand(cluster_name_)), has_sharding_key(sharding_key_),
sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, columns).getActions(false) : nullptr),
@ -197,7 +199,7 @@ BlockInputStreams StorageDistributed::read(
external_tables = context.getExternalTables();
ClusterProxy::SelectStreamFactory select_stream_factory(
processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables);
processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables);
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings);
@ -206,25 +208,29 @@ BlockInputStreams StorageDistributed::read(
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings)
{
auto cluster = owned_cluster ? owned_cluster : context.getCluster(cluster_name);
auto cluster = (owned_cluster) ? owned_cluster : context.getCluster(cluster_name);
/// TODO: !path.empty() can be replaced by !owned_cluster or !cluster_name.empty() ?
/// owned_cluster for remote table function use sync insertion => doesn't need a path.
bool write_enabled = (!path.empty() || owned_cluster)
&& (((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) < 2) || has_sharding_key);
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
if (path.empty() && !owned_cluster && !settings.insert_distributed_sync.value)
{
throw Exception("Storage " + getName() + " must has own data directory to enable asynchronous inserts",
ErrorCodes::BAD_ARGUMENTS);
}
if (!write_enabled)
throw Exception{
"Method write is not supported by storage " + getName() +
" with more than one shard and no sharding key provided",
ErrorCodes::STORAGE_REQUIRES_PARAMETER};
/// If sharding key is not specified, then you can only write to a shard containing only one shard
if (!has_sharding_key && ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2))
{
throw Exception("Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided",
ErrorCodes::STORAGE_REQUIRES_PARAMETER);
}
/// Force sync insertion if it is remote() table function
bool insert_sync = settings.insert_distributed_sync || owned_cluster;
auto timeout = settings.insert_distributed_timeout;
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedBlockOutputStream>(
*this, rewriteInsertQuery(query, remote_database, remote_table), cluster, insert_sync, timeout);
*this, rewriteInsertQuery(query, remote_database, remote_table), cluster, settings, insert_sync, timeout);
}

View File

@ -94,7 +94,6 @@ public:
String name;
NamesAndTypesList columns;
String remote_database;
String remote_table;

View File

@ -31,6 +31,8 @@ namespace ErrorCodes
extern const int DATABASE_ACCESS_DENIED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNKNOWN_IDENTIFIER;
extern const int INCORRECT_FILE_NAME;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
};
@ -57,8 +59,8 @@ StorageFile::StorageFile(
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_)
: IStorage(materialized_columns_, alias_columns_, column_defaults_),
table_name(table_name_), format_name(format_name_), columns(columns_), context_global(context_), table_fd(table_fd_)
: IStorage(columns_, materialized_columns_, alias_columns_, column_defaults_),
table_name(table_name_), format_name(format_name_), context_global(context_), table_fd(table_fd_)
{
if (table_fd < 0) /// Will use file
{
@ -72,6 +74,9 @@ StorageFile::StorageFile(
}
else /// Is DB's file
{
if (db_dir_path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
path = getTablePath(db_dir_path, table_name, format_name);
is_db_table = true;
Poco::File(Poco::Path(path).parent()).createDirectories();

View File

@ -77,7 +77,6 @@ private:
std::string table_name;
std::string format_name;
NamesAndTypesList columns;
Context & context_global;
std::string path;

View File

@ -228,9 +228,9 @@ StorageKafka::StorageKafka(
const ColumnDefaults & column_defaults_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, const String & schema_name_, size_t num_consumers_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
table_name(table_name_), database_name(database_name_), context(context_),
columns(columns_), topics(topics_), brokers(brokers_), group(group_), format_name(format_name_), schema_name(schema_name_),
topics(topics_), brokers(brokers_), group(group_), format_name(format_name_), schema_name(schema_name_),
num_consumers(num_consumers_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
semaphore(0, num_consumers_), mutex(), consumers(), event_update()
{

View File

@ -32,8 +32,6 @@ public:
std::string getTableName() const override { return table_name; }
std::string getDatabaseName() const { return database_name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
void startup() override;
void shutdown() override;
@ -73,7 +71,6 @@ private:
String table_name;
String database_name;
Context & context;
NamesAndTypesList columns;
Names topics;
const String brokers;
const String group;

View File

@ -41,6 +41,7 @@ namespace ErrorCodes
extern const int DUPLICATE_COLUMN;
extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_FILE_NAME;
}
@ -363,15 +364,15 @@ StorageLog::StorageLog(
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
size_t max_compress_block_size_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), name(name_), columns(columns_),
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
path(path_), name(name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(name) + '/' + "sizes.json")
{
if (columns.empty())
throw Exception("Empty list of columns passed to StorageLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
/// create files if they do not exist
/// create files if they do not exist
Poco::File(path + escapeForFileName(name) + '/').createDirectories();
for (const auto & column : getColumnsList())

View File

@ -26,8 +26,6 @@ public:
std::string getName() const override { return "Log"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -59,7 +57,6 @@ protected:
private:
String path;
String name;
NamesAndTypesList columns;
mutable std::shared_mutex rwlock;

View File

@ -65,8 +65,8 @@ StorageMaterializedView::StorageMaterializedView(
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
bool attach_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
database_name(database_name_), global_context(local_context.getGlobalContext()), columns(columns_)
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
database_name(database_name_), global_context(local_context.getGlobalContext())
{
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);

View File

@ -51,7 +51,6 @@ private:
String database_name;
ASTPtr inner_query;
Context & global_context;
NamesAndTypesList columns;
bool has_inner_table = false;
protected:

View File

@ -87,8 +87,8 @@ StorageMemory::StorageMemory(
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
name(name_), columns(columns_)
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
name(name_)
{
}

View File

@ -26,8 +26,6 @@ public:
std::string getName() const override { return "Memory"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
size_t getSize() const { return data.size(); }
BlockInputStreams read(
@ -45,7 +43,6 @@ public:
private:
String name;
NamesAndTypesList columns;
/// The data itself. `list` - so that when inserted to the end, the existing iterators are not invalidated.
BlocksList data;

View File

@ -42,8 +42,8 @@ StorageMerge::StorageMerge(
const String & source_database_,
const String & table_name_regexp_,
const Context & context_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
name(name_), columns(columns_), source_database(source_database_),
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
name(name_), source_database(source_database_),
table_name_regexp(table_name_regexp_), context(context_)
{
}

View File

@ -47,7 +47,6 @@ public:
private:
String name;
NamesAndTypesList columns;
String source_database;
OptimizedRegularExpression table_name_regexp;
const Context & context;

View File

@ -27,6 +27,7 @@ namespace ErrorCodes
extern const int ABORTED;
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA;
extern const int INCORRECT_FILE_NAME;
extern const int CANNOT_ASSIGN_OPTIMIZE;
}
@ -48,7 +49,7 @@ StorageMergeTree::StorageMergeTree(
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
context(context_), background_pool(context_.getBackgroundPool()),
data(database_name, table_name,
@ -60,6 +61,9 @@ StorageMergeTree::StorageMergeTree(
reader(data), writer(data), merger(data, context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
{
if (path_.empty())
throw Exception("MergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
data.loadDataParts(has_force_restore_data_flag);
if (!attach)

View File

@ -23,11 +23,11 @@ StorageMySQL::StorageMySQL(
mysqlxx::Pool && pool,
const std::string & remote_database_name,
const std::string & remote_table_name,
const NamesAndTypesList & columns)
: name(name)
const NamesAndTypesList & columns_)
: IStorage{columns_, {}, {}, {}}
, name(name)
, remote_database_name(remote_database_name)
, remote_table_name(remote_table_name)
, columns(columns)
, pool(std::move(pool))
{
}

View File

@ -24,11 +24,10 @@ public:
mysqlxx::Pool && pool,
const std::string & remote_database_name,
const std::string & remote_table_name,
const NamesAndTypesList & columns);
const NamesAndTypesList & columns_);
std::string getName() const override { return "MySQL"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
@ -44,7 +43,7 @@ private:
std::string remote_database_name;
std::string remote_table_name;
NamesAndTypesList columns;
mysqlxx::Pool pool;
};

View File

@ -20,8 +20,6 @@ public:
std::string getName() const override { return "Null"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names &,
const SelectQueryInfo &,
@ -47,7 +45,6 @@ public:
private:
String name;
NamesAndTypesList columns;
protected:
StorageNull(
@ -56,7 +53,7 @@ protected:
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, name(name_), columns(columns_) {}
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, name(name_) {}
};
}

View File

@ -21,11 +21,11 @@ StorageODBC::StorageODBC(
const std::string & connection_string,
const std::string & remote_database_name,
const std::string & remote_table_name,
const NamesAndTypesList & columns)
: name(name)
const NamesAndTypesList & columns_)
: IStorage{columns_, {}, {}, {}}
, name(name)
, remote_database_name(remote_database_name)
, remote_table_name(remote_table_name)
, columns(columns)
{
pool = createAndCheckResizePocoSessionPool([&]
{

View File

@ -30,11 +30,10 @@ public:
const std::string & connection_string,
const std::string & remote_database_name,
const std::string & remote_table_name,
const NamesAndTypesList & columns);
const NamesAndTypesList & columns_);
std::string getName() const override { return "ODBC"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
@ -46,12 +45,9 @@ public:
private:
std::string name;
std::string remote_database_name;
std::string remote_table_name;
NamesAndTypesList columns;
std::shared_ptr<Poco::Data::SessionPool> pool;
};
}

View File

@ -95,6 +95,7 @@ namespace ErrorCodes
extern const int TOO_MUCH_FETCHES;
extern const int BAD_DATA_PART_NAME;
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int INCORRECT_FILE_NAME;
extern const int CANNOT_ASSIGN_OPTIMIZE;
}
@ -178,7 +179,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, context(context_),
current_zookeeper(context.getZooKeeper()), database_name(database_name_),
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'),
zookeeper_path(context.getMacros().expand(zookeeper_path_)),
@ -195,6 +196,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
shutdown_event(false), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{
if (path_.empty())
throw Exception("ReplicatedMergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should starts with '/', because chroot concatenates without it.

View File

@ -21,6 +21,12 @@ namespace ErrorCodes
}
namespace ErrorCodes
{
extern const int INCORRECT_FILE_NAME;
}
class SetOrJoinBlockOutputStream : public IBlockOutputStream
{
public:
@ -86,9 +92,13 @@ StorageSetOrJoinBase::StorageSetOrJoinBase(
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_ + escapeForFileName(name_) + '/'), name(name_), columns(columns_)
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
name(name_)
{
if (path_.empty())
throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
path = path_ + escapeForFileName(name_) + '/';
}

View File

@ -20,7 +20,6 @@ class StorageSetOrJoinBase : public IStorage
public:
String getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
@ -37,7 +36,6 @@ protected:
String path;
String name;
NamesAndTypesList columns;
UInt64 increment = 0; /// For the backup file names.

View File

@ -39,6 +39,7 @@ namespace ErrorCodes
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
extern const int CANNOT_CREATE_DIRECTORY;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_FILE_NAME;
}
@ -183,14 +184,14 @@ StorageStripeLog::StorageStripeLog(
const ColumnDefaults & column_defaults_,
bool attach,
size_t max_compress_block_size_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), name(name_), columns(columns_),
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
path(path_), name(name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(name) + '/' + "sizes.json"),
log(&Logger::get("StorageStripeLog"))
{
if (columns.empty())
throw Exception("Empty list of columns passed to StorageStripeLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
String full_path = path + escapeForFileName(name) + '/';
if (!attach)

View File

@ -28,8 +28,6 @@ public:
std::string getName() const override { return "StripeLog"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -56,7 +54,6 @@ public:
private:
String path;
String name;
NamesAndTypesList columns;
size_t max_compress_block_size;

View File

@ -46,6 +46,7 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA;
extern const int DUPLICATE_COLUMN;
extern const int LOGICAL_ERROR;
extern const int INCORRECT_FILE_NAME;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
@ -284,14 +285,14 @@ StorageTinyLog::StorageTinyLog(
const ColumnDefaults & column_defaults_,
bool attach,
size_t max_compress_block_size_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), name(name_), columns(columns_),
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
path(path_), name(name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(name) + '/' + "sizes.json"),
log(&Logger::get("StorageTinyLog"))
{
if (columns.empty())
throw Exception("Empty list of columns passed to StorageTinyLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
String full_path = path + escapeForFileName(name) + '/';
if (!attach)

View File

@ -27,8 +27,6 @@ public:
std::string getName() const override { return "TinyLog"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -55,7 +53,6 @@ public:
private:
String path;
String name;
NamesAndTypesList columns;
size_t max_compress_block_size;

View File

@ -25,8 +25,8 @@ StorageView::StorageView(
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
database_name(database_name_), columns(columns_)
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
database_name(database_name_)
{
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);

View File

@ -16,7 +16,6 @@ class StorageView : public ext::shared_ptr_helper<StorageView>, public IStorage
public:
std::string getName() const override { return "View"; }
std::string getTableName() const override { return table_name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
/// It is passed inside the query and solved at its level.
bool supportsSampling() const override { return true; }
@ -36,7 +35,6 @@ private:
String table_name;
String database_name;
ASTPtr inner_query;
NamesAndTypesList columns;
protected:
StorageView(

View File

@ -14,13 +14,12 @@ namespace DB
StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_)
: name(name_),
columns
{
{"metric", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeFloat64>()},
},
async_metrics(async_metrics_)
{
columns = NamesAndTypesList{
{"metric", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeFloat64>()},
};
}

View File

@ -19,8 +19,6 @@ public:
std::string getName() const override { return "SystemAsynchronousMetrics"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -31,7 +29,6 @@ public:
private:
const std::string name;
NamesAndTypesList columns;
const AsynchronousMetrics & async_metrics;
protected:

View File

@ -12,12 +12,11 @@ namespace DB
StorageSystemBuildOptions::StorageSystemBuildOptions(const std::string & name_)
: name(name_)
, columns
{
{
columns = NamesAndTypesList{
{ "name", std::make_shared<DataTypeString>() },
{ "value", std::make_shared<DataTypeString>() },
}
{
};
}

View File

@ -18,8 +18,6 @@ public:
std::string getName() const override { return "SystemBuildOptions"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -30,7 +28,6 @@ public:
private:
const std::string name;
NamesAndTypesList columns;
protected:
StorageSystemBuildOptions(const std::string & name_);

View File

@ -13,7 +13,8 @@ namespace DB
StorageSystemClusters::StorageSystemClusters(const std::string & name_)
: name(name_)
, columns{
{
columns = NamesAndTypesList{
{ "cluster", std::make_shared<DataTypeString>() },
{ "shard_num", std::make_shared<DataTypeUInt32>() },
{ "shard_weight", std::make_shared<DataTypeUInt32>() },
@ -24,8 +25,7 @@ StorageSystemClusters::StorageSystemClusters(const std::string & name_)
{ "is_local", std::make_shared<DataTypeUInt8>() },
{ "user", std::make_shared<DataTypeString>() },
{ "default_database", std::make_shared<DataTypeString>() }
}
{
};
}

View File

@ -18,7 +18,6 @@ class StorageSystemClusters : public ext::shared_ptr_helper<StorageSystemCluster
public:
std::string getName() const override { return "SystemClusters"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
@ -30,7 +29,6 @@ public:
private:
const std::string name;
NamesAndTypesList columns;
protected:
StorageSystemClusters(const std::string & name_);

View File

@ -17,7 +17,8 @@ namespace DB
StorageSystemColumns::StorageSystemColumns(const std::string & name_)
: name(name_)
, columns{
{
columns = NamesAndTypesList{
{ "database", std::make_shared<DataTypeString>() },
{ "table", std::make_shared<DataTypeString>() },
{ "name", std::make_shared<DataTypeString>() },
@ -27,8 +28,7 @@ StorageSystemColumns::StorageSystemColumns(const std::string & name_)
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "data_uncompressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "marks_bytes", std::make_shared<DataTypeUInt64>() },
}
{
};
}

View File

@ -16,7 +16,6 @@ class StorageSystemColumns : public ext::shared_ptr_helper<StorageSystemColumns>
public:
std::string getName() const override { return "SystemColumns"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
@ -31,7 +30,6 @@ protected:
private:
const std::string name;
NamesAndTypesList columns;
};
}

View File

@ -11,13 +11,12 @@ namespace DB
StorageSystemDatabases::StorageSystemDatabases(const std::string & name_)
: name(name_),
columns
{
{"name", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
}
: name(name_)
{
columns = NamesAndTypesList{
{"name", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
};
}

View File

@ -18,8 +18,6 @@ public:
std::string getName() const override { return "SystemDatabases"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -30,7 +28,6 @@ public:
private:
const std::string name;
NamesAndTypesList columns;
protected:
StorageSystemDatabases(const std::string & name_);

View File

@ -19,24 +19,24 @@ namespace DB
{
StorageSystemDictionaries::StorageSystemDictionaries(const std::string & name)
: name{name},
columns{
{ "name", std::make_shared<DataTypeString>() },
{ "origin", std::make_shared<DataTypeString>() },
{ "type", std::make_shared<DataTypeString>() },
{ "key", std::make_shared<DataTypeString>() },
{ "attribute.names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
{ "attribute.types", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
{ "bytes_allocated", std::make_shared<DataTypeUInt64>() },
{ "query_count", std::make_shared<DataTypeUInt64>() },
{ "hit_rate", std::make_shared<DataTypeFloat64>() },
{ "element_count", std::make_shared<DataTypeUInt64>() },
{ "load_factor", std::make_shared<DataTypeFloat64>() },
{ "creation_time", std::make_shared<DataTypeDateTime>() },
{ "source", std::make_shared<DataTypeString>() },
{ "last_exception", std::make_shared<DataTypeString>() }
}
: name{name}
{
columns = NamesAndTypesList{
{ "name", std::make_shared<DataTypeString>() },
{ "origin", std::make_shared<DataTypeString>() },
{ "type", std::make_shared<DataTypeString>() },
{ "key", std::make_shared<DataTypeString>() },
{ "attribute.names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
{ "attribute.types", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
{ "bytes_allocated", std::make_shared<DataTypeUInt64>() },
{ "query_count", std::make_shared<DataTypeUInt64>() },
{ "hit_rate", std::make_shared<DataTypeFloat64>() },
{ "element_count", std::make_shared<DataTypeUInt64>() },
{ "load_factor", std::make_shared<DataTypeFloat64>() },
{ "creation_time", std::make_shared<DataTypeDateTime>() },
{ "source", std::make_shared<DataTypeString>() },
{ "last_exception", std::make_shared<DataTypeString>() }
};
}

View File

@ -26,9 +26,6 @@ public:
private:
const std::string name;
const NamesAndTypesList columns;
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
protected:
StorageSystemDictionaries(const std::string & name);

View File

@ -11,13 +11,13 @@ namespace DB
StorageSystemEvents::StorageSystemEvents(const std::string & name_)
: name(name_),
columns
: name(name_)
{
columns = NamesAndTypesList
{
{"event", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeUInt64>()}
}
{
};
}

View File

@ -18,8 +18,6 @@ public:
std::string getName() const override { return "SystemEvents"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -30,7 +28,6 @@ public:
private:
const std::string name;
NamesAndTypesList columns;
protected:
StorageSystemEvents(const std::string & name_);

View File

@ -15,11 +15,11 @@ namespace DB
StorageSystemFunctions::StorageSystemFunctions(const std::string & name_)
: name(name_)
, columns{
{
columns = NamesAndTypesList{
{ "name", std::make_shared<DataTypeString>() },
{ "is_aggregate", std::make_shared<DataTypeUInt8>() }
}
{
};
}

View File

@ -18,7 +18,6 @@ class StorageSystemFunctions : public ext::shared_ptr_helper<StorageSystemFuncti
public:
std::string getName() const override { return "SystemFunctions"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
@ -33,7 +32,6 @@ protected:
private:
const std::string name;
NamesAndTypesList columns;
};
}

View File

@ -124,16 +124,16 @@ static Strings getAllGraphiteSections(const AbstractConfiguration & config)
StorageSystemGraphite::StorageSystemGraphite(const std::string & name_)
: name(name_)
, columns
{
{
columns = NamesAndTypesList{
{"config_name", std::make_shared<DataTypeString>()},
{"regexp", std::make_shared<DataTypeString>()},
{"function", std::make_shared<DataTypeString>()},
{"age", std::make_shared<DataTypeUInt64>()},
{"precision", std::make_shared<DataTypeUInt64>()},
{"priority", std::make_shared<DataTypeUInt16>()},
{"is_default", std::make_shared<DataTypeUInt8>()}}
{
{"is_default", std::make_shared<DataTypeUInt8>()}
};
}

View File

@ -12,7 +12,6 @@ class StorageSystemGraphite : public ext::shared_ptr_helper<StorageSystemGraphit
public:
std::string getName() const override { return "SystemGraphite"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
@ -24,7 +23,6 @@ public:
private:
const std::string name;
NamesAndTypesList columns;
protected:
StorageSystemGraphite(const std::string & name_);

View File

@ -13,7 +13,8 @@ namespace DB
StorageSystemMerges::StorageSystemMerges(const std::string & name)
: name{name}
, columns{
{
columns = NamesAndTypesList{
{ "database", std::make_shared<DataTypeString>() },
{ "table", std::make_shared<DataTypeString>() },
{ "elapsed", std::make_shared<DataTypeFloat64>() },
@ -30,8 +31,7 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name)
{ "columns_written", std::make_shared<DataTypeUInt64>() },
{ "memory_usage", std::make_shared<DataTypeUInt64>() },
{ "thread_number", std::make_shared<DataTypeUInt64>() },
}
{
};
}

View File

@ -16,7 +16,6 @@ public:
std::string getName() const override { return "SystemMerges"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -27,7 +26,6 @@ public:
private:
const std::string name;
NamesAndTypesList columns;
protected:
StorageSystemMerges(const std::string & name);

View File

@ -12,13 +12,12 @@ namespace DB
StorageSystemMetrics::StorageSystemMetrics(const std::string & name_)
: name(name_),
columns
{
: name(name_)
{
columns = NamesAndTypesList{
{"metric", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeInt64>()},
}
{
};
}

View File

@ -18,8 +18,6 @@ public:
std::string getName() const override { return "SystemMetrics"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -30,7 +28,6 @@ public:
private:
const std::string name;
NamesAndTypesList columns;
protected:
StorageSystemMetrics(const std::string & name_);

View File

@ -12,15 +12,15 @@ namespace DB
{
StorageSystemModels::StorageSystemModels(const std::string & name)
: name{name},
columns{
: name{name}
{
columns = NamesAndTypesList{
{ "name", std::make_shared<DataTypeString>() },
{ "origin", std::make_shared<DataTypeString>() },
{ "type", std::make_shared<DataTypeString>() },
{ "creation_time", std::make_shared<DataTypeDateTime>() },
{ "last_exception", std::make_shared<DataTypeString>() }
}
{
};
}

View File

@ -26,9 +26,6 @@ public:
private:
const std::string name;
const NamesAndTypesList columns;
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
protected:
StorageSystemModels(const std::string & name);

View File

@ -41,8 +41,9 @@ private:
StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, size_t limit_)
: name(name_), columns{{"number", std::make_shared<DataTypeUInt64>()}}, multithreaded(multithreaded_), limit(limit_)
: name(name_), multithreaded(multithreaded_), limit(limit_)
{
columns = NamesAndTypesList{{"number", std::make_shared<DataTypeUInt64>()}};
}

Some files were not shown because too many files have changed in this diff Show More