Merge branch 'CLICKHOUSE-3346-v310' into CLICKHOUSE-3346

This commit is contained in:
Vitaliy Lyudvichenko 2018-01-11 23:51:30 +03:00
commit 388d47bbbc
54 changed files with 2286 additions and 93 deletions

View File

@ -4,6 +4,7 @@
#include <Poco/Net/DNS.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/isLocalAddress.h>
#include <Common/ProfileEvents.h>
#include <Interpreters/Settings.h>
@ -39,14 +40,7 @@ ConnectionPoolWithFailover::ConnectionPoolWithFailover(
for (size_t i = 0; i < nested_pools.size(); ++i)
{
ConnectionPool & connection_pool = dynamic_cast<ConnectionPool &>(*nested_pools[i]);
const std::string & host = connection_pool.getHost();
size_t hostname_difference = 0;
for (size_t i = 0; i < std::min(local_hostname.length(), host.length()); ++i)
if (local_hostname[i] != host[i])
++hostname_difference;
hostname_differences[i] = hostname_difference;
hostname_differences[i] = getHostNameDifference(local_hostname, connection_pool.getHost());
}
}

View File

@ -131,11 +131,12 @@ private:
struct OpResult : public zoo_op_result_t
{
/// Указатели в этой структуре указывают на поля в классе Op.
/// Поэтому деструктор не нужен
/// Pointers in this class point to fields of class Op.
/// Op instances have the same (or longer lifetime), therefore destructor is not required.
};
using Ops = std::vector<std::unique_ptr<Op>>;
using OpPtr = std::unique_ptr<Op>;
using Ops = std::vector<OpPtr>;
using OpResults = std::vector<OpResult>;
using OpResultsPtr = std::shared_ptr<OpResults>;
using Strings = std::vector<std::string>;

View File

@ -28,6 +28,15 @@ namespace CurrentMetrics
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace zkutil
{
@ -1028,4 +1037,19 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMulti(const zkutil::Ops & ops)
return asyncMultiImpl(ops, true);
}
size_t getFailedOpIndex(const OpResultsPtr & op_results)
{
if (!op_results)
throw DB::Exception("OpResults is nullptr", DB::ErrorCodes::LOGICAL_ERROR);
for (size_t index = 0; index < op_results->size(); ++index)
{
if ((*op_results)[index].err != ZOK)
return index;
}
throw DB::Exception("There is no failed OpResult", DB::ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -429,6 +429,11 @@ private:
bool is_dirty = false;
};
/// Returns first op which op_result != ZOK or throws an exception
size_t getFailedOpIndex(const OpResultsPtr & op_results);
using ZooKeeperPtr = ZooKeeper::Ptr;

View File

@ -22,4 +22,5 @@ const std::string & getFQDNOrHostName()
{
static std::string result = getFQDNOrHostNameImpl();
return result;
}

View File

@ -2,6 +2,7 @@
#include <string>
/** Get the FQDN for the local server by resolving DNS hostname - similar to calling the 'hostname' tool with the -f flag.
* If it does not work, return hostname - similar to calling 'hostname' without flags or 'uname -n'.
*/

View File

@ -10,25 +10,35 @@
namespace DB
{
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port)
bool isLocalAddress(const Poco::Net::SocketAddress & address)
{
static auto interfaces = Poco::Net::NetworkInterface::list();
if (clickhouse_port == address.port())
{
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&] (const Poco::Net::NetworkInterface & interface)
{
/** Compare the addresses without taking into account `scope`.
* Theoretically, this may not be correct - depends on `route` setting
* - through which interface we will actually access the specified address.
*/
return interface.address().length() == address.host().length()
&& 0 == memcmp(interface.address().addr(), address.host().addr(), address.host().length());
});
}
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&] (const Poco::Net::NetworkInterface & interface)
{
/** Compare the addresses without taking into account `scope`.
* Theoretically, this may not be correct - depends on `route` setting
* - through which interface we will actually access the specified address.
*/
return interface.address().length() == address.host().length()
&& 0 == memcmp(interface.address().addr(), address.host().addr(), address.host().length());
});
}
return false;
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port)
{
return clickhouse_port == address.port() && isLocalAddress(address);
}
size_t getHostNameDifference(const std::string & local_hostname, const std::string & host)
{
size_t hostname_difference = 0;
for (size_t i = 0; i < std::min(local_hostname.length(), host.length()); ++i)
if (local_hostname[i] != host[i])
++hostname_difference;
return hostname_difference;
}
}

View File

@ -23,4 +23,8 @@ namespace DB
*/
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port);
bool isLocalAddress(const Poco::Net::SocketAddress & address);
/// Returns number of different bytes in hostnames, used for load balancing
size_t getHostNameDifference(const std::string & local_hostname, const std::string & host);
}

