mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 11:02:08 +00:00
Merge branch 'master' into fix/ISSUES-117
This commit is contained in:
commit
4875a80825
@ -4,6 +4,7 @@
|
|||||||
#include <Poco/Net/DNS.h>
|
#include <Poco/Net/DNS.h>
|
||||||
|
|
||||||
#include <Common/getFQDNOrHostName.h>
|
#include <Common/getFQDNOrHostName.h>
|
||||||
|
#include <Common/isLocalAddress.h>
|
||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
#include <Interpreters/Settings.h>
|
#include <Interpreters/Settings.h>
|
||||||
|
|
||||||
@ -39,14 +40,7 @@ ConnectionPoolWithFailover::ConnectionPoolWithFailover(
|
|||||||
for (size_t i = 0; i < nested_pools.size(); ++i)
|
for (size_t i = 0; i < nested_pools.size(); ++i)
|
||||||
{
|
{
|
||||||
ConnectionPool & connection_pool = dynamic_cast<ConnectionPool &>(*nested_pools[i]);
|
ConnectionPool & connection_pool = dynamic_cast<ConnectionPool &>(*nested_pools[i]);
|
||||||
const std::string & host = connection_pool.getHost();
|
hostname_differences[i] = getHostNameDifference(local_hostname, connection_pool.getHost());
|
||||||
|
|
||||||
size_t hostname_difference = 0;
|
|
||||||
for (size_t i = 0; i < std::min(local_hostname.length(), host.length()); ++i)
|
|
||||||
if (local_hostname[i] != host[i])
|
|
||||||
++hostname_difference;
|
|
||||||
|
|
||||||
hostname_differences[i] = hostname_difference;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,11 +138,12 @@ private:
|
|||||||
|
|
||||||
struct OpResult : public zoo_op_result_t
|
struct OpResult : public zoo_op_result_t
|
||||||
{
|
{
|
||||||
/// Указатели в этой структуре указывают на поля в классе Op.
|
/// Pointers in this class point to fields of class Op.
|
||||||
/// Поэтому деструктор не нужен
|
/// Op instances have the same (or longer lifetime), therefore destructor is not required.
|
||||||
};
|
};
|
||||||
|
|
||||||
using Ops = std::vector<std::unique_ptr<Op>>;
|
using OpPtr = std::unique_ptr<Op>;
|
||||||
|
using Ops = std::vector<OpPtr>;
|
||||||
using OpResults = std::vector<OpResult>;
|
using OpResults = std::vector<OpResult>;
|
||||||
using OpResultsPtr = std::shared_ptr<OpResults>;
|
using OpResultsPtr = std::shared_ptr<OpResults>;
|
||||||
using Strings = std::vector<std::string>;
|
using Strings = std::vector<std::string>;
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
|
|
||||||
/** Get the FQDN for the local server by resolving DNS hostname - similar to calling the 'hostname' tool with the -f flag.
|
/** Get the FQDN for the local server by resolving DNS hostname - similar to calling the 'hostname' tool with the -f flag.
|
||||||
* If it does not work, return hostname - similar to calling 'hostname' without flags or 'uname -n'.
|
* If it does not work, return hostname - similar to calling 'hostname' without flags or 'uname -n'.
|
||||||
*/
|
*/
|
||||||
|
@ -10,25 +10,35 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port)
|
bool isLocalAddress(const Poco::Net::SocketAddress & address)
|
||||||
{
|
{
|
||||||
static auto interfaces = Poco::Net::NetworkInterface::list();
|
static auto interfaces = Poco::Net::NetworkInterface::list();
|
||||||
|
|
||||||
if (clickhouse_port == address.port())
|
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
|
||||||
{
|
[&] (const Poco::Net::NetworkInterface & interface)
|
||||||
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
|
{
|
||||||
[&] (const Poco::Net::NetworkInterface & interface)
|
/** Compare the addresses without taking into account `scope`.
|
||||||
{
|
* Theoretically, this may not be correct - depends on `route` setting
|
||||||
/** Compare the addresses without taking into account `scope`.
|
* - through which interface we will actually access the specified address.
|
||||||
* Theoretically, this may not be correct - depends on `route` setting
|
*/
|
||||||
* - through which interface we will actually access the specified address.
|
return interface.address().length() == address.host().length()
|
||||||
*/
|
&& 0 == memcmp(interface.address().addr(), address.host().addr(), address.host().length());
|
||||||
return interface.address().length() == address.host().length()
|
});
|
||||||
&& 0 == memcmp(interface.address().addr(), address.host().addr(), address.host().length());
|
}
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port)
|
||||||
|
{
|
||||||
|
return clickhouse_port == address.port() && isLocalAddress(address);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
size_t getHostNameDifference(const std::string & local_hostname, const std::string & host)
|
||||||
|
{
|
||||||
|
size_t hostname_difference = 0;
|
||||||
|
for (size_t i = 0; i < std::min(local_hostname.length(), host.length()); ++i)
|
||||||
|
if (local_hostname[i] != host[i])
|
||||||
|
++hostname_difference;
|
||||||
|
return hostname_difference;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,4 +23,8 @@ namespace DB
|
|||||||
*/
|
*/
|
||||||
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port);
|
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port);
|
||||||
|
|
||||||
|
bool isLocalAddress(const Poco::Net::SocketAddress & address);
|
||||||
|
|
||||||
|
/// Returns number of different bytes in hostnames, used for load balancing
|
||||||
|
size_t getHostNameDifference(const std::string & local_hostname, const std::string & host);
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,8 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
SquashingBlockInputStream::SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes)
|
SquashingBlockInputStream::SquashingBlockInputStream(const BlockInputStreamPtr & src,
|
||||||
|
size_t min_block_size_rows, size_t min_block_size_bytes)
|
||||||
: transform(min_block_size_rows, min_block_size_bytes)
|
: transform(min_block_size_rows, min_block_size_bytes)
|
||||||
{
|
{
|
||||||
children.emplace_back(src);
|
children.emplace_back(src);
|
||||||
|
@ -12,7 +12,7 @@ namespace DB
|
|||||||
class SquashingBlockInputStream : public IProfilingBlockInputStream
|
class SquashingBlockInputStream : public IProfilingBlockInputStream
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes);
|
SquashingBlockInputStream(const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes);
|
||||||
|
|
||||||
String getName() const override { return "Squashing"; }
|
String getName() const override { return "Squashing"; }
|
||||||
|
|
||||||
|
@ -16,20 +16,21 @@ bool isAtomicSet(std::atomic<bool> * val)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
|
template <typename Pred>
|
||||||
|
void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, Pred && is_cancelled)
|
||||||
{
|
{
|
||||||
from.readPrefix();
|
from.readPrefix();
|
||||||
to.writePrefix();
|
to.writePrefix();
|
||||||
|
|
||||||
while (Block block = from.read())
|
while (Block block = from.read())
|
||||||
{
|
{
|
||||||
if (isAtomicSet(is_cancelled))
|
if (is_cancelled())
|
||||||
break;
|
break;
|
||||||
|
|
||||||
to.write(block);
|
to.write(block);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isAtomicSet(is_cancelled))
|
if (is_cancelled())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/// For outputting additional information in some formats.
|
/// For outputting additional information in some formats.
|
||||||
@ -42,11 +43,28 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<boo
|
|||||||
to.setExtremes(input->getExtremes());
|
to.setExtremes(input->getExtremes());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isAtomicSet(is_cancelled))
|
if (is_cancelled())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
from.readSuffix();
|
from.readSuffix();
|
||||||
to.writeSuffix();
|
to.writeSuffix();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
|
||||||
|
{
|
||||||
|
auto is_cancelled_pred = [is_cancelled] ()
|
||||||
|
{
|
||||||
|
return isAtomicSet(is_cancelled);
|
||||||
|
};
|
||||||
|
|
||||||
|
copyDataImpl(from, to, is_cancelled_pred);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
|
||||||
|
{
|
||||||
|
copyDataImpl(from, to, is_cancelled);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -14,4 +15,6 @@ class IBlockOutputStream;
|
|||||||
*/
|
*/
|
||||||
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
|
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
|
||||||
|
|
||||||
|
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -173,4 +173,9 @@ void DatabaseDictionary::drop()
|
|||||||
/// Additional actions to delete database are not required.
|
/// Additional actions to delete database are not required.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String DatabaseDictionary::getDataPath(const Context &) const
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -93,6 +93,8 @@ public:
|
|||||||
const Context & context,
|
const Context & context,
|
||||||
const String & table_name) const override;
|
const String & table_name) const override;
|
||||||
|
|
||||||
|
String getDataPath(const Context & context) const override;
|
||||||
|
|
||||||
void shutdown() override;
|
void shutdown() override;
|
||||||
void drop() override;
|
void drop() override;
|
||||||
};
|
};
|
||||||
|
@ -15,11 +15,11 @@ namespace ErrorCodes
|
|||||||
DatabasePtr DatabaseFactory::get(
|
DatabasePtr DatabaseFactory::get(
|
||||||
const String & engine_name,
|
const String & engine_name,
|
||||||
const String & database_name,
|
const String & database_name,
|
||||||
const String & path,
|
const String & metadata_path,
|
||||||
Context & context)
|
Context & context)
|
||||||
{
|
{
|
||||||
if (engine_name == "Ordinary")
|
if (engine_name == "Ordinary")
|
||||||
return std::make_shared<DatabaseOrdinary>(database_name, path);
|
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
|
||||||
else if (engine_name == "Memory")
|
else if (engine_name == "Memory")
|
||||||
return std::make_shared<DatabaseMemory>(database_name);
|
return std::make_shared<DatabaseMemory>(database_name);
|
||||||
else if (engine_name == "Dictionary")
|
else if (engine_name == "Dictionary")
|
||||||
|
@ -13,7 +13,7 @@ public:
|
|||||||
static DatabasePtr get(
|
static DatabasePtr get(
|
||||||
const String & engine_name,
|
const String & engine_name,
|
||||||
const String & database_name,
|
const String & database_name,
|
||||||
const String & path,
|
const String & metadata_path,
|
||||||
Context & context);
|
Context & context);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -152,4 +152,9 @@ void DatabaseMemory::drop()
|
|||||||
/// Additional actions to delete database are not required.
|
/// Additional actions to delete database are not required.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String DatabaseMemory::getDataPath(const Context &) const
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -84,6 +84,8 @@ public:
|
|||||||
const Context & context,
|
const Context & context,
|
||||||
const String & table_name) const override;
|
const String & table_name) const override;
|
||||||
|
|
||||||
|
String getDataPath(const Context & context) const override;
|
||||||
|
|
||||||
void shutdown() override;
|
void shutdown() override;
|
||||||
void drop() override;
|
void drop() override;
|
||||||
};
|
};
|
||||||
|
@ -90,10 +90,10 @@ static void loadTable(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
DatabaseOrdinary::DatabaseOrdinary(
|
DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & metadata_path_, const Context & context)
|
||||||
const String & name_, const String & path_)
|
: DatabaseMemory(name_), metadata_path(metadata_path_), data_path(context.getPath() + "data/" + escapeForFileName(name_) + "/")
|
||||||
: DatabaseMemory(name_), path(path_)
|
|
||||||
{
|
{
|
||||||
|
Poco::File(data_path).createDirectory();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -108,7 +108,7 @@ void DatabaseOrdinary::loadTables(
|
|||||||
FileNames file_names;
|
FileNames file_names;
|
||||||
|
|
||||||
Poco::DirectoryIterator dir_end;
|
Poco::DirectoryIterator dir_end;
|
||||||
for (Poco::DirectoryIterator dir_it(path); dir_it != dir_end; ++dir_it)
|
for (Poco::DirectoryIterator dir_it(metadata_path); dir_it != dir_end; ++dir_it)
|
||||||
{
|
{
|
||||||
/// For '.svn', '.gitignore' directory and similar.
|
/// For '.svn', '.gitignore' directory and similar.
|
||||||
if (dir_it.name().at(0) == '.')
|
if (dir_it.name().at(0) == '.')
|
||||||
@ -130,7 +130,7 @@ void DatabaseOrdinary::loadTables(
|
|||||||
if (endsWith(dir_it.name(), ".sql"))
|
if (endsWith(dir_it.name(), ".sql"))
|
||||||
file_names.push_back(dir_it.name());
|
file_names.push_back(dir_it.name());
|
||||||
else
|
else
|
||||||
throw Exception("Incorrect file extension: " + dir_it.name() + " in metadata directory " + path,
|
throw Exception("Incorrect file extension: " + dir_it.name() + " in metadata directory " + metadata_path,
|
||||||
ErrorCodes::INCORRECT_FILE_NAME);
|
ErrorCodes::INCORRECT_FILE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,7 +162,7 @@ void DatabaseOrdinary::loadTables(
|
|||||||
watch.restart();
|
watch.restart();
|
||||||
}
|
}
|
||||||
|
|
||||||
loadTable(context, path, *this, name, data_path, table, has_force_restore_data_flag);
|
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -269,7 +269,7 @@ void DatabaseOrdinary::createTable(
|
|||||||
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||||
}
|
}
|
||||||
|
|
||||||
String table_metadata_path = getTableMetadataPath(path, table_name);
|
String table_metadata_path = getTableMetadataPath(metadata_path, table_name);
|
||||||
String table_metadata_tmp_path = table_metadata_path + ".tmp";
|
String table_metadata_tmp_path = table_metadata_path + ".tmp";
|
||||||
String statement;
|
String statement;
|
||||||
|
|
||||||
@ -312,7 +312,7 @@ void DatabaseOrdinary::removeTable(
|
|||||||
{
|
{
|
||||||
StoragePtr res = detachTable(table_name);
|
StoragePtr res = detachTable(table_name);
|
||||||
|
|
||||||
String table_metadata_path = getTableMetadataPath(path, table_name);
|
String table_metadata_path = getTableMetadataPath(metadata_path, table_name);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -374,7 +374,7 @@ void DatabaseOrdinary::renameTable(
|
|||||||
throw Exception{e};
|
throw Exception{e};
|
||||||
}
|
}
|
||||||
|
|
||||||
ASTPtr ast = getCreateQueryImpl(path, table_name);
|
ASTPtr ast = getCreateQueryImpl(metadata_path, table_name);
|
||||||
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
|
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
|
||||||
ast_create_query.table = to_table_name;
|
ast_create_query.table = to_table_name;
|
||||||
|
|
||||||
@ -388,7 +388,7 @@ time_t DatabaseOrdinary::getTableMetadataModificationTime(
|
|||||||
const Context & /*context*/,
|
const Context & /*context*/,
|
||||||
const String & table_name)
|
const String & table_name)
|
||||||
{
|
{
|
||||||
String table_metadata_path = getTableMetadataPath(path, table_name);
|
String table_metadata_path = getTableMetadataPath(metadata_path, table_name);
|
||||||
Poco::File meta_file(table_metadata_path);
|
Poco::File meta_file(table_metadata_path);
|
||||||
|
|
||||||
if (meta_file.exists())
|
if (meta_file.exists())
|
||||||
@ -406,7 +406,7 @@ ASTPtr DatabaseOrdinary::getCreateQuery(
|
|||||||
const Context & /*context*/,
|
const Context & /*context*/,
|
||||||
const String & table_name) const
|
const String & table_name) const
|
||||||
{
|
{
|
||||||
ASTPtr ast = getCreateQueryImpl(path, table_name);
|
ASTPtr ast = getCreateQueryImpl(metadata_path, table_name);
|
||||||
|
|
||||||
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
|
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
|
||||||
ast_create_query.attach = false;
|
ast_create_query.attach = false;
|
||||||
@ -455,8 +455,8 @@ void DatabaseOrdinary::alterTable(
|
|||||||
/// Read the definition of the table and replace the necessary parts with new ones.
|
/// Read the definition of the table and replace the necessary parts with new ones.
|
||||||
|
|
||||||
String table_name_escaped = escapeForFileName(name);
|
String table_name_escaped = escapeForFileName(name);
|
||||||
String table_metadata_tmp_path = path + "/" + table_name_escaped + ".sql.tmp";
|
String table_metadata_tmp_path = metadata_path + "/" + table_name_escaped + ".sql.tmp";
|
||||||
String table_metadata_path = path + "/" + table_name_escaped + ".sql";
|
String table_metadata_path = metadata_path + "/" + table_name_escaped + ".sql";
|
||||||
String statement;
|
String statement;
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -499,4 +499,9 @@ void DatabaseOrdinary::alterTable(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String DatabaseOrdinary::getDataPath(const Context &) const
|
||||||
|
{
|
||||||
|
return data_path;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -13,10 +13,11 @@ namespace DB
|
|||||||
class DatabaseOrdinary : public DatabaseMemory
|
class DatabaseOrdinary : public DatabaseMemory
|
||||||
{
|
{
|
||||||
protected:
|
protected:
|
||||||
const String path;
|
const String metadata_path;
|
||||||
|
const String data_path;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
DatabaseOrdinary(const String & name_, const String & path_);
|
DatabaseOrdinary(const String & name_, const String & metadata_path_, const Context & context);
|
||||||
|
|
||||||
String getEngineName() const override { return "Ordinary"; }
|
String getEngineName() const override { return "Ordinary"; }
|
||||||
|
|
||||||
@ -58,6 +59,8 @@ public:
|
|||||||
const Context & context,
|
const Context & context,
|
||||||
const String & table_name) const override;
|
const String & table_name) const override;
|
||||||
|
|
||||||
|
String getDataPath(const Context & context) const override;
|
||||||
|
|
||||||
void shutdown() override;
|
void shutdown() override;
|
||||||
void drop() override;
|
void drop() override;
|
||||||
|
|
||||||
|
@ -129,6 +129,9 @@ public:
|
|||||||
const Context & context,
|
const Context & context,
|
||||||
const String & name) const = 0;
|
const String & name) const = 0;
|
||||||
|
|
||||||
|
/// Returns path for persistent data storage if the database supports it, empty string otherwise
|
||||||
|
virtual String getDataPath(const Context & context) const = 0;
|
||||||
|
|
||||||
/// Ask all tables to complete the background threads they are using and delete all table objects.
|
/// Ask all tables to complete the background threads they are using and delete all table objects.
|
||||||
virtual void shutdown() = 0;
|
virtual void shutdown() = 0;
|
||||||
|
|
||||||
|
@ -132,19 +132,26 @@ Clusters::Clusters(Poco::Util::AbstractConfiguration & config, const Settings &
|
|||||||
|
|
||||||
ClusterPtr Clusters::getCluster(const std::string & cluster_name) const
|
ClusterPtr Clusters::getCluster(const std::string & cluster_name) const
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
auto it = impl.find(cluster_name);
|
auto it = impl.find(cluster_name);
|
||||||
return (it != impl.end()) ? it->second : nullptr;
|
return (it != impl.end()) ? it->second : nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Clusters::setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster)
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
impl[cluster_name] = cluster;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void Clusters::updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
|
void Clusters::updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
|
||||||
{
|
{
|
||||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||||
config.keys(config_name, config_keys);
|
config.keys(config_name, config_keys);
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
for (const auto & key : config_keys)
|
for (const auto & key : config_keys)
|
||||||
{
|
{
|
||||||
@ -163,11 +170,12 @@ void Clusters::updateClusters(Poco::Util::AbstractConfiguration & config, const
|
|||||||
|
|
||||||
Clusters::Impl Clusters::getContainer() const
|
Clusters::Impl Clusters::getContainer() const
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
/// The following line copies container of shared_ptrs to return value under lock
|
/// The following line copies container of shared_ptrs to return value under lock
|
||||||
return impl;
|
return impl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Implementation of `Cluster` class
|
/// Implementation of `Cluster` class
|
||||||
|
|
||||||
Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name)
|
Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name)
|
||||||
|
@ -97,6 +97,7 @@ public:
|
|||||||
UInt32 shard_num;
|
UInt32 shard_num;
|
||||||
UInt32 weight;
|
UInt32 weight;
|
||||||
Addresses local_addresses;
|
Addresses local_addresses;
|
||||||
|
/// nullptr if there are no remote addresses
|
||||||
ConnectionPoolWithFailoverPtr pool;
|
ConnectionPoolWithFailoverPtr pool;
|
||||||
bool has_internal_replication;
|
bool has_internal_replication;
|
||||||
};
|
};
|
||||||
@ -168,8 +169,9 @@ public:
|
|||||||
Clusters & operator=(const Clusters &) = delete;
|
Clusters & operator=(const Clusters &) = delete;
|
||||||
|
|
||||||
ClusterPtr getCluster(const std::string & cluster_name) const;
|
ClusterPtr getCluster(const std::string & cluster_name) const;
|
||||||
|
void setCluster(const String & cluster_name, const ClusterPtr & cluster);
|
||||||
|
|
||||||
void updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");
|
void updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
using Impl = std::map<String, ClusterPtr>;
|
using Impl = std::map<String, ClusterPtr>;
|
||||||
|
@ -1370,13 +1370,27 @@ Clusters & Context::getClusters() const
|
|||||||
|
|
||||||
|
|
||||||
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
|
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
|
||||||
void Context::setClustersConfig(const ConfigurationPtr & config)
|
void Context::setClustersConfig(const ConfigurationPtr & config, const String & config_name)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
|
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
|
||||||
|
|
||||||
shared->clusters_config = config;
|
shared->clusters_config = config;
|
||||||
if (shared->clusters)
|
|
||||||
shared->clusters->updateClusters(*shared->clusters_config, settings);
|
if (!shared->clusters)
|
||||||
|
shared->clusters = std::make_unique<Clusters>(*shared->clusters_config, settings, config_name);
|
||||||
|
else
|
||||||
|
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Context::setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
|
||||||
|
|
||||||
|
if (!shared->clusters)
|
||||||
|
throw Exception("Clusters are not set", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
shared->clusters->setCluster(cluster_name, cluster);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -318,8 +318,10 @@ public:
|
|||||||
Clusters & getClusters() const;
|
Clusters & getClusters() const;
|
||||||
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
|
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
|
||||||
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
|
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
|
||||||
|
void setClustersConfig(const ConfigurationPtr & config, const String & config_name = "remote_servers");
|
||||||
|
/// Sets custom cluster, but doesn't update configuration
|
||||||
|
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
|
||||||
void reloadClusterConfig();
|
void reloadClusterConfig();
|
||||||
void setClustersConfig(const ConfigurationPtr & config);
|
|
||||||
|
|
||||||
Compiler & getCompiler();
|
Compiler & getCompiler();
|
||||||
QueryLog & getQueryLog();
|
QueryLog & getQueryLog();
|
||||||
|
@ -1570,7 +1570,9 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block &
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
makeExplicitSet(func, sample_block, true);
|
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(columns, settings);
|
||||||
|
getRootActions(func->arguments->children.at(0), true, false, temp_actions);
|
||||||
|
makeExplicitSet(func, temp_actions->getSampleBlock(), true);
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
{
|
{
|
||||||
|
@ -7,11 +7,13 @@ namespace DB
|
|||||||
{
|
{
|
||||||
|
|
||||||
class Context;
|
class Context;
|
||||||
|
class Cluster;
|
||||||
|
|
||||||
class InterpreterCheckQuery : public IInterpreter
|
class InterpreterCheckQuery : public IInterpreter
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
InterpreterCheckQuery(const ASTPtr & query_ptr_, const Context & context_);
|
InterpreterCheckQuery(const ASTPtr & query_ptr_, const Context & context_);
|
||||||
|
|
||||||
BlockIO execute() override;
|
BlockIO execute() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -19,6 +21,7 @@ private:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
ASTPtr query_ptr;
|
ASTPtr query_ptr;
|
||||||
|
|
||||||
const Context & context;
|
const Context & context;
|
||||||
Block result;
|
Block result;
|
||||||
};
|
};
|
||||||
|
@ -103,13 +103,10 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
|||||||
|
|
||||||
String database_name_escaped = escapeForFileName(database_name);
|
String database_name_escaped = escapeForFileName(database_name);
|
||||||
|
|
||||||
/// Create directories for tables data and metadata.
|
/// Create directories for tables metadata.
|
||||||
String path = context.getPath();
|
String path = context.getPath();
|
||||||
String data_path = path + "data/" + database_name_escaped + "/";
|
|
||||||
String metadata_path = path + "metadata/" + database_name_escaped + "/";
|
String metadata_path = path + "metadata/" + database_name_escaped + "/";
|
||||||
|
|
||||||
Poco::File(metadata_path).createDirectory();
|
Poco::File(metadata_path).createDirectory();
|
||||||
Poco::File(data_path).createDirectory();
|
|
||||||
|
|
||||||
DatabasePtr database = DatabaseFactory::get(database_engine_name, database_name, metadata_path, context);
|
DatabasePtr database = DatabaseFactory::get(database_engine_name, database_name, metadata_path, context);
|
||||||
|
|
||||||
@ -458,13 +455,9 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
|||||||
String current_database = context.getCurrentDatabase();
|
String current_database = context.getCurrentDatabase();
|
||||||
|
|
||||||
String database_name = create.database.empty() ? current_database : create.database;
|
String database_name = create.database.empty() ? current_database : create.database;
|
||||||
String database_name_escaped = escapeForFileName(database_name);
|
|
||||||
String table_name = create.table;
|
String table_name = create.table;
|
||||||
String table_name_escaped = escapeForFileName(table_name);
|
String table_name_escaped = escapeForFileName(table_name);
|
||||||
|
|
||||||
String data_path = path + "data/" + database_name_escaped + "/";
|
|
||||||
String metadata_path = path + "metadata/" + database_name_escaped + "/" + table_name_escaped + ".sql";
|
|
||||||
|
|
||||||
// If this is a stub ATTACH query, read the query definition from the database
|
// If this is a stub ATTACH query, read the query definition from the database
|
||||||
if (create.attach && !create.storage && !create.columns)
|
if (create.attach && !create.storage && !create.columns)
|
||||||
{
|
{
|
||||||
@ -511,9 +504,13 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
|||||||
{
|
{
|
||||||
std::unique_ptr<DDLGuard> guard;
|
std::unique_ptr<DDLGuard> guard;
|
||||||
|
|
||||||
|
String data_path;
|
||||||
|
DatabasePtr database;
|
||||||
|
|
||||||
if (!create.is_temporary)
|
if (!create.is_temporary)
|
||||||
{
|
{
|
||||||
context.assertDatabaseExists(database_name);
|
database = context.getDatabase(database_name);
|
||||||
|
data_path = database->getDataPath(context);
|
||||||
|
|
||||||
/** If the table already exists, and the request specifies IF NOT EXISTS,
|
/** If the table already exists, and the request specifies IF NOT EXISTS,
|
||||||
* then we allow concurrent CREATE queries (which do nothing).
|
* then we allow concurrent CREATE queries (which do nothing).
|
||||||
@ -548,7 +545,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
|||||||
if (create.is_temporary)
|
if (create.is_temporary)
|
||||||
context.getSessionContext().addExternalTable(table_name, res);
|
context.getSessionContext().addExternalTable(table_name, res);
|
||||||
else
|
else
|
||||||
context.getDatabase(database_name)->createTable(context, table_name, res, query_ptr);
|
database->createTable(context, table_name, res, query_ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
res->startup();
|
res->startup();
|
||||||
|
@ -66,6 +66,7 @@ BlockIO InterpreterDropQuery::execute()
|
|||||||
|
|
||||||
String data_path = path + "data/" + database_name_escaped + "/";
|
String data_path = path + "data/" + database_name_escaped + "/";
|
||||||
String metadata_path = path + "metadata/" + database_name_escaped + "/";
|
String metadata_path = path + "metadata/" + database_name_escaped + "/";
|
||||||
|
String database_metadata_path = path + "metadata/" + database_name_escaped + ".sql";
|
||||||
|
|
||||||
auto database = context.tryGetDatabase(database_name);
|
auto database = context.tryGetDatabase(database_name);
|
||||||
if (!database && !drop.if_exists)
|
if (!database && !drop.if_exists)
|
||||||
@ -163,6 +164,11 @@ BlockIO InterpreterDropQuery::execute()
|
|||||||
|
|
||||||
Poco::File(data_path).remove(false);
|
Poco::File(data_path).remove(false);
|
||||||
Poco::File(metadata_path).remove(false);
|
Poco::File(metadata_path).remove(false);
|
||||||
|
|
||||||
|
/// Old ClickHouse versions did not store database.sql files
|
||||||
|
Poco::File database_metadata_file(database_metadata_path);
|
||||||
|
if (database_metadata_file.exists())
|
||||||
|
database_metadata_file.remove(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return {};
|
return {};
|
||||||
|
@ -136,7 +136,7 @@ void loadMetadataSystem(Context & context)
|
|||||||
Poco::File(global_path + "data/" SYSTEM_DATABASE).createDirectories();
|
Poco::File(global_path + "data/" SYSTEM_DATABASE).createDirectories();
|
||||||
Poco::File(global_path + "metadata/" SYSTEM_DATABASE).createDirectories();
|
Poco::File(global_path + "metadata/" SYSTEM_DATABASE).createDirectories();
|
||||||
|
|
||||||
auto system_database = std::make_shared<DatabaseOrdinary>(SYSTEM_DATABASE, global_path + "metadata/" SYSTEM_DATABASE);
|
auto system_database = std::make_shared<DatabaseOrdinary>(SYSTEM_DATABASE, global_path + "metadata/" SYSTEM_DATABASE, context);
|
||||||
context.addDatabase(SYSTEM_DATABASE, system_database);
|
context.addDatabase(SYSTEM_DATABASE, system_database);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ bool ParserList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
|||||||
auto list = std::make_shared<ASTExpressionList>();
|
auto list = std::make_shared<ASTExpressionList>();
|
||||||
node = list;
|
node = list;
|
||||||
|
|
||||||
while (1)
|
while (true)
|
||||||
{
|
{
|
||||||
if (first)
|
if (first)
|
||||||
{
|
{
|
||||||
|
@ -317,6 +317,21 @@ ASTPtr parseQuery(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ASTPtr parseQuery(
|
||||||
|
IParser & parser,
|
||||||
|
const std::string & query,
|
||||||
|
const std::string & query_description)
|
||||||
|
{
|
||||||
|
return parseQuery(parser, query.data(), query.data() + query.size(), query_description);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ASTPtr parseQuery(IParser & parser, const std::string & query)
|
||||||
|
{
|
||||||
|
return parseQuery(parser, query.data(), query.data() + query.size(), parser.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, std::vector<std::string> & queries_list)
|
std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, std::vector<std::string> & queries_list)
|
||||||
{
|
{
|
||||||
ASTPtr ast;
|
ASTPtr ast;
|
||||||
@ -357,4 +372,5 @@ std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, s
|
|||||||
return std::make_pair(begin, pos == end);
|
return std::make_pair(begin, pos == end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,15 @@ ASTPtr parseQuery(
|
|||||||
const char * end,
|
const char * end,
|
||||||
const std::string & description);
|
const std::string & description);
|
||||||
|
|
||||||
|
ASTPtr parseQuery(
|
||||||
|
IParser & parser,
|
||||||
|
const std::string & query,
|
||||||
|
const std::string & query_description);
|
||||||
|
|
||||||
|
ASTPtr parseQuery(
|
||||||
|
IParser & parser,
|
||||||
|
const std::string & query);
|
||||||
|
|
||||||
|
|
||||||
/** Split queries separated by ; on to list of single queries
|
/** Split queries separated by ; on to list of single queries
|
||||||
* Returns pointer to the end of last sucessfuly parsed query (first), and true if all queries are sucessfuly parsed (second)
|
* Returns pointer to the end of last sucessfuly parsed query (first), and true if all queries are sucessfuly parsed (second)
|
||||||
|
@ -44,6 +44,9 @@ target_link_libraries (clickhouse-compressor-lib clickhouse_common_io ${Boost_PR
|
|||||||
add_library (clickhouse-format-lib ${SPLIT_SHARED} Format.cpp)
|
add_library (clickhouse-format-lib ${SPLIT_SHARED} Format.cpp)
|
||||||
target_link_libraries (clickhouse-format-lib clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
|
target_link_libraries (clickhouse-format-lib clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
|
||||||
|
|
||||||
|
add_library (clickhouse-cluster-copier-lib ClusterCopier.cpp)
|
||||||
|
target_link_libraries (clickhouse-cluster-copier-lib clickhouse-server-lib clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions)
|
||||||
|
|
||||||
if (USE_EMBEDDED_COMPILER)
|
if (USE_EMBEDDED_COMPILER)
|
||||||
link_directories (${LLVM_LIBRARY_DIRS})
|
link_directories (${LLVM_LIBRARY_DIRS})
|
||||||
add_subdirectory ("Compiler-${LLVM_VERSION}")
|
add_subdirectory ("Compiler-${LLVM_VERSION}")
|
||||||
@ -67,8 +70,11 @@ if (CLICKHOUSE_SPLIT_BINARY)
|
|||||||
target_link_libraries (clickhouse-compressor clickhouse-compressor-lib)
|
target_link_libraries (clickhouse-compressor clickhouse-compressor-lib)
|
||||||
add_executable (clickhouse-format clickhouse-format.cpp)
|
add_executable (clickhouse-format clickhouse-format.cpp)
|
||||||
target_link_libraries (clickhouse-format clickhouse-format-lib dbms)
|
target_link_libraries (clickhouse-format clickhouse-format-lib dbms)
|
||||||
|
add_executable (clickhouse-cluster-copier clickhouse-cluster-copier.cpp)
|
||||||
|
target_link_libraries (clickhouse-cluster-copier clickhouse-cluster-copier-lib)
|
||||||
|
|
||||||
set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-performance-test clickhouse-extract-from-config clickhouse-format)
|
set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-performance-test
|
||||||
|
clickhouse-extract-from-config clickhouse-format clickhouse-cluster-copier)
|
||||||
|
|
||||||
if (USE_EMBEDDED_COMPILER)
|
if (USE_EMBEDDED_COMPILER)
|
||||||
add_executable (clickhouse-clang clickhouse-clang.cpp)
|
add_executable (clickhouse-clang clickhouse-clang.cpp)
|
||||||
@ -100,6 +106,7 @@ else ()
|
|||||||
clickhouse-extract-from-config-lib
|
clickhouse-extract-from-config-lib
|
||||||
clickhouse-compressor-lib
|
clickhouse-compressor-lib
|
||||||
clickhouse-format-lib
|
clickhouse-format-lib
|
||||||
|
clickhouse-cluster-copier-lib
|
||||||
)
|
)
|
||||||
|
|
||||||
add_custom_target (clickhouse-server ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-server DEPENDS clickhouse)
|
add_custom_target (clickhouse-server ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-server DEPENDS clickhouse)
|
||||||
@ -110,6 +117,7 @@ else ()
|
|||||||
add_custom_target (clickhouse-extract-from-config ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-extract-from-config DEPENDS clickhouse)
|
add_custom_target (clickhouse-extract-from-config ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-extract-from-config DEPENDS clickhouse)
|
||||||
add_custom_target (clickhouse-compressor ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-compressor DEPENDS clickhouse)
|
add_custom_target (clickhouse-compressor ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-compressor DEPENDS clickhouse)
|
||||||
add_custom_target (clickhouse-format ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-format DEPENDS clickhouse)
|
add_custom_target (clickhouse-format ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-format DEPENDS clickhouse)
|
||||||
|
add_custom_target (clickhouse-cluster-copier ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-cluster-copier DEPENDS clickhouse)
|
||||||
# install always because depian package want this files:
|
# install always because depian package want this files:
|
||||||
add_custom_target (clickhouse-clang ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-clang DEPENDS clickhouse)
|
add_custom_target (clickhouse-clang ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-clang DEPENDS clickhouse)
|
||||||
add_custom_target (clickhouse-lld ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-lld DEPENDS clickhouse)
|
add_custom_target (clickhouse-lld ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-lld DEPENDS clickhouse)
|
||||||
@ -124,6 +132,7 @@ else ()
|
|||||||
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-extract-from-config
|
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-extract-from-config
|
||||||
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-compressor
|
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-compressor
|
||||||
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-format
|
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-format
|
||||||
|
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-cluster-copier
|
||||||
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-clang
|
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-clang
|
||||||
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-lld
|
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-lld
|
||||||
DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
|
DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
|
||||||
|
1641
dbms/src/Server/ClusterCopier.cpp
Normal file
1641
dbms/src/Server/ClusterCopier.cpp
Normal file
File diff suppressed because it is too large
Load Diff
205
dbms/src/Server/ClusterCopier.h
Normal file
205
dbms/src/Server/ClusterCopier.h
Normal file
@ -0,0 +1,205 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <Poco/Util/ServerApplication.h>
|
||||||
|
|
||||||
|
/* = clickhouse-cluster-copier util =
|
||||||
|
* Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
|
||||||
|
*
|
||||||
|
* Configuration of copying tasks is set in special ZooKeeper node (called the description node).
|
||||||
|
* A ZooKeeper path to the description node is specified via --task-path </task/path> parameter.
|
||||||
|
* So, node /task/path/description should contain special XML content describing copying tasks.
|
||||||
|
*
|
||||||
|
* Simultaneously many clickhouse-cluster-copier processes located on any servers could execute the same task.
|
||||||
|
* ZooKeeper node /task/path/ is used by the processes to coordinate their work.
|
||||||
|
* You must not add additional child nodes to /task/path/.
|
||||||
|
*
|
||||||
|
* Currently you are responsible for launching cluster-copier processes.
|
||||||
|
* You can launch as many processes as you want, whenever and wherever you want.
|
||||||
|
* Each process try to select nearest available shard of source cluster and copy some part of data (partition) from it to the whole
|
||||||
|
* destination cluster with resharding.
|
||||||
|
* Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage.
|
||||||
|
*
|
||||||
|
* Since the workers coordinate their work via ZooKeeper, in addition to --task-path </task/path> you have to specify ZooKeeper
|
||||||
|
* configuration via --config-file <zookeeper.xml> parameter. Example of zookeeper.xml:
|
||||||
|
|
||||||
|
<yandex>
|
||||||
|
<zookeeper>
|
||||||
|
<node index="1">
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<port>2181</port>
|
||||||
|
</node>
|
||||||
|
</zookeeper>
|
||||||
|
</yandex>
|
||||||
|
|
||||||
|
* When you run clickhouse-cluster-copier --config-file <zookeeper.xml> --task-path </task/path>
|
||||||
|
* the process connects to ZooKeeper, reads tasks config from /task/path/description and executes them.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* = Format of task config =
|
||||||
|
|
||||||
|
<yandex>
|
||||||
|
<!-- Configuration of clusters as in an ordinary server config -->
|
||||||
|
<remote_servers>
|
||||||
|
<source_cluster>
|
||||||
|
<shard>
|
||||||
|
<internal_replication>false</internal_replication>
|
||||||
|
<replica>
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
</shard>
|
||||||
|
...
|
||||||
|
</source_cluster>
|
||||||
|
|
||||||
|
<destination_cluster>
|
||||||
|
...
|
||||||
|
</destination_cluster>
|
||||||
|
</remote_servers>
|
||||||
|
|
||||||
|
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
|
||||||
|
<max_workers>2</max_workers>
|
||||||
|
|
||||||
|
<!-- Setting used to fetch (pull) data from source cluster tables -->
|
||||||
|
<settings_pull>
|
||||||
|
<readonly>1</readonly>
|
||||||
|
</settings_pull>
|
||||||
|
|
||||||
|
<!-- Setting used to insert (push) data to destination cluster tables -->
|
||||||
|
<settings_push>
|
||||||
|
<readonly>0</readonly>
|
||||||
|
</settings_push>
|
||||||
|
|
||||||
|
<!-- Common setting for fetch (pull) and insert (push) operations.
|
||||||
|
They are overlaid by <settings_pull/> and <settings_push/> respectively -->
|
||||||
|
<settings>
|
||||||
|
<insert_distributed_sync>1</insert_distributed_sync>
|
||||||
|
</settings>
|
||||||
|
|
||||||
|
<!-- Copying tasks description.
|
||||||
|
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
|
||||||
|
sequentially.
|
||||||
|
-->
|
||||||
|
<tables>
|
||||||
|
<!-- Name of the table task, it must be an unique name suitable for ZooKeeper node name -->
|
||||||
|
<table_hits>
|
||||||
|
<-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
|
||||||
|
<cluster_pull>source_cluster</cluster_pull>
|
||||||
|
<database_pull>test</database_pull>
|
||||||
|
<table_pull>hits</table_pull>
|
||||||
|
|
||||||
|
<-- Destination cluster name and tables in which the data should be inserted -->
|
||||||
|
<cluster_push>destination_cluster</cluster_push>
|
||||||
|
<database_push>test</database_push>
|
||||||
|
<table_push>hits2</table_push>
|
||||||
|
|
||||||
|
<!-- Engine of destination tables.
|
||||||
|
If destination tables have not be created, workers create them using columns definition from source tables and engine
|
||||||
|
definition from here.
|
||||||
|
|
||||||
|
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
|
||||||
|
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
|
||||||
|
specify partitions that should be copied in <enabled_partitions/>.
|
||||||
|
|
||||||
|
NOTE: Currently partition key of source and destination tables should be the same.
|
||||||
|
-->
|
||||||
|
<engine>ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/hits2/{shard}/hits2', '{replica}', EventDate, (CounterID, EventDate), 8192)</engine>
|
||||||
|
|
||||||
|
<!-- Sharding key used to insert data to destination cluster -->
|
||||||
|
<sharding_key>intHash32(UserID)</sharding_key>
|
||||||
|
|
||||||
|
<!-- Optional expression that filter data while pull them from source servers -->
|
||||||
|
<where_condition>CounterID != 0</where_condition>
|
||||||
|
|
||||||
|
<!-- Optional section, it specifies partitions that should be copied, other partition will be ignored -->
|
||||||
|
<enabled_partitions>
|
||||||
|
<partition>201712</partition>
|
||||||
|
<partition>201801</partition>
|
||||||
|
...
|
||||||
|
</enabled_partitions>
|
||||||
|
</table_hits>
|
||||||
|
|
||||||
|
</table_visits>
|
||||||
|
...
|
||||||
|
</table_visits>
|
||||||
|
...
|
||||||
|
</tables>
|
||||||
|
</yandex>
|
||||||
|
|
||||||
|
|
||||||
|
* = Implementation details =
|
||||||
|
*
|
||||||
|
* cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
|
||||||
|
* Distributed table (to preform data resharding). So, worker job is a partition of a source shard.
|
||||||
|
* A job has three states: Active, Finished and Abandoned. Abandoned means that worker died and did not finish the job.
|
||||||
|
*
|
||||||
|
* If an error occurred during the copying (a worker failed or a worker did not finish the INSERT), then the whole partition (on
|
||||||
|
* all destination servers) should be dropped and refilled. So, copying entity is a partition of all destination shards.
|
||||||
|
* If a failure is detected a special /is_dirty node is created in ZooKeeper signalling that other workers copying the same partition
|
||||||
|
* should stop, after a refilling procedure should start.
|
||||||
|
*
|
||||||
|
* ZooKeeper task node has the following structure:
|
||||||
|
* /task/path_root - path passed in --task-path parameter
|
||||||
|
* /description - contains user-defined XML config of the task
|
||||||
|
* /task_active_workers - contains ephemeral nodes of all currently active workers, used to implement max_workers limitation
|
||||||
|
* /server_fqdn#PID_timestamp - cluster-copier worker ID
|
||||||
|
* ...
|
||||||
|
* /tables - directory with table tasks
|
||||||
|
* /table_hits - directory of table_hits task
|
||||||
|
* /partition1 - directory for partition1
|
||||||
|
* /shards - directory for source cluster shards
|
||||||
|
* /1 - worker job for the first shard of partition1 of table test.hits
|
||||||
|
* Contains info about current status (Active or Finished) and worker ID.
|
||||||
|
* /2
|
||||||
|
* ...
|
||||||
|
* /partition_active_workers
|
||||||
|
* /1 - for each job in /shards a corresponding ephemeral node created in /partition_active_workers
|
||||||
|
* It is used to detect Abandoned jobs (if there is Active node in /shards and there is no node in
|
||||||
|
* /partition_active_workers).
|
||||||
|
* Also, it is used to track active workers in the partition (when we need to refill the partition we do
|
||||||
|
* not DROP PARTITION while there are active workers)
|
||||||
|
* /2
|
||||||
|
* ...
|
||||||
|
* /is_dirty - the node is set if some worker detected that an error occurred (the INSERT is failed or an Abandoned node is
|
||||||
|
* detected). If the node appeared workers in this partition should stop and start cleaning and refilling
|
||||||
|
* partition procedure.
|
||||||
|
* During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition
|
||||||
|
* workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node.
|
||||||
|
* /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker.
|
||||||
|
* /test_visits
|
||||||
|
* ...
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class ClusterCopierApp : public Poco::Util::ServerApplication
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
void initialize(Poco::Util::Application & self) override;
|
||||||
|
|
||||||
|
void handleHelp(const std::string &, const std::string &);
|
||||||
|
|
||||||
|
void defineOptions(Poco::Util::OptionSet & options) override;
|
||||||
|
|
||||||
|
int main(const std::vector<std::string> &) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
void mainImpl();
|
||||||
|
|
||||||
|
void setupLogging();
|
||||||
|
|
||||||
|
std::string config_xml_path;
|
||||||
|
std::string task_path;
|
||||||
|
std::string log_level = "debug";
|
||||||
|
bool is_safe_mode = false;
|
||||||
|
double copy_fault_probability = 0;
|
||||||
|
bool is_help = false;
|
||||||
|
|
||||||
|
std::string base_dir;
|
||||||
|
std::string process_path;
|
||||||
|
std::string process_id;
|
||||||
|
std::string host_id;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
@ -463,6 +463,14 @@ static const char * minimal_default_user_xml =
|
|||||||
"</yandex>";
|
"</yandex>";
|
||||||
|
|
||||||
|
|
||||||
|
static ConfigurationPtr getConfigurationFromXMLString(const char * xml_data)
|
||||||
|
{
|
||||||
|
std::stringstream ss{std::string{xml_data}};
|
||||||
|
Poco::XML::InputSource input_source{ss};
|
||||||
|
return {new Poco::Util::XMLConfiguration{&input_source}};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void LocalServer::setupUsers()
|
void LocalServer::setupUsers()
|
||||||
{
|
{
|
||||||
ConfigurationPtr users_config;
|
ConfigurationPtr users_config;
|
||||||
@ -477,11 +485,7 @@ void LocalServer::setupUsers()
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
std::stringstream default_user_stream;
|
users_config = getConfigurationFromXMLString(minimal_default_user_xml);
|
||||||
default_user_stream << minimal_default_user_xml;
|
|
||||||
|
|
||||||
Poco::XML::InputSource default_user_source(default_user_stream);
|
|
||||||
users_config = ConfigurationPtr(new Poco::Util::XMLConfiguration(&default_user_source));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (users_config)
|
if (users_config)
|
||||||
|
2
dbms/src/Server/clickhouse-cluster-copier.cpp
Normal file
2
dbms/src/Server/clickhouse-cluster-copier.cpp
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
int mainEntryClickHouseClusterCopier(int argc, char ** argv);
|
||||||
|
int main(int argc_, char ** argv_) { return mainEntryClickHouseClusterCopier(argc_, argv_); }
|
@ -18,6 +18,7 @@ int mainEntryClickHousePerformanceTest(int argc, char ** argv);
|
|||||||
int mainEntryClickHouseExtractFromConfig(int argc, char ** argv);
|
int mainEntryClickHouseExtractFromConfig(int argc, char ** argv);
|
||||||
int mainEntryClickHouseCompressor(int argc, char ** argv);
|
int mainEntryClickHouseCompressor(int argc, char ** argv);
|
||||||
int mainEntryClickHouseFormat(int argc, char ** argv);
|
int mainEntryClickHouseFormat(int argc, char ** argv);
|
||||||
|
int mainEntryClickHouseClusterCopier(int argc, char ** argv);
|
||||||
|
|
||||||
#if USE_EMBEDDED_COMPILER
|
#if USE_EMBEDDED_COMPILER
|
||||||
int mainEntryClickHouseClang(int argc, char ** argv);
|
int mainEntryClickHouseClang(int argc, char ** argv);
|
||||||
@ -41,6 +42,7 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
|
|||||||
{"extract-from-config", mainEntryClickHouseExtractFromConfig},
|
{"extract-from-config", mainEntryClickHouseExtractFromConfig},
|
||||||
{"compressor", mainEntryClickHouseCompressor},
|
{"compressor", mainEntryClickHouseCompressor},
|
||||||
{"format", mainEntryClickHouseFormat},
|
{"format", mainEntryClickHouseFormat},
|
||||||
|
{"copier", mainEntryClickHouseClusterCopier},
|
||||||
#if USE_EMBEDDED_COMPILER
|
#if USE_EMBEDDED_COMPILER
|
||||||
{"clang", mainEntryClickHouseClang},
|
{"clang", mainEntryClickHouseClang},
|
||||||
{"lld", mainEntryClickHouseLLD},
|
{"lld", mainEntryClickHouseLLD},
|
||||||
|
@ -53,9 +53,10 @@ namespace ErrorCodes
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast,
|
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_,
|
||||||
const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_)
|
const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_)
|
||||||
: storage(storage), query_ast(query_ast), cluster(cluster_), insert_sync(insert_sync_), insert_timeout(insert_timeout_)
|
: storage(storage), query_ast(query_ast), cluster(cluster_), settings(settings_), insert_sync(insert_sync_),
|
||||||
|
insert_timeout(insert_timeout_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -392,9 +393,21 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz
|
|||||||
|
|
||||||
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
|
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
|
||||||
{
|
{
|
||||||
InterpreterInsertQuery interp{query_ast, storage.context};
|
std::unique_ptr<Context> local_context;
|
||||||
|
std::optional<InterpreterInsertQuery> interp;
|
||||||
|
|
||||||
auto block_io = interp.execute();
|
/// Async insert does not support settings forwarding yet whereas sync one supports
|
||||||
|
if (insert_sync)
|
||||||
|
interp.emplace(query_ast, storage.context);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// Overwrite global settings by user settings
|
||||||
|
local_context = std::make_unique<Context>(storage.context);
|
||||||
|
local_context->setSettings(settings);
|
||||||
|
interp.emplace(query_ast, *local_context);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto block_io = interp->execute();
|
||||||
block_io.out->writePrefix();
|
block_io.out->writePrefix();
|
||||||
|
|
||||||
for (size_t i = 0; i < repeats; ++i)
|
for (size_t i = 0; i < repeats; ++i)
|
||||||
@ -410,7 +423,7 @@ void DistributedBlockOutputStream::writeToShardSync(const Block & block, const s
|
|||||||
auto connection = pool->get();
|
auto connection = pool->get();
|
||||||
|
|
||||||
const auto & query_string = queryToString(query_ast);
|
const auto & query_string = queryToString(query_ast);
|
||||||
RemoteBlockOutputStream remote{*connection, query_string};
|
RemoteBlockOutputStream remote{*connection, query_string, &settings};
|
||||||
|
|
||||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
||||||
|
|
||||||
|
@ -32,7 +32,8 @@ class StorageDistributed;
|
|||||||
class DistributedBlockOutputStream : public IBlockOutputStream
|
class DistributedBlockOutputStream : public IBlockOutputStream
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_);
|
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_,
|
||||||
|
const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_);
|
||||||
|
|
||||||
void write(const Block & block) override;
|
void write(const Block & block) override;
|
||||||
|
|
||||||
@ -88,6 +89,7 @@ private:
|
|||||||
StorageDistributed & storage;
|
StorageDistributed & storage;
|
||||||
ASTPtr query_ast;
|
ASTPtr query_ast;
|
||||||
ClusterPtr cluster;
|
ClusterPtr cluster;
|
||||||
|
const Settings & settings;
|
||||||
bool insert_sync;
|
bool insert_sync;
|
||||||
UInt64 insert_timeout;
|
UInt64 insert_timeout;
|
||||||
size_t blocks_inserted = 0;
|
size_t blocks_inserted = 0;
|
||||||
|
@ -21,6 +21,7 @@ namespace ErrorCodes
|
|||||||
extern const int TYPE_MISMATCH;
|
extern const int TYPE_MISMATCH;
|
||||||
extern const int DUPLICATE_COLUMN;
|
extern const int DUPLICATE_COLUMN;
|
||||||
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
|
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
|
||||||
|
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -289,4 +290,15 @@ void ITableDeclaration::check(const Block & block, bool need_all) const
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ITableDeclaration::ITableDeclaration(const NamesAndTypesList & columns, const NamesAndTypesList & materialized_columns,
|
||||||
|
const NamesAndTypesList & alias_columns, const ColumnDefaults & column_defaults)
|
||||||
|
: columns{columns},
|
||||||
|
materialized_columns{materialized_columns},
|
||||||
|
alias_columns{alias_columns},
|
||||||
|
column_defaults{column_defaults}
|
||||||
|
{
|
||||||
|
if (columns.empty())
|
||||||
|
throw Exception("Empty list of columns passed to storage constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -84,20 +84,21 @@ public:
|
|||||||
|
|
||||||
ITableDeclaration() = default;
|
ITableDeclaration() = default;
|
||||||
ITableDeclaration(
|
ITableDeclaration(
|
||||||
|
const NamesAndTypesList & columns,
|
||||||
const NamesAndTypesList & materialized_columns,
|
const NamesAndTypesList & materialized_columns,
|
||||||
const NamesAndTypesList & alias_columns,
|
const NamesAndTypesList & alias_columns,
|
||||||
const ColumnDefaults & column_defaults)
|
const ColumnDefaults & column_defaults);
|
||||||
: materialized_columns{materialized_columns},
|
|
||||||
alias_columns{alias_columns},
|
|
||||||
column_defaults{column_defaults}
|
|
||||||
{}
|
|
||||||
|
|
||||||
|
NamesAndTypesList columns;
|
||||||
NamesAndTypesList materialized_columns{};
|
NamesAndTypesList materialized_columns{};
|
||||||
NamesAndTypesList alias_columns{};
|
NamesAndTypesList alias_columns{};
|
||||||
ColumnDefaults column_defaults{};
|
ColumnDefaults column_defaults{};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
virtual const NamesAndTypesList & getColumnsListImpl() const = 0;
|
virtual const NamesAndTypesList & getColumnsListImpl() const
|
||||||
|
{
|
||||||
|
return columns;
|
||||||
|
}
|
||||||
|
|
||||||
using ColumnsListRange = boost::range::joined_range<const NamesAndTypesList, const NamesAndTypesList>;
|
using ColumnsListRange = boost::range::joined_range<const NamesAndTypesList, const NamesAndTypesList>;
|
||||||
/// Returns a lazily joined range of table's ordinary and materialized columns, without unnecessary copying
|
/// Returns a lazily joined range of table's ordinary and materialized columns, without unnecessary copying
|
||||||
|
@ -93,7 +93,8 @@ MergeTreeData::MergeTreeData(
|
|||||||
bool require_part_metadata_,
|
bool require_part_metadata_,
|
||||||
bool attach,
|
bool attach,
|
||||||
BrokenPartCallback broken_part_callback_)
|
BrokenPartCallback broken_part_callback_)
|
||||||
: ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
|
: ITableDeclaration{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
|
context(context_),
|
||||||
sampling_expression(sampling_expression_),
|
sampling_expression(sampling_expression_),
|
||||||
index_granularity(settings_.index_granularity),
|
index_granularity(settings_.index_granularity),
|
||||||
merging_params(merging_params_),
|
merging_params(merging_params_),
|
||||||
@ -102,7 +103,7 @@ MergeTreeData::MergeTreeData(
|
|||||||
partition_expr_ast(partition_expr_ast_),
|
partition_expr_ast(partition_expr_ast_),
|
||||||
require_part_metadata(require_part_metadata_),
|
require_part_metadata(require_part_metadata_),
|
||||||
database_name(database_), table_name(table_),
|
database_name(database_), table_name(table_),
|
||||||
full_path(full_path_), columns(columns_),
|
full_path(full_path_),
|
||||||
broken_part_callback(broken_part_callback_),
|
broken_part_callback(broken_part_callback_),
|
||||||
log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")),
|
log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")),
|
||||||
data_parts_by_name(data_parts_indexes.get<TagByName>()),
|
data_parts_by_name(data_parts_indexes.get<TagByName>()),
|
||||||
|
@ -550,8 +550,6 @@ private:
|
|||||||
String table_name;
|
String table_name;
|
||||||
String full_path;
|
String full_path;
|
||||||
|
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
/// Current column sizes in compressed and uncompressed form.
|
/// Current column sizes in compressed and uncompressed form.
|
||||||
ColumnSizes column_sizes;
|
ColumnSizes column_sizes;
|
||||||
|
|
||||||
|
@ -56,8 +56,8 @@ StorageBuffer::StorageBuffer(const std::string & name_, const NamesAndTypesList
|
|||||||
Context & context_,
|
Context & context_,
|
||||||
size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
|
size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
|
||||||
const String & destination_database_, const String & destination_table_, bool allow_materialized_)
|
const String & destination_database_, const String & destination_table_, bool allow_materialized_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
name(name_), columns(columns_), context(context_),
|
name(name_), context(context_),
|
||||||
num_shards(num_shards_), buffers(num_shards_),
|
num_shards(num_shards_), buffers(num_shards_),
|
||||||
min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
|
min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
|
||||||
destination_database(destination_database_), destination_table(destination_table_),
|
destination_database(destination_database_), destination_table(destination_table_),
|
||||||
|
@ -53,8 +53,6 @@ public:
|
|||||||
std::string getName() const override { return "Buffer"; }
|
std::string getName() const override { return "Buffer"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -82,7 +80,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
String name;
|
String name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
Context & context;
|
Context & context;
|
||||||
|
|
||||||
|
@ -14,8 +14,6 @@ public:
|
|||||||
|
|
||||||
std::string getTableName() const override { return table_name; }
|
std::string getTableName() const override { return table_name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(const Names & column_names,
|
BlockInputStreams read(const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
const Context & context,
|
const Context & context,
|
||||||
@ -25,7 +23,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
String table_name;
|
String table_name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
String column_description_file_name;
|
String column_description_file_name;
|
||||||
String data_description_file_name;
|
String data_description_file_name;
|
||||||
Block sample_block;
|
Block sample_block;
|
||||||
|
@ -30,8 +30,8 @@ StorageDictionary::StorageDictionary(
|
|||||||
const ColumnDefaults & column_defaults_,
|
const ColumnDefaults & column_defaults_,
|
||||||
const DictionaryStructure & dictionary_structure_,
|
const DictionaryStructure & dictionary_structure_,
|
||||||
const String & dictionary_name_)
|
const String & dictionary_name_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
|
||||||
columns(columns_), dictionary_name(dictionary_name_),
|
dictionary_name(dictionary_name_),
|
||||||
logger(&Poco::Logger::get("StorageDictionary"))
|
logger(&Poco::Logger::get("StorageDictionary"))
|
||||||
{
|
{
|
||||||
checkNamesAndTypesCompatibleWithDictionary(dictionary_structure_);
|
checkNamesAndTypesCompatibleWithDictionary(dictionary_structure_);
|
||||||
|
@ -37,7 +37,6 @@ private:
|
|||||||
using Ptr = MultiVersion<IDictionaryBase>::Version;
|
using Ptr = MultiVersion<IDictionaryBase>::Version;
|
||||||
|
|
||||||
String table_name;
|
String table_name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
String dictionary_name;
|
String dictionary_name;
|
||||||
Poco::Logger * logger;
|
Poco::Logger * logger;
|
||||||
|
|
||||||
|
@ -50,6 +50,8 @@ namespace DB
|
|||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int STORAGE_REQUIRES_PARAMETER;
|
extern const int STORAGE_REQUIRES_PARAMETER;
|
||||||
|
extern const int BAD_ARGUMENTS;
|
||||||
|
extern const int READONLY;
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
extern const int INCORRECT_NUMBER_OF_COLUMNS;
|
extern const int INCORRECT_NUMBER_OF_COLUMNS;
|
||||||
}
|
}
|
||||||
@ -141,8 +143,8 @@ StorageDistributed::StorageDistributed(
|
|||||||
const Context & context_,
|
const Context & context_,
|
||||||
const ASTPtr & sharding_key_,
|
const ASTPtr & sharding_key_,
|
||||||
const String & data_path_)
|
const String & data_path_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
name(name_), columns(columns_),
|
name(name_),
|
||||||
remote_database(remote_database_), remote_table(remote_table_),
|
remote_database(remote_database_), remote_table(remote_table_),
|
||||||
context(context_), cluster_name(context.getMacros().expand(cluster_name_)), has_sharding_key(sharding_key_),
|
context(context_), cluster_name(context.getMacros().expand(cluster_name_)), has_sharding_key(sharding_key_),
|
||||||
sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, columns).getActions(false) : nullptr),
|
sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, columns).getActions(false) : nullptr),
|
||||||
@ -197,7 +199,7 @@ BlockInputStreams StorageDistributed::read(
|
|||||||
external_tables = context.getExternalTables();
|
external_tables = context.getExternalTables();
|
||||||
|
|
||||||
ClusterProxy::SelectStreamFactory select_stream_factory(
|
ClusterProxy::SelectStreamFactory select_stream_factory(
|
||||||
processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables);
|
processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables);
|
||||||
|
|
||||||
return ClusterProxy::executeQuery(
|
return ClusterProxy::executeQuery(
|
||||||
select_stream_factory, cluster, modified_query_ast, context, settings);
|
select_stream_factory, cluster, modified_query_ast, context, settings);
|
||||||
@ -206,25 +208,29 @@ BlockInputStreams StorageDistributed::read(
|
|||||||
|
|
||||||
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings)
|
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings)
|
||||||
{
|
{
|
||||||
auto cluster = owned_cluster ? owned_cluster : context.getCluster(cluster_name);
|
auto cluster = (owned_cluster) ? owned_cluster : context.getCluster(cluster_name);
|
||||||
|
|
||||||
/// TODO: !path.empty() can be replaced by !owned_cluster or !cluster_name.empty() ?
|
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
|
||||||
/// owned_cluster for remote table function use sync insertion => doesn't need a path.
|
if (path.empty() && !owned_cluster && !settings.insert_distributed_sync.value)
|
||||||
bool write_enabled = (!path.empty() || owned_cluster)
|
{
|
||||||
&& (((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) < 2) || has_sharding_key);
|
throw Exception("Storage " + getName() + " must has own data directory to enable asynchronous inserts",
|
||||||
|
ErrorCodes::BAD_ARGUMENTS);
|
||||||
|
}
|
||||||
|
|
||||||
if (!write_enabled)
|
/// If sharding key is not specified, then you can only write to a shard containing only one shard
|
||||||
throw Exception{
|
if (!has_sharding_key && ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2))
|
||||||
"Method write is not supported by storage " + getName() +
|
{
|
||||||
" with more than one shard and no sharding key provided",
|
throw Exception("Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided",
|
||||||
ErrorCodes::STORAGE_REQUIRES_PARAMETER};
|
ErrorCodes::STORAGE_REQUIRES_PARAMETER);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Force sync insertion if it is remote() table function
|
||||||
bool insert_sync = settings.insert_distributed_sync || owned_cluster;
|
bool insert_sync = settings.insert_distributed_sync || owned_cluster;
|
||||||
auto timeout = settings.insert_distributed_timeout;
|
auto timeout = settings.insert_distributed_timeout;
|
||||||
|
|
||||||
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
|
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
|
||||||
return std::make_shared<DistributedBlockOutputStream>(
|
return std::make_shared<DistributedBlockOutputStream>(
|
||||||
*this, rewriteInsertQuery(query, remote_database, remote_table), cluster, insert_sync, timeout);
|
*this, rewriteInsertQuery(query, remote_database, remote_table), cluster, settings, insert_sync, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -94,7 +94,6 @@ public:
|
|||||||
|
|
||||||
|
|
||||||
String name;
|
String name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
String remote_database;
|
String remote_database;
|
||||||
String remote_table;
|
String remote_table;
|
||||||
|
|
||||||
|
@ -31,6 +31,8 @@ namespace ErrorCodes
|
|||||||
extern const int DATABASE_ACCESS_DENIED;
|
extern const int DATABASE_ACCESS_DENIED;
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
extern const int UNKNOWN_IDENTIFIER;
|
extern const int UNKNOWN_IDENTIFIER;
|
||||||
|
extern const int INCORRECT_FILE_NAME;
|
||||||
|
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -57,8 +59,8 @@ StorageFile::StorageFile(
|
|||||||
const NamesAndTypesList & alias_columns_,
|
const NamesAndTypesList & alias_columns_,
|
||||||
const ColumnDefaults & column_defaults_,
|
const ColumnDefaults & column_defaults_,
|
||||||
Context & context_)
|
Context & context_)
|
||||||
: IStorage(materialized_columns_, alias_columns_, column_defaults_),
|
: IStorage(columns_, materialized_columns_, alias_columns_, column_defaults_),
|
||||||
table_name(table_name_), format_name(format_name_), columns(columns_), context_global(context_), table_fd(table_fd_)
|
table_name(table_name_), format_name(format_name_), context_global(context_), table_fd(table_fd_)
|
||||||
{
|
{
|
||||||
if (table_fd < 0) /// Will use file
|
if (table_fd < 0) /// Will use file
|
||||||
{
|
{
|
||||||
@ -72,6 +74,9 @@ StorageFile::StorageFile(
|
|||||||
}
|
}
|
||||||
else /// Is DB's file
|
else /// Is DB's file
|
||||||
{
|
{
|
||||||
|
if (db_dir_path.empty())
|
||||||
|
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
|
||||||
|
|
||||||
path = getTablePath(db_dir_path, table_name, format_name);
|
path = getTablePath(db_dir_path, table_name, format_name);
|
||||||
is_db_table = true;
|
is_db_table = true;
|
||||||
Poco::File(Poco::Path(path).parent()).createDirectories();
|
Poco::File(Poco::Path(path).parent()).createDirectories();
|
||||||
|
@ -77,7 +77,6 @@ private:
|
|||||||
|
|
||||||
std::string table_name;
|
std::string table_name;
|
||||||
std::string format_name;
|
std::string format_name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
Context & context_global;
|
Context & context_global;
|
||||||
|
|
||||||
std::string path;
|
std::string path;
|
||||||
|
@ -228,9 +228,9 @@ StorageKafka::StorageKafka(
|
|||||||
const ColumnDefaults & column_defaults_,
|
const ColumnDefaults & column_defaults_,
|
||||||
const String & brokers_, const String & group_, const Names & topics_,
|
const String & brokers_, const String & group_, const Names & topics_,
|
||||||
const String & format_name_, const String & schema_name_, size_t num_consumers_)
|
const String & format_name_, const String & schema_name_, size_t num_consumers_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
table_name(table_name_), database_name(database_name_), context(context_),
|
table_name(table_name_), database_name(database_name_), context(context_),
|
||||||
columns(columns_), topics(topics_), brokers(brokers_), group(group_), format_name(format_name_), schema_name(schema_name_),
|
topics(topics_), brokers(brokers_), group(group_), format_name(format_name_), schema_name(schema_name_),
|
||||||
num_consumers(num_consumers_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
|
num_consumers(num_consumers_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
|
||||||
semaphore(0, num_consumers_), mutex(), consumers(), event_update()
|
semaphore(0, num_consumers_), mutex(), consumers(), event_update()
|
||||||
{
|
{
|
||||||
|
@ -32,8 +32,6 @@ public:
|
|||||||
std::string getTableName() const override { return table_name; }
|
std::string getTableName() const override { return table_name; }
|
||||||
std::string getDatabaseName() const { return database_name; }
|
std::string getDatabaseName() const { return database_name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
void startup() override;
|
void startup() override;
|
||||||
void shutdown() override;
|
void shutdown() override;
|
||||||
|
|
||||||
@ -73,7 +71,6 @@ private:
|
|||||||
String table_name;
|
String table_name;
|
||||||
String database_name;
|
String database_name;
|
||||||
Context & context;
|
Context & context;
|
||||||
NamesAndTypesList columns;
|
|
||||||
Names topics;
|
Names topics;
|
||||||
const String brokers;
|
const String brokers;
|
||||||
const String group;
|
const String group;
|
||||||
|
@ -41,6 +41,7 @@ namespace ErrorCodes
|
|||||||
extern const int DUPLICATE_COLUMN;
|
extern const int DUPLICATE_COLUMN;
|
||||||
extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
|
extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
|
extern const int INCORRECT_FILE_NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -363,15 +364,15 @@ StorageLog::StorageLog(
|
|||||||
const NamesAndTypesList & alias_columns_,
|
const NamesAndTypesList & alias_columns_,
|
||||||
const ColumnDefaults & column_defaults_,
|
const ColumnDefaults & column_defaults_,
|
||||||
size_t max_compress_block_size_)
|
size_t max_compress_block_size_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
path(path_), name(name_), columns(columns_),
|
path(path_), name(name_),
|
||||||
max_compress_block_size(max_compress_block_size_),
|
max_compress_block_size(max_compress_block_size_),
|
||||||
file_checker(path + escapeForFileName(name) + '/' + "sizes.json")
|
file_checker(path + escapeForFileName(name) + '/' + "sizes.json")
|
||||||
{
|
{
|
||||||
if (columns.empty())
|
if (path.empty())
|
||||||
throw Exception("Empty list of columns passed to StorageLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
|
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
|
||||||
|
|
||||||
/// create files if they do not exist
|
/// create files if they do not exist
|
||||||
Poco::File(path + escapeForFileName(name) + '/').createDirectories();
|
Poco::File(path + escapeForFileName(name) + '/').createDirectories();
|
||||||
|
|
||||||
for (const auto & column : getColumnsList())
|
for (const auto & column : getColumnsList())
|
||||||
|
@ -26,8 +26,6 @@ public:
|
|||||||
std::string getName() const override { return "Log"; }
|
std::string getName() const override { return "Log"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -59,7 +57,6 @@ protected:
|
|||||||
private:
|
private:
|
||||||
String path;
|
String path;
|
||||||
String name;
|
String name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
mutable std::shared_mutex rwlock;
|
mutable std::shared_mutex rwlock;
|
||||||
|
|
||||||
|
@ -65,8 +65,8 @@ StorageMaterializedView::StorageMaterializedView(
|
|||||||
const NamesAndTypesList & alias_columns_,
|
const NamesAndTypesList & alias_columns_,
|
||||||
const ColumnDefaults & column_defaults_,
|
const ColumnDefaults & column_defaults_,
|
||||||
bool attach_)
|
bool attach_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
|
||||||
database_name(database_name_), global_context(local_context.getGlobalContext()), columns(columns_)
|
database_name(database_name_), global_context(local_context.getGlobalContext())
|
||||||
{
|
{
|
||||||
if (!query.select)
|
if (!query.select)
|
||||||
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
|
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
|
||||||
|
@ -51,7 +51,6 @@ private:
|
|||||||
String database_name;
|
String database_name;
|
||||||
ASTPtr inner_query;
|
ASTPtr inner_query;
|
||||||
Context & global_context;
|
Context & global_context;
|
||||||
NamesAndTypesList columns;
|
|
||||||
bool has_inner_table = false;
|
bool has_inner_table = false;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
@ -87,8 +87,8 @@ StorageMemory::StorageMemory(
|
|||||||
const NamesAndTypesList & materialized_columns_,
|
const NamesAndTypesList & materialized_columns_,
|
||||||
const NamesAndTypesList & alias_columns_,
|
const NamesAndTypesList & alias_columns_,
|
||||||
const ColumnDefaults & column_defaults_)
|
const ColumnDefaults & column_defaults_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
name(name_), columns(columns_)
|
name(name_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,8 +26,6 @@ public:
|
|||||||
std::string getName() const override { return "Memory"; }
|
std::string getName() const override { return "Memory"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
size_t getSize() const { return data.size(); }
|
size_t getSize() const { return data.size(); }
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
@ -45,7 +43,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
String name;
|
String name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
/// The data itself. `list` - so that when inserted to the end, the existing iterators are not invalidated.
|
/// The data itself. `list` - so that when inserted to the end, the existing iterators are not invalidated.
|
||||||
BlocksList data;
|
BlocksList data;
|
||||||
|
@ -42,8 +42,8 @@ StorageMerge::StorageMerge(
|
|||||||
const String & source_database_,
|
const String & source_database_,
|
||||||
const String & table_name_regexp_,
|
const String & table_name_regexp_,
|
||||||
const Context & context_)
|
const Context & context_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
name(name_), columns(columns_), source_database(source_database_),
|
name(name_), source_database(source_database_),
|
||||||
table_name_regexp(table_name_regexp_), context(context_)
|
table_name_regexp(table_name_regexp_), context(context_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
String name;
|
String name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
String source_database;
|
String source_database;
|
||||||
OptimizedRegularExpression table_name_regexp;
|
OptimizedRegularExpression table_name_regexp;
|
||||||
const Context & context;
|
const Context & context;
|
||||||
|
@ -27,6 +27,7 @@ namespace ErrorCodes
|
|||||||
extern const int ABORTED;
|
extern const int ABORTED;
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int INCORRECT_DATA;
|
extern const int INCORRECT_DATA;
|
||||||
|
extern const int INCORRECT_FILE_NAME;
|
||||||
extern const int CANNOT_ASSIGN_OPTIMIZE;
|
extern const int CANNOT_ASSIGN_OPTIMIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -48,7 +49,7 @@ StorageMergeTree::StorageMergeTree(
|
|||||||
const MergeTreeData::MergingParams & merging_params_,
|
const MergeTreeData::MergingParams & merging_params_,
|
||||||
const MergeTreeSettings & settings_,
|
const MergeTreeSettings & settings_,
|
||||||
bool has_force_restore_data_flag)
|
bool has_force_restore_data_flag)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
|
path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
|
||||||
context(context_), background_pool(context_.getBackgroundPool()),
|
context(context_), background_pool(context_.getBackgroundPool()),
|
||||||
data(database_name, table_name,
|
data(database_name, table_name,
|
||||||
@ -60,6 +61,9 @@ StorageMergeTree::StorageMergeTree(
|
|||||||
reader(data), writer(data), merger(data, context.getBackgroundPool()),
|
reader(data), writer(data), merger(data, context.getBackgroundPool()),
|
||||||
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
|
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
|
||||||
{
|
{
|
||||||
|
if (path_.empty())
|
||||||
|
throw Exception("MergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
|
||||||
|
|
||||||
data.loadDataParts(has_force_restore_data_flag);
|
data.loadDataParts(has_force_restore_data_flag);
|
||||||
|
|
||||||
if (!attach)
|
if (!attach)
|
||||||
|
@ -23,11 +23,11 @@ StorageMySQL::StorageMySQL(
|
|||||||
mysqlxx::Pool && pool,
|
mysqlxx::Pool && pool,
|
||||||
const std::string & remote_database_name,
|
const std::string & remote_database_name,
|
||||||
const std::string & remote_table_name,
|
const std::string & remote_table_name,
|
||||||
const NamesAndTypesList & columns)
|
const NamesAndTypesList & columns_)
|
||||||
: name(name)
|
: IStorage{columns_, {}, {}, {}}
|
||||||
|
, name(name)
|
||||||
, remote_database_name(remote_database_name)
|
, remote_database_name(remote_database_name)
|
||||||
, remote_table_name(remote_table_name)
|
, remote_table_name(remote_table_name)
|
||||||
, columns(columns)
|
|
||||||
, pool(std::move(pool))
|
, pool(std::move(pool))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@ -24,11 +24,10 @@ public:
|
|||||||
mysqlxx::Pool && pool,
|
mysqlxx::Pool && pool,
|
||||||
const std::string & remote_database_name,
|
const std::string & remote_database_name,
|
||||||
const std::string & remote_table_name,
|
const std::string & remote_table_name,
|
||||||
const NamesAndTypesList & columns);
|
const NamesAndTypesList & columns_);
|
||||||
|
|
||||||
std::string getName() const override { return "MySQL"; }
|
std::string getName() const override { return "MySQL"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
@ -44,7 +43,7 @@ private:
|
|||||||
std::string remote_database_name;
|
std::string remote_database_name;
|
||||||
std::string remote_table_name;
|
std::string remote_table_name;
|
||||||
|
|
||||||
NamesAndTypesList columns;
|
|
||||||
mysqlxx::Pool pool;
|
mysqlxx::Pool pool;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -20,8 +20,6 @@ public:
|
|||||||
std::string getName() const override { return "Null"; }
|
std::string getName() const override { return "Null"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names &,
|
const Names &,
|
||||||
const SelectQueryInfo &,
|
const SelectQueryInfo &,
|
||||||
@ -47,7 +45,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
String name;
|
String name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageNull(
|
StorageNull(
|
||||||
@ -56,7 +53,7 @@ protected:
|
|||||||
const NamesAndTypesList & materialized_columns_,
|
const NamesAndTypesList & materialized_columns_,
|
||||||
const NamesAndTypesList & alias_columns_,
|
const NamesAndTypesList & alias_columns_,
|
||||||
const ColumnDefaults & column_defaults_)
|
const ColumnDefaults & column_defaults_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, name(name_), columns(columns_) {}
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, name(name_) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -21,11 +21,11 @@ StorageODBC::StorageODBC(
|
|||||||
const std::string & connection_string,
|
const std::string & connection_string,
|
||||||
const std::string & remote_database_name,
|
const std::string & remote_database_name,
|
||||||
const std::string & remote_table_name,
|
const std::string & remote_table_name,
|
||||||
const NamesAndTypesList & columns)
|
const NamesAndTypesList & columns_)
|
||||||
: name(name)
|
: IStorage{columns_, {}, {}, {}}
|
||||||
|
, name(name)
|
||||||
, remote_database_name(remote_database_name)
|
, remote_database_name(remote_database_name)
|
||||||
, remote_table_name(remote_table_name)
|
, remote_table_name(remote_table_name)
|
||||||
, columns(columns)
|
|
||||||
{
|
{
|
||||||
pool = createAndCheckResizePocoSessionPool([&]
|
pool = createAndCheckResizePocoSessionPool([&]
|
||||||
{
|
{
|
||||||
|
@ -30,11 +30,10 @@ public:
|
|||||||
const std::string & connection_string,
|
const std::string & connection_string,
|
||||||
const std::string & remote_database_name,
|
const std::string & remote_database_name,
|
||||||
const std::string & remote_table_name,
|
const std::string & remote_table_name,
|
||||||
const NamesAndTypesList & columns);
|
const NamesAndTypesList & columns_);
|
||||||
|
|
||||||
std::string getName() const override { return "ODBC"; }
|
std::string getName() const override { return "ODBC"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
@ -46,12 +45,9 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
std::string name;
|
std::string name;
|
||||||
|
|
||||||
std::string remote_database_name;
|
std::string remote_database_name;
|
||||||
std::string remote_table_name;
|
std::string remote_table_name;
|
||||||
|
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
std::shared_ptr<Poco::Data::SessionPool> pool;
|
std::shared_ptr<Poco::Data::SessionPool> pool;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -95,6 +95,7 @@ namespace ErrorCodes
|
|||||||
extern const int TOO_MUCH_FETCHES;
|
extern const int TOO_MUCH_FETCHES;
|
||||||
extern const int BAD_DATA_PART_NAME;
|
extern const int BAD_DATA_PART_NAME;
|
||||||
extern const int PART_IS_TEMPORARILY_LOCKED;
|
extern const int PART_IS_TEMPORARILY_LOCKED;
|
||||||
|
extern const int INCORRECT_FILE_NAME;
|
||||||
extern const int CANNOT_ASSIGN_OPTIMIZE;
|
extern const int CANNOT_ASSIGN_OPTIMIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,7 +179,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
const MergeTreeData::MergingParams & merging_params_,
|
const MergeTreeData::MergingParams & merging_params_,
|
||||||
const MergeTreeSettings & settings_,
|
const MergeTreeSettings & settings_,
|
||||||
bool has_force_restore_data_flag)
|
bool has_force_restore_data_flag)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, context(context_),
|
||||||
current_zookeeper(context.getZooKeeper()), database_name(database_name_),
|
current_zookeeper(context.getZooKeeper()), database_name(database_name_),
|
||||||
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'),
|
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'),
|
||||||
zookeeper_path(context.getMacros().expand(zookeeper_path_)),
|
zookeeper_path(context.getMacros().expand(zookeeper_path_)),
|
||||||
@ -195,6 +196,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
shutdown_event(false), part_check_thread(*this),
|
shutdown_event(false), part_check_thread(*this),
|
||||||
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
|
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
|
||||||
{
|
{
|
||||||
|
if (path_.empty())
|
||||||
|
throw Exception("ReplicatedMergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
|
||||||
|
|
||||||
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
|
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
|
||||||
zookeeper_path.resize(zookeeper_path.size() - 1);
|
zookeeper_path.resize(zookeeper_path.size() - 1);
|
||||||
/// If zookeeper chroot prefix is used, path should starts with '/', because chroot concatenates without it.
|
/// If zookeeper chroot prefix is used, path should starts with '/', because chroot concatenates without it.
|
||||||
|
@ -21,6 +21,12 @@ namespace ErrorCodes
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int INCORRECT_FILE_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class SetOrJoinBlockOutputStream : public IBlockOutputStream
|
class SetOrJoinBlockOutputStream : public IBlockOutputStream
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -86,9 +92,13 @@ StorageSetOrJoinBase::StorageSetOrJoinBase(
|
|||||||
const NamesAndTypesList & materialized_columns_,
|
const NamesAndTypesList & materialized_columns_,
|
||||||
const NamesAndTypesList & alias_columns_,
|
const NamesAndTypesList & alias_columns_,
|
||||||
const ColumnDefaults & column_defaults_)
|
const ColumnDefaults & column_defaults_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
path(path_ + escapeForFileName(name_) + '/'), name(name_), columns(columns_)
|
name(name_)
|
||||||
{
|
{
|
||||||
|
if (path_.empty())
|
||||||
|
throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
|
||||||
|
|
||||||
|
path = path_ + escapeForFileName(name_) + '/';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -20,7 +20,6 @@ class StorageSetOrJoinBase : public IStorage
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
String getTableName() const override { return name; }
|
String getTableName() const override { return name; }
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
|
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
|
||||||
|
|
||||||
@ -37,7 +36,6 @@ protected:
|
|||||||
|
|
||||||
String path;
|
String path;
|
||||||
String name;
|
String name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
UInt64 increment = 0; /// For the backup file names.
|
UInt64 increment = 0; /// For the backup file names.
|
||||||
|
|
||||||
|
@ -39,6 +39,7 @@ namespace ErrorCodes
|
|||||||
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
|
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
|
||||||
extern const int CANNOT_CREATE_DIRECTORY;
|
extern const int CANNOT_CREATE_DIRECTORY;
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
|
extern const int INCORRECT_FILE_NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -183,14 +184,14 @@ StorageStripeLog::StorageStripeLog(
|
|||||||
const ColumnDefaults & column_defaults_,
|
const ColumnDefaults & column_defaults_,
|
||||||
bool attach,
|
bool attach,
|
||||||
size_t max_compress_block_size_)
|
size_t max_compress_block_size_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
path(path_), name(name_), columns(columns_),
|
path(path_), name(name_),
|
||||||
max_compress_block_size(max_compress_block_size_),
|
max_compress_block_size(max_compress_block_size_),
|
||||||
file_checker(path + escapeForFileName(name) + '/' + "sizes.json"),
|
file_checker(path + escapeForFileName(name) + '/' + "sizes.json"),
|
||||||
log(&Logger::get("StorageStripeLog"))
|
log(&Logger::get("StorageStripeLog"))
|
||||||
{
|
{
|
||||||
if (columns.empty())
|
if (path.empty())
|
||||||
throw Exception("Empty list of columns passed to StorageStripeLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
|
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
|
||||||
|
|
||||||
String full_path = path + escapeForFileName(name) + '/';
|
String full_path = path + escapeForFileName(name) + '/';
|
||||||
if (!attach)
|
if (!attach)
|
||||||
|
@ -28,8 +28,6 @@ public:
|
|||||||
std::string getName() const override { return "StripeLog"; }
|
std::string getName() const override { return "StripeLog"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -56,7 +54,6 @@ public:
|
|||||||
private:
|
private:
|
||||||
String path;
|
String path;
|
||||||
String name;
|
String name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
size_t max_compress_block_size;
|
size_t max_compress_block_size;
|
||||||
|
|
||||||
|
@ -46,6 +46,7 @@ namespace ErrorCodes
|
|||||||
extern const int CANNOT_READ_ALL_DATA;
|
extern const int CANNOT_READ_ALL_DATA;
|
||||||
extern const int DUPLICATE_COLUMN;
|
extern const int DUPLICATE_COLUMN;
|
||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
|
extern const int INCORRECT_FILE_NAME;
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,14 +285,14 @@ StorageTinyLog::StorageTinyLog(
|
|||||||
const ColumnDefaults & column_defaults_,
|
const ColumnDefaults & column_defaults_,
|
||||||
bool attach,
|
bool attach,
|
||||||
size_t max_compress_block_size_)
|
size_t max_compress_block_size_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_},
|
||||||
path(path_), name(name_), columns(columns_),
|
path(path_), name(name_),
|
||||||
max_compress_block_size(max_compress_block_size_),
|
max_compress_block_size(max_compress_block_size_),
|
||||||
file_checker(path + escapeForFileName(name) + '/' + "sizes.json"),
|
file_checker(path + escapeForFileName(name) + '/' + "sizes.json"),
|
||||||
log(&Logger::get("StorageTinyLog"))
|
log(&Logger::get("StorageTinyLog"))
|
||||||
{
|
{
|
||||||
if (columns.empty())
|
if (path.empty())
|
||||||
throw Exception("Empty list of columns passed to StorageTinyLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
|
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
|
||||||
|
|
||||||
String full_path = path + escapeForFileName(name) + '/';
|
String full_path = path + escapeForFileName(name) + '/';
|
||||||
if (!attach)
|
if (!attach)
|
||||||
|
@ -27,8 +27,6 @@ public:
|
|||||||
std::string getName() const override { return "TinyLog"; }
|
std::string getName() const override { return "TinyLog"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -55,7 +53,6 @@ public:
|
|||||||
private:
|
private:
|
||||||
String path;
|
String path;
|
||||||
String name;
|
String name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
size_t max_compress_block_size;
|
size_t max_compress_block_size;
|
||||||
|
|
||||||
|
@ -25,8 +25,8 @@ StorageView::StorageView(
|
|||||||
const NamesAndTypesList & materialized_columns_,
|
const NamesAndTypesList & materialized_columns_,
|
||||||
const NamesAndTypesList & alias_columns_,
|
const NamesAndTypesList & alias_columns_,
|
||||||
const ColumnDefaults & column_defaults_)
|
const ColumnDefaults & column_defaults_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
|
: IStorage{columns_, materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
|
||||||
database_name(database_name_), columns(columns_)
|
database_name(database_name_)
|
||||||
{
|
{
|
||||||
if (!query.select)
|
if (!query.select)
|
||||||
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
|
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
|
||||||
|
@ -16,7 +16,6 @@ class StorageView : public ext::shared_ptr_helper<StorageView>, public IStorage
|
|||||||
public:
|
public:
|
||||||
std::string getName() const override { return "View"; }
|
std::string getName() const override { return "View"; }
|
||||||
std::string getTableName() const override { return table_name; }
|
std::string getTableName() const override { return table_name; }
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
/// It is passed inside the query and solved at its level.
|
/// It is passed inside the query and solved at its level.
|
||||||
bool supportsSampling() const override { return true; }
|
bool supportsSampling() const override { return true; }
|
||||||
@ -36,7 +35,6 @@ private:
|
|||||||
String table_name;
|
String table_name;
|
||||||
String database_name;
|
String database_name;
|
||||||
ASTPtr inner_query;
|
ASTPtr inner_query;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageView(
|
StorageView(
|
||||||
|
@ -14,13 +14,12 @@ namespace DB
|
|||||||
|
|
||||||
StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_)
|
StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_)
|
||||||
: name(name_),
|
: name(name_),
|
||||||
columns
|
|
||||||
{
|
|
||||||
{"metric", std::make_shared<DataTypeString>()},
|
|
||||||
{"value", std::make_shared<DataTypeFloat64>()},
|
|
||||||
},
|
|
||||||
async_metrics(async_metrics_)
|
async_metrics(async_metrics_)
|
||||||
{
|
{
|
||||||
|
columns = NamesAndTypesList{
|
||||||
|
{"metric", std::make_shared<DataTypeString>()},
|
||||||
|
{"value", std::make_shared<DataTypeFloat64>()},
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -19,8 +19,6 @@ public:
|
|||||||
std::string getName() const override { return "SystemAsynchronousMetrics"; }
|
std::string getName() const override { return "SystemAsynchronousMetrics"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -31,7 +29,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
const AsynchronousMetrics & async_metrics;
|
const AsynchronousMetrics & async_metrics;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
@ -12,12 +12,11 @@ namespace DB
|
|||||||
|
|
||||||
StorageSystemBuildOptions::StorageSystemBuildOptions(const std::string & name_)
|
StorageSystemBuildOptions::StorageSystemBuildOptions(const std::string & name_)
|
||||||
: name(name_)
|
: name(name_)
|
||||||
, columns
|
{
|
||||||
{
|
columns = NamesAndTypesList{
|
||||||
{ "name", std::make_shared<DataTypeString>() },
|
{ "name", std::make_shared<DataTypeString>() },
|
||||||
{ "value", std::make_shared<DataTypeString>() },
|
{ "value", std::make_shared<DataTypeString>() },
|
||||||
}
|
};
|
||||||
{
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,8 +18,6 @@ public:
|
|||||||
std::string getName() const override { return "SystemBuildOptions"; }
|
std::string getName() const override { return "SystemBuildOptions"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -30,7 +28,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageSystemBuildOptions(const std::string & name_);
|
StorageSystemBuildOptions(const std::string & name_);
|
||||||
|
@ -13,7 +13,8 @@ namespace DB
|
|||||||
|
|
||||||
StorageSystemClusters::StorageSystemClusters(const std::string & name_)
|
StorageSystemClusters::StorageSystemClusters(const std::string & name_)
|
||||||
: name(name_)
|
: name(name_)
|
||||||
, columns{
|
{
|
||||||
|
columns = NamesAndTypesList{
|
||||||
{ "cluster", std::make_shared<DataTypeString>() },
|
{ "cluster", std::make_shared<DataTypeString>() },
|
||||||
{ "shard_num", std::make_shared<DataTypeUInt32>() },
|
{ "shard_num", std::make_shared<DataTypeUInt32>() },
|
||||||
{ "shard_weight", std::make_shared<DataTypeUInt32>() },
|
{ "shard_weight", std::make_shared<DataTypeUInt32>() },
|
||||||
@ -24,8 +25,7 @@ StorageSystemClusters::StorageSystemClusters(const std::string & name_)
|
|||||||
{ "is_local", std::make_shared<DataTypeUInt8>() },
|
{ "is_local", std::make_shared<DataTypeUInt8>() },
|
||||||
{ "user", std::make_shared<DataTypeString>() },
|
{ "user", std::make_shared<DataTypeString>() },
|
||||||
{ "default_database", std::make_shared<DataTypeString>() }
|
{ "default_database", std::make_shared<DataTypeString>() }
|
||||||
}
|
};
|
||||||
{
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,7 +18,6 @@ class StorageSystemClusters : public ext::shared_ptr_helper<StorageSystemCluster
|
|||||||
public:
|
public:
|
||||||
std::string getName() const override { return "SystemClusters"; }
|
std::string getName() const override { return "SystemClusters"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
@ -30,7 +29,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageSystemClusters(const std::string & name_);
|
StorageSystemClusters(const std::string & name_);
|
||||||
|
@ -17,7 +17,8 @@ namespace DB
|
|||||||
|
|
||||||
StorageSystemColumns::StorageSystemColumns(const std::string & name_)
|
StorageSystemColumns::StorageSystemColumns(const std::string & name_)
|
||||||
: name(name_)
|
: name(name_)
|
||||||
, columns{
|
{
|
||||||
|
columns = NamesAndTypesList{
|
||||||
{ "database", std::make_shared<DataTypeString>() },
|
{ "database", std::make_shared<DataTypeString>() },
|
||||||
{ "table", std::make_shared<DataTypeString>() },
|
{ "table", std::make_shared<DataTypeString>() },
|
||||||
{ "name", std::make_shared<DataTypeString>() },
|
{ "name", std::make_shared<DataTypeString>() },
|
||||||
@ -27,8 +28,7 @@ StorageSystemColumns::StorageSystemColumns(const std::string & name_)
|
|||||||
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
|
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
|
||||||
{ "data_uncompressed_bytes", std::make_shared<DataTypeUInt64>() },
|
{ "data_uncompressed_bytes", std::make_shared<DataTypeUInt64>() },
|
||||||
{ "marks_bytes", std::make_shared<DataTypeUInt64>() },
|
{ "marks_bytes", std::make_shared<DataTypeUInt64>() },
|
||||||
}
|
};
|
||||||
{
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@ class StorageSystemColumns : public ext::shared_ptr_helper<StorageSystemColumns>
|
|||||||
public:
|
public:
|
||||||
std::string getName() const override { return "SystemColumns"; }
|
std::string getName() const override { return "SystemColumns"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
@ -31,7 +30,6 @@ protected:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -11,13 +11,12 @@ namespace DB
|
|||||||
|
|
||||||
|
|
||||||
StorageSystemDatabases::StorageSystemDatabases(const std::string & name_)
|
StorageSystemDatabases::StorageSystemDatabases(const std::string & name_)
|
||||||
: name(name_),
|
: name(name_)
|
||||||
columns
|
|
||||||
{
|
|
||||||
{"name", std::make_shared<DataTypeString>()},
|
|
||||||
{"engine", std::make_shared<DataTypeString>()},
|
|
||||||
}
|
|
||||||
{
|
{
|
||||||
|
columns = NamesAndTypesList{
|
||||||
|
{"name", std::make_shared<DataTypeString>()},
|
||||||
|
{"engine", std::make_shared<DataTypeString>()},
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,8 +18,6 @@ public:
|
|||||||
std::string getName() const override { return "SystemDatabases"; }
|
std::string getName() const override { return "SystemDatabases"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -30,7 +28,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageSystemDatabases(const std::string & name_);
|
StorageSystemDatabases(const std::string & name_);
|
||||||
|
@ -19,24 +19,24 @@ namespace DB
|
|||||||
{
|
{
|
||||||
|
|
||||||
StorageSystemDictionaries::StorageSystemDictionaries(const std::string & name)
|
StorageSystemDictionaries::StorageSystemDictionaries(const std::string & name)
|
||||||
: name{name},
|
: name{name}
|
||||||
columns{
|
|
||||||
{ "name", std::make_shared<DataTypeString>() },
|
|
||||||
{ "origin", std::make_shared<DataTypeString>() },
|
|
||||||
{ "type", std::make_shared<DataTypeString>() },
|
|
||||||
{ "key", std::make_shared<DataTypeString>() },
|
|
||||||
{ "attribute.names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
|
|
||||||
{ "attribute.types", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
|
|
||||||
{ "bytes_allocated", std::make_shared<DataTypeUInt64>() },
|
|
||||||
{ "query_count", std::make_shared<DataTypeUInt64>() },
|
|
||||||
{ "hit_rate", std::make_shared<DataTypeFloat64>() },
|
|
||||||
{ "element_count", std::make_shared<DataTypeUInt64>() },
|
|
||||||
{ "load_factor", std::make_shared<DataTypeFloat64>() },
|
|
||||||
{ "creation_time", std::make_shared<DataTypeDateTime>() },
|
|
||||||
{ "source", std::make_shared<DataTypeString>() },
|
|
||||||
{ "last_exception", std::make_shared<DataTypeString>() }
|
|
||||||
}
|
|
||||||
{
|
{
|
||||||
|
columns = NamesAndTypesList{
|
||||||
|
{ "name", std::make_shared<DataTypeString>() },
|
||||||
|
{ "origin", std::make_shared<DataTypeString>() },
|
||||||
|
{ "type", std::make_shared<DataTypeString>() },
|
||||||
|
{ "key", std::make_shared<DataTypeString>() },
|
||||||
|
{ "attribute.names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
|
||||||
|
{ "attribute.types", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
|
||||||
|
{ "bytes_allocated", std::make_shared<DataTypeUInt64>() },
|
||||||
|
{ "query_count", std::make_shared<DataTypeUInt64>() },
|
||||||
|
{ "hit_rate", std::make_shared<DataTypeFloat64>() },
|
||||||
|
{ "element_count", std::make_shared<DataTypeUInt64>() },
|
||||||
|
{ "load_factor", std::make_shared<DataTypeFloat64>() },
|
||||||
|
{ "creation_time", std::make_shared<DataTypeDateTime>() },
|
||||||
|
{ "source", std::make_shared<DataTypeString>() },
|
||||||
|
{ "last_exception", std::make_shared<DataTypeString>() }
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -26,9 +26,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
const NamesAndTypesList columns;
|
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageSystemDictionaries(const std::string & name);
|
StorageSystemDictionaries(const std::string & name);
|
||||||
|
@ -11,13 +11,13 @@ namespace DB
|
|||||||
|
|
||||||
|
|
||||||
StorageSystemEvents::StorageSystemEvents(const std::string & name_)
|
StorageSystemEvents::StorageSystemEvents(const std::string & name_)
|
||||||
: name(name_),
|
: name(name_)
|
||||||
columns
|
{
|
||||||
|
columns = NamesAndTypesList
|
||||||
{
|
{
|
||||||
{"event", std::make_shared<DataTypeString>()},
|
{"event", std::make_shared<DataTypeString>()},
|
||||||
{"value", std::make_shared<DataTypeUInt64>()}
|
{"value", std::make_shared<DataTypeUInt64>()}
|
||||||
}
|
};
|
||||||
{
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,8 +18,6 @@ public:
|
|||||||
std::string getName() const override { return "SystemEvents"; }
|
std::string getName() const override { return "SystemEvents"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -30,7 +28,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageSystemEvents(const std::string & name_);
|
StorageSystemEvents(const std::string & name_);
|
||||||
|
@ -15,11 +15,11 @@ namespace DB
|
|||||||
|
|
||||||
StorageSystemFunctions::StorageSystemFunctions(const std::string & name_)
|
StorageSystemFunctions::StorageSystemFunctions(const std::string & name_)
|
||||||
: name(name_)
|
: name(name_)
|
||||||
, columns{
|
{
|
||||||
|
columns = NamesAndTypesList{
|
||||||
{ "name", std::make_shared<DataTypeString>() },
|
{ "name", std::make_shared<DataTypeString>() },
|
||||||
{ "is_aggregate", std::make_shared<DataTypeUInt8>() }
|
{ "is_aggregate", std::make_shared<DataTypeUInt8>() }
|
||||||
}
|
};
|
||||||
{
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,7 +18,6 @@ class StorageSystemFunctions : public ext::shared_ptr_helper<StorageSystemFuncti
|
|||||||
public:
|
public:
|
||||||
std::string getName() const override { return "SystemFunctions"; }
|
std::string getName() const override { return "SystemFunctions"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
@ -33,7 +32,6 @@ protected:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -124,16 +124,16 @@ static Strings getAllGraphiteSections(const AbstractConfiguration & config)
|
|||||||
|
|
||||||
StorageSystemGraphite::StorageSystemGraphite(const std::string & name_)
|
StorageSystemGraphite::StorageSystemGraphite(const std::string & name_)
|
||||||
: name(name_)
|
: name(name_)
|
||||||
, columns
|
{
|
||||||
{
|
columns = NamesAndTypesList{
|
||||||
{"config_name", std::make_shared<DataTypeString>()},
|
{"config_name", std::make_shared<DataTypeString>()},
|
||||||
{"regexp", std::make_shared<DataTypeString>()},
|
{"regexp", std::make_shared<DataTypeString>()},
|
||||||
{"function", std::make_shared<DataTypeString>()},
|
{"function", std::make_shared<DataTypeString>()},
|
||||||
{"age", std::make_shared<DataTypeUInt64>()},
|
{"age", std::make_shared<DataTypeUInt64>()},
|
||||||
{"precision", std::make_shared<DataTypeUInt64>()},
|
{"precision", std::make_shared<DataTypeUInt64>()},
|
||||||
{"priority", std::make_shared<DataTypeUInt16>()},
|
{"priority", std::make_shared<DataTypeUInt16>()},
|
||||||
{"is_default", std::make_shared<DataTypeUInt8>()}}
|
{"is_default", std::make_shared<DataTypeUInt8>()}
|
||||||
{
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -12,7 +12,6 @@ class StorageSystemGraphite : public ext::shared_ptr_helper<StorageSystemGraphit
|
|||||||
public:
|
public:
|
||||||
std::string getName() const override { return "SystemGraphite"; }
|
std::string getName() const override { return "SystemGraphite"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
@ -24,7 +23,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageSystemGraphite(const std::string & name_);
|
StorageSystemGraphite(const std::string & name_);
|
||||||
|
@ -13,7 +13,8 @@ namespace DB
|
|||||||
|
|
||||||
StorageSystemMerges::StorageSystemMerges(const std::string & name)
|
StorageSystemMerges::StorageSystemMerges(const std::string & name)
|
||||||
: name{name}
|
: name{name}
|
||||||
, columns{
|
{
|
||||||
|
columns = NamesAndTypesList{
|
||||||
{ "database", std::make_shared<DataTypeString>() },
|
{ "database", std::make_shared<DataTypeString>() },
|
||||||
{ "table", std::make_shared<DataTypeString>() },
|
{ "table", std::make_shared<DataTypeString>() },
|
||||||
{ "elapsed", std::make_shared<DataTypeFloat64>() },
|
{ "elapsed", std::make_shared<DataTypeFloat64>() },
|
||||||
@ -30,8 +31,7 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name)
|
|||||||
{ "columns_written", std::make_shared<DataTypeUInt64>() },
|
{ "columns_written", std::make_shared<DataTypeUInt64>() },
|
||||||
{ "memory_usage", std::make_shared<DataTypeUInt64>() },
|
{ "memory_usage", std::make_shared<DataTypeUInt64>() },
|
||||||
{ "thread_number", std::make_shared<DataTypeUInt64>() },
|
{ "thread_number", std::make_shared<DataTypeUInt64>() },
|
||||||
}
|
};
|
||||||
{
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@ public:
|
|||||||
std::string getName() const override { return "SystemMerges"; }
|
std::string getName() const override { return "SystemMerges"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -27,7 +26,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageSystemMerges(const std::string & name);
|
StorageSystemMerges(const std::string & name);
|
||||||
|
@ -12,13 +12,12 @@ namespace DB
|
|||||||
|
|
||||||
|
|
||||||
StorageSystemMetrics::StorageSystemMetrics(const std::string & name_)
|
StorageSystemMetrics::StorageSystemMetrics(const std::string & name_)
|
||||||
: name(name_),
|
: name(name_)
|
||||||
columns
|
{
|
||||||
{
|
columns = NamesAndTypesList{
|
||||||
{"metric", std::make_shared<DataTypeString>()},
|
{"metric", std::make_shared<DataTypeString>()},
|
||||||
{"value", std::make_shared<DataTypeInt64>()},
|
{"value", std::make_shared<DataTypeInt64>()},
|
||||||
}
|
};
|
||||||
{
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,8 +18,6 @@ public:
|
|||||||
std::string getName() const override { return "SystemMetrics"; }
|
std::string getName() const override { return "SystemMetrics"; }
|
||||||
std::string getTableName() const override { return name; }
|
std::string getTableName() const override { return name; }
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
BlockInputStreams read(
|
BlockInputStreams read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -30,7 +28,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
NamesAndTypesList columns;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageSystemMetrics(const std::string & name_);
|
StorageSystemMetrics(const std::string & name_);
|
||||||
|
@ -12,15 +12,15 @@ namespace DB
|
|||||||
{
|
{
|
||||||
|
|
||||||
StorageSystemModels::StorageSystemModels(const std::string & name)
|
StorageSystemModels::StorageSystemModels(const std::string & name)
|
||||||
: name{name},
|
: name{name}
|
||||||
columns{
|
{
|
||||||
|
columns = NamesAndTypesList{
|
||||||
{ "name", std::make_shared<DataTypeString>() },
|
{ "name", std::make_shared<DataTypeString>() },
|
||||||
{ "origin", std::make_shared<DataTypeString>() },
|
{ "origin", std::make_shared<DataTypeString>() },
|
||||||
{ "type", std::make_shared<DataTypeString>() },
|
{ "type", std::make_shared<DataTypeString>() },
|
||||||
{ "creation_time", std::make_shared<DataTypeDateTime>() },
|
{ "creation_time", std::make_shared<DataTypeDateTime>() },
|
||||||
{ "last_exception", std::make_shared<DataTypeString>() }
|
{ "last_exception", std::make_shared<DataTypeString>() }
|
||||||
}
|
};
|
||||||
{
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -26,9 +26,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string name;
|
const std::string name;
|
||||||
const NamesAndTypesList columns;
|
|
||||||
|
|
||||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageSystemModels(const std::string & name);
|
StorageSystemModels(const std::string & name);
|
||||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user