Merge remote-tracking branch 'upstream/master'

This commit is contained in:
BayoNet 2018-07-27 14:30:45 +03:00
commit cf1380fc77
49 changed files with 869 additions and 131 deletions

View File

@ -1,3 +1,36 @@
## ClickHouse release 18.1.0, 2018-07-23
### New features:
* Support for the `ALTER TABLE t DELETE WHERE` query for non-replicated MergeTree tables ([#2634](https://github.com/yandex/ClickHouse/pull/2634)).
* Support for arbitrary types for the `uniq*` family of aggregate functions ([#2010](https://github.com/yandex/ClickHouse/issues/2010)).
* Support for arbitrary types in comparison operators ([#2026](https://github.com/yandex/ClickHouse/issues/2026)).
* The `users.xml` file allows setting a subnet mask in the format `10.0.0.1/255.255.255.0`. This is necessary for using masks for IPv6 networks with zeros in the middle ([#2637](https://github.com/yandex/ClickHouse/pull/2637)).
* Added the `arrayDistinct` function ([#2670](https://github.com/yandex/ClickHouse/pull/2670)).
* The SummingMergeTree engine can now work with AggregateFunction type columns ([Constantin S. Pan](https://github.com/yandex/ClickHouse/pull/2566)).
### Improvements:
* Changed the numbering scheme for release versions. Now the first part contains the year of release (A.D., Moscow timezone, minus 2000), the second part contains the number for major changes (increases for most releases), and the third part is the patch version. Releases are still backwards compatible, unless otherwise stated in the changelog.
* Faster conversions of floating-point numbers to a string ([Amos Bird](https://github.com/yandex/ClickHouse/pull/2664)).
* If some rows were skipped during an insert due to parsing errors (this is possible with the `input_allow_errors_num` and `input_allow_errors_ratio` settings enabled), the number of skipped rows is now written to the server log ([Leonardo Cecchi](https://github.com/yandex/ClickHouse/pull/2669)).
### Bug fixes:
* Fixed the TRUNCATE command for temporary tables ([Amos Bird](https://github.com/yandex/ClickHouse/pull/2624)).
* Fixed a rare deadlock in the ZooKeeper client library that occurred when there was a network error while reading the response ([c315200](https://github.com/yandex/ClickHouse/commit/c315200e64b87e44bdf740707fc857d1fdf7e947)).
* Fixed an error during a CAST to Nullable types ([#1322](https://github.com/yandex/ClickHouse/issues/1322)).
* Fixed the incorrect result of the `maxIntersection()` function when the boundaries of intervals coincided ([Michael Furmur](https://github.com/yandex/ClickHouse/pull/2657)).
* Fixed incorrect transformation of the OR expression chain in a function argument ([chenxing-xc](https://github.com/yandex/ClickHouse/pull/2663)).
* Fixed performance degradation for queries containing `IN (subquery)` expressions inside another subquery ([#2571](https://github.com/yandex/ClickHouse/issues/2571)).
* Fixed incompatibility between servers with different versions in distributed queries that use a `CAST` function that isn't in uppercase letters ([fe8c4d6](https://github.com/yandex/ClickHouse/commit/fe8c4d64e434cacd4ceef34faa9005129f2190a5)).
* Added missing quoting of identifiers for queries to an external DBMS ([#2635](https://github.com/yandex/ClickHouse/issues/2635)).
### Backward incompatible changes:
* Converting a string containing the number zero to DateTime does not work. Example: `SELECT toDateTime('0')`. This is also the reason that `DateTime DEFAULT '0'` does not work in tables, as well as `<null_value>0</null_value>` in dictionaries. Solution: replace `0` with `0000-00-00 00:00:00`.
## ClickHouse release 1.1.54394, 2018-07-12
### New features:

View File

@ -1,3 +1,4 @@
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
@ -23,14 +24,40 @@ namespace ErrorCodes
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
}
std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(Poco::Net::HTTPServerRequest & request) const
{
const auto & config = server.config();
if (config.has("interserver_http_credentials.user"))
{
if (!request.hasCredentials())
return {"Server requires HTTP Basic authentification, but client doesn't provide it", false};
String scheme, info;
request.getCredentials(scheme, info);
if (scheme != "Basic")
return {"Server requires HTTP Basic authentification but client provides another method", false};
String user = config.getString("interserver_http_credentials.user");
String password = config.getString("interserver_http_credentials.password", "");
Poco::Net::HTTPBasicCredentials credentials(info);
if (std::make_pair(user, password) != std::make_pair(credentials.getUsername(), credentials.getPassword()))
return {"Incorrect user or password in HTTP Basic authentification", false};
}
else if (request.hasCredentials())
{
return {"Client requires HTTP Basic authentification, but server doesn't provide it", false};
}
return {"", true};
}
void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
HTMLForm params(request);
LOG_TRACE(log, "Request URI: " << request.getURI());
/// NOTE: You can do authentication here if you need to.
String endpoint_name = params.get("endpoint");
bool compress = params.get("compress") == "true";
@ -65,8 +92,18 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ
try
{
processQuery(request, response);
LOG_INFO(log, "Done processing query");
if (auto [msg, success] = checkAuthentication(request); success)
{
processQuery(request, response);
LOG_INFO(log, "Done processing query");
}
else
{
response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED);
if (!response.sent())
response.send() << msg << std::endl;
LOG_WARNING(log, "Query processing failed request: '" << request.getURI() << "' authentification failed");
}
}
catch (Exception & e)
{

View File

@ -34,6 +34,8 @@ private:
CurrentMetrics::Increment metric_increment{CurrentMetrics::InterserverConnection};
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
std::pair<String, bool> checkAuthentication(Poco::Net::HTTPServerRequest & request) const;
};
}

View File

@ -230,6 +230,17 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setInterserverIOAddress(this_host, port);
}
if (config().has("interserver_http_credentials"))
{
String user = config().getString("interserver_http_credentials.user", "");
String password = config().getString("interserver_http_credentials.password", "");
if (user.empty())
throw Exception("Configuration parameter interserver_http_credentials user can't be empty", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
global_context->setInterverserCredentials(user, password);
}
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros"));

View File

@ -116,7 +116,7 @@ struct AggregateFunctionWindowFunnelData
/// TODO Protection against huge size
events_list.clear();
events_list.resize(size);
events_list.reserve(size);
UInt32 timestamp;
UInt8 event;

View File

@ -83,6 +83,16 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Se
return entries;
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction(const Settings * settings, PoolMode pool_mode)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
};
return getManyImpl(settings, pool_mode, try_get_entry);
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check)
{
@ -90,6 +100,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
{
return tryGetEntry(pool, fail_message, settings, &table_to_check);
};
return getManyImpl(settings, pool_mode, try_get_entry);
}

View File

@ -47,6 +47,9 @@ public:
*/
std::vector<Entry> getMany(const Settings * settings, PoolMode pool_mode);
/// The same as getMany(), but return std::vector<TryResult>.
std::vector<TryResult> getManyForTableFunction(const Settings * settings, PoolMode pool_mode);
using Base = PoolWithFailoverBase<IConnectionPool>;
using TryResult = Base::TryResult;

View File

@ -18,6 +18,7 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
const std::string & method_,
OutStreamCallback out_stream_callback,
const ConnectionTimeouts & timeouts,
const Poco::Net::HTTPBasicCredentials & credentials,
size_t buffer_size_)
: ReadBuffer(nullptr, 0),
uri{uri},
@ -30,6 +31,9 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
if (!credentials.getUsername().empty())
credentials.authenticate(request);
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());

View File

@ -1,6 +1,7 @@
#pragma once
#include <functional>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/URI.h>
#include <IO/ReadBuffer.h>
@ -32,6 +33,7 @@ public:
const std::string & method = {},
OutStreamCallback out_stream_callback = {},
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override;

View File

@ -6,6 +6,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <common/logger_useful.h>
@ -28,13 +29,26 @@ namespace ClusterProxy
{
SelectStreamFactory::SelectStreamFactory(
const Block & header,
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Tables & external_tables_)
: header(header),
: header(header_),
processed_stage{processed_stage_},
main_table(std::move(main_table_)),
table_func_ptr{nullptr},
external_tables{external_tables_}
{
}
SelectStreamFactory::SelectStreamFactory(
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
ASTPtr table_func_ptr_,
const Tables & external_tables_)
: header(header_),
processed_stage{processed_stage_},
table_func_ptr{table_func_ptr_},
external_tables{external_tables_}
{
}
@ -71,13 +85,24 @@ void SelectStreamFactory::createForShard(
{
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
stream->setMainTable(main_table);
if (!table_func_ptr)
stream->setMainTable(main_table);
res.emplace_back(std::move(stream));
};
if (shard_info.isLocal())
{
StoragePtr main_table_storage = context.tryGetTable(main_table.database, main_table.table);
StoragePtr main_table_storage;
if (table_func_ptr)
{
auto table_function = static_cast<ASTFunction *>(table_func_ptr.get());
main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context);
}
else
main_table_storage = context.tryGetTable(main_table.database, main_table.table);
if (!main_table_storage) /// Table is absent on a local server.
{
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
@ -158,14 +183,17 @@ void SelectStreamFactory::createForShard(
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,
main_table = main_table, external_tables = external_tables, stage = processed_stage,
main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage,
local_delay]()
-> BlockInputStreamPtr
{
std::vector<ConnectionPoolWithFailover::TryResult> try_results;
try
{
try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
if (table_func_ptr)
try_results = pool->getManyForTableFunction(&context.getSettingsRef(), PoolMode::GET_MANY);
else
try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
}
catch (const Exception & ex)
{

View File

@ -13,11 +13,19 @@ namespace ClusterProxy
class SelectStreamFactory final : public IStreamFactory
{
public:
/// Database in a query.
SelectStreamFactory(
const Block & header,
QueryProcessingStage::Enum processed_stage,
QualifiedTableName main_table,
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Tables & external_tables);
/// TableFunction in a query.
SelectStreamFactory(
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
ASTPtr table_func_ptr_,
const Tables & external_tables_);
void createForShard(
const Cluster::ShardInfo & shard_info,
@ -29,6 +37,7 @@ private:
const Block header;
QueryProcessingStage::Enum processed_stage;
QualifiedTableName main_table;
ASTPtr table_func_ptr;
Tables external_tables;
};

View File

@ -109,6 +109,8 @@ struct ContextShared
String interserver_io_host; /// The host name by which this server is available for other servers.
UInt16 interserver_io_port = 0; /// and port.
String interserver_io_user;
String interserver_io_password;
String path; /// Path to the data directory, with a slash at the end.
String tmp_path; /// The path to the temporary files that occur when processing the request.
@ -1378,6 +1380,17 @@ void Context::setInterserverIOAddress(const String & host, UInt16 port)
shared->interserver_io_port = port;
}
void Context::setInterverserCredentials(const String & user, const String & password)
{
shared->interserver_io_user = user;
shared->interserver_io_password = password;
}
std::pair<String, String> Context::getInterserverCredentials() const
{
return { shared->interserver_io_user, shared->interserver_io_password };
}
std::pair<String, UInt16> Context::getInterserverIOAddress() const
{

View File

@ -249,6 +249,11 @@ public:
/// How other servers can access this for downloading replicated data.
void setInterserverIOAddress(const String & host, UInt16 port);
std::pair<String, UInt16> getInterserverIOAddress() const;
// Credentials which server will use to communicate with others
void setInterverserCredentials(const String & user, const String & password);
std::pair<String, String> getInterserverCredentials() const;
/// The port that the server listens for executing SQL queries.
UInt16 getTCPPort() const;

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnsNumber.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ExpressionElementParsers.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
@ -10,6 +11,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Common/typeid_cast.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
@ -52,13 +54,19 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context)
{
/// Branch with string in qery.
if (typeid_cast<const ASTLiteral *>(node.get()))
return node;
/// Branch with TableFunction in query.
if (auto table_func_ptr = typeid_cast<ASTFunction *>(node.get()))
if (TableFunctionFactory::instance().isTableFunctionName(table_func_ptr->name))
return node;
return std::make_shared<ASTLiteral>(evaluateConstantExpression(node, context).first);
}
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context)
{
if (auto id = typeid_cast<const ASTIdentifier *>(node.get()))

View File

@ -372,9 +372,9 @@ void ASTSelectQuery::replaceDatabaseAndTable(const String & database_name, const
children.emplace_back(tables_list);
table_expression = table_expr.get();
}
ASTPtr table = std::make_shared<ASTIdentifier>(table_name, ASTIdentifier::Table);
if (!database_name.empty())
{
ASTPtr database = std::make_shared<ASTIdentifier>(database_name, ASTIdentifier::Database);
@ -388,5 +388,27 @@ void ASTSelectQuery::replaceDatabaseAndTable(const String & database_name, const
}
}
void ASTSelectQuery::addTableFunction(ASTPtr & table_function_ptr)
{
ASTTableExpression * table_expression = getFirstTableExpression(*this);
if (!table_expression)
{
auto tables_list = std::make_shared<ASTTablesInSelectQuery>();
auto element = std::make_shared<ASTTablesInSelectQueryElement>();
auto table_expr = std::make_shared<ASTTableExpression>();
element->table_expression = table_expr;
element->children.emplace_back(table_expr);
tables_list->children.emplace_back(element);
tables = tables_list;
children.emplace_back(tables_list);
table_expression = table_expr.get();
}
table_expression->table_function = table_function_ptr;
table_expression->database_and_table_name = nullptr;
}
};

View File

@ -47,6 +47,7 @@ public:
bool final() const;
void setDatabaseIfNeeded(const String & database_name);
void replaceDatabaseAndTable(const String & database_name, const String & table_name);
void addTableFunction(ASTPtr & table_function_ptr);
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -161,6 +161,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const String & host,
int port,
const ConnectionTimeouts & timeouts,
const String & user,
const String & password,
bool to_detached,
const String & tmp_prefix_)
{
@ -175,7 +177,14 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{"compress", "false"}
});
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts};
Poco::Net::HTTPBasicCredentials creds{};
if (!user.empty())
{
creds.setUsername(user);
creds.setPassword(password);
}
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts, creds};
static const String TMP_PREFIX = "tmp_fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;

View File

@ -54,6 +54,8 @@ public:
const String & host,
int port,
const ConnectionTimeouts & timeouts,
const String & user,
const String & password,
bool to_detached = false,
const String & tmp_prefix_ = "");

View File

@ -65,12 +65,15 @@ namespace ErrorCodes
namespace
{
/// select query has database and table names as AST pointers
/// Creates a copy of query, changes database and table names.
ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table)
/// select query has database, table and table function names as AST pointers
/// Creates a copy of query, changes database, table and table function names.
ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table, ASTPtr table_function_ptr = nullptr)
{
auto modified_query_ast = query->clone();
typeid_cast<ASTSelectQuery &>(*modified_query_ast).replaceDatabaseAndTable(database, table);
if (table_function_ptr)
typeid_cast<ASTSelectQuery &>(*modified_query_ast).addTableFunction(table_function_ptr);
else
typeid_cast<ASTSelectQuery &>(*modified_query_ast).replaceDatabaseAndTable(database, table);
return modified_query_ast;
}
@ -170,16 +173,48 @@ StorageDistributed::StorageDistributed(
}
StoragePtr StorageDistributed::createWithOwnCluster(
const std::string & name_,
StorageDistributed::StorageDistributed(
const String & database_name,
const String & table_name_,
const ColumnsDescription & columns_,
const String & remote_database_,
const String & remote_table_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach)
: StorageDistributed(database_name, table_name_, columns_, String{}, String{}, cluster_name_, context_, sharding_key_, data_path_, attach)
{
remote_table_function_ptr = remote_table_function_ptr_;
}
StoragePtr StorageDistributed::createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
ClusterPtr owned_cluster_,
const Context & context_)
{
auto res = ext::shared_ptr_helper<StorageDistributed>::create(
String{}, table_name_, columns_, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
res->owned_cluster = owned_cluster_;
return res;
}
StoragePtr StorageDistributed::createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
ASTPtr & remote_table_function_ptr_,
ClusterPtr & owned_cluster_,
const Context & context_)
{
auto res = ext::shared_ptr_helper<StorageDistributed>::create(
String{}, name_, columns_, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
String{}, table_name_, columns_, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
res->owned_cluster = owned_cluster_;
@ -209,15 +244,19 @@ BlockInputStreams StorageDistributed::read(
processed_stage = result_size == 1
? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table);
query_info.query, remote_database, remote_table, remote_table_function_ptr);
Block header = materializeBlock(InterpreterSelectQuery(query_info.query, context, Names{}, processed_stage).getSampleBlock());
ClusterProxy::SelectStreamFactory select_stream_factory(
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr ?
ClusterProxy::SelectStreamFactory(
header, processed_stage, remote_table_function_ptr, context.getExternalTables())
: ClusterProxy::SelectStreamFactory(
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings);
}

View File

@ -9,6 +9,7 @@
#include <Interpreters/Settings.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTFunction.h>
#include <common/logger_useful.h>
@ -36,8 +37,15 @@ public:
static StoragePtr createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
ClusterPtr owned_cluster_,
const Context & context_);
static StoragePtr createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
ASTPtr & remote_table_function_ptr_, /// Table function ptr.
ClusterPtr & owned_cluster_,
const Context & context_);
@ -101,6 +109,7 @@ public:
String table_name;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
const Context & context;
Logger * log = &Logger::get("StorageDistributed");
@ -146,6 +155,17 @@ protected:
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach);
StorageDistributed(
const String & database_name,
const String & table_name_,
const ColumnsDescription & columns_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach);
};
}

View File

@ -1971,9 +1971,10 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
String replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
auto [user, password] = context.getInterserverCredentials();
part_desc->res_part = fetcher.fetchPart(part_desc->found_new_part_name, replica_path,
address.host, address.replication_port, timeouts, false, TMP_PREFIX + "fetch_");
address.host, address.replication_port, timeouts, user, password, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
@ -2706,10 +2707,11 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
auto [user, password] = context.getInterserverCredentials();
try
{
part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, to_detached);
part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, user, password, to_detached);
if (!to_detached)
{

View File

@ -13,9 +13,9 @@ NamesAndTypesList StorageSystemTableEngines::getNamesAndTypes()
void StorageSystemTableEngines::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
const auto & storages = StorageFactory::instance().getAllStorages();
for (const auto & [name, creator] : storages)
for (const auto & pair : storages)
{
res_columns[0]->insert(name);
res_columns[0]->insert(pair.first);
}
}

View File

@ -7,6 +7,8 @@
#include <Storages/IStorage.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
@ -22,21 +24,40 @@ ColumnsDescription getStructureOfRemoteTable(
const Cluster & cluster,
const std::string & database,
const std::string & table,
const Context & context)
const Context & context,
const ASTPtr & table_func_ptr)
{
/// Send to the first any remote shard.
const auto & shard_info = cluster.getAnyShardInfo();
String query;
if (shard_info.isLocal())
return context.getTable(database, table)->getColumns();
if (table_func_ptr)
{
if (shard_info.isLocal())
{
auto table_function = static_cast<ASTFunction *>(table_func_ptr.get());
return TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context)->getColumns();
}
auto table_func_name = queryToString(table_func_ptr);
query = "DESC TABLE " + table_func_name;
}
else
{
if (shard_info.isLocal())
return context.getTable(database, table)->getColumns();
/// Request for a table description
query = "DESC TABLE " + backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
}
/// Request for a table description
String query = "DESC TABLE " + backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
ColumnsDescription res;
auto input = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, InterpreterDescribeQuery::getSampleBlock(), context);
input->setPoolMode(PoolMode::GET_ONE);
input->setMainTable(QualifiedTableName{database, table});
if (!table_func_ptr)
input->setMainTable(QualifiedTableName{database, table});
input->readPrefix();
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();

View File

@ -1,6 +1,8 @@
#pragma once
#include <Storages/ColumnsDescription.h>
#include <Parsers/IAST.h>
#include <Parsers/queryToString.h>
namespace DB
@ -15,6 +17,7 @@ ColumnsDescription getStructureOfRemoteTable(
const Cluster & cluster,
const std::string & database,
const std::string & table,
const Context & context);
const Context & context,
const ASTPtr & table_func_ptr = nullptr);
}

View File

@ -37,4 +37,9 @@ TableFunctionPtr TableFunctionFactory::get(
return it->second();
}
bool TableFunctionFactory::isTableFunctionName(const std::string & name) const
{
return functions.count(name);
}
}

View File

@ -42,6 +42,8 @@ public:
TableFunctionPtr get(
const std::string & name,
const Context & context) const;
bool isTableFunctionName(const std::string & name) const;
const TableFunctions & getAllTableFunctions() const
{

View File

@ -198,6 +198,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
String cluster_description;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
String username;
String password;
@ -230,25 +231,39 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
++arg_num;
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
remote_database = static_cast<const ASTLiteral &>(*args[arg_num]).value.safeGet<String>();
++arg_num;
size_t dot = remote_database.find('.');
if (dot != String::npos)
const auto table_function = static_cast<ASTFunction *>(args[arg_num].get());
if (TableFunctionFactory::instance().isTableFunctionName(table_function->name))
{
/// NOTE Bad - do not support identifiers in backquotes.
remote_table = remote_database.substr(dot + 1);
remote_database = remote_database.substr(0, dot);
}
else
{
if (arg_num >= args.size())
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
remote_table = static_cast<const ASTLiteral &>(*args[arg_num]).value.safeGet<String>();
remote_table_function_ptr = args[arg_num];
++arg_num;
}
else {
remote_database = static_cast<const ASTLiteral &>(*args[arg_num]).value.safeGet<String>();
++arg_num;
size_t dot = remote_database.find('.');
if (dot != String::npos)
{
/// NOTE Bad - do not support identifiers in backquotes.
remote_table = remote_database.substr(dot + 1);
remote_database = remote_database.substr(0, dot);
}
else
{
if (arg_num >= args.size())
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
else
{
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
remote_table = static_cast<const ASTLiteral &>(*args[arg_num]).value.safeGet<String>();
remote_database = remote_database;
++arg_num;
}
}
}
/// Username and password parameters are prohibited in cluster version of the function
if (!is_cluster_function)
@ -299,18 +314,28 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
cluster = std::make_shared<Cluster>(context.getSettings(), names, username, password, context.getTCPPort(), false);
}
auto res = StorageDistributed::createWithOwnCluster(
getName(),
getStructureOfRemoteTable(*cluster, remote_database, remote_table, context),
remote_database,
remote_table,
cluster,
context);
auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_database, remote_table, context, remote_table_function_ptr);
StoragePtr res = remote_table_function_ptr ?
StorageDistributed::createWithOwnCluster(
getName(),
structure_remote_table,
remote_table_function_ptr,
cluster,
context)
: StorageDistributed::createWithOwnCluster(
getName(),
structure_remote_table,
remote_database,
remote_table,
cluster,
context);
res->startup();
return res;
}
TableFunctionRemote::TableFunctionRemote(const std::string & name_)
: name(name_)
{

View File

@ -14,9 +14,9 @@ Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL kafka-python`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo python-kafka`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo`
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER` and re-login.
(You must close all your sessions (for example, restart your computer))

View File

@ -144,6 +144,7 @@ class ClickHouseCluster:
if self.with_kafka and self.base_kafka_cmd:
subprocess.check_call(self.base_kafka_cmd + ['up', '-d', '--no-recreate'])
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
# Uncomment for debugging
#print ' '.join(self.base_cmd + ['up', '--no-recreate'])

View File

@ -22,13 +22,3 @@ services:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- kafka_zookeeper
dns-proxy-server:
image: defreitas/dns-proxy-server
hostname: dns.mageddo
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /etc/resolv.conf:/etc/resolv.conf
ports:
- 5380:5380
network_mode: bridge

View File

@ -0,0 +1,7 @@
<yandex>
<interserver_http_port>9009</interserver_http_port>
<interserver_http_credentials>
<user>admin</user>
<password>222</password>
</interserver_http_credentials>
</yandex>

View File

@ -0,0 +1,7 @@
<yandex>
<interserver_http_port>9009</interserver_http_port>
<interserver_http_credentials>
<user>root</user>
<password>111</password>
</interserver_http_credentials>
</yandex>

View File

@ -0,0 +1,3 @@
<yandex>
<interserver_http_port>9009</interserver_http_port>
</yandex>

View File

@ -0,0 +1,58 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>test</default_database>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>test</default_database>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node4</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>test</default_database>
<host>node5</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node7</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>test</default_database>
<host>node7</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node8</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,132 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
def _fill_nodes(nodes, shard):
for node in nodes:
node.query(
'''
CREATE DATABASE test;
CREATE TABLE test_table(date Date, id UInt32, dummy UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}', date, id, 8192);
'''.format(shard=shard, replica=node.name))
cluster = ClickHouseCluster(__file__, server_bin_path="/home/alesap/ClickHouse/dbms/programs/clickhouse-server")
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def same_credentials_cluster():
try:
cluster.start()
_fill_nodes([node1, node2], 1)
yield cluster
finally:
cluster.shutdown()
def test_same_credentials(same_credentials_cluster):
node1.query("insert into test_table values ('2017-06-16', 111, 0)")
time.sleep(1)
assert node1.query("SELECT id FROM test_table order by id") == '111\n'
assert node2.query("SELECT id FROM test_table order by id") == '111\n'
node2.query("insert into test_table values ('2017-06-17', 222, 1)")
time.sleep(1)
assert node1.query("SELECT id FROM test_table order by id") == '111\n222\n'
assert node2.query("SELECT id FROM test_table order by id") == '111\n222\n'
node3 = cluster.add_instance('node3', main_configs=['configs/remote_servers.xml', 'configs/no_credentials.xml'], with_zookeeper=True)
node4 = cluster.add_instance('node4', main_configs=['configs/remote_servers.xml', 'configs/no_credentials.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def no_credentials_cluster():
try:
cluster.start()
_fill_nodes([node3, node4], 2)
yield cluster
finally:
cluster.shutdown()
def test_no_credentials(no_credentials_cluster):
node3.query("insert into test_table values ('2017-06-18', 111, 0)")
time.sleep(1)
assert node3.query("SELECT id FROM test_table order by id") == '111\n'
assert node4.query("SELECT id FROM test_table order by id") == '111\n'
node4.query("insert into test_table values ('2017-06-19', 222, 1)")
time.sleep(1)
assert node3.query("SELECT id FROM test_table order by id") == '111\n222\n'
assert node4.query("SELECT id FROM test_table order by id") == '111\n222\n'
node5 = cluster.add_instance('node5', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
node6 = cluster.add_instance('node6', main_configs=['configs/remote_servers.xml', 'configs/credentials2.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def different_credentials_cluster():
try:
cluster.start()
_fill_nodes([node5, node6], 3)
yield cluster
finally:
cluster.shutdown()
def test_different_credentials(different_credentials_cluster):
node5.query("insert into test_table values ('2017-06-20', 111, 0)")
time.sleep(1)
assert node5.query("SELECT id FROM test_table order by id") == '111\n'
assert node6.query("SELECT id FROM test_table order by id") == ''
node6.query("insert into test_table values ('2017-06-21', 222, 1)")
time.sleep(1)
assert node5.query("SELECT id FROM test_table order by id") == '111\n'
assert node6.query("SELECT id FROM test_table order by id") == '222\n'
node7 = cluster.add_instance('node7', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
node8 = cluster.add_instance('node8', main_configs=['configs/remote_servers.xml', 'configs/no_credentials.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def credentials_and_no_credentials_cluster():
try:
cluster.start()
_fill_nodes([node7, node8], 4)
yield cluster
finally:
cluster.shutdown()
def test_credentials_and_no_credentials(credentials_and_no_credentials_cluster):
node7.query("insert into test_table values ('2017-06-21', 111, 0)")
time.sleep(1)
assert node7.query("SELECT id FROM test_table order by id") == '111\n'
assert node8.query("SELECT id FROM test_table order by id") == ''
node8.query("insert into test_table values ('2017-06-22', 222, 1)")
time.sleep(1)
assert node7.query("SELECT id FROM test_table order by id") == '111\n'
assert node8.query("SELECT id FROM test_table order by id") == '222\n'

View File

@ -6,8 +6,8 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from kafka import KafkaProducer
import json
import subprocess
@ -23,7 +23,17 @@ def started_cluster():
yield cluster
finally:
cluster.shutdown(False)
cluster.shutdown()
def kafka_is_available(started_cluster):
p = subprocess.Popen(('docker', 'exec', '-i', started_cluster.kafka_docker_id, '/usr/bin/kafka-broker-api-versions', '--bootstrap-server', 'PLAINTEXT://localhost:9092'), stdout=subprocess.PIPE)
streamdata = p.communicate()[0]
return p.returncode == 0
def kafka_produce(started_cluster, topic, messages):
p = subprocess.Popen(('docker', 'exec', '-i', started_cluster.kafka_docker_id, '/usr/bin/kafka-console-producer', '--broker-list', 'localhost:9092', '--topic', topic), stdin=subprocess.PIPE)
p.communicate(messages)
p.stdin.close()
def test_kafka_json(started_cluster):
instance.query('''
@ -34,18 +44,18 @@ CREATE TABLE test.kafka (key UInt64, value UInt64)
retries = 0
while True:
try:
producer = KafkaProducer()
if kafka_is_available(started_cluster):
break
except:
else:
retries += 1
if retries > 50:
raise
raise 'Cannot connect to kafka.'
print("Waiting for kafka to be available...")
time.sleep(1)
messages = ''
for i in xrange(50):
producer.send('json', json.dumps({'key': i, 'value': i}))
producer.flush()
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce(started_cluster, 'json', messages)
time.sleep(3)
result = instance.query('SELECT * FROM test.kafka;')
with open(p.join(p.dirname(__file__), 'test_kafka_json.reference')) as reference:

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS test.remote_test;
CREATE TABLE test.remote_test(a1 UInt8) ENGINE=Memory;
INSERT INTO FUNCTION remote('127.0.0.1', test.remote_test) VALUES(1);
INSERT INTO FUNCTION remote('127.0.0.1', test.remote_test) VALUES(2);
INSERT INTO FUNCTION remote('127.0.0.1', test.remote_test) VALUES(3);
INSERT INTO FUNCTION remote('127.0.0.1', test.remote_test) VALUES(4);
SELECT COUNT(*) FROM remote('127.0.0.1', test.remote_test);
SELECT count(*) FROM remote('127.0.0.{1,2}', merge(test, '^remote_test'));
DROP TABLE test.remote_test;

View File

@ -2,27 +2,27 @@
## True column-oriented DBMS
In a true column-oriented DBMS, there isn't any "garbage" stored with the values. Among other things, this means that constant-length values must be supported, to avoid storing their length "number" next to the values. As an example, a billion UInt8-type values should actually consume around 1 GB uncompressed, or this will strongly affect the CPU use. It is very important to store data compactly (without any "garbage") even when uncompressed, since the speed of decompression (CPU usage) depends mainly on the volume of uncompressed data.
In a true column-oriented DBMS, there is no excessive data stored with the values. For example, this means that constant-length values must be supported, to avoid storing their length as additional integer next to the values. In this case, a billion UInt8 values should actually consume around 1 GB uncompressed, or this will strongly affect the CPU use. It is very important to store data compactly even when uncompressed, since the speed of decompression (CPU usage) depends mainly on the volume of uncompressed data.
This is worth noting because there are systems that can store values of separate columns separately, but that can't effectively process analytical queries due to their optimization for other scenarios. Examples are HBase, BigTable, Cassandra, and HyperTable. In these systems, you will get throughput around a hundred thousand rows per second, but not hundreds of millions of rows per second.
This is worth noting because there are systems that can store values of different columns separately, but that can't effectively process analytical queries due to their optimization for other scenarios. Examples are HBase, BigTable, Cassandra, and HyperTable. In these systems, you will get throughput around a hundred thousand rows per second, but not hundreds of millions of rows per second.
Also note that ClickHouse is a DBMS, not a single database. ClickHouse allows creating tables and databases in runtime, loading data, and running queries without reconfiguring and restarting the server.
Also note that ClickHouse is a database management system, not a single database. ClickHouse allows creating tables and databases in runtime, loading data, and running queries without reconfiguring and restarting the server.
## Data compression
Some column-oriented DBMSs (InfiniDB CE and MonetDB) do not use data compression. However, data compression really improves performance.
Some column-oriented DBMSs (InfiniDB CE and MonetDB) do not use data compression. However, data compression is crucial to achieve excellent performance.
## Disk storage of data
Many column-oriented DBMSs (such as SAP HANA and Google PowerDrill) can only work in RAM. But even on thousands of servers, the RAM is too small for storing all the pageviews and sessions in Yandex.Metrica.
Many column-oriented DBMSs (such as SAP HANA and Google PowerDrill) can only work in RAM. This approach stimulates the allocation of a larger hardware budget than is actually necessary for real-time analysis. ClickHouse is designed to work on regular hard drives, which ensures low cost of ownership per gigabyte of data, but SSD and additional RAM are also utilized fully if available.
## Parallel processing on multiple cores
Large queries are parallelized in a natural way.
Large queries are parallelized in a natural way, utilizing all necessary resources that are available on the current server.
## Distributed processing on multiple servers
Almost none of the columnar DBMSs listed above have support for distributed processing.
Almost none of the columnar DBMSs mentioned above have support for distributed query processing.
In ClickHouse, data can reside on different shards. Each shard can be a group of replicas that are used for fault tolerance. The query is processed on all the shards in parallel. This is transparent for the user.
## SQL support
@ -33,30 +33,37 @@ However, this is a declarative query language based on SQL that can't be differe
JOINs are supported. Subqueries are supported in FROM, IN, and JOIN clauses, as well as scalar subqueries.
Dependent subqueries are not supported.
ClickHouse supports declarative query language that is based on SQL and complies to SQL standard in many cases.
GROUP BY, ORDER BY, scalar subqueries and subqueries in FROM, IN and JOIN clauses are supported.
Correlated subqueries and window functions are not supported.
## Vector engine
Data is not only stored by columns, but is processed by vectors (parts of columns). This allows us to achieve high CPU performance.
Data is not only stored by columns, but is also processed by vectors (parts of columns). This allows to achieve high CPU efficiency.
## Real-time data updates
ClickHouse supports primary key tables. In order to quickly perform queries on the range of the primary key, the data is sorted incrementally using the merge tree. Due to this, data can continually be added to the table. There is no locking when adding data.
ClickHouse supports tables with a primary key. In order to quickly perform queries on the range of the primary key, the data is sorted incrementally using the merge tree. Due to this, data can continually be added to the table. No locks are taken when new data is ingested.
## Indexes
## Index
Having a primary key makes it possible to extract data for specific clients (for instance, Yandex.Metrica tracking tags) for a specific time range, with low latency less than several dozen milliseconds.
Having a data physically sorted by primary key makes it possible to extract data for it's specific values or value ranges with low latency, less than few dozen milliseconds.
## Suitable for online queries
This lets us use the system as the back-end for a web interface. Low latency means queries can be processed without delay, while the Yandex.Metrica interface page is loading. In other words, in online mode.
Low latency means that queries can be processed without delay and without trying to prepare answer in advance, right at the same moment while user interface page is loading. In other words, online.
## Support for approximated calculations
1. The system contains aggregate functions for approximated calculation of the number of various values, medians, and quantiles.
2. Supports running a query based on a part (sample) of data and getting an approximated result. In this case, proportionally less data is retrieved from the disk.
3. Supports running an aggregation for a limited number of random keys, instead of for all keys. Under certain conditions for key distribution in the data, this provides a reasonably accurate result while using fewer resources.
ClickHouse provides various ways to trade accuracy for performance:
## Data replication and support for data integrity on replicas
1. Aggregate functions for approximated calculation of the number of distinct values, medians, and quantiles.
2. Running a query based on a part (sample) of data and getting an approximated result. In this case, proportionally less data is retrieved from the disk.
3. Running an aggregation for a limited number of random keys, instead of for all keys. Under certain conditions for key distribution in the data, this provides a reasonably accurate result while using fewer resources.
Uses asynchronous multimaster replication. After being written to any available replica, data is distributed to all the remaining replicas. The system maintains identical data on different replicas. Data is restored automatically after a failure, or using a "button" for complex cases.
For more information, see the section [Data replication](../operations/table_engines/replication.md#table_engines-replication).
## Data replication and integrity
ClickHouse uses asynchronous multimaster replication. After being written to any available replica, data is distributed to all the other replicas in background. The system maintains identical data on different replicas. Data is restored automatically after most failures, or semiautomatically in complicated cases.
For more information, see the [Data replication](../operations/table_engines/replication.md#table_engines-replication) section.

View File

@ -1,6 +1,6 @@
# ClickHouse features that can be considered disadvantages
1. No transactions.
2. For aggregation, query results must fit in the RAM on a single server. However, the volume of source data for a query may be indefinitely large.
3. Lack of full-fledged UPDATE/DELETE implementation.
1. No full-fledged transactions.
2. Lack of ability to modify or delete already inserted data with high rate and low latency. There are batch deletes available to clean up data that is not needed anymore or to comply with [GDPR](https://gdpr-info.eu). Batch updates are in development as of July 2018.
3. Sparse index makes ClickHouse not really suitable for point queries retrieving single rows by their keys.

View File

@ -0,0 +1,77 @@
<a name="table_engines-url"></a>
# URL(URL, Format)
This data source operates with data on remote HTTP/HTTPS server. The engine is
similar to [`File`](./file.md#).
## Usage in ClickHouse server
```
URL(URL, Format)
```
`Format` should be supported for `SELECT` and/or `INSERT`. For the full list of
supported formats see [Formats](../../interfaces/formats.md#formats).
`URL` must match the format of Uniform Resource Locator. The specified
URL must address a server working with HTTP or HTTPS. The server shouldn't
require any additional HTTP-headers.
`INSERT` and `SELECT` queries are transformed into `POST` and `GET` requests
respectively. For correct `POST`-requests handling the remote server should support
[Chunked transfer encoding](https://ru.wikipedia.org/wiki/Chunked_transfer_encoding).
**Example:**
**1.** Create the `url_engine_table` table:
```sql
CREATE TABLE url_engine_table (word String, value UInt64)
ENGINE=URL('http://127.0.0.1:12345/', CSV)
```
**2.** Implement simple http-server using python3:
```python3
from http.server import BaseHTTPRequestHandler, HTTPServer
class CSVHTTPServer(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/csv')
self.end_headers()
self.wfile.write(bytes('Hello,1\nWorld,2\n', "utf-8"))
if __name__ == "__main__":
server_address = ('127.0.0.1', 12345)
HTTPServer(server_address, CSVHTTPServer).serve_forever()
```
```bash
python3 server.py
```
**3.** Query the data:
```sql
SELECT * FROM url_engine_table
```
```text
┌─word──┬─value─┐
│ Hello │ 1 │
│ World │ 2 │
└───────┴───────┘
```
## Details of implementation
- Reads and writes can be parallel
- Not supported:
- `ALTER`
- `SELECT ... SAMPLE`
- Indices
- Replication

View File

@ -0,0 +1,19 @@
<a name="table_functions-url"></a>
# url
`url(URL, format, structure)` - returns a table created from the `URL` with given
`format` and `structure`.
URL - HTTP or HTTPS server address, which can accept `GET` and/or `POST` requests.
format - [format](../../interfaces/formats.md#formats) of the data.
structure - table structure in `'UserID UInt64, Name String'` format. Determines column names and types.
**Example**
```sql
-- getting the first 3 lines of a table that contains columns of String and UInt32 type from HTTP-server which answers in CSV format.
SELECT * FROM url('http://127.0.0.1:12345/', CSV, 'column1 String, column2 UInt32') LIMIT 3
```

View File

@ -2,23 +2,23 @@
## По-настоящему столбцовая СУБД
В по-настоящему столбцовой СУБД рядом со значениями не хранится никакого "мусора". Например, должны поддерживаться значения постоянной длины, чтобы не хранить рядом со значениями типа "число" их длины. Для примера, миллиард значений типа UInt8 должен действительно занимать в несжатом виде около 1GB, иначе это сильно ударит по эффективности использования CPU. Очень важно хранить данные компактно (без "мусора") в том числе в несжатом виде, так как скорость разжатия (использование CPU) зависит, в основном, от объёма несжатых данных.
В по-настоящему столбцовой СУБД рядом со значениями не хранится никаких лишних данных. Например, должны поддерживаться значения постоянной длины, чтобы не хранить рядом со значениями типа "число" их длины. Для примера, миллиард значений типа UInt8 должен действительно занимать в несжатом виде около 1GB, иначе это сильно ударит по эффективности использования CPU. Очень важно хранить данные компактно (без "мусора") в том числе в несжатом виде, так как скорость разжатия (использование CPU) зависит, в основном, от объёма несжатых данных.
Этот пункт пришлось выделить, так как существуют системы, которые могут хранить значения отдельных столбцов по отдельности, но не могут эффективно выполнять аналитические запросы в силу оптимизации под другой сценарий работы. Примеры: HBase, BigTable, Cassandra, HyperTable. В этих системах вы получите throughput в районе сотен тысяч строк в секунду, но не сотен миллионов строк в секунду.
Этот пункт пришлось выделить, так как существуют системы, которые могут хранить значения отдельных столбцов по отдельности, но не могут эффективно выполнять аналитические запросы в силу оптимизации под другой сценарий работы. Примеры: HBase, BigTable, Cassandra, HyperTable. В этих системах вы получите пропускную способность в районе сотен тысяч строк в секунду, но не сотен миллионов строк в секунду.
Также стоит заметить, что ClickHouse является СУБД, а не одной базой данных. То есть, ClickHouse позволяет создавать таблицы и базы данных в runtime, загружать данные и выполнять запросы без переконфигурирования и перезапуска сервера.
Также стоит заметить, что ClickHouse является системой управления базами данных, а не одной базой данных. То есть, ClickHouse позволяет создавать таблицы и базы данных в runtime, загружать данные и выполнять запросы без переконфигурирования и перезапуска сервера.
## Сжатие данных
Некоторые столбцовые СУБД (InfiniDB CE, MonetDB) не используют сжатие данных. Но сжатие данных действительно серьёзно увеличивает производительность.
Некоторые столбцовые СУБД (InfiniDB CE, MonetDB) не используют сжатие данных. Однако сжатие данных действительно играет одну из ключевых ролей в демонстрации отличной производительности.
## Хранение данных на диске
Многие столбцовые СУБД (SAP HANA, Google PowerDrill) могут работать только в оперативке. Но оперативки (даже на тысячах серверах) слишком мало для хранения всех хитов и визитов в Яндекс.Метрике.
Многие столбцовые СУБД (SAP HANA, Google PowerDrill) могут работать только в оперативной памяти. Такой подход стимулирует выделять больший бюджет на оборудование, чем фактически требуется для анализа в реальном времени. ClickHouse спроектирован для работы на обычных жестких дисках, что обеспечивает низкую стоимость хранения на гигабайт данных, но SSD b дополнительная оперативная память тоже полноценно используются, если доступны.
## Параллельная обработка запроса на многих процессорных ядрах
Большие запросы естественным образом распараллеливаются.
Большие запросы естественным образом распараллеливаются, используя все необходимые ресурсы из доступных на сервере.
## Распределённая обработка запроса на многих серверах
@ -27,11 +27,9 @@
## Поддержка SQL
Если вы знаете, что такое стандартный SQL, то говорить о поддержке SQL всё-таки нельзя.
Все функции названы по-другому.
Тем не менее, это - декларативный язык запросов на основе SQL и во многих случаях не отличимый от SQL.
Поддерживаются JOIN-ы. Поддерживаются подзапросы в секциях FROM, IN, JOIN, а также скалярные подзапросы.
Зависимые подзапросы не поддерживаются.
ClickHouse поддерживает декларативный язык запросов на основе SQL и во многих случаях совпадающий с SQL стандартом.
Поддерживаются GROUP BY, ORDER BY, подзапросы в секциях FROM, IN, JOIN, а также скалярные подзапросы.
Зависимые подзапросы и оконные функции не поддерживаются.
## Векторный движок
@ -41,21 +39,24 @@
ClickHouse поддерживает таблицы с первичным ключом. Для того, чтобы можно было быстро выполнять запросы по диапазону первичного ключа, данные инкрементально сортируются с помощью merge дерева. За счёт этого, поддерживается постоянное добавление данных в таблицу. Блокировки при добавлении данных отсутствуют.
## Наличие индексов
## Наличие индекса
Наличие первичного ключа позволяет, например, вынимать данные для конкретных клиентов (счётчиков Метрики), для заданного диапазона времени, с низкими задержками - менее десятков миллисекунд.
Физическая сортировка данных по первичному ключу позволяет получать данные для конкретных его значений или их диапазонов с низкими задержками - менее десятков миллисекунд.
## Подходит для онлайн запросов
Это позволяет использовать систему в качестве бэкенда для веб-интерфейса. Низкие задержки позволяют не откладывать выполнение запроса, а выполнять его в момент загрузки страницы интерфейса Яндекс.Метрики. То есть, в режиме онлайн.
Низкие задержки позволяют не откладывать выполнение запроса и не подготавливать ответ заранее, а выполнять его именно в момент загрузки страницы пользовательского интерфейса. То есть, в режиме онлайн.
## Поддержка приближённых вычислений
ClickHouse предоставляет различные способы разменять точность вычислений на производительность:
1. Система содержит агрегатные функции для приближённого вычисления количества различных значений, медианы и квантилей.
2. Поддерживается возможность выполнить запрос на основе части (выборки) данных и получить приближённый результат. При этом, с диска будет считано пропорционально меньше данных.
3. Поддерживается возможность выполнить агрегацию не для всех ключей, а для ограниченного количества первых попавшихся ключей. При выполнении некоторых условий на распределение ключей в данных, это позволяет получить достаточно точный результат с использованием меньшего количества ресурсов.
## Репликация данных, поддержка целостности данных на репликах
## Репликация данных и поддержка целостности
Используется асинхронная multimaster репликация. После записи на любую доступную реплику, данные распространяются на все остальные реплики в фоне. Система поддерживает полную идентичность данных на разных репликах. Восстановление после большинства сбоев осуществляется автоматически, а в сложных случаях — полуавтоматически.
Используется асинхронная multimaster репликация. После записи на любую доступную реплику, данные распространяются на все остальные реплики. Система поддерживает полную идентичность данных на разных репликах. Восстановление после сбоя осуществляется автоматически, а в сложных случаях - "по кнопке".
Подробнее смотрите раздел [Репликация данных](../operations/table_engines/replication.md#table_engines-replication).

View File

@ -1,6 +1,6 @@
# Особенности ClickHouse, которые могут считаться недостатками
1. Отсутствие транзакций.
2. Необходимо, чтобы результат выполнения запроса, в случае агрегации, помещался в оперативку на одном сервере. Объём исходных данных для запроса, при этом, может быть сколь угодно большим.
3. Отсутствие полноценной реализации UPDATE/DELETE.
1. Отсутствие полноценных транзакций.
2. Возможность изменять или удалять ранее записанные данные с низкими задержками и высокой частотой запросов не предоставляется. Есть массовое удаление данных для очистки более не нужного или соответствия [GDPR](https://gdpr-info.eu). Массовое изменение данных находится в разработке (на момент июля 2018).
3. Разреженный индекс делает ClickHouse плохо пригодным для точечных чтений одиночных строк по своим
ключам.

View File

@ -0,0 +1,74 @@
<a name="table_engines-url"></a>
# URL(URL, Format)
Управляет данными на удаленном HTTP/HTTPS сервере. Данный движок похож
на движок [`File`](./file.md#).
## Использование движка в сервере ClickHouse
`Format` должен быть таким, который ClickHouse может использовать в запросах
`SELECT` и, если есть необходимость, `INSERT`. Полный список поддерживаемых форматов смотрите в
разделе [Форматы](../../interfaces/formats.md#formats).
`URL` должен соответствовать структуре Uniform Resource Locator. По указанному URL должен находится сервер
работающий по протоколу HTTP или HTTPS. При этом не должно требоваться никаких
дополнительных заголовков для получения ответа от сервера.
Запросы `INSERT` и `SELECT` транслируются в `POST` и `GET` запросы
соответственно. Для обработки `POST`-запросов удаленный сервер должен поддерживать
[Chunked transfer encoding](https://ru.wikipedia.org/wiki/Chunked_transfer_encoding).
**Пример:**
**1.** Создадим на сервере таблицу `url_engine_table`:
```sql
CREATE TABLE url_engine_table (word String, value UInt64)
ENGINE=URL('http://127.0.0.1:12345/', CSV)
```
**2.** Создадим простейший http-сервер стандартными средствами языка python3 и
запустим его:
```python3
from http.server import BaseHTTPRequestHandler, HTTPServer
class CSVHTTPServer(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/csv')
self.end_headers()
self.wfile.write(bytes('Hello,1\nWorld,2\n', "utf-8"))
if __name__ == "__main__":
server_address = ('127.0.0.1', 12345)
HTTPServer(server_address, CSVHTTPServer).serve_forever()
```
```bash
python3 server.py
```
**3.** Запросим данные:
```sql
SELECT * FROM url_engine_table
```
```text
┌─word──┬─value─┐
│ Hello │ 1 │
│ World │ 2 │
└───────┴───────┘
```
## Особенности использования
- Поддерживается многопоточное чтение и запись.
- Не поддерживается:
- использование операций `ALTER` и `SELECT...SAMPLE`;
- индексы;
- репликация.

View File

@ -0,0 +1,20 @@
<a name="table_functions-url"></a>
# url
`url(URL, format, structure)` - возвращает таблицу со столбцами, указанными в
`structure`, созданную из данных находящихся по `URL` в формате `format`.
URL - адрес, по которому сервер принимает `GET` и/или `POST` запросы по
протоколу HTTP или HTTPS.
format - [формат](../../interfaces/formats.md#formats) данных.
structure - структура таблицы в форме `'UserID UInt64, Name String'`. Определяет имена и типы столбцов.
**Пример**
```sql
-- получение 3-х строк таблицы, состоящей из двух колонк типа String и UInt32 от сервера, отдающего данные в формате CSV
SELECT * FROM url('http://127.0.0.1:12345/', CSV, 'column1 String, column2 UInt32') LIMIT 3
```

View File

@ -94,6 +94,7 @@ pages:
- 'merge': 'query_language/table_functions/merge.md'
- 'numbers': 'query_language/table_functions/numbers.md'
- 'remote': 'query_language/table_functions/remote.md'
- 'url': 'query_language/table_functions/url.md'
- 'Dictionaries':
- 'Introduction': 'query_language/dicts/index.md'
- 'External dictionaries':
@ -134,6 +135,7 @@ pages:
- 'Null': 'operations/table_engines/null.md'
- 'Set': 'operations/table_engines/set.md'
- 'Join': 'operations/table_engines/join.md'
- 'URL': 'operations/table_engines/url.md'
- 'View': 'operations/table_engines/view.md'
- 'MaterializedView': 'operations/table_engines/materializedview.md'
- 'Integrations':

View File

@ -100,6 +100,7 @@ pages:
- 'merge': 'query_language/table_functions/merge.md'
- 'numbers': 'query_language/table_functions/numbers.md'
- 'remote': 'query_language/table_functions/remote.md'
- 'url': 'query_language/table_functions/url.md'
- 'Словари':
- 'Введение': 'query_language/dicts/index.md'
- 'Внешние словари':
@ -141,6 +142,7 @@ pages:
- 'Null': 'operations/table_engines/null.md'
- 'Set': 'operations/table_engines/set.md'
- 'Join': 'operations/table_engines/join.md'
- 'URL': 'operations/table_engines/url.md'
- 'View': 'operations/table_engines/view.md'
- 'MaterializedView': 'operations/table_engines/materializedview.md'
- 'Интеграции':