View File

@ -4,6 +4,7 @@
#include <vector>
#include <memory>
#include <boost/noncopyable.hpp>
#include "IBlockInputStream.h"
namespace DB

View File

@ -4,7 +4,8 @@
namespace DB
{
SquashingBlockInputStream::SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes)
SquashingBlockInputStream::SquashingBlockInputStream(const BlockInputStreamPtr & src,
size_t min_block_size_rows, size_t min_block_size_bytes)
: transform(min_block_size_rows, min_block_size_bytes)
{
children.emplace_back(src);

View File

@ -12,7 +12,7 @@ namespace DB
class SquashingBlockInputStream : public IProfilingBlockInputStream
{
public:
SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes);
SquashingBlockInputStream(const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "Squashing"; }

View File

@ -16,20 +16,21 @@ bool isAtomicSet(std::atomic<bool> * val)
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
template <typename Pred>
void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, Pred && is_cancelled)
{
from.readPrefix();
to.writePrefix();
while (Block block = from.read())
{
if (isAtomicSet(is_cancelled))
if (is_cancelled())
break;
to.write(block);
}
if (isAtomicSet(is_cancelled))
if (is_cancelled())
return;
/// For outputting additional information in some formats.
@ -42,11 +43,28 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<boo
to.setExtremes(input->getExtremes());
}
if (isAtomicSet(is_cancelled))
if (is_cancelled())
return;
from.readSuffix();
to.writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
{
copyDataImpl(from, to, is_cancelled);
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <atomic>
#include <functional>
namespace DB
@ -14,4 +15,6 @@ class IBlockOutputStream;
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
}

View File

@ -173,4 +173,9 @@ void DatabaseDictionary::drop()
/// Additional actions to delete database are not required.
}
String DatabaseDictionary::getDataPath(const Context &) const
{
return {};
}
}

View File

@ -93,6 +93,8 @@ public:
const Context & context,
const String & table_name) const override;
String getDataPath(const Context & context) const override;
void shutdown() override;
void drop() override;
};

View File

@ -15,11 +15,11 @@ namespace ErrorCodes
DatabasePtr DatabaseFactory::get(
const String & engine_name,
const String & database_name,
const String & path,
const String & metadata_path,
Context & context)
{
if (engine_name == "Ordinary")
return std::make_shared<DatabaseOrdinary>(database_name, path);
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name);
else if (engine_name == "Dictionary")

View File

@ -13,7 +13,7 @@ public:
static DatabasePtr get(
const String & engine_name,
const String & database_name,
const String & path,
const String & metadata_path,
Context & context);
};

View File

@ -152,4 +152,9 @@ void DatabaseMemory::drop()
/// Additional actions to delete database are not required.
}
String DatabaseMemory::getDataPath(const Context &) const
{
return {};
}
}

View File

@ -84,6 +84,8 @@ public:
const Context & context,
const String & table_name) const override;
String getDataPath(const Context & context) const override;
void shutdown() override;
void drop() override;
};

View File

