diff --git a/CHANGELOG.md b/CHANGELOG.md
index 23062ae1e73..ed71baf8046 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,36 @@
+## ClickHouse release 18.1.0, 2018-07-23
+
+### New features:
+
+* Support for the `ALTER TABLE t DELETE WHERE` query for non-replicated MergeTree tables ([#2634](https://github.com/yandex/ClickHouse/pull/2634)).
+* Support for arbitrary types for the `uniq*` family of aggregate functions ([#2010](https://github.com/yandex/ClickHouse/issues/2010)).
+* Support for arbitrary types in comparison operators ([#2026](https://github.com/yandex/ClickHouse/issues/2026)).
+* The `users.xml` file allows setting a subnet mask in the format `10.0.0.1/255.255.255.0`. This is necessary for using masks for IPv6 networks with zeros in the middle ([#2637](https://github.com/yandex/ClickHouse/pull/2637)).
+* Added the `arrayDistinct` function ([#2670](https://github.com/yandex/ClickHouse/pull/2670)).
+* The SummingMergeTree engine can now work with AggregateFunction type columns ([Constantin S. Pan](https://github.com/yandex/ClickHouse/pull/2566)).
+
+### Improvements:
+
+* Changed the numbering scheme for release versions. Now the first part contains the year of release (A.D., Moscow timezone, minus 2000), the second part contains the number for major changes (increases for most releases), and the third part is the patch version. Releases are still backwards compatible, unless otherwise stated in the changelog.
+* Faster conversions of floating-point numbers to a string ([Amos Bird](https://github.com/yandex/ClickHouse/pull/2664)).
+* If some rows were skipped during an insert due to parsing errors (this is possible with the `input_allow_errors_num` and `input_allow_errors_ratio` settings enabled), the number of skipped rows is now written to the server log ([Leonardo Cecchi](https://github.com/yandex/ClickHouse/pull/2669)).
+
+### Bug fixes:
+
+* Fixed the TRUNCATE command for temporary tables ([Amos Bird](https://github.com/yandex/ClickHouse/pull/2624)).
+* Fixed a rare deadlock in the ZooKeeper client library that occurred when there was a network error while reading the response ([c315200](https://github.com/yandex/ClickHouse/commit/c315200e64b87e44bdf740707fc857d1fdf7e947)).
+* Fixed an error during a CAST to Nullable types ([#1322](https://github.com/yandex/ClickHouse/issues/1322)).
+* Fixed the incorrect result of the `maxIntersection()` function when the boundaries of intervals coincided ([Michael Furmur](https://github.com/yandex/ClickHouse/pull/2657)).
+* Fixed incorrect transformation of the OR expression chain in a function argument ([chenxing-xc](https://github.com/yandex/ClickHouse/pull/2663)).
+* Fixed performance degradation for queries containing `IN (subquery)` expressions inside another subquery ([#2571](https://github.com/yandex/ClickHouse/issues/2571)).
+* Fixed incompatibility between servers with different versions in distributed queries that use a `CAST` function that isn't in uppercase letters ([fe8c4d6](https://github.com/yandex/ClickHouse/commit/fe8c4d64e434cacd4ceef34faa9005129f2190a5)).
+* Added missing quoting of identifiers for queries to an external DBMS ([#2635](https://github.com/yandex/ClickHouse/issues/2635)).
+
+### Backward incompatible changes:
+
+* Converting a string containing the number zero to DateTime does not work. Example: `SELECT toDateTime('0')`. This is also the reason that `DateTime DEFAULT '0'` does not work in tables, as well as `0` in dictionaries. Solution: replace `0` with `0000-00-00 00:00:00`.
+
+
## ClickHouse release 1.1.54394, 2018-07-12
### New features:
diff --git a/CHANGELOG_RU.md b/CHANGELOG_RU.md
index 5282e10a556..8150e1f5a57 100644
--- a/CHANGELOG_RU.md
+++ b/CHANGELOG_RU.md
@@ -1,3 +1,32 @@
+## ClickHouse release 18.1.0, 2018-07-23
+
+### Новые возможности:
+* Поддержка запроса `ALTER TABLE t DELETE WHERE` для нереплицированных MergeTree-таблиц ([#2634](https://github.com/yandex/ClickHouse/pull/2634)).
+* Поддержка произвольных типов для семейства агрегатных функций `uniq*` ([#2010](https://github.com/yandex/ClickHouse/issues/2010)).
+* Поддержка произвольных типов в операторах сравнения ([#2026](https://github.com/yandex/ClickHouse/issues/2026)).
+* Возможность в `users.xml` указывать маску подсети в формате `10.0.0.1/255.255.255.0`. Это необходимо для использования "дырявых" масок IPv6 сетей ([#2637](https://github.com/yandex/ClickHouse/pull/2637)).
+* Добавлена функция `arrayDistinct` ([#2670](https://github.com/yandex/ClickHouse/pull/2670)).
+* Движок SummingMergeTree теперь может работать со столбцами типа AggregateFunction ([Constantin S. Pan](https://github.com/yandex/ClickHouse/pull/2566)).
+
+### Улучшения:
+* Изменена схема версионирования релизов. Теперь первый компонент содержит год релиза (A.D.; по московскому времени; из номера вычитается 2000), второй - номер крупных изменений (увеличивается для большинства релизов), третий - патч-версия. Релизы по-прежнему обратно совместимы, если другое не указано в changelog.
+* Ускорено преобразование чисел с плавающей точкой в строку ([Amos Bird](https://github.com/yandex/ClickHouse/pull/2664)).
+* Теперь, если при вставке из-за ошибок парсинга пропущено некоторое количество строк (такое возможно про включённых настройках `input_allow_errors_num`, `input_allow_errors_ratio`), это количество пишется в лог сервера ([Leonardo Cecchi](https://github.com/yandex/ClickHouse/pull/2669)).
+
+### Исправление ошибок:
+* Исправлена работа команды TRUNCATE для временных таблиц ([Amos Bird](https://github.com/yandex/ClickHouse/pull/2624)).
+* Исправлен редкий deadlock в клиентской библиотеке ZooKeeper, который возникал при сетевой ошибке во время вычитывания ответа ([c315200](https://github.com/yandex/ClickHouse/commit/c315200e64b87e44bdf740707fc857d1fdf7e947)).
+* Исправлена ошибка при CAST в Nullable типы ([#1322](https://github.com/yandex/ClickHouse/issues/1322)).
+* Исправлен неправильный результат функции `maxIntersection()` в случае совпадения границ отрезков ([Michael Furmur](https://github.com/yandex/ClickHouse/pull/2657)).
+* Исправлено неверное преобразование цепочки OR-выражений в аргументе функции ([chenxing-xc](https://github.com/yandex/ClickHouse/pull/2663)).
+* Исправлена деградация производительности запросов, содержащих выражение `IN (подзапрос)` внутри другого подзапроса ([#2571](https://github.com/yandex/ClickHouse/issues/2571)).
+* Исправлена несовместимость серверов разных версий при распределённых запросах, использующих функцию `CAST` не в верхнем регистре ([fe8c4d6](https://github.com/yandex/ClickHouse/commit/fe8c4d64e434cacd4ceef34faa9005129f2190a5)).
+* Добавлено недостающее квотирование идентификаторов при запросах к внешним СУБД ([#2635](https://github.com/yandex/ClickHouse/issues/2635)).
+
+### Обратно несовместимые изменения:
+* Не работает преобразование строки, содержащей число ноль, в DateTime. Пример: `SELECT toDateTime('0')`. По той же причине не работает `DateTime DEFAULT '0'` в таблицах, а также `0` в словарях. Решение: заменить `0` на `0000-00-00 00:00:00`.
+
+
## ClickHouse release 1.1.54394, 2018-07-12
### Новые возможности:
diff --git a/README.md b/README.md
index 905e6e5ba90..8cb9fa3379e 100644
--- a/README.md
+++ b/README.md
@@ -2,6 +2,8 @@
ClickHouse is an open-source column-oriented database management system that allows generating analytical data reports in real time.
+[![Build Status](https://travis-ci.org/yandex/ClickHouse.svg?branch=master)](https://travis-ci.org/yandex/ClickHouse)
+
## Useful links
* [Official website](https://clickhouse.yandex/) has quick high-level overview of ClickHouse on main page.
@@ -9,5 +11,3 @@ ClickHouse is an open-source column-oriented database management system that all
* [Documentation](https://clickhouse.yandex/docs/en/) provides more in-depth information.
* [Contacts](https://clickhouse.yandex/#contacts) can help to get your questions answered if there are any.
-
-[![Build Status](https://travis-ci.org/yandex/ClickHouse.svg?branch=master)](https://travis-ci.org/yandex/ClickHouse)
diff --git a/cmake/find_llvm.cmake b/cmake/find_llvm.cmake
index a398255d5d7..b10a8cb87d4 100644
--- a/cmake/find_llvm.cmake
+++ b/cmake/find_llvm.cmake
@@ -26,10 +26,10 @@ if (ENABLE_EMBEDDED_COMPILER)
if (LLVM_FOUND)
find_library (LLD_LIBRARY_TEST lldCore PATHS ${LLVM_LIBRARY_DIRS})
- find_path (LLD_INCLUDE_DIR_TEST NAMES lld/Common/Driver.h PATHS ${LLVM_INCLUDE_DIRS})
+ find_path (LLD_INCLUDE_DIR_TEST NAMES lld/Core/AbsoluteAtom.h PATHS ${LLVM_INCLUDE_DIRS})
if (NOT LLD_LIBRARY_TEST OR NOT LLD_INCLUDE_DIR_TEST)
set (LLVM_FOUND 0)
- message(WARNING "liblld not found in ${LLVM_INCLUDE_DIRS} ${LLVM_LIBRARY_DIRS}. Disabling internal compiler.")
+ message(WARNING "liblld (${LLD_LIBRARY_TEST}, ${LLD_INCLUDE_DIR_TEST}) not found in ${LLVM_INCLUDE_DIRS} ${LLVM_LIBRARY_DIRS}. Disabling internal compiler.")
endif ()
endif ()
diff --git a/dbms/cmake/version.cmake b/dbms/cmake/version.cmake
index 131e6f26aaa..9b8de90f6b7 100644
--- a/dbms/cmake/version.cmake
+++ b/dbms/cmake/version.cmake
@@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
-set(VERSION_REVISION 54396 CACHE STRING "")
+set(VERSION_REVISION 54397 CACHE STRING "")
set(VERSION_MAJOR 18 CACHE STRING "")
-set(VERSION_MINOR 1 CACHE STRING "")
+set(VERSION_MINOR 2 CACHE STRING "")
set(VERSION_PATCH 0 CACHE STRING "")
-set(VERSION_GITHASH 550f41bc65cb03201acad489e7b96ea346ed8259 CACHE STRING "")
-set(VERSION_DESCRIBE v18.1.0-testing CACHE STRING "")
-set(VERSION_STRING 18.1.0 CACHE STRING "")
+set(VERSION_GITHASH 6ad677d7d6961a0c9088ccd9eff55779cfdaa654 CACHE STRING "")
+set(VERSION_DESCRIBE v18.2.0-testing CACHE STRING "")
+set(VERSION_STRING 18.2.0 CACHE STRING "")
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")
diff --git a/dbms/programs/obfuscator/Obfuscator.cpp b/dbms/programs/obfuscator/Obfuscator.cpp
index 854771b3b26..3ba6d76179e 100644
--- a/dbms/programs/obfuscator/Obfuscator.cpp
+++ b/dbms/programs/obfuscator/Obfuscator.cpp
@@ -58,13 +58,13 @@ It is designed to retain the following properties of data:
Most of the properties above are viable for performance testing:
- reading data, filtering, aggregation and sorting will work at almost the same speed
- as on original data due to saved cardinalities, magnitudes, compression ratios, etc.
+ as on original data due to saved cardinalities, magnitudes, compression ratios, etc.
It works in deterministic fashion: you define a seed value and transform is totally determined by input data and by seed.
Some transforms are one to one and could be reversed, so you need to have large enough seed and keep it in secret.
It use some cryptographic primitives to transform data, but from the cryptographic point of view,
- it doesn't do anything properly and you should never consider the result as secure, unless you have other reasons for it.
+ it doesn't do anything properly and you should never consider the result as secure, unless you have other reasons for it.
It may retain some data you don't want to publish.
@@ -74,7 +74,7 @@ So, the user will be able to count exact ratio of mobile traffic.
Another example, suppose you have some private data in your table, like user email and you don't want to publish any single email address.
If your table is large enough and contain multiple different emails and there is no email that have very high frequency than all others,
- it will perfectly anonymize all data. But if you have small amount of different values in a column, it can possibly reproduce some of them.
+ it will perfectly anonymize all data. But if you have small amount of different values in a column, it can possibly reproduce some of them.
And you should take care and look at exact algorithm, how this tool works, and probably fine tune some of it command line parameters.
This tool works fine only with reasonable amount of data (at least 1000s of rows).
diff --git a/dbms/programs/server/InterserverIOHTTPHandler.cpp b/dbms/programs/server/InterserverIOHTTPHandler.cpp
index 3cdbaa69b64..39d214503ba 100644
--- a/dbms/programs/server/InterserverIOHTTPHandler.cpp
+++ b/dbms/programs/server/InterserverIOHTTPHandler.cpp
@@ -1,3 +1,4 @@
+#include
#include
#include
@@ -23,14 +24,40 @@ namespace ErrorCodes
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
}
+std::pair InterserverIOHTTPHandler::checkAuthentication(Poco::Net::HTTPServerRequest & request) const
+{
+ const auto & config = server.config();
+
+ if (config.has("interserver_http_credentials.user"))
+ {
+ if (!request.hasCredentials())
+ return {"Server requires HTTP Basic authentification, but client doesn't provide it", false};
+ String scheme, info;
+ request.getCredentials(scheme, info);
+
+ if (scheme != "Basic")
+ return {"Server requires HTTP Basic authentification but client provides another method", false};
+
+ String user = config.getString("interserver_http_credentials.user");
+ String password = config.getString("interserver_http_credentials.password", "");
+
+ Poco::Net::HTTPBasicCredentials credentials(info);
+ if (std::make_pair(user, password) != std::make_pair(credentials.getUsername(), credentials.getPassword()))
+ return {"Incorrect user or password in HTTP Basic authentification", false};
+ }
+ else if (request.hasCredentials())
+ {
+ return {"Client requires HTTP Basic authentification, but server doesn't provide it", false};
+ }
+ return {"", true};
+}
+
void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
HTMLForm params(request);
LOG_TRACE(log, "Request URI: " << request.getURI());
- /// NOTE: You can do authentication here if you need to.
-
String endpoint_name = params.get("endpoint");
bool compress = params.get("compress") == "true";
@@ -65,8 +92,18 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ
try
{
- processQuery(request, response);
- LOG_INFO(log, "Done processing query");
+ if (auto [msg, success] = checkAuthentication(request); success)
+ {
+ processQuery(request, response);
+ LOG_INFO(log, "Done processing query");
+ }
+ else
+ {
+ response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED);
+ if (!response.sent())
+ response.send() << msg << std::endl;
+ LOG_WARNING(log, "Query processing failed request: '" << request.getURI() << "' authentification failed");
+ }
}
catch (Exception & e)
{
diff --git a/dbms/programs/server/InterserverIOHTTPHandler.h b/dbms/programs/server/InterserverIOHTTPHandler.h
index bf9fef59982..fbaf432d4f9 100644
--- a/dbms/programs/server/InterserverIOHTTPHandler.h
+++ b/dbms/programs/server/InterserverIOHTTPHandler.h
@@ -34,6 +34,8 @@ private:
CurrentMetrics::Increment metric_increment{CurrentMetrics::InterserverConnection};
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
+
+ std::pair checkAuthentication(Poco::Net::HTTPServerRequest & request) const;
};
}
diff --git a/dbms/programs/server/Server.cpp b/dbms/programs/server/Server.cpp
index 9a3db8bdb12..c4b0c77a026 100644
--- a/dbms/programs/server/Server.cpp
+++ b/dbms/programs/server/Server.cpp
@@ -230,6 +230,17 @@ int Server::main(const std::vector & /*args*/)
global_context->setInterserverIOAddress(this_host, port);
}
+ if (config().has("interserver_http_credentials"))
+ {
+ String user = config().getString("interserver_http_credentials.user", "");
+ String password = config().getString("interserver_http_credentials.password", "");
+
+ if (user.empty())
+ throw Exception("Configuration parameter interserver_http_credentials user can't be empty", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
+
+ global_context->setInterverserCredentials(user, password);
+ }
+
if (config().has("macros"))
global_context->setMacros(std::make_unique(config(), "macros"));
diff --git a/dbms/src/AggregateFunctions/AggregateFunctionArray.cpp b/dbms/src/AggregateFunctions/AggregateFunctionArray.cpp
index f42c5b6d142..9cb7d03bf69 100644
--- a/dbms/src/AggregateFunctions/AggregateFunctionArray.cpp
+++ b/dbms/src/AggregateFunctions/AggregateFunctionArray.cpp
@@ -18,6 +18,9 @@ public:
DataTypes transformArguments(const DataTypes & arguments) const override
{
+ if (0 == arguments.size())
+ throw Exception("-Array aggregate functions require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
+
DataTypes nested_arguments;
for (const auto & type : arguments)
{
diff --git a/dbms/src/AggregateFunctions/AggregateFunctionBitwise.cpp b/dbms/src/AggregateFunctions/AggregateFunctionBitwise.cpp
index 762baf2451b..8c188bcbb8e 100644
--- a/dbms/src/AggregateFunctions/AggregateFunctionBitwise.cpp
+++ b/dbms/src/AggregateFunctions/AggregateFunctionBitwise.cpp
@@ -38,9 +38,9 @@ void registerAggregateFunctionsBitwise(AggregateFunctionFactory & factory)
factory.registerFunction("groupBitXor", createAggregateFunctionBitwise);
/// Aliases for compatibility with MySQL.
- factory.registerFunction("BIT_OR", createAggregateFunctionBitwise, AggregateFunctionFactory::CaseInsensitive);
- factory.registerFunction("BIT_AND", createAggregateFunctionBitwise, AggregateFunctionFactory::CaseInsensitive);
- factory.registerFunction("BIT_XOR", createAggregateFunctionBitwise, AggregateFunctionFactory::CaseInsensitive);
+ factory.registerAlias("BIT_OR", "groupBitOr", AggregateFunctionFactory::CaseInsensitive);
+ factory.registerAlias("BIT_AND", "groupBitAnd", AggregateFunctionFactory::CaseInsensitive);
+ factory.registerAlias("BIT_XOR", "groupBitXor", AggregateFunctionFactory::CaseInsensitive);
}
}
diff --git a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp
index eca854a031b..353b5a213b3 100644
--- a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp
+++ b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp
@@ -78,11 +78,12 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
AggregateFunctionPtr AggregateFunctionFactory::getImpl(
- const String & name,
+ const String & name_param,
const DataTypes & argument_types,
const Array & parameters,
int recursion_level) const
{
+ String name = getAliasToOrName(name_param);
/// Find by exact match.
auto it = aggregate_functions.find(name);
if (it != aggregate_functions.end())
@@ -103,8 +104,8 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
if (AggregateFunctionCombinatorPtr combinator = AggregateFunctionCombinatorFactory::instance().tryFindSuffix(name))
{
- if (combinator->getName() == "Null")
- throw Exception("Aggregate function combinator 'Null' is only for internal usage", ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
+ if (combinator->isForInternalUsageOnly())
+ throw Exception("Aggregate function combinator '" + combinator->getName() + "' is only for internal usage", ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
String nested_name = name.substr(0, name.size() - combinator->getName().size());
DataTypes nested_types = combinator->transformArguments(argument_types);
@@ -126,10 +127,11 @@ AggregateFunctionPtr AggregateFunctionFactory::tryGet(const String & name, const
bool AggregateFunctionFactory::isAggregateFunctionName(const String & name, int recursion_level) const
{
- if (aggregate_functions.count(name))
+ if (aggregate_functions.count(name) || isAlias(name))
return true;
- if (recursion_level == 0 && case_insensitive_aggregate_functions.count(Poco::toLower(name)))
+ String name_lowercase = Poco::toLower(name);
+ if (recursion_level == 0 && (case_insensitive_aggregate_functions.count(name_lowercase) || isAlias(name_lowercase)))
return true;
if (AggregateFunctionCombinatorPtr combinator = AggregateFunctionCombinatorFactory::instance().tryFindSuffix(name))
diff --git a/dbms/src/AggregateFunctions/AggregateFunctionFactory.h b/dbms/src/AggregateFunctions/AggregateFunctionFactory.h
index bc36e76c11f..92598e52509 100644
--- a/dbms/src/AggregateFunctions/AggregateFunctionFactory.h
+++ b/dbms/src/AggregateFunctions/AggregateFunctionFactory.h
@@ -1,6 +1,7 @@
#pragma once
#include
+#include
#include
@@ -20,27 +21,18 @@ class IDataType;
using DataTypePtr = std::shared_ptr;
using DataTypes = std::vector;
+/** Creator have arguments: name of aggregate function, types of arguments, values of parameters.
+ * Parameters are for "parametric" aggregate functions.
+ * For example, in quantileWeighted(0.9)(x, weight), 0.9 is "parameter" and x, weight are "arguments".
+ */
+using AggregateFunctionCreator = std::function;
+
/** Creates an aggregate function by name.
*/
-class AggregateFunctionFactory final : public ext::singleton
+class AggregateFunctionFactory final : public ext::singleton, public IFactoryWithAliases
{
- friend class StorageSystemFunctions;
-
public:
- /** Creator have arguments: name of aggregate function, types of arguments, values of parameters.
- * Parameters are for "parametric" aggregate functions.
- * For example, in quantileWeighted(0.9)(x, weight), 0.9 is "parameter" and x, weight are "arguments".
- */
- using Creator = std::function;
-
- /// For compatibility with SQL, it's possible to specify that certain aggregate function name is case insensitive.
- enum CaseSensitiveness
- {
- CaseSensitive,
- CaseInsensitive
- };
-
/// Register a function by its name.
/// No locking, you must register all functions before usage of get.
void registerFunction(
@@ -77,6 +69,13 @@ private:
/// Case insensitive aggregate functions will be additionally added here with lowercased name.
AggregateFunctions case_insensitive_aggregate_functions;
+
+ const AggregateFunctions & getCreatorMap() const override { return aggregate_functions; }
+
+ const AggregateFunctions & getCaseInsensitiveCreatorMap() const override { return case_insensitive_aggregate_functions; }
+
+ String getFactoryName() const override { return "AggregateFunctionFactory"; }
+
};
}
diff --git a/dbms/src/AggregateFunctions/AggregateFunctionNull.cpp b/dbms/src/AggregateFunctions/AggregateFunctionNull.cpp
index 46a46a2370a..6ce7d94d970 100644
--- a/dbms/src/AggregateFunctions/AggregateFunctionNull.cpp
+++ b/dbms/src/AggregateFunctions/AggregateFunctionNull.cpp
@@ -18,6 +18,8 @@ class AggregateFunctionCombinatorNull final : public IAggregateFunctionCombinato
public:
String getName() const override { return "Null"; }
+ bool isForInternalUsageOnly() const override { return true; }
+
DataTypes transformArguments(const DataTypes & arguments) const override
{
size_t size = arguments.size();
diff --git a/dbms/src/AggregateFunctions/AggregateFunctionQuantile.cpp b/dbms/src/AggregateFunctions/AggregateFunctionQuantile.cpp
index 250ee422e8b..62455af6353 100644
--- a/dbms/src/AggregateFunctions/AggregateFunctionQuantile.cpp
+++ b/dbms/src/AggregateFunctions/AggregateFunctionQuantile.cpp
@@ -93,30 +93,14 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
createAggregateFunctionQuantile);
/// 'median' is an alias for 'quantile'
-
- factory.registerFunction("median",
- createAggregateFunctionQuantile);
-
- factory.registerFunction("medianDeterministic",
- createAggregateFunctionQuantile);
-
- factory.registerFunction("medianExact",
- createAggregateFunctionQuantile);
-
- factory.registerFunction("medianExactWeighted",
- createAggregateFunctionQuantile);
-
- factory.registerFunction("medianTiming",
- createAggregateFunctionQuantile);
-
- factory.registerFunction("medianTimingWeighted",
- createAggregateFunctionQuantile);
-
- factory.registerFunction("medianTDigest",
- createAggregateFunctionQuantile);
-
- factory.registerFunction("medianTDigestWeighted",
- createAggregateFunctionQuantile);
+ factory.registerAlias("median", NameQuantile::name);
+ factory.registerAlias("medianDeterministic", NameQuantileDeterministic::name);
+ factory.registerAlias("medianExact", NameQuantileExact::name);
+ factory.registerAlias("medianExactWeighted", NameQuantileExactWeighted::name);
+ factory.registerAlias("medianTiming", NameQuantileTiming::name);
+ factory.registerAlias("medianTimingWeighted", NameQuantileTimingWeighted::name);
+ factory.registerAlias("medianTDigest", NameQuantileTDigest::name);
+ factory.registerAlias("medianTDigestWeighted", NameQuantileTDigestWeighted::name);
}
}
diff --git a/dbms/src/AggregateFunctions/AggregateFunctionWindowFunnel.h b/dbms/src/AggregateFunctions/AggregateFunctionWindowFunnel.h
index 4ad0400d160..b62755ef00c 100644
--- a/dbms/src/AggregateFunctions/AggregateFunctionWindowFunnel.h
+++ b/dbms/src/AggregateFunctions/AggregateFunctionWindowFunnel.h
@@ -116,7 +116,7 @@ struct AggregateFunctionWindowFunnelData
/// TODO Protection against huge size
events_list.clear();
- events_list.resize(size);
+ events_list.reserve(size);
UInt32 timestamp;
UInt8 event;
diff --git a/dbms/src/AggregateFunctions/AggregateFunctionsStatisticsSimple.cpp b/dbms/src/AggregateFunctions/AggregateFunctionsStatisticsSimple.cpp
index 089ea59cd79..c42372187bc 100644
--- a/dbms/src/AggregateFunctions/AggregateFunctionsStatisticsSimple.cpp
+++ b/dbms/src/AggregateFunctions/AggregateFunctionsStatisticsSimple.cpp
@@ -56,12 +56,12 @@ void registerAggregateFunctionsStatisticsSimple(AggregateFunctionFactory & facto
factory.registerFunction("corr", createAggregateFunctionStatisticsBinary, AggregateFunctionFactory::CaseInsensitive);
/// Synonims for compatibility.
- factory.registerFunction("VAR_SAMP", createAggregateFunctionStatisticsUnary, AggregateFunctionFactory::CaseInsensitive);
- factory.registerFunction("VAR_POP", createAggregateFunctionStatisticsUnary, AggregateFunctionFactory::CaseInsensitive);
- factory.registerFunction("STDDEV_SAMP", createAggregateFunctionStatisticsUnary, AggregateFunctionFactory::CaseInsensitive);
- factory.registerFunction("STDDEV_POP", createAggregateFunctionStatisticsUnary, AggregateFunctionFactory::CaseInsensitive);
- factory.registerFunction("COVAR_SAMP", createAggregateFunctionStatisticsBinary, AggregateFunctionFactory::CaseInsensitive);
- factory.registerFunction("COVAR_POP", createAggregateFunctionStatisticsBinary, AggregateFunctionFactory::CaseInsensitive);
+ factory.registerAlias("VAR_SAMP", "varSamp", AggregateFunctionFactory::CaseInsensitive);
+ factory.registerAlias("VAR_POP", "varPop", AggregateFunctionFactory::CaseInsensitive);
+ factory.registerAlias("STDDEV_SAMP", "stddevSamp", AggregateFunctionFactory::CaseInsensitive);
+ factory.registerAlias("STDDEV_POP", "stddevPop", AggregateFunctionFactory::CaseInsensitive);
+ factory.registerAlias("COVAR_SAMP", "covarSamp", AggregateFunctionFactory::CaseInsensitive);
+ factory.registerAlias("COVAR_POP", "covarPop", AggregateFunctionFactory::CaseInsensitive);
}
}
diff --git a/dbms/src/AggregateFunctions/IAggregateFunctionCombinator.h b/dbms/src/AggregateFunctions/IAggregateFunctionCombinator.h
index ba28026b1cd..0ac9a3d41cd 100644
--- a/dbms/src/AggregateFunctions/IAggregateFunctionCombinator.h
+++ b/dbms/src/AggregateFunctions/IAggregateFunctionCombinator.h
@@ -32,6 +32,8 @@ class IAggregateFunctionCombinator
public:
virtual String getName() const = 0;
+ virtual bool isForInternalUsageOnly() const { return false; }
+
/** From the arguments for combined function (ex: UInt64, UInt8 for sumIf),
* get the arguments for nested function (ex: UInt64 for sum).
* If arguments are not suitable for combined function, throw an exception.
diff --git a/dbms/src/Client/ConnectionPoolWithFailover.cpp b/dbms/src/Client/ConnectionPoolWithFailover.cpp
index ee8c3607c43..a311dac95b1 100644
--- a/dbms/src/Client/ConnectionPoolWithFailover.cpp
+++ b/dbms/src/Client/ConnectionPoolWithFailover.cpp
@@ -83,6 +83,16 @@ std::vector ConnectionPoolWithFailover::getMany(const Se
return entries;
}
+std::vector ConnectionPoolWithFailover::getManyForTableFunction(const Settings * settings, PoolMode pool_mode)
+{
+ TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
+ {
+ return tryGetEntry(pool, fail_message, settings);
+ };
+
+ return getManyImpl(settings, pool_mode, try_get_entry);
+}
+
std::vector ConnectionPoolWithFailover::getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check)
{
@@ -90,6 +100,7 @@ std::vector ConnectionPoolWithFailover::g
{
return tryGetEntry(pool, fail_message, settings, &table_to_check);
};
+
return getManyImpl(settings, pool_mode, try_get_entry);
}
diff --git a/dbms/src/Client/ConnectionPoolWithFailover.h b/dbms/src/Client/ConnectionPoolWithFailover.h
index b61fa03d711..62ca75859ba 100644
--- a/dbms/src/Client/ConnectionPoolWithFailover.h
+++ b/dbms/src/Client/ConnectionPoolWithFailover.h
@@ -47,6 +47,9 @@ public:
*/
std::vector getMany(const Settings * settings, PoolMode pool_mode);
+ /// The same as getMany(), but return std::vector.
+ std::vector getManyForTableFunction(const Settings * settings, PoolMode pool_mode);
+
using Base = PoolWithFailoverBase;
using TryResult = Base::TryResult;
diff --git a/dbms/src/Common/BackgroundSchedulePool.cpp b/dbms/src/Common/BackgroundSchedulePool.cpp
index 84eecdad7ff..9556c9a037b 100644
--- a/dbms/src/Common/BackgroundSchedulePool.cpp
+++ b/dbms/src/Common/BackgroundSchedulePool.cpp
@@ -128,7 +128,8 @@ void BackgroundSchedulePool::TaskInfo::execute()
zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
{
- return [t=shared_from_this()](const ZooKeeperImpl::ZooKeeper::WatchResponse &) {
+ return [t = shared_from_this()](const ZooKeeperImpl::ZooKeeper::WatchResponse &)
+ {
t->schedule();
};
}
diff --git a/dbms/src/Common/IFactoryWithAliases.h b/dbms/src/Common/IFactoryWithAliases.h
new file mode 100644
index 00000000000..9006a3c7cfd
--- /dev/null
+++ b/dbms/src/Common/IFactoryWithAliases.h
@@ -0,0 +1,125 @@
+#pragma once
+
+#include
+#include
+#include
+
+#include
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+}
+
+/** If stored objects may have several names (aliases)
+ * this interface may be helpful
+ * template parameter is available as Creator
+ */
+template
+class IFactoryWithAliases
+{
+protected:
+ using Creator = CreatorFunc;
+
+ String getAliasToOrName(const String & name) const
+ {
+ if (aliases.count(name))
+ return aliases.at(name);
+ else if (String name_lowercase = Poco::toLower(name); case_insensitive_aliases.count(name_lowercase))
+ return case_insensitive_aliases.at(name_lowercase);
+ else
+ return name;
+ }
+
+public:
+ /// For compatibility with SQL, it's possible to specify that certain function name is case insensitive.
+ enum CaseSensitiveness
+ {
+ CaseSensitive,
+ CaseInsensitive
+ };
+
+ /** Register additional name for creator
+ * real_name have to be already registered.
+ */
+ void registerAlias(const String & alias_name, const String & real_name, CaseSensitiveness case_sensitiveness = CaseSensitive)
+ {
+ const auto & creator_map = getCreatorMap();
+ const auto & case_insensitive_creator_map = getCaseInsensitiveCreatorMap();
+ const String factory_name = getFactoryName();
+
+ String real_dict_name;
+ if (creator_map.count(real_name))
+ real_dict_name = real_name;
+ else if (auto real_name_lowercase = Poco::toLower(real_name); case_insensitive_creator_map.count(real_name_lowercase))
+ real_dict_name = real_name_lowercase;
+ else
+ throw Exception(factory_name + ": can't create alias '" + alias_name + "', the real name '" + real_name + "' is not registered",
+ ErrorCodes::LOGICAL_ERROR);
+
+ String alias_name_lowercase = Poco::toLower(alias_name);
+
+ if (creator_map.count(alias_name) || case_insensitive_creator_map.count(alias_name_lowercase))
+ throw Exception(
+ factory_name + ": the alias name '" + alias_name + "' is already registered as real name", ErrorCodes::LOGICAL_ERROR);
+
+ if (case_sensitiveness == CaseInsensitive)
+ if (!case_insensitive_aliases.emplace(alias_name_lowercase, real_dict_name).second)
+ throw Exception(
+ factory_name + ": case insensitive alias name '" + alias_name + "' is not unique", ErrorCodes::LOGICAL_ERROR);
+
+ if (!aliases.emplace(alias_name, real_dict_name).second)
+ throw Exception(factory_name + ": alias name '" + alias_name + "' is not unique", ErrorCodes::LOGICAL_ERROR);
+ }
+
+ std::vector getAllRegisteredNames() const
+ {
+ std::vector result;
+ auto getter = [](const auto & pair) { return pair.first; };
+ std::transform(getCreatorMap().begin(), getCreatorMap().end(), std::back_inserter(result), getter);
+ std::transform(aliases.begin(), aliases.end(), std::back_inserter(result), getter);
+ return result;
+ }
+
+ bool isCaseInsensitive(const String & name) const
+ {
+ String name_lowercase = Poco::toLower(name);
+ return getCaseInsensitiveCreatorMap().count(name_lowercase) || case_insensitive_aliases.count(name_lowercase);
+ }
+
+ const String & aliasTo(const String & name) const
+ {
+ if (auto it = aliases.find(name); it != aliases.end())
+ return it->second;
+ else if (auto it = case_insensitive_aliases.find(Poco::toLower(name)); it != case_insensitive_aliases.end())
+ return it->second;
+
+ throw Exception(getFactoryName() + ": name '" + name + "' is not alias", ErrorCodes::LOGICAL_ERROR);
+ }
+
+ bool isAlias(const String & name) const
+ {
+ return aliases.count(name) || case_insensitive_aliases.count(name);
+ }
+
+ virtual ~IFactoryWithAliases() {}
+
+private:
+ using InnerMap = std::unordered_map; // name -> creator
+ using AliasMap = std::unordered_map; // alias -> original type
+
+ virtual const InnerMap & getCreatorMap() const = 0;
+ virtual const InnerMap & getCaseInsensitiveCreatorMap() const = 0;
+ virtual String getFactoryName() const = 0;
+
+ /// Alias map to data_types from previous two maps
+ AliasMap aliases;
+
+ /// Case insensitive aliases
+ AliasMap case_insensitive_aliases;
+};
+
+}
diff --git a/dbms/src/DataTypes/DataTypeFactory.cpp b/dbms/src/DataTypes/DataTypeFactory.cpp
index f1a12d75868..9706ecf4944 100644
--- a/dbms/src/DataTypes/DataTypeFactory.cpp
+++ b/dbms/src/DataTypes/DataTypeFactory.cpp
@@ -51,16 +51,19 @@ DataTypePtr DataTypeFactory::get(const ASTPtr & ast) const
throw Exception("Unexpected AST element for data type.", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
}
-DataTypePtr DataTypeFactory::get(const String & family_name, const ASTPtr & parameters) const
+DataTypePtr DataTypeFactory::get(const String & family_name_param, const ASTPtr & parameters) const
{
+ String family_name = getAliasToOrName(family_name_param);
+
{
DataTypesDictionary::const_iterator it = data_types.find(family_name);
if (data_types.end() != it)
return it->second(parameters);
}
+ String family_name_lowercase = Poco::toLower(family_name);
+
{
- String family_name_lowercase = Poco::toLower(family_name);
DataTypesDictionary::const_iterator it = case_insensitive_data_types.find(family_name_lowercase);
if (case_insensitive_data_types.end() != it)
return it->second(parameters);
@@ -76,11 +79,16 @@ void DataTypeFactory::registerDataType(const String & family_name, Creator creat
throw Exception("DataTypeFactory: the data type family " + family_name + " has been provided "
" a null constructor", ErrorCodes::LOGICAL_ERROR);
+ String family_name_lowercase = Poco::toLower(family_name);
+
+ if (isAlias(family_name) || isAlias(family_name_lowercase))
+ throw Exception("DataTypeFactory: the data type family name '" + family_name + "' is already registered as alias",
+ ErrorCodes::LOGICAL_ERROR);
+
if (!data_types.emplace(family_name, creator).second)
throw Exception("DataTypeFactory: the data type family name '" + family_name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
- String family_name_lowercase = Poco::toLower(family_name);
if (case_sensitiveness == CaseInsensitive
&& !case_insensitive_data_types.emplace(family_name_lowercase, creator).second)
@@ -88,7 +96,6 @@ void DataTypeFactory::registerDataType(const String & family_name, Creator creat
ErrorCodes::LOGICAL_ERROR);
}
-
void DataTypeFactory::registerSimpleDataType(const String & name, SimpleCreator creator, CaseSensitiveness case_sensitiveness)
{
if (creator == nullptr)
@@ -103,7 +110,6 @@ void DataTypeFactory::registerSimpleDataType(const String & name, SimpleCreator
}, case_sensitiveness);
}
-
void registerDataTypeNumbers(DataTypeFactory & factory);
void registerDataTypeDate(DataTypeFactory & factory);
void registerDataTypeDateTime(DataTypeFactory & factory);
diff --git a/dbms/src/DataTypes/DataTypeFactory.h b/dbms/src/DataTypes/DataTypeFactory.h
index e6c873ba724..21d22cf932e 100644
--- a/dbms/src/DataTypes/DataTypeFactory.h
+++ b/dbms/src/DataTypes/DataTypeFactory.h
@@ -3,6 +3,7 @@
#include
#include
#include
+#include
#include
#include
@@ -19,10 +20,9 @@ using ASTPtr = std::shared_ptr;
/** Creates a data type by name of data type family and parameters.
*/
-class DataTypeFactory final : public ext::singleton
+class DataTypeFactory final : public ext::singleton, public IFactoryWithAliases>
{
private:
- using Creator = std::function;
using SimpleCreator = std::function;
using DataTypesDictionary = std::unordered_map;
@@ -31,24 +31,12 @@ public:
DataTypePtr get(const String & family_name, const ASTPtr & parameters) const;
DataTypePtr get(const ASTPtr & ast) const;
- /// For compatibility with SQL, it's possible to specify that certain data type name is case insensitive.
- enum CaseSensitiveness
- {
- CaseSensitive,
- CaseInsensitive
- };
-
/// Register a type family by its name.
void registerDataType(const String & family_name, Creator creator, CaseSensitiveness case_sensitiveness = CaseSensitive);
/// Register a simple data type, that have no parameters.
void registerSimpleDataType(const String & name, SimpleCreator creator, CaseSensitiveness case_sensitiveness = CaseSensitive);
- const DataTypesDictionary & getAllDataTypes() const
- {
- return data_types;
- }
-
private:
DataTypesDictionary data_types;
@@ -56,6 +44,13 @@ private:
DataTypesDictionary case_insensitive_data_types;
DataTypeFactory();
+
+ const DataTypesDictionary & getCreatorMap() const override { return data_types; }
+
+ const DataTypesDictionary & getCaseInsensitiveCreatorMap() const override { return case_insensitive_data_types; }
+
+ String getFactoryName() const override { return "DataTypeFactory"; }
+
friend class ext::singleton;
};
diff --git a/dbms/src/DataTypes/DataTypeFixedString.cpp b/dbms/src/DataTypes/DataTypeFixedString.cpp
index 05fdd34c464..ad875c4f85e 100644
--- a/dbms/src/DataTypes/DataTypeFixedString.cpp
+++ b/dbms/src/DataTypes/DataTypeFixedString.cpp
@@ -231,7 +231,7 @@ void registerDataTypeFixedString(DataTypeFactory & factory)
factory.registerDataType("FixedString", create);
/// Compatibility alias.
- factory.registerDataType("BINARY", create, DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("BINARY", "FixedString", DataTypeFactory::CaseInsensitive);
}
}
diff --git a/dbms/src/DataTypes/DataTypeString.cpp b/dbms/src/DataTypes/DataTypeString.cpp
index 4ffda6f2099..671d1b2d3a5 100644
--- a/dbms/src/DataTypes/DataTypeString.cpp
+++ b/dbms/src/DataTypes/DataTypeString.cpp
@@ -312,16 +312,16 @@ void registerDataTypeString(DataTypeFactory & factory)
/// These synonims are added for compatibility.
- factory.registerSimpleDataType("CHAR", creator, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("VARCHAR", creator, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("TEXT", creator, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("TINYTEXT", creator, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("MEDIUMTEXT", creator, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("LONGTEXT", creator, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("BLOB", creator, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("TINYBLOB", creator, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("MEDIUMBLOB", creator, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("LONGBLOB", creator, DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("CHAR", "String", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("VARCHAR", "String", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("TEXT", "String", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("TINYTEXT", "String", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("MEDIUMTEXT", "String", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("LONGTEXT", "String", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("BLOB", "String", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("TINYBLOB", "String", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("MEDIUMBLOB", "String", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("LONGBLOB", "String", DataTypeFactory::CaseInsensitive);
}
}
diff --git a/dbms/src/DataTypes/DataTypesNumber.cpp b/dbms/src/DataTypes/DataTypesNumber.cpp
index 72861eff3ac..254d6ba6852 100644
--- a/dbms/src/DataTypes/DataTypesNumber.cpp
+++ b/dbms/src/DataTypes/DataTypesNumber.cpp
@@ -22,13 +22,13 @@ void registerDataTypeNumbers(DataTypeFactory & factory)
/// These synonims are added for compatibility.
- factory.registerSimpleDataType("TINYINT", [] { return DataTypePtr(std::make_shared()); }, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("SMALLINT", [] { return DataTypePtr(std::make_shared()); }, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("INT", [] { return DataTypePtr(std::make_shared()); }, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("INTEGER", [] { return DataTypePtr(std::make_shared()); }, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("BIGINT", [] { return DataTypePtr(std::make_shared()); }, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("FLOAT", [] { return DataTypePtr(std::make_shared()); }, DataTypeFactory::CaseInsensitive);
- factory.registerSimpleDataType("DOUBLE", [] { return DataTypePtr(std::make_shared()); }, DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("TINYINT", "Int8", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("SMALLINT", "Int16", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("INT", "Int32", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("INTEGER", "Int32", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("BIGINT", "Int64", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("FLOAT", "Float32", DataTypeFactory::CaseInsensitive);
+ factory.registerAlias("DOUBLE", "Float64", DataTypeFactory::CaseInsensitive);
}
}
diff --git a/dbms/src/Dictionaries/Embedded/RegionsHierarchy.cpp b/dbms/src/Dictionaries/Embedded/RegionsHierarchy.cpp
index 2dbab26acc1..978d7b9e496 100644
--- a/dbms/src/Dictionaries/Embedded/RegionsHierarchy.cpp
+++ b/dbms/src/Dictionaries/Embedded/RegionsHierarchy.cpp
@@ -41,7 +41,6 @@ void RegionsHierarchy::reload()
RegionID max_region_id = 0;
-
auto regions_reader = data_source->createReader();
RegionEntry region_entry;
diff --git a/dbms/src/Formats/ValuesRowInputStream.cpp b/dbms/src/Formats/ValuesRowInputStream.cpp
index c291f147184..559ac658a6a 100644
--- a/dbms/src/Formats/ValuesRowInputStream.cpp
+++ b/dbms/src/Formats/ValuesRowInputStream.cpp
@@ -1,5 +1,6 @@
#include
#include
+#include
#include
#include
#include
@@ -29,7 +30,7 @@ namespace ErrorCodes
ValuesRowInputStream::ValuesRowInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, const FormatSettings & format_settings)
- : istr(istr_), header(header_), context(context_), format_settings(format_settings)
+ : istr(istr_), header(header_), context(std::make_unique(context_)), format_settings(format_settings)
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(istr);
@@ -112,7 +113,7 @@ bool ValuesRowInputStream::read(MutableColumns & columns)
istr.position() = const_cast(token_iterator->begin);
- std::pair value_raw = evaluateConstantExpression(ast, context);
+ std::pair value_raw = evaluateConstantExpression(ast, *context);
Field value = convertFieldToType(value_raw.first, type, value_raw.second.get());
if (value.isNull())
diff --git a/dbms/src/Formats/ValuesRowInputStream.h b/dbms/src/Formats/ValuesRowInputStream.h
index 00fa9071947..49775861746 100644
--- a/dbms/src/Formats/ValuesRowInputStream.h
+++ b/dbms/src/Formats/ValuesRowInputStream.h
@@ -28,7 +28,7 @@ public:
private:
ReadBuffer & istr;
Block header;
- const Context & context;
+ std::unique_ptr context; /// pimpl
const FormatSettings format_settings;
};
diff --git a/dbms/src/Functions/FunctionFactory.cpp b/dbms/src/Functions/FunctionFactory.cpp
index 9bb2abbb013..0b2f042089d 100644
--- a/dbms/src/Functions/FunctionFactory.cpp
+++ b/dbms/src/Functions/FunctionFactory.cpp
@@ -6,7 +6,6 @@
#include
-
namespace DB
{
@@ -26,8 +25,13 @@ void FunctionFactory::registerFunction(const
throw Exception("FunctionFactory: the function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
+ String function_name_lowercase = Poco::toLower(name);
+ if (isAlias(name) || isAlias(function_name_lowercase))
+ throw Exception("FunctionFactory: the function name '" + name + "' is already registered as alias",
+ ErrorCodes::LOGICAL_ERROR);
+
if (case_sensitiveness == CaseInsensitive
- && !case_insensitive_functions.emplace(Poco::toLower(name), creator).second)
+ && !case_insensitive_functions.emplace(function_name_lowercase, creator).second)
throw Exception("FunctionFactory: the case insensitive function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
}
@@ -45,9 +49,11 @@ FunctionBuilderPtr FunctionFactory::get(
FunctionBuilderPtr FunctionFactory::tryGet(
- const std::string & name,
+ const std::string & name_param,
const Context & context) const
{
+ String name = getAliasToOrName(name_param);
+
auto it = functions.find(name);
if (functions.end() != it)
return it->second(context);
diff --git a/dbms/src/Functions/FunctionFactory.h b/dbms/src/Functions/FunctionFactory.h
index a061c3103fd..7fa0f81f475 100644
--- a/dbms/src/Functions/FunctionFactory.h
+++ b/dbms/src/Functions/FunctionFactory.h
@@ -1,6 +1,7 @@
#pragma once
#include
+#include
#include
@@ -20,19 +21,9 @@ class Context;
* Function could use for initialization (take ownership of shared_ptr, for example)
* some dictionaries from Context.
*/
-class FunctionFactory : public ext::singleton
+class FunctionFactory : public ext::singleton, public IFactoryWithAliases>
{
- friend class StorageSystemFunctions;
-
public:
- using Creator = std::function;
-
- /// For compatibility with SQL, it's possible to specify that certain function name is case insensitive.
- enum CaseSensitiveness
- {
- CaseSensitive,
- CaseInsensitive
- };
template
void registerFunction(CaseSensitiveness case_sensitiveness = CaseSensitive)
@@ -67,6 +58,12 @@ private:
return std::make_shared(Function::create(context));
}
+ const Functions & getCreatorMap() const override { return functions; }
+
+ const Functions & getCaseInsensitiveCreatorMap() const override { return case_insensitive_functions; }
+
+ String getFactoryName() const override { return "FunctionFactory"; }
+
/// Register a function by its name.
/// No locking, you must register all functions before usage of get.
void registerFunction(
diff --git a/dbms/src/Functions/FunctionsArray.cpp b/dbms/src/Functions/FunctionsArray.cpp
index d72dcf6f670..466610bcd45 100644
--- a/dbms/src/Functions/FunctionsArray.cpp
+++ b/dbms/src/Functions/FunctionsArray.cpp
@@ -1286,12 +1286,12 @@ DataTypePtr FunctionArrayDistinct::getReturnTypeImpl(const DataTypes & arguments
{
const DataTypeArray * array_type = checkAndGetDataType(arguments[0].get());
if (!array_type)
- throw Exception("Argument for function " + getName() + " must be array but it "
+ throw Exception("Argument for function " + getName() + " must be array but it "
" has type " + arguments[0]->getName() + ".",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
-
+
auto nested_type = removeNullable(array_type->getNestedType());
-
+
return std::make_shared(nested_type);
}
@@ -1307,7 +1307,7 @@ void FunctionArrayDistinct::executeImpl(Block & block, const ColumnNumbers & arg
const IColumn & src_data = array->getData();
const ColumnArray::Offsets & offsets = array->getOffsets();
-
+
ColumnRawPtrs original_data_columns;
original_data_columns.push_back(&src_data);
@@ -1416,7 +1416,7 @@ bool FunctionArrayDistinct::executeString(
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
const PaddedPODArray * src_null_map = nullptr;
-
+
if (nullable_col)
{
src_null_map = &static_cast(&nullable_col->getNullMapColumn())->getData();
@@ -1471,7 +1471,7 @@ void FunctionArrayDistinct::executeHashed(
res_data_col.insertFrom(*columns[0], j);
}
}
-
+
res_offsets.emplace_back(set.size() + prev_off);
prev_off = off;
}
diff --git a/dbms/src/Functions/FunctionsArray.h b/dbms/src/Functions/FunctionsArray.h
index 3cd1a8968f7..15fd5b420e2 100644
--- a/dbms/src/Functions/FunctionsArray.h
+++ b/dbms/src/Functions/FunctionsArray.h
@@ -1011,10 +1011,11 @@ public:
DataTypePtr observed_type0 = removeNullable(array_type->getNestedType());
DataTypePtr observed_type1 = removeNullable(arguments[1]);
- if (!(observed_type0->isNumber() && observed_type1->isNumber())
+ /// We also support arrays of Enum type (that are represented by number) to search numeric values.
+ if (!(observed_type0->isValueRepresentedByNumber() && observed_type1->isNumber())
&& !observed_type0->equals(*observed_type1))
throw Exception("Types of array and 2nd argument of function "
- + getName() + " must be identical up to nullability. Passed: "
+ + getName() + " must be identical up to nullability or numeric types or Enum and numeric type. Passed: "
+ arguments[0]->getName() + " and " + arguments[1]->getName() + ".",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
@@ -1249,7 +1250,7 @@ private:
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
const ColumnNullable * nullable_col);
-
+
void executeHashed(
const ColumnArray::Offsets & offsets,
const ColumnRawPtrs & columns,
diff --git a/dbms/src/Functions/FunctionsRound.cpp b/dbms/src/Functions/FunctionsRound.cpp
index 7bf7eb791ad..9cb9e1001ae 100644
--- a/dbms/src/Functions/FunctionsRound.cpp
+++ b/dbms/src/Functions/FunctionsRound.cpp
@@ -16,8 +16,8 @@ void registerFunctionsRound(FunctionFactory & factory)
factory.registerFunction("trunc", FunctionFactory::CaseInsensitive);
/// Compatibility aliases.
- factory.registerFunction("ceiling", FunctionFactory::CaseInsensitive);
- factory.registerFunction("truncate", FunctionFactory::CaseInsensitive);
+ factory.registerAlias("ceiling", "ceil", FunctionFactory::CaseInsensitive);
+ factory.registerAlias("truncate", "trunc", FunctionFactory::CaseInsensitive);
}
}
diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp
index af0f34babbf..52ec808bd68 100644
--- a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp
+++ b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp
@@ -18,6 +18,7 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
const std::string & method_,
OutStreamCallback out_stream_callback,
const ConnectionTimeouts & timeouts,
+ const Poco::Net::HTTPBasicCredentials & credentials,
size_t buffer_size_)
: ReadBuffer(nullptr, 0),
uri{uri},
@@ -30,6 +31,9 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
+ if (!credentials.getUsername().empty())
+ credentials.authenticate(request);
+
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());
diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.h b/dbms/src/IO/ReadWriteBufferFromHTTP.h
index 93a8232f93d..d370bb3d4c7 100644
--- a/dbms/src/IO/ReadWriteBufferFromHTTP.h
+++ b/dbms/src/IO/ReadWriteBufferFromHTTP.h
@@ -1,6 +1,7 @@
#pragma once
#include
+#include
#include
#include
#include
@@ -32,6 +33,7 @@ public:
const std::string & method = {},
OutStreamCallback out_stream_callback = {},
const ConnectionTimeouts & timeouts = {},
+ const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override;
diff --git a/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
index c5ffd0ef4f7..eb1d54c457e 100644
--- a/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
+++ b/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
@@ -6,6 +6,7 @@
#include
#include
#include
+#include
#include
@@ -28,13 +29,26 @@ namespace ClusterProxy
{
SelectStreamFactory::SelectStreamFactory(
- const Block & header,
+ const Block & header_,
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Tables & external_tables_)
- : header(header),
+ : header(header_),
processed_stage{processed_stage_},
main_table(std::move(main_table_)),
+ table_func_ptr{nullptr},
+ external_tables{external_tables_}
+{
+}
+
+SelectStreamFactory::SelectStreamFactory(
+ const Block & header_,
+ QueryProcessingStage::Enum processed_stage_,
+ ASTPtr table_func_ptr_,
+ const Tables & external_tables_)
+ : header(header_),
+ processed_stage{processed_stage_},
+ table_func_ptr{table_func_ptr_},
external_tables{external_tables_}
{
}
@@ -71,13 +85,24 @@ void SelectStreamFactory::createForShard(
{
auto stream = std::make_shared(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
- stream->setMainTable(main_table);
+ if (!table_func_ptr)
+ stream->setMainTable(main_table);
res.emplace_back(std::move(stream));
};
if (shard_info.isLocal())
{
- StoragePtr main_table_storage = context.tryGetTable(main_table.database, main_table.table);
+ StoragePtr main_table_storage;
+
+ if (table_func_ptr)
+ {
+ auto table_function = static_cast(table_func_ptr.get());
+ main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context);
+ }
+ else
+ main_table_storage = context.tryGetTable(main_table.database, main_table.table);
+
+
if (!main_table_storage) /// Table is absent on a local server.
{
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
@@ -158,14 +183,17 @@ void SelectStreamFactory::createForShard(
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,
- main_table = main_table, external_tables = external_tables, stage = processed_stage,
+ main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage,
local_delay]()
-> BlockInputStreamPtr
{
std::vector try_results;
try
{
- try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
+ if (table_func_ptr)
+ try_results = pool->getManyForTableFunction(&context.getSettingsRef(), PoolMode::GET_MANY);
+ else
+ try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
}
catch (const Exception & ex)
{
diff --git a/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.h b/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.h
index 5325e5d463c..38dabf82dcc 100644
--- a/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.h
+++ b/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.h
@@ -13,11 +13,19 @@ namespace ClusterProxy
class SelectStreamFactory final : public IStreamFactory
{
public:
+ /// Database in a query.
SelectStreamFactory(
- const Block & header,
- QueryProcessingStage::Enum processed_stage,
- QualifiedTableName main_table,
+ const Block & header_,
+ QueryProcessingStage::Enum processed_stage_,
+ QualifiedTableName main_table_,
const Tables & external_tables);
+
+ /// TableFunction in a query.
+ SelectStreamFactory(
+ const Block & header_,
+ QueryProcessingStage::Enum processed_stage_,
+ ASTPtr table_func_ptr_,
+ const Tables & external_tables_);
void createForShard(
const Cluster::ShardInfo & shard_info,
@@ -29,6 +37,7 @@ private:
const Block header;
QueryProcessingStage::Enum processed_stage;
QualifiedTableName main_table;
+ ASTPtr table_func_ptr;
Tables external_tables;
};
diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp
index 9fed370cfbc..0561c2f11c2 100644
--- a/dbms/src/Interpreters/Context.cpp
+++ b/dbms/src/Interpreters/Context.cpp
@@ -109,6 +109,8 @@ struct ContextShared
String interserver_io_host; /// The host name by which this server is available for other servers.
UInt16 interserver_io_port = 0; /// and port.
+ String interserver_io_user;
+ String interserver_io_password;
String path; /// Path to the data directory, with a slash at the end.
String tmp_path; /// The path to the temporary files that occur when processing the request.
@@ -1378,6 +1380,17 @@ void Context::setInterserverIOAddress(const String & host, UInt16 port)
shared->interserver_io_port = port;
}
+void Context::setInterverserCredentials(const String & user, const String & password)
+{
+ shared->interserver_io_user = user;
+ shared->interserver_io_password = password;
+}
+
+std::pair Context::getInterserverCredentials() const
+{
+ return { shared->interserver_io_user, shared->interserver_io_password };
+}
+
std::pair Context::getInterserverIOAddress() const
{
diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h
index 1c867d65e8f..38a0e7cb4bc 100644
--- a/dbms/src/Interpreters/Context.h
+++ b/dbms/src/Interpreters/Context.h
@@ -249,6 +249,11 @@ public:
/// How other servers can access this for downloading replicated data.
void setInterserverIOAddress(const String & host, UInt16 port);
std::pair getInterserverIOAddress() const;
+
+ // Credentials which server will use to communicate with others
+ void setInterverserCredentials(const String & user, const String & password);
+ std::pair getInterserverCredentials() const;
+
/// The port that the server listens for executing SQL queries.
UInt16 getTCPPort() const;
diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp
index 31a6a566ae8..b23160f133f 100644
--- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp
+++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp
@@ -64,6 +64,7 @@
#include
#include
#include
+#include
namespace DB
diff --git a/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp b/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp
index 336b45e7a5c..f0add31dc38 100644
--- a/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp
+++ b/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp
@@ -2,6 +2,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -172,6 +173,9 @@ BlockIO InterpreterKillQueryQuery::execute()
{
ASTKillQueryQuery & query = typeid_cast(*query_ptr);
+ if (!query.cluster.empty())
+ return executeDDLQueryOnCluster(query_ptr, context, {"system"});
+
BlockIO res_io;
Block processes_block = getSelectFromSystemProcessesResult();
if (!processes_block)
diff --git a/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp b/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp
index 2472cff1876..80a64d83f90 100644
--- a/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp
+++ b/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp
@@ -1,6 +1,7 @@
#include
#include
#include
+#include
#include
#include
@@ -18,6 +19,9 @@ BlockIO InterpreterOptimizeQuery::execute()
{
const ASTOptimizeQuery & ast = typeid_cast(*query_ptr);
+ if (!ast.cluster.empty())
+ return executeDDLQueryOnCluster(query_ptr, context, {ast.database});
+
StoragePtr table = context.getTable(ast.database, ast.table);
auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__);
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);
diff --git a/dbms/src/Interpreters/evaluateConstantExpression.cpp b/dbms/src/Interpreters/evaluateConstantExpression.cpp
index 8ab3ca7bf1a..6dcff35e6a4 100644
--- a/dbms/src/Interpreters/evaluateConstantExpression.cpp
+++ b/dbms/src/Interpreters/evaluateConstantExpression.cpp
@@ -3,6 +3,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -10,6 +11,7 @@
#include
#include
#include
+#include
namespace DB
@@ -52,13 +54,19 @@ std::pair> evaluateConstantExpression(co
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context)
{
+ /// Branch with string in qery.
if (typeid_cast(node.get()))
return node;
-
+
+ /// Branch with TableFunction in query.
+ if (auto table_func_ptr = typeid_cast(node.get()))
+ if (TableFunctionFactory::instance().isTableFunctionName(table_func_ptr->name))
+
+ return node;
+
return std::make_shared(evaluateConstantExpression(node, context).first);
}
-
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context)
{
if (auto id = typeid_cast(node.get()))
diff --git a/dbms/src/Parsers/ASTKillQueryQuery.cpp b/dbms/src/Parsers/ASTKillQueryQuery.cpp
index 8be944e8481..0f3e5406fdd 100644
--- a/dbms/src/Parsers/ASTKillQueryQuery.cpp
+++ b/dbms/src/Parsers/ASTKillQueryQuery.cpp
@@ -8,9 +8,22 @@ String ASTKillQueryQuery::getID() const
return "KillQueryQuery_" + (where_expression ? where_expression->getID() : "") + "_" + String(sync ? "SYNC" : "ASYNC");
}
+ASTPtr ASTKillQueryQuery::getRewrittenASTWithoutOnCluster(const std::string & /*new_database*/) const
+{
+ auto query_ptr = clone();
+ ASTKillQueryQuery & query = static_cast(*query_ptr);
+
+ query.cluster.clear();
+
+ return query_ptr;
+}
+
void ASTKillQueryQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
- settings.ostr << (settings.hilite ? hilite_keyword : "") << "KILL QUERY WHERE " << (settings.hilite ? hilite_none : "");
+ settings.ostr << (settings.hilite ? hilite_keyword : "") << "KILL QUERY ";
+
+ formatOnCluster(settings);
+ settings.ostr << " WHERE " << (settings.hilite ? hilite_none : "");
if (where_expression)
where_expression->formatImpl(settings, state, frame);
diff --git a/dbms/src/Parsers/ASTKillQueryQuery.h b/dbms/src/Parsers/ASTKillQueryQuery.h
index 4df1f28f733..086ee55e3bd 100644
--- a/dbms/src/Parsers/ASTKillQueryQuery.h
+++ b/dbms/src/Parsers/ASTKillQueryQuery.h
@@ -1,10 +1,11 @@
#include
#include
+#include
namespace DB
{
-class ASTKillQueryQuery : public ASTQueryWithOutput
+class ASTKillQueryQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
{
public:
ASTPtr where_expression; // expression to filter processes from system.processes table
@@ -22,6 +23,8 @@ public:
String getID() const override;
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
+
+ ASTPtr getRewrittenASTWithoutOnCluster(const std::string &new_database) const override;
};
}
diff --git a/dbms/src/Parsers/ASTOptimizeQuery.cpp b/dbms/src/Parsers/ASTOptimizeQuery.cpp
new file mode 100644
index 00000000000..dd37b665173
--- /dev/null
+++ b/dbms/src/Parsers/ASTOptimizeQuery.cpp
@@ -0,0 +1,39 @@
+#include
+
+namespace DB
+{
+
+
+ASTPtr ASTOptimizeQuery::getRewrittenASTWithoutOnCluster(const std::string & new_database) const
+{
+ auto query_ptr = clone();
+ ASTOptimizeQuery & query = static_cast(*query_ptr);
+
+ query.cluster.clear();
+ if (query.database.empty())
+ query.database = new_database;
+
+ return query_ptr;
+}
+
+void ASTOptimizeQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
+{
+ settings.ostr << (settings.hilite ? hilite_keyword : "") << "OPTIMIZE TABLE " << (settings.hilite ? hilite_none : "")
+ << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
+
+ formatOnCluster(settings);
+
+ if (partition)
+ {
+ settings.ostr << (settings.hilite ? hilite_keyword : "") << " PARTITION " << (settings.hilite ? hilite_none : "");
+ partition->formatImpl(settings, state, frame);
+ }
+
+ if (final)
+ settings.ostr << (settings.hilite ? hilite_keyword : "") << " FINAL" << (settings.hilite ? hilite_none : "");
+
+ if (deduplicate)
+ settings.ostr << (settings.hilite ? hilite_keyword : "") << " DEDUPLICATE" << (settings.hilite ? hilite_none : "");
+}
+
+}
diff --git a/dbms/src/Parsers/ASTOptimizeQuery.h b/dbms/src/Parsers/ASTOptimizeQuery.h
index 571b04d22ef..0b329d59559 100644
--- a/dbms/src/Parsers/ASTOptimizeQuery.h
+++ b/dbms/src/Parsers/ASTOptimizeQuery.h
@@ -1,7 +1,8 @@
#pragma once
#include
-
+#include
+#include
namespace DB
{
@@ -9,7 +10,7 @@ namespace DB
/** OPTIMIZE query
*/
-class ASTOptimizeQuery : public IAST
+class ASTOptimizeQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
{
public:
String database;
@@ -23,7 +24,8 @@ public:
bool deduplicate;
/** Get the text that identifies this element. */
- String getID() const override { return "OptimizeQuery_" + database + "_" + table + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : ""); }
+ String getID() const override
+ { return "OptimizeQuery_" + database + "_" + table + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : ""); }
ASTPtr clone() const override
{
@@ -39,24 +41,10 @@ public:
return res;
}
-protected:
- void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
- {
- settings.ostr << (settings.hilite ? hilite_keyword : "") << "OPTIMIZE TABLE " << (settings.hilite ? hilite_none : "")
- << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
+ void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
- if (partition)
- {
- settings.ostr << (settings.hilite ? hilite_keyword : "") << " PARTITION " << (settings.hilite ? hilite_none : "");
- partition->formatImpl(settings, state, frame);
- }
+ ASTPtr getRewrittenASTWithoutOnCluster(const std::string &new_database) const override;
- if (final)
- settings.ostr << (settings.hilite ? hilite_keyword : "") << " FINAL" << (settings.hilite ? hilite_none : "");
-
- if (deduplicate)
- settings.ostr << (settings.hilite ? hilite_keyword : "") << " DEDUPLICATE" << (settings.hilite ? hilite_none : "");
- }
};
}
diff --git a/dbms/src/Parsers/ASTSelectQuery.cpp b/dbms/src/Parsers/ASTSelectQuery.cpp
index f234b0ae4b5..8bb5f2488d8 100644
--- a/dbms/src/Parsers/ASTSelectQuery.cpp
+++ b/dbms/src/Parsers/ASTSelectQuery.cpp
@@ -372,9 +372,9 @@ void ASTSelectQuery::replaceDatabaseAndTable(const String & database_name, const
children.emplace_back(tables_list);
table_expression = table_expr.get();
}
-
+
ASTPtr table = std::make_shared(table_name, ASTIdentifier::Table);
-
+
if (!database_name.empty())
{
ASTPtr database = std::make_shared(database_name, ASTIdentifier::Database);
@@ -388,5 +388,27 @@ void ASTSelectQuery::replaceDatabaseAndTable(const String & database_name, const
}
}
+
+void ASTSelectQuery::addTableFunction(ASTPtr & table_function_ptr)
+{
+ ASTTableExpression * table_expression = getFirstTableExpression(*this);
+
+ if (!table_expression)
+ {
+ auto tables_list = std::make_shared();
+ auto element = std::make_shared();
+ auto table_expr = std::make_shared();
+ element->table_expression = table_expr;
+ element->children.emplace_back(table_expr);
+ tables_list->children.emplace_back(element);
+ tables = tables_list;
+ children.emplace_back(tables_list);
+ table_expression = table_expr.get();
+ }
+
+ table_expression->table_function = table_function_ptr;
+ table_expression->database_and_table_name = nullptr;
+}
+
};
diff --git a/dbms/src/Parsers/ASTSelectQuery.h b/dbms/src/Parsers/ASTSelectQuery.h
index d45f45c34d8..91d8d52172c 100644
--- a/dbms/src/Parsers/ASTSelectQuery.h
+++ b/dbms/src/Parsers/ASTSelectQuery.h
@@ -47,6 +47,7 @@ public:
bool final() const;
void setDatabaseIfNeeded(const String & database_name);
void replaceDatabaseAndTable(const String & database_name, const String & table_name);
+ void addTableFunction(ASTPtr & table_function_ptr);
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
diff --git a/dbms/src/Parsers/ParserKillQueryQuery.cpp b/dbms/src/Parsers/ParserKillQueryQuery.cpp
index e6d1bae2e05..5e674d9da83 100644
--- a/dbms/src/Parsers/ParserKillQueryQuery.cpp
+++ b/dbms/src/Parsers/ParserKillQueryQuery.cpp
@@ -11,29 +11,36 @@ namespace DB
bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
+ String cluster_str;
auto query = std::make_shared();
- if (!ParserKeyword{"KILL QUERY"}.ignore(pos, expected))
- return false;
-
- if (!ParserKeyword{"WHERE"}.ignore(pos, expected))
- return false;
-
+ ParserKeyword p_on{"ON"};
+ ParserKeyword p_test{"TEST"};
+ ParserKeyword p_sync{"SYNC"};
+ ParserKeyword p_async{"ASYNC"};
+ ParserKeyword p_where{"WHERE"};
+ ParserKeyword p_kill_query{"KILL QUERY"};
ParserExpression p_where_expression;
- if (!p_where_expression.parse(pos, query->where_expression, expected))
+
+ if (!p_kill_query.ignore(pos, expected))
return false;
- query->children.emplace_back(query->where_expression);
+ if (p_on.ignore(pos, expected) && !ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
+ return false;
- if (ParserKeyword{"SYNC"}.ignore(pos))
+ if (p_where.ignore(pos, expected) && !p_where_expression.parse(pos, query->where_expression, expected))
+ return false;
+
+ if (p_sync.ignore(pos, expected))
query->sync = true;
- else if (ParserKeyword{"ASYNC"}.ignore(pos))
+ else if (p_async.ignore(pos, expected))
query->sync = false;
- else if (ParserKeyword{"TEST"}.ignore(pos))
+ else if (p_test.ignore(pos, expected))
query->test = true;
+ query->cluster = cluster_str;
+ query->children.emplace_back(query->where_expression);
node = std::move(query);
-
return true;
}
diff --git a/dbms/src/Parsers/ParserOptimizeQuery.cpp b/dbms/src/Parsers/ParserOptimizeQuery.cpp
index c01a1a7b5df..e0dcf7ffb47 100644
--- a/dbms/src/Parsers/ParserOptimizeQuery.cpp
+++ b/dbms/src/Parsers/ParserOptimizeQuery.cpp
@@ -28,6 +28,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
ASTPtr partition;
bool final = false;
bool deduplicate = false;
+ String cluster_str;
if (!s_optimize_table.ignore(pos, expected))
return false;
@@ -42,6 +43,9 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
return false;
}
+ if (ParserKeyword{"ON"}.ignore(pos, expected) && !ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
+ return false;
+
if (s_partition.ignore(pos, expected))
{
if (!partition_p.parse(pos, partition, expected))
@@ -61,6 +65,8 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
query->database = typeid_cast(*database).name;
if (table)
query->table = typeid_cast(*table).name;
+
+ query->cluster = cluster_str;
query->partition = partition;
query->final = final;
query->deduplicate = deduplicate;
diff --git a/dbms/src/Parsers/ParserQuery.cpp b/dbms/src/Parsers/ParserQuery.cpp
index efdac16d74c..7285e03bad7 100644
--- a/dbms/src/Parsers/ParserQuery.cpp
+++ b/dbms/src/Parsers/ParserQuery.cpp
@@ -21,14 +21,12 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserInsertQuery insert_p(end);
ParserUseQuery use_p;
ParserSetQuery set_p;
- ParserOptimizeQuery optimize_p;
ParserSystemQuery system_p;
bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
|| use_p.parse(pos, node, expected)
|| set_p.parse(pos, node, expected)
- || optimize_p.parse(pos, node, expected)
|| system_p.parse(pos, node, expected);
return res;
diff --git a/dbms/src/Parsers/ParserQueryWithOutput.cpp b/dbms/src/Parsers/ParserQueryWithOutput.cpp
index e7fdc390dd6..3ec71de5f0c 100644
--- a/dbms/src/Parsers/ParserQueryWithOutput.cpp
+++ b/dbms/src/Parsers/ParserQueryWithOutput.cpp
@@ -10,6 +10,7 @@
#include
#include
#include
+#include
namespace DB
@@ -27,6 +28,7 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
ParserRenameQuery rename_p;
ParserDropQuery drop_p;
ParserCheckQuery check_p;
+ ParserOptimizeQuery optimize_p;
ParserKillQueryQuery kill_query_p;
ASTPtr query;
@@ -41,7 +43,8 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|| rename_p.parse(pos, query, expected)
|| drop_p.parse(pos, query, expected)
|| check_p.parse(pos, query, expected)
- || kill_query_p.parse(pos, query, expected);
+ || kill_query_p.parse(pos, query, expected)
+ || optimize_p.parse(pos, query, expected);
if (!parsed)
return false;
diff --git a/dbms/src/Storages/ITableDeclaration.h b/dbms/src/Storages/ITableDeclaration.h
index 74d5b6db6d7..5f15ad626f7 100644
--- a/dbms/src/Storages/ITableDeclaration.h
+++ b/dbms/src/Storages/ITableDeclaration.h
@@ -39,9 +39,9 @@ public:
*/
void check(const NamesAndTypesList & columns, const Names & column_names) const;
- /** Check that the data block for the record contains all the columns of the table with the correct types,
+ /** Check that the data block contains all the columns of the table with the correct types,
* contains only the columns of the table, and all the columns are different.
- * If need_all, still checks that all the columns of the table are in the block.
+ * If need_all, checks that all the columns of the table are in the block.
*/
void check(const Block & block, bool need_all = false) const;
diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp
index 15d1c56b051..39db6142605 100644
--- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp
+++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp
@@ -161,6 +161,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const String & host,
int port,
const ConnectionTimeouts & timeouts,
+ const String & user,
+ const String & password,
bool to_detached,
const String & tmp_prefix_)
{
@@ -175,7 +177,14 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{"compress", "false"}
});
- ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts};
+ Poco::Net::HTTPBasicCredentials creds{};
+ if (!user.empty())
+ {
+ creds.setUsername(user);
+ creds.setPassword(password);
+ }
+
+ ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts, creds};
static const String TMP_PREFIX = "tmp_fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.h b/dbms/src/Storages/MergeTree/DataPartsExchange.h
index 0ebc2ec358a..32eb80e96ca 100644
--- a/dbms/src/Storages/MergeTree/DataPartsExchange.h
+++ b/dbms/src/Storages/MergeTree/DataPartsExchange.h
@@ -54,6 +54,8 @@ public:
const String & host,
int port,
const ConnectionTimeouts & timeouts,
+ const String & user,
+ const String & password,
bool to_detached = false,
const String & tmp_prefix_ = "");
diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h
index aa29dccc195..43276a6dd34 100644
--- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h
+++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h
@@ -139,7 +139,7 @@ struct MergeTreeSettings
* instead of ordinary ones (dozens KB). \
* Before enabling check that all replicas support new format. \
*/ \
- M(SettingBool, use_minimalistic_checksums_in_zookeeper, false)
+ M(SettingBool, use_minimalistic_checksums_in_zookeeper, true)
/// Settings that should not change after the creation of a table.
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \
diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp
index 53ec36fe2c4..5805ea439f3 100644
--- a/dbms/src/Storages/StorageDistributed.cpp
+++ b/dbms/src/Storages/StorageDistributed.cpp
@@ -65,12 +65,15 @@ namespace ErrorCodes
namespace
{
-/// select query has database and table names as AST pointers
-/// Creates a copy of query, changes database and table names.
-ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table)
+/// select query has database, table and table function names as AST pointers
+/// Creates a copy of query, changes database, table and table function names.
+ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table, ASTPtr table_function_ptr = nullptr)
{
auto modified_query_ast = query->clone();
- typeid_cast(*modified_query_ast).replaceDatabaseAndTable(database, table);
+ if (table_function_ptr)
+ typeid_cast(*modified_query_ast).addTableFunction(table_function_ptr);
+ else
+ typeid_cast(*modified_query_ast).replaceDatabaseAndTable(database, table);
return modified_query_ast;
}
@@ -170,16 +173,48 @@ StorageDistributed::StorageDistributed(
}
-StoragePtr StorageDistributed::createWithOwnCluster(
- const std::string & name_,
+StorageDistributed::StorageDistributed(
+ const String & database_name,
+ const String & table_name_,
const ColumnsDescription & columns_,
- const String & remote_database_,
- const String & remote_table_,
+ ASTPtr remote_table_function_ptr_,
+ const String & cluster_name_,
+ const Context & context_,
+ const ASTPtr & sharding_key_,
+ const String & data_path_,
+ bool attach)
+ : StorageDistributed(database_name, table_name_, columns_, String{}, String{}, cluster_name_, context_, sharding_key_, data_path_, attach)
+{
+ remote_table_function_ptr = remote_table_function_ptr_;
+}
+
+
+StoragePtr StorageDistributed::createWithOwnCluster(
+ const std::string & table_name_,
+ const ColumnsDescription & columns_,
+ const String & remote_database_, /// database on remote servers.
+ const String & remote_table_, /// The name of the table on the remote servers.
+ ClusterPtr owned_cluster_,
+ const Context & context_)
+{
+ auto res = ext::shared_ptr_helper::create(
+ String{}, table_name_, columns_, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
+
+ res->owned_cluster = owned_cluster_;
+
+ return res;
+}
+
+
+StoragePtr StorageDistributed::createWithOwnCluster(
+ const std::string & table_name_,
+ const ColumnsDescription & columns_,
+ ASTPtr & remote_table_function_ptr_,
ClusterPtr & owned_cluster_,
const Context & context_)
{
auto res = ext::shared_ptr_helper::create(
- String{}, name_, columns_, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
+ String{}, table_name_, columns_, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
res->owned_cluster = owned_cluster_;
@@ -209,15 +244,19 @@ BlockInputStreams StorageDistributed::read(
processed_stage = result_size == 1
? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
+
const auto & modified_query_ast = rewriteSelectQuery(
- query_info.query, remote_database, remote_table);
+ query_info.query, remote_database, remote_table, remote_table_function_ptr);
Block header = materializeBlock(InterpreterSelectQuery(query_info.query, context, Names{}, processed_stage).getSampleBlock());
-
- ClusterProxy::SelectStreamFactory select_stream_factory(
- header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
-
+
+ ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr ?
+ ClusterProxy::SelectStreamFactory(
+ header, processed_stage, remote_table_function_ptr, context.getExternalTables())
+ : ClusterProxy::SelectStreamFactory(
+ header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
+
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings);
}
diff --git a/dbms/src/Storages/StorageDistributed.h b/dbms/src/Storages/StorageDistributed.h
index bdfd654ea6e..dda56bb3312 100644
--- a/dbms/src/Storages/StorageDistributed.h
+++ b/dbms/src/Storages/StorageDistributed.h
@@ -9,6 +9,7 @@
#include
#include
#include
+#include
#include
@@ -36,8 +37,15 @@ public:
static StoragePtr createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
- const String & remote_database_, /// database on remote servers.
- const String & remote_table_, /// The name of the table on the remote servers.
+ const String & remote_database_, /// database on remote servers.
+ const String & remote_table_, /// The name of the table on the remote servers.
+ ClusterPtr owned_cluster_,
+ const Context & context_);
+
+ static StoragePtr createWithOwnCluster(
+ const std::string & table_name_,
+ const ColumnsDescription & columns_,
+ ASTPtr & remote_table_function_ptr_, /// Table function ptr.
ClusterPtr & owned_cluster_,
const Context & context_);
@@ -101,6 +109,7 @@ public:
String table_name;
String remote_database;
String remote_table;
+ ASTPtr remote_table_function_ptr;
const Context & context;
Logger * log = &Logger::get("StorageDistributed");
@@ -146,6 +155,17 @@ protected:
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach);
+
+ StorageDistributed(
+ const String & database_name,
+ const String & table_name_,
+ const ColumnsDescription & columns_,
+ ASTPtr remote_table_function_ptr_,
+ const String & cluster_name_,
+ const Context & context_,
+ const ASTPtr & sharding_key_,
+ const String & data_path_,
+ bool attach);
};
}
diff --git a/dbms/src/Storages/StorageFactory.h b/dbms/src/Storages/StorageFactory.h
index 2acb9fb7c00..4addfcd9794 100644
--- a/dbms/src/Storages/StorageFactory.h
+++ b/dbms/src/Storages/StorageFactory.h
@@ -53,6 +53,11 @@ public:
/// No locking, you must register all engines before usage of get.
void registerStorage(const std::string & name, Creator creator);
+ const auto & getAllStorages() const
+ {
+ return storages;
+ }
+
private:
using Storages = std::unordered_map;
Storages storages;
diff --git a/dbms/src/Storages/StorageKafka.cpp b/dbms/src/Storages/StorageKafka.cpp
index a9666bab22c..7823dfdd65a 100644
--- a/dbms/src/Storages/StorageKafka.cpp
+++ b/dbms/src/Storages/StorageKafka.cpp
@@ -62,12 +62,19 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
{
rd_kafka_t * consumer;
rd_kafka_message_t * current;
+ bool current_pending;
Poco::Logger * log;
size_t read_messages;
+ char row_delimiter;
bool nextImpl() override
{
- reset();
+ if (current_pending)
+ {
+ BufferBase::set(reinterpret_cast(current->payload), current->len, 0);
+ current_pending = false;
+ return true;
+ }
// Process next buffered message
rd_kafka_message_t * msg = rd_kafka_consumer_poll(consumer, READ_POLL_MS);
@@ -88,13 +95,24 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
rd_kafka_message_destroy(msg);
return nextImpl();
}
+ ++read_messages;
+
+ // Now we've received a new message. Check if we need to produce a delimiter
+ if (row_delimiter != '\0' && current != nullptr)
+ {
+ BufferBase::set(&row_delimiter, 1, 0);
+ reset();
+ current = msg;
+ current_pending = true;
+ return true;
+ }
// Consume message and mark the topic/partition offset
- // The offsets will be committed in the insertSuffix() method after the block is completed
- // If an exception is thrown before that would occur, the client will rejoin without comitting offsets
- BufferBase::set(reinterpret_cast(msg->payload), msg->len, 0);
+ // The offsets will be committed in the readSuffix() method after the block is completed
+ // If an exception is thrown before that would occur, the client will rejoin without committing offsets
+ reset();
current = msg;
- ++read_messages;
+ BufferBase::set(reinterpret_cast(current->payload), current->len, 0);
return true;
}
@@ -108,8 +126,11 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
}
public:
- ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_)
- : ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr), log(log_), read_messages(0) {}
+ ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_, char row_delimiter_)
+ : ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr),
+ current_pending(false), log(log_), read_messages(0), row_delimiter(row_delimiter_) {
+ LOG_TRACE(log, "row delimiter is :" << row_delimiter);
+ }
~ReadBufferFromKafkaConsumer() { reset(); }
@@ -143,7 +164,7 @@ public:
// Create a formatted reader on Kafka messages
LOG_TRACE(storage.log, "Creating formatted reader");
- read_buf = std::make_unique(consumer->stream, storage.log);
+ read_buf = std::make_unique(consumer->stream, storage.log, storage.row_delimiter);
reader = FormatFactory::instance().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size);
}
@@ -226,13 +247,14 @@ StorageKafka::StorageKafka(
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
- const String & format_name_, const String & schema_name_, size_t num_consumers_)
+ const String & format_name_, char row_delimiter_, const String & schema_name_, size_t num_consumers_)
: IStorage{columns_},
table_name(table_name_), database_name(database_name_), context(context_),
topics(context.getMacros()->expand(topics_)),
brokers(context.getMacros()->expand(brokers_)),
group(context.getMacros()->expand(group_)),
format_name(context.getMacros()->expand(format_name_)),
+ row_delimiter(row_delimiter_),
schema_name(context.getMacros()->expand(schema_name_)),
num_consumers(num_consumers_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
semaphore(0, num_consumers_), mutex(), consumers(), event_update()
@@ -552,10 +574,10 @@ void registerStorageKafka(StorageFactory & factory)
* - Schema (optional, if the format supports it)
*/
- if (engine_args.size() < 3 || engine_args.size() > 6)
+ if (engine_args.size() < 3 || engine_args.size() > 7)
throw Exception(
- "Storage Kafka requires 3-6 parameters"
- " - Kafka broker list, list of topics to consume, consumer group ID, message format, schema, number of consumers",
+ "Storage Kafka requires 3-7 parameters"
+ " - Kafka broker list, list of topics to consume, consumer group ID, message format, row delimiter, schema, number of consumers",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String brokers;
@@ -569,13 +591,33 @@ void registerStorageKafka(StorageFactory & factory)
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
- // Parse format schema if supported (optional)
- String schema;
+ // Parse row delimiter (optional)
+ char row_delimiter = '\0';
if (engine_args.size() >= 5)
{
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
auto ast = typeid_cast(engine_args[4].get());
+ String arg;
+ if (ast && ast->value.getType() == Field::Types::String)
+ arg = safeGet(ast->value);
+ else
+ throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
+ if (arg.size() > 1)
+ throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
+ else if (arg.size() == 0)
+ row_delimiter = '\0';
+ else
+ row_delimiter = arg[0];
+ }
+
+ // Parse format schema if supported (optional)
+ String schema;
+ if (engine_args.size() >= 6)
+ {
+ engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
+
+ auto ast = typeid_cast(engine_args[5].get());
if (ast && ast->value.getType() == Field::Types::String)
schema = safeGet(ast->value);
else
@@ -584,9 +626,9 @@ void registerStorageKafka(StorageFactory & factory)
// Parse number of consumers (optional)
UInt64 num_consumers = 1;
- if (engine_args.size() >= 6)
+ if (engine_args.size() >= 7)
{
- auto ast = typeid_cast(engine_args[5].get());
+ auto ast = typeid_cast(engine_args[6].get());
if (ast && ast->value.getType() == Field::Types::UInt64)
num_consumers = safeGet(ast->value);
else
@@ -613,7 +655,7 @@ void registerStorageKafka(StorageFactory & factory)
return StorageKafka::create(
args.table_name, args.database_name, args.context, args.columns,
- brokers, group, topics, format, schema, num_consumers);
+ brokers, group, topics, format, row_delimiter, schema, num_consumers);
});
}
diff --git a/dbms/src/Storages/StorageKafka.h b/dbms/src/Storages/StorageKafka.h
index 45530517e94..9652d1d6a46 100644
--- a/dbms/src/Storages/StorageKafka.h
+++ b/dbms/src/Storages/StorageKafka.h
@@ -75,6 +75,9 @@ private:
const String brokers;
const String group;
const String format_name;
+ // Optional row delimiter for generating char delimited stream
+ // in order to make various input stream parsers happy.
+ char row_delimiter;
const String schema_name;
/// Total number of consumers
size_t num_consumers;
@@ -109,7 +112,7 @@ protected:
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
- const String & format_name_, const String & schema_name_, size_t num_consumers_);
+ const String & format_name_, char row_delimiter_, const String & schema_name_, size_t num_consumers_);
};
}
diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp
index e3f78e746b5..42774b06f1d 100644
--- a/dbms/src/Storages/StorageMergeTree.cpp
+++ b/dbms/src/Storages/StorageMergeTree.cpp
@@ -311,6 +311,7 @@ std::vector StorageMergeTree::getMutationsStatus() cons
part_data_versions.reserve(data_parts.size());
for (const auto & part : data_parts)
part_data_versions.push_back(part->info.getDataVersion());
+ std::sort(part_data_versions.begin(), part_data_versions.end());
std::vector result;
for (const auto & kv : current_mutations_by_version)
diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp
index c8b8b6d9706..09fd8bbba8a 100644
--- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp
+++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp
@@ -1971,9 +1971,10 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
String replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
+ auto [user, password] = context.getInterserverCredentials();
part_desc->res_part = fetcher.fetchPart(part_desc->found_new_part_name, replica_path,
- address.host, address.replication_port, timeouts, false, TMP_PREFIX + "fetch_");
+ address.host, address.replication_port, timeouts, user, password, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
@@ -2706,10 +2707,11 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
+ auto [user, password] = context.getInterserverCredentials();
try
{
- part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, to_detached);
+ part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, user, password, to_detached);
if (!to_detached)
{
diff --git a/dbms/src/Storages/System/IStorageSystemWithStringColumns.h b/dbms/src/Storages/System/IStorageSystemOneBlock.h
similarity index 63%
rename from dbms/src/Storages/System/IStorageSystemWithStringColumns.h
rename to dbms/src/Storages/System/IStorageSystemOneBlock.h
index 08e2f0a7bf5..96286f56eee 100644
--- a/dbms/src/Storages/System/IStorageSystemWithStringColumns.h
+++ b/dbms/src/Storages/System/IStorageSystemOneBlock.h
@@ -14,21 +14,15 @@ class Context;
/** Base class for system tables whose all columns have String type.
*/
template
-class IStorageSystemWithStringColumns : public IStorage
+class IStorageSystemOneBlock : public IStorage
{
protected:
- virtual void fillData(MutableColumns & res_columns) const = 0;
+ virtual void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const = 0;
public:
- IStorageSystemWithStringColumns(const String & name_) : name(name_)
+ IStorageSystemOneBlock(const String & name_) : name(name_)
{
- auto names = Self::getColumnNames();
- NamesAndTypesList name_list;
- for (const auto & name : names)
- {
- name_list.push_back(NameAndTypePair{name, std::make_shared()});
- }
- setColumns(ColumnsDescription(name_list));
+ setColumns(ColumnsDescription(Self::getNamesAndTypes()));
}
std::string getTableName() const override
@@ -37,8 +31,8 @@ public:
}
BlockInputStreams read(const Names & column_names,
- const SelectQueryInfo & /*query_info*/,
- const Context & /*context*/,
+ const SelectQueryInfo & query_info,
+ const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t /*max_block_size*/,
unsigned /*num_streams*/) override
@@ -48,7 +42,7 @@ public:
Block sample_block = getSampleBlock();
MutableColumns res_columns = sample_block.cloneEmptyColumns();
- fillData(res_columns);
+ fillData(res_columns, context, query_info);
return BlockInputStreams(1, std::make_shared(sample_block.cloneWithColumns(std::move(res_columns))));
}
diff --git a/dbms/src/Storages/System/StorageSystemAggregateFunctionCombinators.cpp b/dbms/src/Storages/System/StorageSystemAggregateFunctionCombinators.cpp
index 9dd106ce2d7..8fa335faceb 100644
--- a/dbms/src/Storages/System/StorageSystemAggregateFunctionCombinators.cpp
+++ b/dbms/src/Storages/System/StorageSystemAggregateFunctionCombinators.cpp
@@ -3,12 +3,23 @@
namespace DB
{
-void StorageSystemAggregateFunctionCombinators::fillData(MutableColumns & res_columns) const
+
+NamesAndTypesList StorageSystemAggregateFunctionCombinators::getNamesAndTypes()
+{
+ return {
+ {"name", std::make_shared()},
+ {"is_internal", std::make_shared()},
+ };
+}
+
+void StorageSystemAggregateFunctionCombinators::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
const auto & combinators = AggregateFunctionCombinatorFactory::instance().getAllAggregateFunctionCombinators();
for (const auto & pair : combinators)
{
res_columns[0]->insert(pair.first);
+ res_columns[1]->insert(UInt64(pair.second->isForInternalUsageOnly()));
}
}
+
}
diff --git a/dbms/src/Storages/System/StorageSystemAggregateFunctionCombinators.h b/dbms/src/Storages/System/StorageSystemAggregateFunctionCombinators.h
index 097fe93666e..1d7226eda8b 100644
--- a/dbms/src/Storages/System/StorageSystemAggregateFunctionCombinators.h
+++ b/dbms/src/Storages/System/StorageSystemAggregateFunctionCombinators.h
@@ -1,26 +1,25 @@
#pragma once
-#include
+#include
+#include
+#include
#include
namespace DB
{
class StorageSystemAggregateFunctionCombinators : public ext::shared_ptr_helper,
- public IStorageSystemWithStringColumns
+ public IStorageSystemOneBlock
{
protected:
- void fillData(MutableColumns & res_columns) const override;
+ void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
+ using IStorageSystemOneBlock::IStorageSystemOneBlock;
public:
- using IStorageSystemWithStringColumns::IStorageSystemWithStringColumns;
std::string getName() const override
{
return "SystemAggregateFunctionCombinators";
}
- static std::vector getColumnNames()
- {
- return {"name"};
- }
+ static NamesAndTypesList getNamesAndTypes();
};
}
diff --git a/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp b/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp
index bc2f76379e9..059ef708a81 100644
--- a/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp
+++ b/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp
@@ -1,51 +1,34 @@
-#include
-
-#include
-#include
-#include
#include
#include
-#include
+#include
+#include
namespace DB
{
-
-StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_)
- : name(name_),
- async_metrics(async_metrics_)
+NamesAndTypesList StorageSystemAsynchronousMetrics::getNamesAndTypes()
{
- setColumns(ColumnsDescription({
+ return {
{"metric", std::make_shared()},
{"value", std::make_shared()},
- }));
+ };
}
-BlockInputStreams StorageSystemAsynchronousMetrics::read(
- const Names & column_names,
- const SelectQueryInfo &,
- const Context &,
- QueryProcessingStage::Enum & processed_stage,
- const size_t /*max_block_size*/,
- const unsigned /*num_streams*/)
+StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_)
+ : IStorageSystemOneBlock(name_), async_metrics(async_metrics_)
{
- check(column_names);
- processed_stage = QueryProcessingStage::FetchColumns;
-
- MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
+}
+void StorageSystemAsynchronousMetrics::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
+{
auto async_metrics_values = async_metrics.getValues();
-
for (const auto & name_value : async_metrics_values)
{
res_columns[0]->insert(name_value.first);
res_columns[1]->insert(name_value.second);
}
-
- return BlockInputStreams(1, std::make_shared(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
-
}
diff --git a/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.h b/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.h
index 60e50096143..853cb97c974 100644
--- a/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.h
+++ b/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.h
@@ -1,8 +1,7 @@
#pragma once
#include
-#include
-
+#include
namespace DB
{
@@ -13,26 +12,20 @@ class Context;
/** Implements system table asynchronous_metrics, which allows to get values of periodically (asynchronously) updated metrics.
*/
-class StorageSystemAsynchronousMetrics : public ext::shared_ptr_helper, public IStorage
+class StorageSystemAsynchronousMetrics : public ext::shared_ptr_helper, public IStorageSystemOneBlock
{
public:
std::string getName() const override { return "SystemAsynchronousMetrics"; }
- std::string getTableName() const override { return name; }
- BlockInputStreams read(
- const Names & column_names,
- const SelectQueryInfo & query_info,
- const Context & context,
- QueryProcessingStage::Enum & processed_stage,
- size_t max_block_size,
- unsigned num_streams) override;
+ static NamesAndTypesList getNamesAndTypes();
private:
- const std::string name;
const AsynchronousMetrics & async_metrics;
protected:
StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_);
+
+ void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}
diff --git a/dbms/src/Storages/System/StorageSystemBuildOptions.cpp b/dbms/src/Storages/System/StorageSystemBuildOptions.cpp
index e62e6e9bbfd..2a8ffc947be 100644
--- a/dbms/src/Storages/System/StorageSystemBuildOptions.cpp
+++ b/dbms/src/Storages/System/StorageSystemBuildOptions.cpp
@@ -1,46 +1,26 @@
-#include
+#include
#include
-#include
-#include
#include
#include
-#include
namespace DB
{
-
-StorageSystemBuildOptions::StorageSystemBuildOptions(const std::string & name_)
- : name(name_)
+NamesAndTypesList StorageSystemBuildOptions::getNamesAndTypes()
{
- setColumns(ColumnsDescription({
- { "name", std::make_shared() },
- { "value", std::make_shared() },
- }));
+ return {
+ {"name", std::make_shared()},
+ {"value", std::make_shared()},
+ };
}
-
-BlockInputStreams StorageSystemBuildOptions::read(
- const Names & column_names,
- const SelectQueryInfo &,
- const Context &,
- QueryProcessingStage::Enum & processed_stage,
- const size_t /*max_block_size*/,
- const unsigned /*num_streams*/)
+void StorageSystemBuildOptions::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
- check(column_names);
- processed_stage = QueryProcessingStage::FetchColumns;
-
- MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
-
for (auto it = auto_config_build; *it; it += 2)
{
res_columns[0]->insert(String(it[0]));
res_columns[1]->insert(String(it[1]));
}
-
- return BlockInputStreams(1, std::make_shared(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
-
}
diff --git a/dbms/src/Storages/System/StorageSystemBuildOptions.h b/dbms/src/Storages/System/StorageSystemBuildOptions.h
index d772b255383..749ffbddbaf 100644
--- a/dbms/src/Storages/System/StorageSystemBuildOptions.h
+++ b/dbms/src/Storages/System/StorageSystemBuildOptions.h
@@ -1,7 +1,7 @@
#pragma once
#include
-#include
+#include
namespace DB
@@ -12,25 +12,18 @@ class Context;
/** System table "build_options" with many params used for clickhouse building
*/
-class StorageSystemBuildOptions : public ext::shared_ptr_helper, public IStorage
+class StorageSystemBuildOptions : public ext::shared_ptr_helper, public IStorageSystemOneBlock
{
-public:
- std::string getName() const override { return "SystemBuildOptions"; }
- std::string getTableName() const override { return name; }
-
- BlockInputStreams read(
- const Names & column_names,
- const SelectQueryInfo & query_info,
- const Context & context,
- QueryProcessingStage::Enum & processed_stage,
- size_t max_block_size,
- unsigned num_streams) override;
-
-private:
- const std::string name;
-
protected:
- StorageSystemBuildOptions(const std::string & name_);
+ void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
+
+ using IStorageSystemOneBlock::IStorageSystemOneBlock;
+
+public:
+
+ std::string getName() const override { return "SystemBuildOptions"; }
+
+ static NamesAndTypesList getNamesAndTypes();
};
}
diff --git a/dbms/src/Storages/System/StorageSystemClusters.cpp b/dbms/src/Storages/System/StorageSystemClusters.cpp
index fb5c4e41b82..3527de302a1 100644
--- a/dbms/src/Storages/System/StorageSystemClusters.cpp
+++ b/dbms/src/Storages/System/StorageSystemClusters.cpp
@@ -1,50 +1,32 @@
-#include
-#include
-#include