@ -90,10 +90,11 @@ static void loadTable(
}
DatabaseOrdinary::DatabaseOrdinary(
const String & name_, const String & path_)
: DatabaseMemory(name_), path(path_)
DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & metadata_path, const Context & context)
: DatabaseMemory(name_), metadata_path(metadata_path)
{
data_path = context.getPath() + "data/" + escapeForFileName(name) + "/";
Poco::File(data_path).createDirectory();
}
@ -108,7 +109,7 @@ void DatabaseOrdinary::loadTables(
FileNames file_names;
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(path); dir_it != dir_end; ++dir_it)
for (Poco::DirectoryIterator dir_it(metadata_path); dir_it != dir_end; ++dir_it)
{
/// For '.svn', '.gitignore' directory and similar.
if (dir_it.name().at(0) == '.')
@ -130,7 +131,7 @@ void DatabaseOrdinary::loadTables(
if (endsWith(dir_it.name(), ".sql"))
file_names.push_back(dir_it.name());
else
throw Exception("Incorrect file extension: " + dir_it.name() + " in metadata directory " + path,
throw Exception("Incorrect file extension: " + dir_it.name() + " in metadata directory " + metadata_path,
ErrorCodes::INCORRECT_FILE_NAME);
}
@ -162,7 +163,7 @@ void DatabaseOrdinary::loadTables(
watch.restart();
}
loadTable(context, path, *this, name, data_path, table, has_force_restore_data_flag);
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
}
};
@ -269,7 +270,7 @@ void DatabaseOrdinary::createTable(
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
String table_metadata_path = getTableMetadataPath(path, table_name);
String table_metadata_path = getTableMetadataPath(metadata_path, table_name);
String table_metadata_tmp_path = table_metadata_path + ".tmp";
String statement;
@ -312,7 +313,7 @@ void DatabaseOrdinary::removeTable(
{
StoragePtr res = detachTable(table_name);
String table_metadata_path = getTableMetadataPath(path, table_name);
String table_metadata_path = getTableMetadataPath(metadata_path, table_name);
try
{
@ -374,7 +375,7 @@ void DatabaseOrdinary::renameTable(
throw Exception{e};
}
ASTPtr ast = getCreateQueryImpl(path, table_name);
ASTPtr ast = getCreateQueryImpl(metadata_path, table_name);
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ast_create_query.table = to_table_name;
@ -388,7 +389,7 @@ time_t DatabaseOrdinary::getTableMetadataModificationTime(
const Context & /*context*/,
const String & table_name)
{
String table_metadata_path = getTableMetadataPath(path, table_name);
String table_metadata_path = getTableMetadataPath(metadata_path, table_name);
Poco::File meta_file(table_metadata_path);
if (meta_file.exists())
@ -406,7 +407,7 @@ ASTPtr DatabaseOrdinary::getCreateQuery(
const Context & /*context*/,
const String & table_name) const
{
ASTPtr ast = getCreateQueryImpl(path, table_name);
ASTPtr ast = getCreateQueryImpl(metadata_path, table_name);
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ast_create_query.attach = false;
@ -455,8 +456,8 @@ void DatabaseOrdinary::alterTable(
/// Read the definition of the table and replace the necessary parts with new ones.
String table_name_escaped = escapeForFileName(name);
String table_metadata_tmp_path = path + "/" + table_name_escaped + ".sql.tmp";
String table_metadata_path = path + "/" + table_name_escaped + ".sql";
String table_metadata_tmp_path = metadata_path + "/" + table_name_escaped + ".sql.tmp";
String table_metadata_path = metadata_path + "/" + table_name_escaped + ".sql";
String statement;
{
@ -499,4 +500,9 @@ void DatabaseOrdinary::alterTable(
}
}
String DatabaseOrdinary::getDataPath(const Context &) const
{
return data_path;
}
}

View File

@ -13,10 +13,11 @@ namespace DB
class DatabaseOrdinary : public DatabaseMemory
{
protected:
const String path;
const String metadata_path;
String data_path;
public:
DatabaseOrdinary(const String & name_, const String & path_);
DatabaseOrdinary(const String & name_, const String & metadata_path, const Context & context);
String getEngineName() const override { return "Ordinary"; }
@ -58,6 +59,8 @@ public:
const Context & context,
const String & table_name) const override;
String getDataPath(const Context & context) const override;
void shutdown() override;
void drop() override;

View File

@ -129,6 +129,9 @@ public:
const Context & context,
const String & name) const = 0;
/// Returns path for persistent data storage if the database supports it, empty string otherwise
virtual String getDataPath(const Context & context) const = 0;
/// Ask all tables to complete the background threads they are using and delete all table objects.
virtual void shutdown() = 0;

View File

@ -0,0 +1,24 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
namespace DB
{
/// Doesn't do anything, just resets working buffer
/// Assume that we almost don't write to it
class WriteBufferNull : public BufferWithOwnMemory<WriteBuffer>
{
public:
explicit WriteBufferNull(size_t dummy_buffer_size = 32) /// DBMS_DEFAULT_BUFFER_SIZE is too much
: BufferWithOwnMemory<WriteBuffer>(dummy_buffer_size) {}
void nextImpl() override
{
set(internal_buffer.begin(), internal_buffer.size());
}
};
}

View File

@ -139,6 +139,13 @@ ClusterPtr Clusters::getCluster(const std::string & cluster_name) const
}
void Clusters::setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster)
{
std::lock_guard<std::mutex> lock(mutex);
impl[cluster_name] = cluster;
}
void Clusters::updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
@ -168,6 +175,7 @@ Clusters::Impl Clusters::getContainer() const
return impl;
}
/// Implementation of `Cluster` class
Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name)

View File

@ -97,6 +97,7 @@ public:
UInt32 shard_num;
UInt32 weight;
Addresses local_addresses;
/// nullptr if there are no remote addresses
ConnectionPoolWithFailoverPtr pool;
bool has_internal_replication;
};
@ -168,8 +169,9 @@ public:
Clusters & operator=(const Clusters &) = delete;
ClusterPtr getCluster(const std::string & cluster_name) const;
void setCluster(const String & cluster_name, const ClusterPtr & cluster);
void updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");
void updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name);
public:
using Impl = std::map<String, ClusterPtr>;

View File

@ -1364,13 +1364,27 @@ Clusters & Context::getClusters() const
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
void Context::setClustersConfig(const ConfigurationPtr & config)
void Context::setClustersConfig(const ConfigurationPtr & config, const String & config_name)
{
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
shared->clusters_config = config;
if (shared->clusters)
shared->clusters->updateClusters(*shared->clusters_config, settings);
if (!shared->clusters)
shared->clusters = std::make_unique<Clusters>(*shared->clusters_config, settings, config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name);
}
void Context::setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster)
{
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
if (!shared->clusters)
throw Exception("Clusters are not set", ErrorCodes::LOGICAL_ERROR);
shared->clusters->setCluster(cluster_name, cluster);
}

View File

@ -318,7 +318,9 @@ public:
Clusters & getClusters() const;
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
void setClustersConfig(const ConfigurationPtr & config);
void setClustersConfig(const ConfigurationPtr & config, const String & config_name = "remote_servers");
/// Sets custom cluster, but doesn't update configuration
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
Compiler & getCompiler();
QueryLog & getQueryLog();

View File

@ -7,11 +7,13 @@ namespace DB
{
class Context;
class Cluster;
class InterpreterCheckQuery : public IInterpreter
{
public:
InterpreterCheckQuery(const ASTPtr & query_ptr_, const Context & context_);
BlockIO execute() override;
private:
@ -19,6 +21,7 @@ private:
private:
ASTPtr query_ptr;
const Context & context;
Block result;
};

View File

@ -103,13 +103,10 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
String database_name_escaped = escapeForFileName(database_name);
/// Create directories for tables data and metadata.
/// Create directories for tables metadata.
String path = context.getPath();
String data_path = path + "data/" + database_name_escaped + "/";
String metadata_path = path + "metadata/" + database_name_escaped + "/";
Poco::File(metadata_path).createDirectory();
Poco::File(data_path).createDirectory();
DatabasePtr database = DatabaseFactory::get(database_engine_name, database_name, metadata_path, context);
@ -458,13 +455,9 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
String current_database = context.getCurrentDatabase();
String database_name = create.database.empty() ? current_database : create.database;
String database_name_escaped = escapeForFileName(database_name);
String table_name = create.table;
String table_name_escaped = escapeForFileName(table_name);
String data_path = path + "data/" + database_name_escaped + "/";
String metadata_path = path + "metadata/" + database_name_escaped + "/" + table_name_escaped + ".sql";
// If this is a stub ATTACH query, read the query definition from the database
if (create.attach && !create.storage && !create.columns)
{
@ -511,9 +504,13 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
std::unique_ptr<DDLGuard> guard;
String data_path;
DatabasePtr database;
if (!create.is_temporary)
{
context.assertDatabaseExists(database_name);
database = context.getDatabase(database_name);
data_path = database->getDataPath(context);
/** If the table already exists, and the request specifies IF NOT EXISTS,
* then we allow concurrent CREATE queries (which do nothing).
@ -548,7 +545,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (create.is_temporary)
context.getSessionContext().addExternalTable(table_name, res);
else
context.getDatabase(database_name)->createTable(context, table_name, res, query_ptr);
database->createTable(context, table_name, res, query_ptr);
}
res->startup();

View File

@ -135,7 +135,7 @@ void loadMetadataSystem(Context & context)
Poco::File(global_path + "data/" SYSTEM_DATABASE).createDirectories();
Poco::File(global_path + "metadata/" SYSTEM_DATABASE).createDirectories();
auto system_database = std::make_shared<DatabaseOrdinary>(SYSTEM_DATABASE, global_path + "metadata/" SYSTEM_DATABASE);
auto system_database = std::make_shared<DatabaseOrdinary>(SYSTEM_DATABASE, global_path + "metadata/" SYSTEM_DATABASE, context);
context.addDatabase(SYSTEM_DATABASE, system_database);
}

View File

@ -82,7 +82,7 @@ bool ParserList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
auto list = std::make_shared<ASTExpressionList>();
node = list;
while (1)
while (true)
{
if (first)
{

View File

@ -317,6 +317,21 @@ ASTPtr parseQuery(
}
ASTPtr parseQuery(
IParser & parser,
const std::string & query,
const std::string & query_description)
{
return parseQuery(parser, query.data(), query.data() + query.size(), query_description);
}
ASTPtr parseQuery(IParser & parser, const std::string & query)
{
return parseQuery(parser, query.data(), query.data() + query.size(), parser.getName());
}
std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, std::vector<std::string> & queries_list)
{
ASTPtr ast;
@ -357,4 +372,5 @@ std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, s
return std::make_pair(begin, pos == end);
}
}

View File

@ -32,6 +32,15 @@ ASTPtr parseQuery(
const char * end,
const std::string & description);
ASTPtr parseQuery(
IParser & parser,
const std::string & query,
const std::string & query_description);
ASTPtr parseQuery(
IParser & parser,
const std::string & query);
/** Split queries separated by ; on to list of single queries
* Returns pointer to the end of last sucessfuly parsed query (first), and true if all queries are sucessfuly parsed (second)

View File

@ -43,6 +43,9 @@ target_link_libraries (clickhouse-compressor-lib clickhouse_common_io ${Boost_PR
add_library (clickhouse-format-lib ${SPLIT_SHARED} Format.cpp)
target_link_libraries (clickhouse-format-lib clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
add_library (clickhouse-cluster-copier-lib ClusterCopier.cpp)
target_link_libraries (clickhouse-cluster-copier-lib dbms ${Boost_PROGRAM_OPTIONS_LIBRARY})
if (USE_EMBEDDED_COMPILER)
link_directories (${LLVM_LIBRARY_DIRS})
add_subdirectory ("Compiler-${LLVM_VERSION}")
@ -99,6 +102,7 @@ else ()
clickhouse-extract-from-config-lib
clickhouse-compressor-lib
clickhouse-format-lib
clickhouse-cluster-copier-lib
dbms
)
@ -110,6 +114,7 @@ else ()
add_custom_target (clickhouse-extract-from-config ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-extract-from-config DEPENDS clickhouse)
add_custom_target (clickhouse-compressor ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-compressor DEPENDS clickhouse)
add_custom_target (clickhouse-format ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-format DEPENDS clickhouse)
add_custom_target (clickhouse-cluster-copier ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-cluster-copier DEPENDS clickhouse)
# install always because depian package want this files:
add_custom_target (clickhouse-clang ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-clang DEPENDS clickhouse)
add_custom_target (clickhouse-lld ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-lld DEPENDS clickhouse)
@ -124,6 +129,7 @@ else ()
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-extract-from-config
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-compressor
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-format
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-cluster-copier
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-clang
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-lld
DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1 @@
#pragma once

View File

@ -66,14 +66,14 @@ void LocalServer::defineOptions(Poco::Util::OptionSet& _options)
/// Arguments that define first query creating initial table:
/// (If structure argument is omitted then initial query is not generated)
_options.addOption(
Poco::Util::Option("structure", "S", "Structe of initial table(list columns names with their types)")
Poco::Util::Option("structure", "S", "Structure of initial table(list columns names with their types)")
.required(false)
.repeatable(false)
.argument("[name Type]")
.binding("table-structure"));
_options.addOption(
Poco::Util::Option("table", "N", "Name of intial table")
Poco::Util::Option("table", "N", "Name of initial table")
.required(false)
.repeatable(false)
.argument("[table]")
@ -87,7 +87,7 @@ void LocalServer::defineOptions(Poco::Util::OptionSet& _options)
.binding("table-file"));
_options.addOption(
Poco::Util::Option("input-format", "if", "Input format of intial table data")
Poco::Util::Option("input-format", "if", "Input format of initial table data")
.required(false)
.repeatable(false)
.argument("[TSV]")
@ -447,6 +447,19 @@ static const char * minimal_default_user_xml =
"</yandex>";
template <typename T>
static ConfigurationPtr getConfigurationFromXMLString(T && xml_string)
{
std::stringstream ss;
ss << std::forward<T>(xml_string);
Poco::XML::InputSource input_source(ss);
ConfigurationPtr res{new Poco::Util::XMLConfiguration(&input_source)};
return res;
}
void LocalServer::setupUsers()
{
ConfigurationPtr users_config;
@ -461,11 +474,7 @@ void LocalServer::setupUsers()
}
else
{
std::stringstream default_user_stream;
default_user_stream << minimal_default_user_xml;
Poco::XML::InputSource default_user_source(default_user_stream);
users_config = ConfigurationPtr(new Poco::Util::XMLConfiguration(&default_user_source));
users_config = getConfigurationFromXMLString(minimal_default_user_xml);
}
if (users_config)

View File

@ -18,6 +18,7 @@ int mainEntryClickHousePerformanceTest(int argc, char ** argv);
int mainEntryClickHouseExtractFromConfig(int argc, char ** argv);
int mainEntryClickHouseCompressor(int argc, char ** argv);
int mainEntryClickHouseFormat(int argc, char ** argv);
int mainEntryClickHouseClusterCopier(int argc, char ** argv);
#if USE_EMBEDDED_COMPILER
int mainEntryClickHouseClang(int argc, char ** argv);
@ -41,6 +42,7 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
{"extract-from-config", mainEntryClickHouseExtractFromConfig},
{"compressor", mainEntryClickHouseCompressor},
{"format", mainEntryClickHouseFormat},
{"copier", mainEntryClickHouseClusterCopier},
#if USE_EMBEDDED_COMPILER
{"clang", mainEntryClickHouseClang},
{"lld", mainEntryClickHouseLLD},

View File

@ -49,6 +49,8 @@ namespace DB
namespace ErrorCodes
{
extern const int STORAGE_REQUIRES_PARAMETER;
extern const int BAD_ARGUMENTS;
extern const int READONLY;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
}
@ -196,7 +198,7 @@ BlockInputStreams StorageDistributed::read(
external_tables = context.getExternalTables();
ClusterProxy::SelectStreamFactory select_stream_factory(
processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables);
processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables);
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings);
@ -205,18 +207,24 @@ BlockInputStreams StorageDistributed::read(
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings)
{
auto cluster = owned_cluster ? owned_cluster : context.getCluster(cluster_name);
if (owned_cluster && context.getApplicationType() != Context::ApplicationType::LOCAL)
throw Exception(
"Method write is not supported by storage " + getName() +
" created via a table function", ErrorCodes::READONLY);
/// TODO: !path.empty() can be replaced by !owned_cluster or !cluster_name.empty() ?
/// owned_cluster for remote table function use sync insertion => doesn't need a path.
bool write_enabled = (!path.empty() || owned_cluster)
&& (((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) < 2) || has_sharding_key);
auto cluster = (owned_cluster) ? owned_cluster : context.getCluster(cluster_name);
if (!write_enabled)
throw Exception{
bool is_sharding_key_ok = has_sharding_key || ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) < 2);
if (!is_sharding_key_ok)
throw Exception(
"Method write is not supported by storage " + getName() +
" with more than one shard and no sharding key provided",
ErrorCodes::STORAGE_REQUIRES_PARAMETER};
ErrorCodes::STORAGE_REQUIRES_PARAMETER);
if (path.empty() && !settings.insert_distributed_sync.value)
throw Exception(
"Data path should be set for storage " + getName() +
" to enable asynchronous inserts", ErrorCodes::BAD_ARGUMENTS);
bool insert_sync = settings.insert_distributed_sync || owned_cluster;
auto timeout = settings.insert_distributed_timeout;

View File

@ -31,6 +31,8 @@ namespace ErrorCodes
extern const int DATABASE_ACCESS_DENIED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNKNOWN_IDENTIFIER;
extern const int INCORRECT_FILE_NAME;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
};
@ -60,6 +62,9 @@ StorageFile::StorageFile(
: IStorage(materialized_columns_, alias_columns_, column_defaults_),
table_name(table_name_), format_name(format_name_), columns(columns_), context_global(context_), table_fd(table_fd_)
{
if (columns.empty())
throw Exception("Empty list of columns passed to storage " + getName() + " constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
if (table_fd < 0) /// Will use file
{
use_table_fd = false;
@ -72,6 +77,9 @@ StorageFile::StorageFile(
}
else /// Is DB's file
{
if (db_dir_path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
path = getTablePath(db_dir_path, table_name, format_name);
is_db_table = true;
Poco::File(Poco::Path(path).parent()).createDirectories();

View File

@ -41,6 +41,7 @@ namespace ErrorCodes
extern const int DUPLICATE_COLUMN;
extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_FILE_NAME;
}
@ -371,7 +372,10 @@ StorageLog::StorageLog(
if (columns.empty())
throw Exception("Empty list of columns passed to StorageLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
/// create files if they do not exist
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
/// create files if they do not exist
Poco::File(path + escapeForFileName(name) + '/').createDirectories();
for (const auto & column : getColumnsList())

View File

@ -27,6 +27,7 @@ namespace ErrorCodes
extern const int ABORTED;
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA;
extern const int INCORRECT_FILE_NAME;
}
@ -59,6 +60,9 @@ StorageMergeTree::StorageMergeTree(
reader(data), writer(data), merger(data, context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
{
if (path_.empty())
throw Exception("MergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
data.loadDataParts(has_force_restore_data_flag);
if (!attach)

View File

@ -95,6 +95,7 @@ namespace ErrorCodes
extern const int TOO_MUCH_FETCHES;
extern const int BAD_DATA_PART_NAME;
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int INCORRECT_FILE_NAME;
}
@ -194,6 +195,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
shutdown_event(false), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{
if (path_.empty())
throw Exception("ReplicatedMergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should starts with '/', because chroot concatenates without it.

View File

@ -21,6 +21,12 @@ namespace ErrorCodes
}
namespace ErrorCodes
{
extern const int INCORRECT_FILE_NAME;
}
class SetOrJoinBlockOutputStream : public IBlockOutputStream
{
public:
@ -87,8 +93,12 @@ StorageSetOrJoinBase::StorageSetOrJoinBase(
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_ + escapeForFileName(name_) + '/'), name(name_), columns(columns_)
name(name_), columns(columns_)
{
if (path_.empty())
throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
path = path_ + escapeForFileName(name_) + '/';
}

View File

@ -39,6 +39,7 @@ namespace ErrorCodes
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
extern const int CANNOT_CREATE_DIRECTORY;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_FILE_NAME;
}
@ -192,6 +193,9 @@ StorageStripeLog::StorageStripeLog(
if (columns.empty())
throw Exception("Empty list of columns passed to StorageStripeLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
String full_path = path + escapeForFileName(name) + '/';
if (!attach)
{

View File

@ -46,6 +46,7 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA;
extern const int DUPLICATE_COLUMN;
extern const int LOGICAL_ERROR;
extern const int INCORRECT_FILE_NAME;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
@ -293,6 +294,9 @@ StorageTinyLog::StorageTinyLog(
if (columns.empty())
throw Exception("Empty list of columns passed to StorageTinyLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
String full_path = path + escapeForFileName(name) + '/';
if (!attach)
{

View File

@ -46,7 +46,7 @@ class ClickHouseCluster:
self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name]
self.base_zookeeper_cmd = None
self.pre_zookkeeper_commands = []
self.pre_zookeeper_commands = []
self.instances = {}
self.with_zookeeper = False
@ -113,9 +113,12 @@ class ClickHouseCluster:
if self.with_zookeeper and self.base_zookeeper_cmd:
subprocess.check_call(self.base_zookeeper_cmd + ['up', '-d', '--no-recreate'])
for command in self.pre_zookkeeper_commands:
for command in self.pre_zookeeper_commands:
self.run_zookeeper_client_command(command, repeats=5)
# Uncomment for debugging
# print ' '.join(self.base_cmd + ['up', '--no-recreate'])
subprocess.check_call(self.base_cmd + ['up', '-d', '--no-recreate'])
start_deadline = time.time() + 20.0 # seconds
@ -159,7 +162,7 @@ class ClickHouseCluster:
return self.docker_client.containers.run('zookeeper', cli_cmd, remove=True, network_mode=network_mode)
def add_zookeeper_startup_command(self, command):
self.pre_zookkeeper_commands.append(command)
self.pre_zookeeper_commands.append(command)
DOCKER_COMPOSE_TEMPLATE = '''
@ -224,11 +227,13 @@ class ClickHouseInstance:
def exec_in_container(self, cmd, **kwargs):
container = self.get_docker_handle()
handle = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
output = self.docker_client.api.exec_start(handle).decode('utf8')
exit_code = self.docker_client.api.exec_inspect(handle)['ExitCode']
exec_id = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
output = self.docker_client.api.exec_start(exec_id, detach=False)
output = output.decode('utf8')
exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode']
if exit_code:
raise Exception('Cmd {} failed! Return code {}. Output {}'.format(' '.join(cmd), exit_code, output))
raise Exception('Cmd "{}" failed! Return code {}. Output: {}'.format(' '.join(cmd), exit_code, output))
return output

View File

@ -0,0 +1,47 @@
<yandex>
<remote_servers>
<cluster0>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_0_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_1_0</host>
<port>9000</port>
</replica>
</shard>
</cluster0>
<cluster1>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s1_0_0</host>
<port>9000</port>
</replica>
<replica>
<host>s1_0_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s1_1_0</host>
<port>9000</port>
</replica>
</shard>
</cluster1>
</remote_servers>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>
</yandex>

View File

@ -0,0 +1,14 @@
<yandex>
<!-- Query log. Used only for queries with setting log_queries = 1. -->
<query_log>
<!-- What table to insert data. If table is not exist, it will be created.
When query log structure is changed after system update,
then old table will be renamed and new table will be created automatically.
-->
<database>system</database>
<table>query_log</table>
<!-- Interval of flushing data. -->
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
</query_log>
</yandex>

View File

@ -0,0 +1,27 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<log_queries>1</log_queries>
<!-- Remove this setting after superfluous deduplication after DROP PARTITION is fixed -->
<insert_deduplicate>0</insert_deduplicate>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,88 @@
<?xml version="1.0"?>
<yandex>
<!-- Configuration of clusters -->
<remote_servers>
<cluster0>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_0_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_1_0</host>
<port>9000</port>
</replica>
</shard>
</cluster0>
<cluster1>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s1_0_0</host>
<port>9000</port>
</replica>
<replica>
<host>s1_0_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s1_1_0</host>
<port>9000</port>
</replica>
</shard>
</cluster1>
</remote_servers>
<!-- How many simualteneous workers are posssible -->
<max_workers>4</max_workers>
<!-- Common setting for pull and push operations -->
<settings>
</settings>
<!-- Setting used to fetch data -->
<settings_pull>
</settings_pull>
<!-- Setting used to insert data -->
<settings_push>
</settings_push>
<!-- Tasks -->
<tables>
<hits>
<cluster_pull>cluster0</cluster_pull>
<database_pull>default</database_pull>
<table_pull>hits</table_pull>
<cluster_push>cluster1</cluster_push>
<database_push>default</database_push>
<table_push>hits</table_push>
<enabled_partitions> 0 1 2</enabled_partitions>
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>
<!-- Optional expression that filter copying data -->
<where_condition>d - d = 0</where_condition>
</hits>
</tables>
</yandex>

View File

@ -0,0 +1,126 @@
import os
import os.path as p
import sys
import time
import datetime
import pytest
from contextlib import contextmanager
import docker
from kazoo.client import KazooClient
CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR))
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
COPYING_FAIL_PROBABILITY = 0.33
cluster = None
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
clusters_schema = {
"0" : {
"0" : ["0", "1"],
"1" : ["0"]
},
"1" : {
"0" : ["0", "1"],
"1" : ["0"]
}
}
cluster = ClickHouseCluster(__file__)
for cluster_name, shards in clusters_schema.iteritems():
for shard_name, replicas in shards.iteritems():
for replica_name in replicas:
name = "s{}_{}_{}".format(cluster_name, shard_name, replica_name)
cluster.add_instance(name,
config_dir="configs",
macroses={"cluster": cluster_name, "shard": shard_name, "replica": replica_name},
with_zookeeper=True)
cluster.start()
yield cluster
finally:
pass
cluster.shutdown()
def _test_copying(cmd_options):
instance = cluster.instances['s0_0_0']
instance.query("CREATE TABLE hits ON CLUSTER cluster0 (d UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16")
instance.query("CREATE TABLE hits_all ON CLUSTER cluster0 (d UInt64) ENGINE=Distributed(cluster0, default, hits, d)")
instance.query("CREATE TABLE hits_all ON CLUSTER cluster1 (d UInt64) ENGINE=Distributed(cluster1, default, hits, d + 1)")
instance.query("INSERT INTO hits_all SELECT * FROM system.numbers LIMIT 1002")
zoo_id = cluster.get_instance_docker_id('zoo1')
zoo_handle = cluster.docker_client.containers.get(zoo_id)
zoo_ip = zoo_handle.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']
print "Use ZooKeeper server: {} ({})".format(zoo_id, zoo_ip)
zk = KazooClient(hosts=zoo_ip)
zk.start()
zk_task_path = "/clickhouse-copier/task_simple"
zk.ensure_path(zk_task_path)
copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task0_description.xml'), 'r').read()
zk.create(zk_task_path + "/description", copier_task_config)
docker_api = docker.from_env().api
copiers_exec_ids = []
cmd = ['/usr/bin/clickhouse', 'copier',
'--config', '/etc/clickhouse-server/config-preprocessed.xml',
'--task-path', '/clickhouse-copier/task_simple',
'--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options
for instance_name, instance in cluster.instances.iteritems():
container = instance.get_docker_handle()
exec_id = docker_api.exec_create(container.id, cmd, stderr=True)
docker_api.exec_start(exec_id, detach=True)
copiers_exec_ids.append(exec_id)
print "Copier for {} ({}) has started".format(instance.name, instance.ip_address)
# Wait for copiers finalizing and check their return codes
for exec_id, instance in zip(copiers_exec_ids, cluster.instances.itervalues()):
while True:
res = docker_api.exec_inspect(exec_id)
if not res['Running']:
break
time.sleep(1)
assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res))
assert TSV(cluster.instances['s0_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
assert TSV(cluster.instances['s1_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
assert TSV(cluster.instances['s1_0_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("1\n")
assert TSV(cluster.instances['s1_1_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("0\n")
zk.delete(zk_task_path, recursive=True)
instance.query("DROP TABLE hits_all ON CLUSTER cluster1")
instance.query("DROP TABLE hits_all ON CLUSTER cluster1")
instance.query("DROP TABLE hits ON CLUSTER cluster0")
instance.query("DROP TABLE hits ON CLUSTER cluster1")
def test_copy_simple(started_cluster):
_test_copying([])
def test_copy_with_recovering(started_cluster):
_test_copying(['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():
print name, instance.ip_address
raw_input("Cluster created, press any key to destroy...")