Merged with master

This commit is contained in:
Nikolai Kochetov 2018-07-27 19:21:43 +03:00
commit b5d6240646
178 changed files with 2127 additions and 1320 deletions

View File

@ -1,3 +1,36 @@
## ClickHouse release 18.1.0, 2018-07-23
### New features:
* Support for the `ALTER TABLE t DELETE WHERE` query for non-replicated MergeTree tables ([#2634](https://github.com/yandex/ClickHouse/pull/2634)).
* Support for arbitrary types for the `uniq*` family of aggregate functions ([#2010](https://github.com/yandex/ClickHouse/issues/2010)).
* Support for arbitrary types in comparison operators ([#2026](https://github.com/yandex/ClickHouse/issues/2026)).
* The `users.xml` file allows setting a subnet mask in the format `10.0.0.1/255.255.255.0`. This is necessary for using masks for IPv6 networks with zeros in the middle ([#2637](https://github.com/yandex/ClickHouse/pull/2637)).
* Added the `arrayDistinct` function ([#2670](https://github.com/yandex/ClickHouse/pull/2670)).
* The SummingMergeTree engine can now work with AggregateFunction type columns ([Constantin S. Pan](https://github.com/yandex/ClickHouse/pull/2566)).
### Improvements:
* Changed the numbering scheme for release versions. Now the first part contains the year of release (A.D., Moscow timezone, minus 2000), the second part contains the number for major changes (increases for most releases), and the third part is the patch version. Releases are still backwards compatible, unless otherwise stated in the changelog.
* Faster conversions of floating-point numbers to a string ([Amos Bird](https://github.com/yandex/ClickHouse/pull/2664)).
* If some rows were skipped during an insert due to parsing errors (this is possible with the `input_allow_errors_num` and `input_allow_errors_ratio` settings enabled), the number of skipped rows is now written to the server log ([Leonardo Cecchi](https://github.com/yandex/ClickHouse/pull/2669)).
### Bug fixes:
* Fixed the TRUNCATE command for temporary tables ([Amos Bird](https://github.com/yandex/ClickHouse/pull/2624)).
* Fixed a rare deadlock in the ZooKeeper client library that occurred when there was a network error while reading the response ([c315200](https://github.com/yandex/ClickHouse/commit/c315200e64b87e44bdf740707fc857d1fdf7e947)).
* Fixed an error during a CAST to Nullable types ([#1322](https://github.com/yandex/ClickHouse/issues/1322)).
* Fixed the incorrect result of the `maxIntersection()` function when the boundaries of intervals coincided ([Michael Furmur](https://github.com/yandex/ClickHouse/pull/2657)).
* Fixed incorrect transformation of the OR expression chain in a function argument ([chenxing-xc](https://github.com/yandex/ClickHouse/pull/2663)).
* Fixed performance degradation for queries containing `IN (subquery)` expressions inside another subquery ([#2571](https://github.com/yandex/ClickHouse/issues/2571)).
* Fixed incompatibility between servers with different versions in distributed queries that use a `CAST` function that isn't in uppercase letters ([fe8c4d6](https://github.com/yandex/ClickHouse/commit/fe8c4d64e434cacd4ceef34faa9005129f2190a5)).
* Added missing quoting of identifiers for queries to an external DBMS ([#2635](https://github.com/yandex/ClickHouse/issues/2635)).
### Backward incompatible changes:
* Converting a string containing the number zero to DateTime does not work. Example: `SELECT toDateTime('0')`. This is also the reason that `DateTime DEFAULT '0'` does not work in tables, as well as `<null_value>0</null_value>` in dictionaries. Solution: replace `0` with `0000-00-00 00:00:00`.
## ClickHouse release 1.1.54394, 2018-07-12
### New features:

View File

@ -1,3 +1,32 @@
## ClickHouse release 18.1.0, 2018-07-23
### Новые возможности:
* Поддержка запроса `ALTER TABLE t DELETE WHERE` для нереплицированных MergeTree-таблиц ([#2634](https://github.com/yandex/ClickHouse/pull/2634)).
* Поддержка произвольных типов для семейства агрегатных функций `uniq*` ([#2010](https://github.com/yandex/ClickHouse/issues/2010)).
* Поддержка произвольных типов в операторах сравнения ([#2026](https://github.com/yandex/ClickHouse/issues/2026)).
* Возможность в `users.xml` указывать маску подсети в формате `10.0.0.1/255.255.255.0`. Это необходимо для использования "дырявых" масок IPv6 сетей ([#2637](https://github.com/yandex/ClickHouse/pull/2637)).
* Добавлена функция `arrayDistinct` ([#2670](https://github.com/yandex/ClickHouse/pull/2670)).
* Движок SummingMergeTree теперь может работать со столбцами типа AggregateFunction ([Constantin S. Pan](https://github.com/yandex/ClickHouse/pull/2566)).
### Улучшения:
* Изменена схема версионирования релизов. Теперь первый компонент содержит год релиза (A.D.; по московскому времени; из номера вычитается 2000), второй - номер крупных изменений (увеличивается для большинства релизов), третий - патч-версия. Релизы по-прежнему обратно совместимы, если другое не указано в changelog.
* Ускорено преобразование чисел с плавающей точкой в строку ([Amos Bird](https://github.com/yandex/ClickHouse/pull/2664)).
* Теперь, если при вставке из-за ошибок парсинга пропущено некоторое количество строк (такое возможно про включённых настройках `input_allow_errors_num`, `input_allow_errors_ratio`), это количество пишется в лог сервера ([Leonardo Cecchi](https://github.com/yandex/ClickHouse/pull/2669)).
### Исправление ошибок:
* Исправлена работа команды TRUNCATE для временных таблиц ([Amos Bird](https://github.com/yandex/ClickHouse/pull/2624)).
* Исправлен редкий deadlock в клиентской библиотеке ZooKeeper, который возникал при сетевой ошибке во время вычитывания ответа ([c315200](https://github.com/yandex/ClickHouse/commit/c315200e64b87e44bdf740707fc857d1fdf7e947)).
* Исправлена ошибка при CAST в Nullable типы ([#1322](https://github.com/yandex/ClickHouse/issues/1322)).
* Исправлен неправильный результат функции `maxIntersection()` в случае совпадения границ отрезков ([Michael Furmur](https://github.com/yandex/ClickHouse/pull/2657)).
* Исправлено неверное преобразование цепочки OR-выражений в аргументе функции ([chenxing-xc](https://github.com/yandex/ClickHouse/pull/2663)).
* Исправлена деградация производительности запросов, содержащих выражение `IN (подзапрос)` внутри другого подзапроса ([#2571](https://github.com/yandex/ClickHouse/issues/2571)).
* Исправлена несовместимость серверов разных версий при распределённых запросах, использующих функцию `CAST` не в верхнем регистре ([fe8c4d6](https://github.com/yandex/ClickHouse/commit/fe8c4d64e434cacd4ceef34faa9005129f2190a5)).
* Добавлено недостающее квотирование идентификаторов при запросах к внешним СУБД ([#2635](https://github.com/yandex/ClickHouse/issues/2635)).
### Обратно несовместимые изменения:
* Не работает преобразование строки, содержащей число ноль, в DateTime. Пример: `SELECT toDateTime('0')`. По той же причине не работает `DateTime DEFAULT '0'` в таблицах, а также `<null_value>0</null_value>` в словарях. Решение: заменить `0` на `0000-00-00 00:00:00`.
## ClickHouse release 1.1.54394, 2018-07-12
### Новые возможности:

View File

@ -2,6 +2,8 @@
ClickHouse is an open-source column-oriented database management system that allows generating analytical data reports in real time.
[![Build Status](https://travis-ci.org/yandex/ClickHouse.svg?branch=master)](https://travis-ci.org/yandex/ClickHouse)
## Useful links
* [Official website](https://clickhouse.yandex/) has quick high-level overview of ClickHouse on main page.
@ -9,5 +11,3 @@ ClickHouse is an open-source column-oriented database management system that all
* [Documentation](https://clickhouse.yandex/docs/en/) provides more in-depth information.
* [Contacts](https://clickhouse.yandex/#contacts) can help to get your questions answered if there are any.
[![Build Status](https://travis-ci.org/yandex/ClickHouse.svg?branch=master)](https://travis-ci.org/yandex/ClickHouse)

View File

@ -26,10 +26,10 @@ if (ENABLE_EMBEDDED_COMPILER)
if (LLVM_FOUND)
find_library (LLD_LIBRARY_TEST lldCore PATHS ${LLVM_LIBRARY_DIRS})
find_path (LLD_INCLUDE_DIR_TEST NAMES lld/Common/Driver.h PATHS ${LLVM_INCLUDE_DIRS})
find_path (LLD_INCLUDE_DIR_TEST NAMES lld/Core/AbsoluteAtom.h PATHS ${LLVM_INCLUDE_DIRS})
if (NOT LLD_LIBRARY_TEST OR NOT LLD_INCLUDE_DIR_TEST)
set (LLVM_FOUND 0)
message(WARNING "liblld not found in ${LLVM_INCLUDE_DIRS} ${LLVM_LIBRARY_DIRS}. Disabling internal compiler.")
message(WARNING "liblld (${LLD_LIBRARY_TEST}, ${LLD_INCLUDE_DIR_TEST}) not found in ${LLVM_INCLUDE_DIRS} ${LLVM_LIBRARY_DIRS}. Disabling internal compiler.")
endif ()
endif ()

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54396 CACHE STRING "")
set(VERSION_REVISION 54397 CACHE STRING "")
set(VERSION_MAJOR 18 CACHE STRING "")
set(VERSION_MINOR 1 CACHE STRING "")
set(VERSION_MINOR 2 CACHE STRING "")
set(VERSION_PATCH 0 CACHE STRING "")
set(VERSION_GITHASH 550f41bc65cb03201acad489e7b96ea346ed8259 CACHE STRING "")
set(VERSION_DESCRIBE v18.1.0-testing CACHE STRING "")
set(VERSION_STRING 18.1.0 CACHE STRING "")
set(VERSION_GITHASH 6ad677d7d6961a0c9088ccd9eff55779cfdaa654 CACHE STRING "")
set(VERSION_DESCRIBE v18.2.0-testing CACHE STRING "")
set(VERSION_STRING 18.2.0 CACHE STRING "")
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -58,13 +58,13 @@ It is designed to retain the following properties of data:
Most of the properties above are viable for performance testing:
- reading data, filtering, aggregation and sorting will work at almost the same speed
as on original data due to saved cardinalities, magnitudes, compression ratios, etc.
as on original data due to saved cardinalities, magnitudes, compression ratios, etc.
It works in deterministic fashion: you define a seed value and transform is totally determined by input data and by seed.
Some transforms are one to one and could be reversed, so you need to have large enough seed and keep it in secret.
It use some cryptographic primitives to transform data, but from the cryptographic point of view,
it doesn't do anything properly and you should never consider the result as secure, unless you have other reasons for it.
it doesn't do anything properly and you should never consider the result as secure, unless you have other reasons for it.
It may retain some data you don't want to publish.
@ -74,7 +74,7 @@ So, the user will be able to count exact ratio of mobile traffic.
Another example, suppose you have some private data in your table, like user email and you don't want to publish any single email address.
If your table is large enough and contain multiple different emails and there is no email that have very high frequency than all others,
it will perfectly anonymize all data. But if you have small amount of different values in a column, it can possibly reproduce some of them.
it will perfectly anonymize all data. But if you have small amount of different values in a column, it can possibly reproduce some of them.
And you should take care and look at exact algorithm, how this tool works, and probably fine tune some of it command line parameters.
This tool works fine only with reasonable amount of data (at least 1000s of rows).

View File

@ -1,3 +1,4 @@
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
@ -23,14 +24,40 @@ namespace ErrorCodes
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
}
std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(Poco::Net::HTTPServerRequest & request) const
{
const auto & config = server.config();
if (config.has("interserver_http_credentials.user"))
{
if (!request.hasCredentials())
return {"Server requires HTTP Basic authentification, but client doesn't provide it", false};
String scheme, info;
request.getCredentials(scheme, info);
if (scheme != "Basic")
return {"Server requires HTTP Basic authentification but client provides another method", false};
String user = config.getString("interserver_http_credentials.user");
String password = config.getString("interserver_http_credentials.password", "");
Poco::Net::HTTPBasicCredentials credentials(info);
if (std::make_pair(user, password) != std::make_pair(credentials.getUsername(), credentials.getPassword()))
return {"Incorrect user or password in HTTP Basic authentification", false};
}
else if (request.hasCredentials())
{
return {"Client requires HTTP Basic authentification, but server doesn't provide it", false};
}
return {"", true};
}
void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
HTMLForm params(request);
LOG_TRACE(log, "Request URI: " << request.getURI());
/// NOTE: You can do authentication here if you need to.
String endpoint_name = params.get("endpoint");
bool compress = params.get("compress") == "true";
@ -65,8 +92,18 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ
try
{
processQuery(request, response);
LOG_INFO(log, "Done processing query");
if (auto [msg, success] = checkAuthentication(request); success)
{
processQuery(request, response);
LOG_INFO(log, "Done processing query");
}
else
{
response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED);
if (!response.sent())
response.send() << msg << std::endl;
LOG_WARNING(log, "Query processing failed request: '" << request.getURI() << "' authentification failed");
}
}
catch (Exception & e)
{

View File

@ -34,6 +34,8 @@ private:
CurrentMetrics::Increment metric_increment{CurrentMetrics::InterserverConnection};
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
std::pair<String, bool> checkAuthentication(Poco::Net::HTTPServerRequest & request) const;
};
}

View File

@ -230,6 +230,17 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setInterserverIOAddress(this_host, port);
}
if (config().has("interserver_http_credentials"))
{
String user = config().getString("interserver_http_credentials.user", "");
String password = config().getString("interserver_http_credentials.password", "");
if (user.empty())
throw Exception("Configuration parameter interserver_http_credentials user can't be empty", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
global_context->setInterverserCredentials(user, password);
}
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros"));

View File

@ -18,6 +18,9 @@ public:
DataTypes transformArguments(const DataTypes & arguments) const override
{
if (0 == arguments.size())
throw Exception("-Array aggregate functions require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
DataTypes nested_arguments;
for (const auto & type : arguments)
{

View File

@ -38,9 +38,9 @@ void registerAggregateFunctionsBitwise(AggregateFunctionFactory & factory)
factory.registerFunction("groupBitXor", createAggregateFunctionBitwise<AggregateFunctionGroupBitXorData>);
/// Aliases for compatibility with MySQL.
factory.registerFunction("BIT_OR", createAggregateFunctionBitwise<AggregateFunctionGroupBitOrData>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("BIT_AND", createAggregateFunctionBitwise<AggregateFunctionGroupBitAndData>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("BIT_XOR", createAggregateFunctionBitwise<AggregateFunctionGroupBitXorData>, AggregateFunctionFactory::CaseInsensitive);
factory.registerAlias("BIT_OR", "groupBitOr", AggregateFunctionFactory::CaseInsensitive);
factory.registerAlias("BIT_AND", "groupBitAnd", AggregateFunctionFactory::CaseInsensitive);
factory.registerAlias("BIT_XOR", "groupBitXor", AggregateFunctionFactory::CaseInsensitive);
}
}

View File

@ -78,11 +78,12 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
AggregateFunctionPtr AggregateFunctionFactory::getImpl(
const String & name,
const String & name_param,
const DataTypes & argument_types,
const Array & parameters,
int recursion_level) const
{
String name = getAliasToOrName(name_param);
/// Find by exact match.
auto it = aggregate_functions.find(name);
if (it != aggregate_functions.end())
@ -103,8 +104,8 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
if (AggregateFunctionCombinatorPtr combinator = AggregateFunctionCombinatorFactory::instance().tryFindSuffix(name))
{
if (combinator->getName() == "Null")
throw Exception("Aggregate function combinator 'Null' is only for internal usage", ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
if (combinator->isForInternalUsageOnly())
throw Exception("Aggregate function combinator '" + combinator->getName() + "' is only for internal usage", ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
String nested_name = name.substr(0, name.size() - combinator->getName().size());
DataTypes nested_types = combinator->transformArguments(argument_types);
@ -126,10 +127,11 @@ AggregateFunctionPtr AggregateFunctionFactory::tryGet(const String & name, const
bool AggregateFunctionFactory::isAggregateFunctionName(const String & name, int recursion_level) const
{
if (aggregate_functions.count(name))
if (aggregate_functions.count(name) || isAlias(name))
return true;
if (recursion_level == 0 && case_insensitive_aggregate_functions.count(Poco::toLower(name)))
String name_lowercase = Poco::toLower(name);
if (recursion_level == 0 && (case_insensitive_aggregate_functions.count(name_lowercase) || isAlias(name_lowercase)))
return true;
if (AggregateFunctionCombinatorPtr combinator = AggregateFunctionCombinatorFactory::instance().tryFindSuffix(name))

View File

@ -1,6 +1,7 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Common/IFactoryWithAliases.h>
#include <ext/singleton.h>
@ -20,27 +21,18 @@ class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
using DataTypes = std::vector<DataTypePtr>;
/** Creator have arguments: name of aggregate function, types of arguments, values of parameters.
* Parameters are for "parametric" aggregate functions.
* For example, in quantileWeighted(0.9)(x, weight), 0.9 is "parameter" and x, weight are "arguments".
*/
using AggregateFunctionCreator = std::function<AggregateFunctionPtr(const String &, const DataTypes &, const Array &)>;
/** Creates an aggregate function by name.
*/
class AggregateFunctionFactory final : public ext::singleton<AggregateFunctionFactory>
class AggregateFunctionFactory final : public ext::singleton<AggregateFunctionFactory>, public IFactoryWithAliases<AggregateFunctionCreator>
{
friend class StorageSystemFunctions;
public:
/** Creator have arguments: name of aggregate function, types of arguments, values of parameters.
* Parameters are for "parametric" aggregate functions.
* For example, in quantileWeighted(0.9)(x, weight), 0.9 is "parameter" and x, weight are "arguments".
*/
using Creator = std::function<AggregateFunctionPtr(const String &, const DataTypes &, const Array &)>;
/// For compatibility with SQL, it's possible to specify that certain aggregate function name is case insensitive.
enum CaseSensitiveness
{
CaseSensitive,
CaseInsensitive
};
/// Register a function by its name.
/// No locking, you must register all functions before usage of get.
void registerFunction(
@ -77,6 +69,13 @@ private:
/// Case insensitive aggregate functions will be additionally added here with lowercased name.
AggregateFunctions case_insensitive_aggregate_functions;
const AggregateFunctions & getCreatorMap() const override { return aggregate_functions; }
const AggregateFunctions & getCaseInsensitiveCreatorMap() const override { return case_insensitive_aggregate_functions; }
String getFactoryName() const override { return "AggregateFunctionFactory"; }
};
}

View File

@ -18,6 +18,8 @@ class AggregateFunctionCombinatorNull final : public IAggregateFunctionCombinato
public:
String getName() const override { return "Null"; }
bool isForInternalUsageOnly() const override { return true; }
DataTypes transformArguments(const DataTypes & arguments) const override
{
size_t size = arguments.size();

View File

@ -93,30 +93,14 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
createAggregateFunctionQuantile<QuantileTDigest, NameQuantilesTDigestWeighted, true, Float32, true>);
/// 'median' is an alias for 'quantile'
factory.registerFunction("median",
createAggregateFunctionQuantile<QuantileReservoirSampler, NameQuantile, false, Float64, false>);
factory.registerFunction("medianDeterministic",
createAggregateFunctionQuantile<QuantileReservoirSamplerDeterministic, NameQuantileDeterministic, true, Float64, false>);
factory.registerFunction("medianExact",
createAggregateFunctionQuantile<QuantileExact, NameQuantileExact, false, void, false>);
factory.registerFunction("medianExactWeighted",
createAggregateFunctionQuantile<QuantileExactWeighted, NameQuantileExactWeighted, true, void, false>);
factory.registerFunction("medianTiming",
createAggregateFunctionQuantile<QuantileTiming, NameQuantileTiming, false, Float32, false>);
factory.registerFunction("medianTimingWeighted",
createAggregateFunctionQuantile<QuantileTiming, NameQuantileTimingWeighted, true, Float32, false>);
factory.registerFunction("medianTDigest",
createAggregateFunctionQuantile<QuantileTDigest, NameQuantileTDigest, false, Float32, false>);
factory.registerFunction("medianTDigestWeighted",
createAggregateFunctionQuantile<QuantileTDigest, NameQuantileTDigestWeighted, true, Float32, false>);
factory.registerAlias("median", NameQuantile::name);
factory.registerAlias("medianDeterministic", NameQuantileDeterministic::name);
factory.registerAlias("medianExact", NameQuantileExact::name);
factory.registerAlias("medianExactWeighted", NameQuantileExactWeighted::name);
factory.registerAlias("medianTiming", NameQuantileTiming::name);
factory.registerAlias("medianTimingWeighted", NameQuantileTimingWeighted::name);
factory.registerAlias("medianTDigest", NameQuantileTDigest::name);
factory.registerAlias("medianTDigestWeighted", NameQuantileTDigestWeighted::name);
}
}

View File

@ -116,7 +116,7 @@ struct AggregateFunctionWindowFunnelData
/// TODO Protection against huge size
events_list.clear();
events_list.resize(size);
events_list.reserve(size);
UInt32 timestamp;
UInt8 event;

View File

@ -56,12 +56,12 @@ void registerAggregateFunctionsStatisticsSimple(AggregateFunctionFactory & facto
factory.registerFunction("corr", createAggregateFunctionStatisticsBinary<AggregateFunctionCorrSimple>, AggregateFunctionFactory::CaseInsensitive);
/// Synonims for compatibility.
factory.registerFunction("VAR_SAMP", createAggregateFunctionStatisticsUnary<AggregateFunctionVarSampSimple>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("VAR_POP", createAggregateFunctionStatisticsUnary<AggregateFunctionVarPopSimple>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("STDDEV_SAMP", createAggregateFunctionStatisticsUnary<AggregateFunctionStddevSampSimple>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("STDDEV_POP", createAggregateFunctionStatisticsUnary<AggregateFunctionStddevPopSimple>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("COVAR_SAMP", createAggregateFunctionStatisticsBinary<AggregateFunctionCovarSampSimple>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("COVAR_POP", createAggregateFunctionStatisticsBinary<AggregateFunctionCovarPopSimple>, AggregateFunctionFactory::CaseInsensitive);
factory.registerAlias("VAR_SAMP", "varSamp", AggregateFunctionFactory::CaseInsensitive);
factory.registerAlias("VAR_POP", "varPop", AggregateFunctionFactory::CaseInsensitive);
factory.registerAlias("STDDEV_SAMP", "stddevSamp", AggregateFunctionFactory::CaseInsensitive);
factory.registerAlias("STDDEV_POP", "stddevPop", AggregateFunctionFactory::CaseInsensitive);
factory.registerAlias("COVAR_SAMP", "covarSamp", AggregateFunctionFactory::CaseInsensitive);
factory.registerAlias("COVAR_POP", "covarPop", AggregateFunctionFactory::CaseInsensitive);
}
}

View File

@ -32,6 +32,8 @@ class IAggregateFunctionCombinator
public:
virtual String getName() const = 0;
virtual bool isForInternalUsageOnly() const { return false; }
/** From the arguments for combined function (ex: UInt64, UInt8 for sumIf),
* get the arguments for nested function (ex: UInt64 for sum).
* If arguments are not suitable for combined function, throw an exception.

View File

@ -83,6 +83,16 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Se
return entries;
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction(const Settings * settings, PoolMode pool_mode)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
};
return getManyImpl(settings, pool_mode, try_get_entry);
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check)
{
@ -90,6 +100,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
{
return tryGetEntry(pool, fail_message, settings, &table_to_check);
};
return getManyImpl(settings, pool_mode, try_get_entry);
}

View File

@ -47,6 +47,9 @@ public:
*/
std::vector<Entry> getMany(const Settings * settings, PoolMode pool_mode);
/// The same as getMany(), but return std::vector<TryResult>.
std::vector<TryResult> getManyForTableFunction(const Settings * settings, PoolMode pool_mode);
using Base = PoolWithFailoverBase<IConnectionPool>;
using TryResult = Base::TryResult;

View File

@ -128,7 +128,8 @@ void BackgroundSchedulePool::TaskInfo::execute()
zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
{
return [t=shared_from_this()](const ZooKeeperImpl::ZooKeeper::WatchResponse &) {
return [t = shared_from_this()](const ZooKeeperImpl::ZooKeeper::WatchResponse &)
{
t->schedule();
};
}

View File

@ -0,0 +1,125 @@
#pragma once
#include <Common/Exception.h>
#include <Core/Types.h>
#include <Poco/String.h>
#include <unordered_map>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/** If stored objects may have several names (aliases)
* this interface may be helpful
* template parameter is available as Creator
*/
template <typename CreatorFunc>
class IFactoryWithAliases
{
protected:
using Creator = CreatorFunc;
String getAliasToOrName(const String & name) const
{
if (aliases.count(name))
return aliases.at(name);
else if (String name_lowercase = Poco::toLower(name); case_insensitive_aliases.count(name_lowercase))
return case_insensitive_aliases.at(name_lowercase);
else
return name;
}
public:
/// For compatibility with SQL, it's possible to specify that certain function name is case insensitive.
enum CaseSensitiveness
{
CaseSensitive,
CaseInsensitive
};
/** Register additional name for creator
* real_name have to be already registered.
*/
void registerAlias(const String & alias_name, const String & real_name, CaseSensitiveness case_sensitiveness = CaseSensitive)
{
const auto & creator_map = getCreatorMap();
const auto & case_insensitive_creator_map = getCaseInsensitiveCreatorMap();
const String factory_name = getFactoryName();
String real_dict_name;
if (creator_map.count(real_name))
real_dict_name = real_name;
else if (auto real_name_lowercase = Poco::toLower(real_name); case_insensitive_creator_map.count(real_name_lowercase))
real_dict_name = real_name_lowercase;
else
throw Exception(factory_name + ": can't create alias '" + alias_name + "', the real name '" + real_name + "' is not registered",
ErrorCodes::LOGICAL_ERROR);
String alias_name_lowercase = Poco::toLower(alias_name);
if (creator_map.count(alias_name) || case_insensitive_creator_map.count(alias_name_lowercase))
throw Exception(
factory_name + ": the alias name '" + alias_name + "' is already registered as real name", ErrorCodes::LOGICAL_ERROR);
if (case_sensitiveness == CaseInsensitive)
if (!case_insensitive_aliases.emplace(alias_name_lowercase, real_dict_name).second)
throw Exception(
factory_name + ": case insensitive alias name '" + alias_name + "' is not unique", ErrorCodes::LOGICAL_ERROR);
if (!aliases.emplace(alias_name, real_dict_name).second)
throw Exception(factory_name + ": alias name '" + alias_name + "' is not unique", ErrorCodes::LOGICAL_ERROR);
}
std::vector<String> getAllRegisteredNames() const
{
std::vector<String> result;
auto getter = [](const auto & pair) { return pair.first; };
std::transform(getCreatorMap().begin(), getCreatorMap().end(), std::back_inserter(result), getter);
std::transform(aliases.begin(), aliases.end(), std::back_inserter(result), getter);
return result;
}
bool isCaseInsensitive(const String & name) const
{
String name_lowercase = Poco::toLower(name);
return getCaseInsensitiveCreatorMap().count(name_lowercase) || case_insensitive_aliases.count(name_lowercase);
}
const String & aliasTo(const String & name) const
{
if (auto it = aliases.find(name); it != aliases.end())
return it->second;
else if (auto it = case_insensitive_aliases.find(Poco::toLower(name)); it != case_insensitive_aliases.end())
return it->second;
throw Exception(getFactoryName() + ": name '" + name + "' is not alias", ErrorCodes::LOGICAL_ERROR);
}
bool isAlias(const String & name) const
{
return aliases.count(name) || case_insensitive_aliases.count(name);
}
virtual ~IFactoryWithAliases() {}
private:
using InnerMap = std::unordered_map<String, Creator>; // name -> creator
using AliasMap = std::unordered_map<String, String>; // alias -> original type
virtual const InnerMap & getCreatorMap() const = 0;
virtual const InnerMap & getCaseInsensitiveCreatorMap() const = 0;
virtual String getFactoryName() const = 0;
/// Alias map to data_types from previous two maps
AliasMap aliases;
/// Case insensitive aliases
AliasMap case_insensitive_aliases;
};
}

View File

@ -51,16 +51,19 @@ DataTypePtr DataTypeFactory::get(const ASTPtr & ast) const
throw Exception("Unexpected AST element for data type.", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
}
DataTypePtr DataTypeFactory::get(const String & family_name, const ASTPtr & parameters) const
DataTypePtr DataTypeFactory::get(const String & family_name_param, const ASTPtr & parameters) const
{
String family_name = getAliasToOrName(family_name_param);
{
DataTypesDictionary::const_iterator it = data_types.find(family_name);
if (data_types.end() != it)
return it->second(parameters);
}
String family_name_lowercase = Poco::toLower(family_name);
{
String family_name_lowercase = Poco::toLower(family_name);
DataTypesDictionary::const_iterator it = case_insensitive_data_types.find(family_name_lowercase);
if (case_insensitive_data_types.end() != it)
return it->second(parameters);
@ -76,11 +79,16 @@ void DataTypeFactory::registerDataType(const String & family_name, Creator creat
throw Exception("DataTypeFactory: the data type family " + family_name + " has been provided "
" a null constructor", ErrorCodes::LOGICAL_ERROR);
String family_name_lowercase = Poco::toLower(family_name);
if (isAlias(family_name) || isAlias(family_name_lowercase))
throw Exception("DataTypeFactory: the data type family name '" + family_name + "' is already registered as alias",
ErrorCodes::LOGICAL_ERROR);
if (!data_types.emplace(family_name, creator).second)
throw Exception("DataTypeFactory: the data type family name '" + family_name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
String family_name_lowercase = Poco::toLower(family_name);
if (case_sensitiveness == CaseInsensitive
&& !case_insensitive_data_types.emplace(family_name_lowercase, creator).second)
@ -88,7 +96,6 @@ void DataTypeFactory::registerDataType(const String & family_name, Creator creat
ErrorCodes::LOGICAL_ERROR);
}
void DataTypeFactory::registerSimpleDataType(const String & name, SimpleCreator creator, CaseSensitiveness case_sensitiveness)
{
if (creator == nullptr)
@ -103,7 +110,6 @@ void DataTypeFactory::registerSimpleDataType(const String & name, SimpleCreator
}, case_sensitiveness);
}
void registerDataTypeNumbers(DataTypeFactory & factory);
void registerDataTypeDate(DataTypeFactory & factory);
void registerDataTypeDateTime(DataTypeFactory & factory);

View File

@ -3,6 +3,7 @@
#include <memory>
#include <functional>
#include <unordered_map>
#include <Common/IFactoryWithAliases.h>
#include <DataTypes/IDataType.h>
#include <ext/singleton.h>
@ -19,10 +20,9 @@ using ASTPtr = std::shared_ptr<IAST>;
/** Creates a data type by name of data type family and parameters.
*/
class DataTypeFactory final : public ext::singleton<DataTypeFactory>
class DataTypeFactory final : public ext::singleton<DataTypeFactory>, public IFactoryWithAliases<std::function<DataTypePtr(const ASTPtr & parameters)>>
{
private:
using Creator = std::function<DataTypePtr(const ASTPtr & parameters)>;
using SimpleCreator = std::function<DataTypePtr()>;
using DataTypesDictionary = std::unordered_map<String, Creator>;
@ -31,24 +31,12 @@ public:
DataTypePtr get(const String & family_name, const ASTPtr & parameters) const;
DataTypePtr get(const ASTPtr & ast) const;
/// For compatibility with SQL, it's possible to specify that certain data type name is case insensitive.
enum CaseSensitiveness
{
CaseSensitive,
CaseInsensitive
};
/// Register a type family by its name.
void registerDataType(const String & family_name, Creator creator, CaseSensitiveness case_sensitiveness = CaseSensitive);
/// Register a simple data type, that have no parameters.
void registerSimpleDataType(const String & name, SimpleCreator creator, CaseSensitiveness case_sensitiveness = CaseSensitive);
const DataTypesDictionary & getAllDataTypes() const
{
return data_types;
}
private:
DataTypesDictionary data_types;
@ -56,6 +44,13 @@ private:
DataTypesDictionary case_insensitive_data_types;
DataTypeFactory();
const DataTypesDictionary & getCreatorMap() const override { return data_types; }
const DataTypesDictionary & getCaseInsensitiveCreatorMap() const override { return case_insensitive_data_types; }
String getFactoryName() const override { return "DataTypeFactory"; }
friend class ext::singleton<DataTypeFactory>;
};

View File

@ -231,7 +231,7 @@ void registerDataTypeFixedString(DataTypeFactory & factory)
factory.registerDataType("FixedString", create);
/// Compatibility alias.
factory.registerDataType("BINARY", create, DataTypeFactory::CaseInsensitive);
factory.registerAlias("BINARY", "FixedString", DataTypeFactory::CaseInsensitive);
}
}

View File

@ -312,16 +312,16 @@ void registerDataTypeString(DataTypeFactory & factory)
/// These synonims are added for compatibility.
factory.registerSimpleDataType("CHAR", creator, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("VARCHAR", creator, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("TEXT", creator, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("TINYTEXT", creator, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("MEDIUMTEXT", creator, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("LONGTEXT", creator, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("BLOB", creator, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("TINYBLOB", creator, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("MEDIUMBLOB", creator, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("LONGBLOB", creator, DataTypeFactory::CaseInsensitive);
factory.registerAlias("CHAR", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("VARCHAR", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("TEXT", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("TINYTEXT", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("MEDIUMTEXT", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("LONGTEXT", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BLOB", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("TINYBLOB", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("MEDIUMBLOB", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("LONGBLOB", "String", DataTypeFactory::CaseInsensitive);
}
}

View File

@ -22,13 +22,13 @@ void registerDataTypeNumbers(DataTypeFactory & factory)
/// These synonims are added for compatibility.
factory.registerSimpleDataType("TINYINT", [] { return DataTypePtr(std::make_shared<DataTypeInt8>()); }, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("SMALLINT", [] { return DataTypePtr(std::make_shared<DataTypeInt16>()); }, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("INT", [] { return DataTypePtr(std::make_shared<DataTypeInt32>()); }, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("INTEGER", [] { return DataTypePtr(std::make_shared<DataTypeInt32>()); }, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("BIGINT", [] { return DataTypePtr(std::make_shared<DataTypeInt64>()); }, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("FLOAT", [] { return DataTypePtr(std::make_shared<DataTypeFloat32>()); }, DataTypeFactory::CaseInsensitive);
factory.registerSimpleDataType("DOUBLE", [] { return DataTypePtr(std::make_shared<DataTypeFloat64>()); }, DataTypeFactory::CaseInsensitive);
factory.registerAlias("TINYINT", "Int8", DataTypeFactory::CaseInsensitive);
factory.registerAlias("SMALLINT", "Int16", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INT", "Int32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INTEGER", "Int32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BIGINT", "Int64", DataTypeFactory::CaseInsensitive);
factory.registerAlias("FLOAT", "Float32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("DOUBLE", "Float64", DataTypeFactory::CaseInsensitive);
}
}

View File

@ -41,7 +41,6 @@ void RegionsHierarchy::reload()
RegionID max_region_id = 0;
auto regions_reader = data_source->createReader();
RegionEntry region_entry;

View File

@ -1,5 +1,6 @@
#include <IO/ReadHelpers.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/TokenIterator.h>
#include <Parsers/ExpressionListParsers.h>
@ -29,7 +30,7 @@ namespace ErrorCodes
ValuesRowInputStream::ValuesRowInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, const FormatSettings & format_settings)
: istr(istr_), header(header_), context(context_), format_settings(format_settings)
: istr(istr_), header(header_), context(std::make_unique<Context>(context_)), format_settings(format_settings)
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(istr);
@ -112,7 +113,7 @@ bool ValuesRowInputStream::read(MutableColumns & columns)
istr.position() = const_cast<char *>(token_iterator->begin);
std::pair<Field, DataTypePtr> value_raw = evaluateConstantExpression(ast, context);
std::pair<Field, DataTypePtr> value_raw = evaluateConstantExpression(ast, *context);
Field value = convertFieldToType(value_raw.first, type, value_raw.second.get());
if (value.isNull())

View File

@ -28,7 +28,7 @@ public:
private:
ReadBuffer & istr;
Block header;
const Context & context;
std::unique_ptr<Context> context; /// pimpl
const FormatSettings format_settings;
};

View File

@ -6,7 +6,6 @@
#include <Poco/String.h>
namespace DB
{
@ -26,8 +25,13 @@ void FunctionFactory::registerFunction(const
throw Exception("FunctionFactory: the function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
String function_name_lowercase = Poco::toLower(name);
if (isAlias(name) || isAlias(function_name_lowercase))
throw Exception("FunctionFactory: the function name '" + name + "' is already registered as alias",
ErrorCodes::LOGICAL_ERROR);
if (case_sensitiveness == CaseInsensitive
&& !case_insensitive_functions.emplace(Poco::toLower(name), creator).second)
&& !case_insensitive_functions.emplace(function_name_lowercase, creator).second)
throw Exception("FunctionFactory: the case insensitive function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
}
@ -45,9 +49,11 @@ FunctionBuilderPtr FunctionFactory::get(
FunctionBuilderPtr FunctionFactory::tryGet(
const std::string & name,
const std::string & name_param,
const Context & context) const
{
String name = getAliasToOrName(name_param);
auto it = functions.find(name);
if (functions.end() != it)
return it->second(context);

View File

@ -1,6 +1,7 @@
#pragma once
#include <Functions/IFunction.h>
#include <Common/IFactoryWithAliases.h>
#include <ext/singleton.h>
@ -20,19 +21,9 @@ class Context;
* Function could use for initialization (take ownership of shared_ptr, for example)
* some dictionaries from Context.
*/
class FunctionFactory : public ext::singleton<FunctionFactory>
class FunctionFactory : public ext::singleton<FunctionFactory>, public IFactoryWithAliases<std::function<FunctionBuilderPtr(const Context &)>>
{
friend class StorageSystemFunctions;
public:
using Creator = std::function<FunctionBuilderPtr(const Context &)>;
/// For compatibility with SQL, it's possible to specify that certain function name is case insensitive.
enum CaseSensitiveness
{
CaseSensitive,
CaseInsensitive
};
template <typename Function>
void registerFunction(CaseSensitiveness case_sensitiveness = CaseSensitive)
@ -67,6 +58,12 @@ private:
return std::make_shared<DefaultFunctionBuilder>(Function::create(context));
}
const Functions & getCreatorMap() const override { return functions; }
const Functions & getCaseInsensitiveCreatorMap() const override { return case_insensitive_functions; }
String getFactoryName() const override { return "FunctionFactory"; }
/// Register a function by its name.
/// No locking, you must register all functions before usage of get.
void registerFunction(

View File

@ -1286,12 +1286,12 @@ DataTypePtr FunctionArrayDistinct::getReturnTypeImpl(const DataTypes & arguments
{
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[0].get());
if (!array_type)
throw Exception("Argument for function " + getName() + " must be array but it "
throw Exception("Argument for function " + getName() + " must be array but it "
" has type " + arguments[0]->getName() + ".",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto nested_type = removeNullable(array_type->getNestedType());
return std::make_shared<DataTypeArray>(nested_type);
}
@ -1307,7 +1307,7 @@ void FunctionArrayDistinct::executeImpl(Block & block, const ColumnNumbers & arg
const IColumn & src_data = array->getData();
const ColumnArray::Offsets & offsets = array->getOffsets();
ColumnRawPtrs original_data_columns;
original_data_columns.push_back(&src_data);
@ -1416,7 +1416,7 @@ bool FunctionArrayDistinct::executeString(
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
const PaddedPODArray<UInt8> * src_null_map = nullptr;
if (nullable_col)
{
src_null_map = &static_cast<const ColumnUInt8 *>(&nullable_col->getNullMapColumn())->getData();
@ -1471,7 +1471,7 @@ void FunctionArrayDistinct::executeHashed(
res_data_col.insertFrom(*columns[0], j);
}
}
res_offsets.emplace_back(set.size() + prev_off);
prev_off = off;
}

View File

@ -1011,10 +1011,11 @@ public:
DataTypePtr observed_type0 = removeNullable(array_type->getNestedType());
DataTypePtr observed_type1 = removeNullable(arguments[1]);
if (!(observed_type0->isNumber() && observed_type1->isNumber())
/// We also support arrays of Enum type (that are represented by number) to search numeric values.
if (!(observed_type0->isValueRepresentedByNumber() && observed_type1->isNumber())
&& !observed_type0->equals(*observed_type1))
throw Exception("Types of array and 2nd argument of function "
+ getName() + " must be identical up to nullability. Passed: "
+ getName() + " must be identical up to nullability or numeric types or Enum and numeric type. Passed: "
+ arguments[0]->getName() + " and " + arguments[1]->getName() + ".",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
@ -1249,7 +1250,7 @@ private:
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
const ColumnNullable * nullable_col);
void executeHashed(
const ColumnArray::Offsets & offsets,
const ColumnRawPtrs & columns,

View File

@ -16,8 +16,8 @@ void registerFunctionsRound(FunctionFactory & factory)
factory.registerFunction<FunctionTrunc>("trunc", FunctionFactory::CaseInsensitive);
/// Compatibility aliases.
factory.registerFunction<FunctionCeil>("ceiling", FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionTrunc>("truncate", FunctionFactory::CaseInsensitive);
factory.registerAlias("ceiling", "ceil", FunctionFactory::CaseInsensitive);
factory.registerAlias("truncate", "trunc", FunctionFactory::CaseInsensitive);
}
}

View File

@ -18,6 +18,7 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
const std::string & method_,
OutStreamCallback out_stream_callback,
const ConnectionTimeouts & timeouts,
const Poco::Net::HTTPBasicCredentials & credentials,
size_t buffer_size_)
: ReadBuffer(nullptr, 0),
uri{uri},
@ -30,6 +31,9 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
if (!credentials.getUsername().empty())
credentials.authenticate(request);
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());

View File

@ -1,6 +1,7 @@
#pragma once
#include <functional>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/URI.h>
#include <IO/ReadBuffer.h>
@ -32,6 +33,7 @@ public:
const std::string & method = {},
OutStreamCallback out_stream_callback = {},
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override;

View File

@ -6,6 +6,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <common/logger_useful.h>
@ -28,13 +29,26 @@ namespace ClusterProxy
{
SelectStreamFactory::SelectStreamFactory(
const Block & header,
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Tables & external_tables_)
: header(header),
: header(header_),
processed_stage{processed_stage_},
main_table(std::move(main_table_)),
table_func_ptr{nullptr},
external_tables{external_tables_}
{
}
SelectStreamFactory::SelectStreamFactory(
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
ASTPtr table_func_ptr_,
const Tables & external_tables_)
: header(header_),
processed_stage{processed_stage_},
table_func_ptr{table_func_ptr_},
external_tables{external_tables_}
{
}
@ -71,13 +85,24 @@ void SelectStreamFactory::createForShard(
{
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
stream->setMainTable(main_table);
if (!table_func_ptr)
stream->setMainTable(main_table);
res.emplace_back(std::move(stream));
};
if (shard_info.isLocal())
{
StoragePtr main_table_storage = context.tryGetTable(main_table.database, main_table.table);
StoragePtr main_table_storage;
if (table_func_ptr)
{
auto table_function = static_cast<ASTFunction *>(table_func_ptr.get());
main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context);
}
else
main_table_storage = context.tryGetTable(main_table.database, main_table.table);
if (!main_table_storage) /// Table is absent on a local server.
{
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
@ -158,14 +183,17 @@ void SelectStreamFactory::createForShard(
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,
main_table = main_table, external_tables = external_tables, stage = processed_stage,
main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage,
local_delay]()
-> BlockInputStreamPtr
{
std::vector<ConnectionPoolWithFailover::TryResult> try_results;
try
{
try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
if (table_func_ptr)
try_results = pool->getManyForTableFunction(&context.getSettingsRef(), PoolMode::GET_MANY);
else
try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
}
catch (const Exception & ex)
{

View File

@ -13,11 +13,19 @@ namespace ClusterProxy
class SelectStreamFactory final : public IStreamFactory
{
public:
/// Database in a query.
SelectStreamFactory(
const Block & header,
QueryProcessingStage::Enum processed_stage,
QualifiedTableName main_table,
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Tables & external_tables);
/// TableFunction in a query.
SelectStreamFactory(
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
ASTPtr table_func_ptr_,
const Tables & external_tables_);
void createForShard(
const Cluster::ShardInfo & shard_info,
@ -29,6 +37,7 @@ private:
const Block header;
QueryProcessingStage::Enum processed_stage;
QualifiedTableName main_table;
ASTPtr table_func_ptr;
Tables external_tables;
};

View File

@ -109,6 +109,8 @@ struct ContextShared
String interserver_io_host; /// The host name by which this server is available for other servers.
UInt16 interserver_io_port = 0; /// and port.
String interserver_io_user;
String interserver_io_password;
String path; /// Path to the data directory, with a slash at the end.
String tmp_path; /// The path to the temporary files that occur when processing the request.
@ -1378,6 +1380,17 @@ void Context::setInterserverIOAddress(const String & host, UInt16 port)
shared->interserver_io_port = port;
}
void Context::setInterverserCredentials(const String & user, const String & password)
{
shared->interserver_io_user = user;
shared->interserver_io_password = password;
}
std::pair<String, String> Context::getInterserverCredentials() const
{
return { shared->interserver_io_user, shared->interserver_io_password };
}
std::pair<String, UInt16> Context::getInterserverIOAddress() const
{

View File

@ -249,6 +249,11 @@ public:
/// How other servers can access this for downloading replicated data.
void setInterserverIOAddress(const String & host, UInt16 port);
std::pair<String, UInt16> getInterserverIOAddress() const;
// Credentials which server will use to communicate with others
void setInterverserCredentials(const String & user, const String & password);
std::pair<String, String> getInterserverCredentials() const;
/// The port that the server listens for executing SQL queries.
UInt16 getTCPPort() const;

View File

@ -64,6 +64,7 @@
#include <Parsers/queryToString.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
namespace DB

View File

@ -2,6 +2,7 @@
#include <Parsers/ASTKillQueryQuery.h>
#include <Parsers/queryToString.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Columns/ColumnString.h>
@ -172,6 +173,9 @@ BlockIO InterpreterKillQueryQuery::execute()
{
ASTKillQueryQuery & query = typeid_cast<ASTKillQueryQuery &>(*query_ptr);
if (!query.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, context, {"system"});
BlockIO res_io;
Block processes_block = getSelectFromSystemProcessesResult();
if (!processes_block)

View File

@ -1,6 +1,7 @@
#include <Storages/IStorage.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/InterpreterOptimizeQuery.h>
#include <Common/typeid_cast.h>
@ -18,6 +19,9 @@ BlockIO InterpreterOptimizeQuery::execute()
{
const ASTOptimizeQuery & ast = typeid_cast<const ASTOptimizeQuery &>(*query_ptr);
if (!ast.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, context, {ast.database});
StoragePtr table = context.getTable(ast.database, ast.table);
auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__);
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnsNumber.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ExpressionElementParsers.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
@ -10,6 +11,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Common/typeid_cast.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
@ -52,13 +54,19 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context)
{
/// Branch with string in qery.
if (typeid_cast<const ASTLiteral *>(node.get()))
return node;
/// Branch with TableFunction in query.
if (auto table_func_ptr = typeid_cast<ASTFunction *>(node.get()))
if (TableFunctionFactory::instance().isTableFunctionName(table_func_ptr->name))
return node;
return std::make_shared<ASTLiteral>(evaluateConstantExpression(node, context).first);
}
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context)
{
if (auto id = typeid_cast<const ASTIdentifier *>(node.get()))

View File

@ -8,9 +8,22 @@ String ASTKillQueryQuery::getID() const
return "KillQueryQuery_" + (where_expression ? where_expression->getID() : "") + "_" + String(sync ? "SYNC" : "ASYNC");
}
ASTPtr ASTKillQueryQuery::getRewrittenASTWithoutOnCluster(const std::string & /*new_database*/) const
{
auto query_ptr = clone();
ASTKillQueryQuery & query = static_cast<ASTKillQueryQuery &>(*query_ptr);
query.cluster.clear();
return query_ptr;
}
void ASTKillQueryQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "KILL QUERY WHERE " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << "KILL QUERY ";
formatOnCluster(settings);
settings.ostr << " WHERE " << (settings.hilite ? hilite_none : "");
if (where_expression)
where_expression->formatImpl(settings, state, frame);

View File

@ -1,10 +1,11 @@
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
{
class ASTKillQueryQuery : public ASTQueryWithOutput
class ASTKillQueryQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
{
public:
ASTPtr where_expression; // expression to filter processes from system.processes table
@ -22,6 +23,8 @@ public:
String getID() const override;
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
ASTPtr getRewrittenASTWithoutOnCluster(const std::string &new_database) const override;
};
}

View File

@ -0,0 +1,39 @@
#include <Parsers/ASTOptimizeQuery.h>
namespace DB
{
ASTPtr ASTOptimizeQuery::getRewrittenASTWithoutOnCluster(const std::string & new_database) const
{
auto query_ptr = clone();
ASTOptimizeQuery & query = static_cast<ASTOptimizeQuery &>(*query_ptr);
query.cluster.clear();
if (query.database.empty())
query.database = new_database;
return query_ptr;
}
void ASTOptimizeQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "OPTIMIZE TABLE " << (settings.hilite ? hilite_none : "")
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
formatOnCluster(settings);
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
if (final)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FINAL" << (settings.hilite ? hilite_none : "");
if (deduplicate)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " DEDUPLICATE" << (settings.hilite ? hilite_none : "");
}
}

View File

@ -1,7 +1,8 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
{
@ -9,7 +10,7 @@ namespace DB
/** OPTIMIZE query
*/
class ASTOptimizeQuery : public IAST
class ASTOptimizeQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
{
public:
String database;
@ -23,7 +24,8 @@ public:
bool deduplicate;
/** Get the text that identifies this element. */
String getID() const override { return "OptimizeQuery_" + database + "_" + table + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : ""); }
String getID() const override
{ return "OptimizeQuery_" + database + "_" + table + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : ""); }
ASTPtr clone() const override
{
@ -39,24 +41,10 @@ public:
return res;
}
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "OPTIMIZE TABLE " << (settings.hilite ? hilite_none : "")
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
ASTPtr getRewrittenASTWithoutOnCluster(const std::string &new_database) const override;
if (final)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FINAL" << (settings.hilite ? hilite_none : "");
if (deduplicate)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " DEDUPLICATE" << (settings.hilite ? hilite_none : "");
}
};
}

View File

@ -372,9 +372,9 @@ void ASTSelectQuery::replaceDatabaseAndTable(const String & database_name, const
children.emplace_back(tables_list);
table_expression = table_expr.get();
}
ASTPtr table = std::make_shared<ASTIdentifier>(table_name, ASTIdentifier::Table);
if (!database_name.empty())
{
ASTPtr database = std::make_shared<ASTIdentifier>(database_name, ASTIdentifier::Database);
@ -388,5 +388,27 @@ void ASTSelectQuery::replaceDatabaseAndTable(const String & database_name, const
}
}
void ASTSelectQuery::addTableFunction(ASTPtr & table_function_ptr)
{
ASTTableExpression * table_expression = getFirstTableExpression(*this);
if (!table_expression)
{
auto tables_list = std::make_shared<ASTTablesInSelectQuery>();
auto element = std::make_shared<ASTTablesInSelectQueryElement>();
auto table_expr = std::make_shared<ASTTableExpression>();
element->table_expression = table_expr;
element->children.emplace_back(table_expr);
tables_list->children.emplace_back(element);
tables = tables_list;
children.emplace_back(tables_list);
table_expression = table_expr.get();
}
table_expression->table_function = table_function_ptr;
table_expression->database_and_table_name = nullptr;
}
};

View File

@ -47,6 +47,7 @@ public:
bool final() const;
void setDatabaseIfNeeded(const String & database_name);
void replaceDatabaseAndTable(const String & database_name, const String & table_name);
void addTableFunction(ASTPtr & table_function_ptr);
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -11,29 +11,36 @@ namespace DB
bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
String cluster_str;
auto query = std::make_shared<ASTKillQueryQuery>();
if (!ParserKeyword{"KILL QUERY"}.ignore(pos, expected))
return false;
if (!ParserKeyword{"WHERE"}.ignore(pos, expected))
return false;
ParserKeyword p_on{"ON"};
ParserKeyword p_test{"TEST"};
ParserKeyword p_sync{"SYNC"};
ParserKeyword p_async{"ASYNC"};
ParserKeyword p_where{"WHERE"};
ParserKeyword p_kill_query{"KILL QUERY"};
ParserExpression p_where_expression;
if (!p_where_expression.parse(pos, query->where_expression, expected))
if (!p_kill_query.ignore(pos, expected))
return false;
query->children.emplace_back(query->where_expression);
if (p_on.ignore(pos, expected) && !ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
if (ParserKeyword{"SYNC"}.ignore(pos))
if (p_where.ignore(pos, expected) && !p_where_expression.parse(pos, query->where_expression, expected))
return false;
if (p_sync.ignore(pos, expected))
query->sync = true;
else if (ParserKeyword{"ASYNC"}.ignore(pos))
else if (p_async.ignore(pos, expected))
query->sync = false;
else if (ParserKeyword{"TEST"}.ignore(pos))
else if (p_test.ignore(pos, expected))
query->test = true;
query->cluster = cluster_str;
query->children.emplace_back(query->where_expression);
node = std::move(query);
return true;
}

View File

@ -28,6 +28,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
ASTPtr partition;
bool final = false;
bool deduplicate = false;
String cluster_str;
if (!s_optimize_table.ignore(pos, expected))
return false;
@ -42,6 +43,9 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
return false;
}
if (ParserKeyword{"ON"}.ignore(pos, expected) && !ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
if (s_partition.ignore(pos, expected))
{
if (!partition_p.parse(pos, partition, expected))
@ -61,6 +65,8 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
query->database = typeid_cast<const ASTIdentifier &>(*database).name;
if (table)
query->table = typeid_cast<const ASTIdentifier &>(*table).name;
query->cluster = cluster_str;
query->partition = partition;
query->final = final;
query->deduplicate = deduplicate;

View File

@ -21,14 +21,12 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserInsertQuery insert_p(end);
ParserUseQuery use_p;
ParserSetQuery set_p;
ParserOptimizeQuery optimize_p;
ParserSystemQuery system_p;
bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
|| use_p.parse(pos, node, expected)
|| set_p.parse(pos, node, expected)
|| optimize_p.parse(pos, node, expected)
|| system_p.parse(pos, node, expected);
return res;

View File

@ -10,6 +10,7 @@
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/ParserDropQuery.h>
#include <Parsers/ParserKillQueryQuery.h>
#include <Parsers/ParserOptimizeQuery.h>
namespace DB
@ -27,6 +28,7 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
ParserRenameQuery rename_p;
ParserDropQuery drop_p;
ParserCheckQuery check_p;
ParserOptimizeQuery optimize_p;
ParserKillQueryQuery kill_query_p;
ASTPtr query;
@ -41,7 +43,8 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|| rename_p.parse(pos, query, expected)
|| drop_p.parse(pos, query, expected)
|| check_p.parse(pos, query, expected)
|| kill_query_p.parse(pos, query, expected);
|| kill_query_p.parse(pos, query, expected)
|| optimize_p.parse(pos, query, expected);
if (!parsed)
return false;

View File

@ -39,9 +39,9 @@ public:
*/
void check(const NamesAndTypesList & columns, const Names & column_names) const;
/** Check that the data block for the record contains all the columns of the table with the correct types,
/** Check that the data block contains all the columns of the table with the correct types,
* contains only the columns of the table, and all the columns are different.
* If need_all, still checks that all the columns of the table are in the block.
* If need_all, checks that all the columns of the table are in the block.
*/
void check(const Block & block, bool need_all = false) const;

View File

@ -161,6 +161,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const String & host,
int port,
const ConnectionTimeouts & timeouts,
const String & user,
const String & password,
bool to_detached,
const String & tmp_prefix_)
{
@ -175,7 +177,14 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{"compress", "false"}
});
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts};
Poco::Net::HTTPBasicCredentials creds{};
if (!user.empty())
{
creds.setUsername(user);
creds.setPassword(password);
}
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts, creds};
static const String TMP_PREFIX = "tmp_fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;

View File

@ -54,6 +54,8 @@ public:
const String & host,
int port,
const ConnectionTimeouts & timeouts,
const String & user,
const String & password,
bool to_detached = false,
const String & tmp_prefix_ = "");

View File

@ -139,7 +139,7 @@ struct MergeTreeSettings
* instead of ordinary ones (dozens KB). \
* Before enabling check that all replicas support new format. \
*/ \
M(SettingBool, use_minimalistic_checksums_in_zookeeper, false)
M(SettingBool, use_minimalistic_checksums_in_zookeeper, true)
/// Settings that should not change after the creation of a table.
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \

View File

@ -65,12 +65,15 @@ namespace ErrorCodes
namespace
{
/// select query has database and table names as AST pointers
/// Creates a copy of query, changes database and table names.
ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table)
/// select query has database, table and table function names as AST pointers
/// Creates a copy of query, changes database, table and table function names.
ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table, ASTPtr table_function_ptr = nullptr)
{
auto modified_query_ast = query->clone();
typeid_cast<ASTSelectQuery &>(*modified_query_ast).replaceDatabaseAndTable(database, table);
if (table_function_ptr)
typeid_cast<ASTSelectQuery &>(*modified_query_ast).addTableFunction(table_function_ptr);
else
typeid_cast<ASTSelectQuery &>(*modified_query_ast).replaceDatabaseAndTable(database, table);
return modified_query_ast;
}
@ -170,16 +173,48 @@ StorageDistributed::StorageDistributed(
}
StoragePtr StorageDistributed::createWithOwnCluster(
const std::string & name_,
StorageDistributed::StorageDistributed(
const String & database_name,
const String & table_name_,
const ColumnsDescription & columns_,
const String & remote_database_,
const String & remote_table_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach)
: StorageDistributed(database_name, table_name_, columns_, String{}, String{}, cluster_name_, context_, sharding_key_, data_path_, attach)
{
remote_table_function_ptr = remote_table_function_ptr_;
}
StoragePtr StorageDistributed::createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
ClusterPtr owned_cluster_,
const Context & context_)
{
auto res = ext::shared_ptr_helper<StorageDistributed>::create(
String{}, table_name_, columns_, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
res->owned_cluster = owned_cluster_;
return res;
}
StoragePtr StorageDistributed::createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
ASTPtr & remote_table_function_ptr_,
ClusterPtr & owned_cluster_,
const Context & context_)
{
auto res = ext::shared_ptr_helper<StorageDistributed>::create(
String{}, name_, columns_, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
String{}, table_name_, columns_, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
res->owned_cluster = owned_cluster_;
@ -209,15 +244,19 @@ BlockInputStreams StorageDistributed::read(
processed_stage = result_size == 1
? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table);
query_info.query, remote_database, remote_table, remote_table_function_ptr);
Block header = materializeBlock(InterpreterSelectQuery(query_info.query, context, Names{}, processed_stage).getSampleBlock());
ClusterProxy::SelectStreamFactory select_stream_factory(
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr ?
ClusterProxy::SelectStreamFactory(
header, processed_stage, remote_table_function_ptr, context.getExternalTables())
: ClusterProxy::SelectStreamFactory(
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings);
}

View File

@ -9,6 +9,7 @@
#include <Interpreters/Settings.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTFunction.h>
#include <common/logger_useful.h>
@ -36,8 +37,15 @@ public:
static StoragePtr createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
ClusterPtr owned_cluster_,
const Context & context_);
static StoragePtr createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
ASTPtr & remote_table_function_ptr_, /// Table function ptr.
ClusterPtr & owned_cluster_,
const Context & context_);
@ -101,6 +109,7 @@ public:
String table_name;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
const Context & context;
Logger * log = &Logger::get("StorageDistributed");
@ -146,6 +155,17 @@ protected:
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach);
StorageDistributed(
const String & database_name,
const String & table_name_,
const ColumnsDescription & columns_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach);
};
}

View File

@ -53,6 +53,11 @@ public:
/// No locking, you must register all engines before usage of get.
void registerStorage(const std::string & name, Creator creator);
const auto & getAllStorages() const
{
return storages;
}
private:
using Storages = std::unordered_map<std::string, Creator>;
Storages storages;

View File

@ -62,12 +62,19 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
{
rd_kafka_t * consumer;
rd_kafka_message_t * current;
bool current_pending;
Poco::Logger * log;
size_t read_messages;
char row_delimiter;
bool nextImpl() override
{
reset();
if (current_pending)
{
BufferBase::set(reinterpret_cast<char *>(current->payload), current->len, 0);
current_pending = false;
return true;
}
// Process next buffered message
rd_kafka_message_t * msg = rd_kafka_consumer_poll(consumer, READ_POLL_MS);
@ -88,13 +95,24 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
rd_kafka_message_destroy(msg);
return nextImpl();
}
++read_messages;
// Now we've received a new message. Check if we need to produce a delimiter
if (row_delimiter != '\0' && current != nullptr)
{
BufferBase::set(&row_delimiter, 1, 0);
reset();
current = msg;
current_pending = true;
return true;
}
// Consume message and mark the topic/partition offset
// The offsets will be committed in the insertSuffix() method after the block is completed
// If an exception is thrown before that would occur, the client will rejoin without comitting offsets
BufferBase::set(reinterpret_cast<char *>(msg->payload), msg->len, 0);
// The offsets will be committed in the readSuffix() method after the block is completed
// If an exception is thrown before that would occur, the client will rejoin without committing offsets
reset();
current = msg;
++read_messages;
BufferBase::set(reinterpret_cast<char *>(current->payload), current->len, 0);
return true;
}
@ -108,8 +126,11 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
}
public:
ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_)
: ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr), log(log_), read_messages(0) {}
ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_, char row_delimiter_)
: ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr),
current_pending(false), log(log_), read_messages(0), row_delimiter(row_delimiter_) {
LOG_TRACE(log, "row delimiter is :" << row_delimiter);
}
~ReadBufferFromKafkaConsumer() { reset(); }
@ -143,7 +164,7 @@ public:
// Create a formatted reader on Kafka messages
LOG_TRACE(storage.log, "Creating formatted reader");
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(consumer->stream, storage.log);
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(consumer->stream, storage.log, storage.row_delimiter);
reader = FormatFactory::instance().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size);
}
@ -226,13 +247,14 @@ StorageKafka::StorageKafka(
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, const String & schema_name_, size_t num_consumers_)
const String & format_name_, char row_delimiter_, const String & schema_name_, size_t num_consumers_)
: IStorage{columns_},
table_name(table_name_), database_name(database_name_), context(context_),
topics(context.getMacros()->expand(topics_)),
brokers(context.getMacros()->expand(brokers_)),
group(context.getMacros()->expand(group_)),
format_name(context.getMacros()->expand(format_name_)),
row_delimiter(row_delimiter_),
schema_name(context.getMacros()->expand(schema_name_)),
num_consumers(num_consumers_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
semaphore(0, num_consumers_), mutex(), consumers(), event_update()
@ -552,10 +574,10 @@ void registerStorageKafka(StorageFactory & factory)
* - Schema (optional, if the format supports it)
*/
if (engine_args.size() < 3 || engine_args.size() > 6)
if (engine_args.size() < 3 || engine_args.size() > 7)
throw Exception(
"Storage Kafka requires 3-6 parameters"
" - Kafka broker list, list of topics to consume, consumer group ID, message format, schema, number of consumers",
"Storage Kafka requires 3-7 parameters"
" - Kafka broker list, list of topics to consume, consumer group ID, message format, row delimiter, schema, number of consumers",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String brokers;
@ -569,13 +591,33 @@ void registerStorageKafka(StorageFactory & factory)
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
// Parse format schema if supported (optional)
String schema;
// Parse row delimiter (optional)
char row_delimiter = '\0';
if (engine_args.size() >= 5)
{
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
auto ast = typeid_cast<const ASTLiteral *>(engine_args[4].get());
String arg;
if (ast && ast->value.getType() == Field::Types::String)
arg = safeGet<String>(ast->value);
else
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
if (arg.size() > 1)
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
else if (arg.size() == 0)
row_delimiter = '\0';
else
row_delimiter = arg[0];
}
// Parse format schema if supported (optional)
String schema;
if (engine_args.size() >= 6)
{
engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
auto ast = typeid_cast<const ASTLiteral *>(engine_args[5].get());
if (ast && ast->value.getType() == Field::Types::String)
schema = safeGet<String>(ast->value);
else
@ -584,9 +626,9 @@ void registerStorageKafka(StorageFactory & factory)
// Parse number of consumers (optional)
UInt64 num_consumers = 1;
if (engine_args.size() >= 6)
if (engine_args.size() >= 7)
{
auto ast = typeid_cast<const ASTLiteral *>(engine_args[5].get());
auto ast = typeid_cast<const ASTLiteral *>(engine_args[6].get());
if (ast && ast->value.getType() == Field::Types::UInt64)
num_consumers = safeGet<UInt64>(ast->value);
else
@ -613,7 +655,7 @@ void registerStorageKafka(StorageFactory & factory)
return StorageKafka::create(
args.table_name, args.database_name, args.context, args.columns,
brokers, group, topics, format, schema, num_consumers);
brokers, group, topics, format, row_delimiter, schema, num_consumers);
});
}

View File

@ -75,6 +75,9 @@ private:
const String brokers;
const String group;
const String format_name;
// Optional row delimiter for generating char delimited stream
// in order to make various input stream parsers happy.
char row_delimiter;
const String schema_name;
/// Total number of consumers
size_t num_consumers;
@ -109,7 +112,7 @@ protected:
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, const String & schema_name_, size_t num_consumers_);
const String & format_name_, char row_delimiter_, const String & schema_name_, size_t num_consumers_);
};
}

View File

@ -311,6 +311,7 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
part_data_versions.reserve(data_parts.size());
for (const auto & part : data_parts)
part_data_versions.push_back(part->info.getDataVersion());
std::sort(part_data_versions.begin(), part_data_versions.end());
std::vector<MergeTreeMutationStatus> result;
for (const auto & kv : current_mutations_by_version)

View File

@ -1971,9 +1971,10 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
String replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
auto [user, password] = context.getInterserverCredentials();
part_desc->res_part = fetcher.fetchPart(part_desc->found_new_part_name, replica_path,
address.host, address.replication_port, timeouts, false, TMP_PREFIX + "fetch_");
address.host, address.replication_port, timeouts, user, password, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
@ -2706,10 +2707,11 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
auto [user, password] = context.getInterserverCredentials();
try
{
part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, to_detached);
part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, user, password, to_detached);
if (!to_detached)
{

View File

@ -14,21 +14,15 @@ class Context;
/** Base class for system tables whose all columns have String type.
*/
template <typename Self>
class IStorageSystemWithStringColumns : public IStorage
class IStorageSystemOneBlock : public IStorage
{
protected:
virtual void fillData(MutableColumns & res_columns) const = 0;
virtual void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const = 0;
public:
IStorageSystemWithStringColumns(const String & name_) : name(name_)
IStorageSystemOneBlock(const String & name_) : name(name_)
{
auto names = Self::getColumnNames();
NamesAndTypesList name_list;
for (const auto & name : names)
{
name_list.push_back(NameAndTypePair{name, std::make_shared<DataTypeString>()});
}
setColumns(ColumnsDescription(name_list));
setColumns(ColumnsDescription(Self::getNamesAndTypes()));
}
std::string getTableName() const override
@ -37,8 +31,8 @@ public:
}
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t /*max_block_size*/,
unsigned /*num_streams*/) override
@ -48,7 +42,7 @@ public:
Block sample_block = getSampleBlock();
MutableColumns res_columns = sample_block.cloneEmptyColumns();
fillData(res_columns);
fillData(res_columns, context, query_info);
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(sample_block.cloneWithColumns(std::move(res_columns))));
}

View File

@ -3,12 +3,23 @@
namespace DB
{
void StorageSystemAggregateFunctionCombinators::fillData(MutableColumns & res_columns) const
NamesAndTypesList StorageSystemAggregateFunctionCombinators::getNamesAndTypes()
{
return {
{"name", std::make_shared<DataTypeString>()},
{"is_internal", std::make_shared<DataTypeUInt8>()},
};
}
void StorageSystemAggregateFunctionCombinators::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
const auto & combinators = AggregateFunctionCombinatorFactory::instance().getAllAggregateFunctionCombinators();
for (const auto & pair : combinators)
{
res_columns[0]->insert(pair.first);
res_columns[1]->insert(UInt64(pair.second->isForInternalUsageOnly()));
}
}
}

View File

@ -1,26 +1,25 @@
#pragma once
#include <Storages/System/IStorageSystemWithStringColumns.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/System/IStorageSystemOneBlock.h>
#include <ext/shared_ptr_helper.h>
namespace DB
{
class StorageSystemAggregateFunctionCombinators : public ext::shared_ptr_helper<StorageSystemAggregateFunctionCombinators>,
public IStorageSystemWithStringColumns<StorageSystemAggregateFunctionCombinators>
public IStorageSystemOneBlock<StorageSystemAggregateFunctionCombinators>
{
protected:
void fillData(MutableColumns & res_columns) const override;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
using IStorageSystemOneBlock::IStorageSystemOneBlock;
public:
using IStorageSystemWithStringColumns::IStorageSystemWithStringColumns;
std::string getName() const override
{
return "SystemAggregateFunctionCombinators";
}
static std::vector<String> getColumnNames()
{
return {"name"};
}
static NamesAndTypesList getNamesAndTypes();
};
}

View File

@ -1,51 +1,34 @@
#include <Storages/System/StorageSystemAsynchronousMetrics.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Storages/System/StorageSystemAsynchronousMetrics.h>
namespace DB
{
StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_)
: name(name_),
async_metrics(async_metrics_)
NamesAndTypesList StorageSystemAsynchronousMetrics::getNamesAndTypes()
{
setColumns(ColumnsDescription({
return {
{"metric", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeFloat64>()},
}));
};
}
BlockInputStreams StorageSystemAsynchronousMetrics::read(
const Names & column_names,
const SelectQueryInfo &,
const Context &,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_)
: IStorageSystemOneBlock(name_), async_metrics(async_metrics_)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
}
void StorageSystemAsynchronousMetrics::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
auto async_metrics_values = async_metrics.getValues();
for (const auto & name_value : async_metrics_values)
{
res_columns[0]->insert(name_value.first);
res_columns[1]->insert(name_value.second);
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,8 +1,7 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
@ -13,26 +12,20 @@ class Context;
/** Implements system table asynchronous_metrics, which allows to get values of periodically (asynchronously) updated metrics.
*/
class StorageSystemAsynchronousMetrics : public ext::shared_ptr_helper<StorageSystemAsynchronousMetrics>, public IStorage
class StorageSystemAsynchronousMetrics : public ext::shared_ptr_helper<StorageSystemAsynchronousMetrics>, public IStorageSystemOneBlock<StorageSystemAsynchronousMetrics>
{
public:
std::string getName() const override { return "SystemAsynchronousMetrics"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
static NamesAndTypesList getNamesAndTypes();
private:
const std::string name;
const AsynchronousMetrics & async_metrics;
protected:
StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_);
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -1,46 +1,26 @@
#include <Columns/ColumnString.h>
#include <Common/config_build.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/Settings.h>
#include <Storages/System/StorageSystemBuildOptions.h>
#include <Common/config_build.h>
namespace DB
{
StorageSystemBuildOptions::StorageSystemBuildOptions(const std::string & name_)
: name(name_)
NamesAndTypesList StorageSystemBuildOptions::getNamesAndTypes()
{
setColumns(ColumnsDescription({
{ "name", std::make_shared<DataTypeString>() },
{ "value", std::make_shared<DataTypeString>() },
}));
return {
{"name", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeString>()},
};
}
BlockInputStreams StorageSystemBuildOptions::read(
const Names & column_names,
const SelectQueryInfo &,
const Context &,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
void StorageSystemBuildOptions::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
for (auto it = auto_config_build; *it; it += 2)
{
res_columns[0]->insert(String(it[0]));
res_columns[1]->insert(String(it[1]));
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
@ -12,25 +12,18 @@ class Context;
/** System table "build_options" with many params used for clickhouse building
*/
class StorageSystemBuildOptions : public ext::shared_ptr_helper<StorageSystemBuildOptions>, public IStorage
class StorageSystemBuildOptions : public ext::shared_ptr_helper<StorageSystemBuildOptions>, public IStorageSystemOneBlock<StorageSystemBuildOptions>
{
public:
std::string getName() const override { return "SystemBuildOptions"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
protected:
StorageSystemBuildOptions(const std::string & name_);
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
using IStorageSystemOneBlock::IStorageSystemOneBlock;
public:
std::string getName() const override { return "SystemBuildOptions"; }
static NamesAndTypesList getNamesAndTypes();
};
}

View File

@ -1,50 +1,32 @@
#include <Storages/System/StorageSystemClusters.h>
#include <Interpreters/Cluster.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/DNSResolver.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Common/DNSResolver.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Storages/System/StorageSystemClusters.h>
namespace DB
{
StorageSystemClusters::StorageSystemClusters(const std::string & name_)
: name(name_)
NamesAndTypesList StorageSystemClusters::getNamesAndTypes()
{
setColumns(ColumnsDescription({
{ "cluster", std::make_shared<DataTypeString>() },
{ "shard_num", std::make_shared<DataTypeUInt32>() },
{ "shard_weight", std::make_shared<DataTypeUInt32>() },
{ "replica_num", std::make_shared<DataTypeUInt32>() },
{ "host_name", std::make_shared<DataTypeString>() },
{ "host_address", std::make_shared<DataTypeString>() },
{ "port", std::make_shared<DataTypeUInt16>() },
{ "is_local", std::make_shared<DataTypeUInt8>() },
{ "user", std::make_shared<DataTypeString>() },
{ "default_database", std::make_shared<DataTypeString>() },
}));
return {
{"cluster", std::make_shared<DataTypeString>()},
{"shard_num", std::make_shared<DataTypeUInt32>()},
{"shard_weight", std::make_shared<DataTypeUInt32>()},
{"replica_num", std::make_shared<DataTypeUInt32>()},
{"host_name", std::make_shared<DataTypeString>()},
{"host_address", std::make_shared<DataTypeString>()},
{"port", std::make_shared<DataTypeUInt16>()},
{"is_local", std::make_shared<DataTypeUInt8>()},
{"user", std::make_shared<DataTypeString>()},
{"default_database", std::make_shared<DataTypeString>()},
};
}
BlockInputStreams StorageSystemClusters::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
void StorageSystemClusters::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
auto updateColumns = [&](const std::string & cluster_name, const Cluster::ShardInfo & shard_info,
const Cluster::Address & address)
auto updateColumns = [&](const std::string & cluster_name, const Cluster::ShardInfo & shard_info, const Cluster::Address & address)
{
size_t i = 0;
res_columns[i++]->insert(cluster_name);
@ -85,8 +67,5 @@ BlockInputStreams StorageSystemClusters::read(
}
}
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,7 +1,9 @@
#pragma once
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
@ -13,25 +15,17 @@ class Context;
* that allows to obtain information about available clusters
* (which may be specified in Distributed tables).
*/
class StorageSystemClusters : public ext::shared_ptr_helper<StorageSystemClusters>, public IStorage
class StorageSystemClusters : public ext::shared_ptr_helper<StorageSystemClusters>, public IStorageSystemOneBlock<StorageSystemClusters>
{
public:
std::string getName() const override { return "SystemClusters"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemClusters(const std::string & name_);
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -3,11 +3,18 @@
namespace DB
{
void StorageSystemCollations::fillData(MutableColumns & res_columns) const
NamesAndTypesList StorageSystemCollations::getNamesAndTypes()
{
for (const auto & collation : Collator::getAvailableCollations())
{
res_columns[0]->insert(collation);
}
return {
{"name", std::make_shared<DataTypeString>()},
};
}
void StorageSystemCollations::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
for (const auto & collation_name : Collator::getAvailableCollations())
res_columns[0]->insert(collation_name);
}
}

View File

@ -1,26 +1,22 @@
#pragma once
#include <Storages/System/IStorageSystemWithStringColumns.h>
#include <Storages/System/IStorageSystemOneBlock.h>
#include <ext/shared_ptr_helper.h>
namespace DB
{
class StorageSystemCollations : public ext::shared_ptr_helper<StorageSystemCollations>,
public IStorageSystemWithStringColumns<StorageSystemCollations>
public IStorageSystemOneBlock<StorageSystemCollations>
{
protected:
void fillData(MutableColumns & res_columns) const override;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
using IStorageSystemOneBlock::IStorageSystemOneBlock;
public:
using IStorageSystemWithStringColumns::IStorageSystemWithStringColumns;
std::string getName() const override
{
return "SystemTableCollations";
}
std::string getName() const override { return "SystemTableCollations"; }
static std::vector<String> getColumnNames()
{
return {"name"};
}
static NamesAndTypesList getNamesAndTypes();
};
}

View File

@ -15,10 +15,9 @@
namespace DB
{
StorageSystemColumns::StorageSystemColumns(const std::string & name_)
: name(name_)
NamesAndTypesList StorageSystemColumns::getNamesAndTypes()
{
setColumns(ColumnsDescription({
return {
{ "database", std::make_shared<DataTypeString>() },
{ "table", std::make_shared<DataTypeString>() },
{ "name", std::make_shared<DataTypeString>() },
@ -28,21 +27,11 @@ StorageSystemColumns::StorageSystemColumns(const std::string & name_)
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "data_uncompressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "marks_bytes", std::make_shared<DataTypeUInt64>() },
}));
};
}
BlockInputStreams StorageSystemColumns::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
void StorageSystemColumns::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Block block_to_filter;
std::map<std::pair<std::string, std::string>, StoragePtr> storages;
@ -60,7 +49,7 @@ BlockInputStreams StorageSystemColumns::read(
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
if (!block_to_filter.rows())
return BlockInputStreams();
return;
ColumnPtr database_column = block_to_filter.getByName("database").column;
size_t rows = database_column->size();
@ -98,14 +87,12 @@ BlockInputStreams StorageSystemColumns::read(
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
if (!block_to_filter.rows())
return BlockInputStreams();
return;
ColumnPtr filtered_database_column = block_to_filter.getByName("database").column;
ColumnPtr filtered_table_column = block_to_filter.getByName("table").column;
/// We compose the result.
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
size_t rows = filtered_database_column->size();
for (size_t i = 0; i < rows; ++i)
{
@ -193,8 +180,6 @@ BlockInputStreams StorageSystemColumns::read(
}
}
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
@ -11,25 +11,17 @@ class Context;
/** Implements system table 'columns', that allows to get information about columns for every table.
*/
class StorageSystemColumns : public ext::shared_ptr_helper<StorageSystemColumns>, public IStorage
class StorageSystemColumns : public ext::shared_ptr_helper<StorageSystemColumns>, public IStorageSystemOneBlock<StorageSystemColumns>
{
public:
std::string getName() const override { return "SystemColumns"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemColumns(const std::string & name_);
using IStorageSystemOneBlock::IStorageSystemOneBlock;
private:
const std::string name;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -1,91 +1,35 @@
#include <Core/Field.h>
#include <DataTypes/DataTypeFactory.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/System/StorageSystemDataTypeFamilies.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <sstream>
namespace DB
{
namespace
NamesAndTypesList StorageSystemDataTypeFamilies::getNamesAndTypes()
{
String getPropertiesAsString(const DataTypePtr data_type)
{
std::vector<std::string> properties;
if (data_type->isParametric())
properties.push_back("parametric");
if (data_type->haveSubtypes())
properties.push_back("have_subtypes");
if (data_type->cannotBeStoredInTables())
properties.push_back("cannot_be_stored_in_tables");
if (data_type->isComparable())
properties.push_back("comparable");
if (data_type->canBeComparedWithCollation())
properties.push_back("can_be_compared_with_collation");
if (data_type->canBeUsedAsVersion())
properties.push_back("can_be_used_as_version");
if (data_type->isSummable())
properties.push_back("summable");
if (data_type->canBeUsedInBitOperations())
properties.push_back("can_be_used_in_bit_operations");
if (data_type->canBeUsedInBooleanContext())
properties.push_back("can_be_used_in_boolean_context");
if (data_type->isValueRepresentedByNumber())
properties.push_back("value_represented_by_number");
if (data_type->isCategorial())
properties.push_back("categorial");
if (data_type->isNullable())
properties.push_back("nullable");
if (data_type->onlyNull())
properties.push_back("only_null");
if (data_type->canBeInsideNullable())
properties.push_back("can_be_inside_nullable");
return boost::algorithm::join(properties, ",");
}
ASTPtr createFakeEnumCreationAst()
{
String fakename{"e"};
ASTPtr name = std::make_shared<ASTLiteral>(Field(fakename.c_str(), fakename.size()));
ASTPtr value = std::make_shared<ASTLiteral>(Field(UInt64(1)));
ASTPtr ast_func = makeASTFunction("equals", name, value);
ASTPtr clone = ast_func->clone();
clone->children.clear();
clone->children.push_back(ast_func);
return clone;
}
return {
{"name", std::make_shared<DataTypeString>()},
{"case_insensitive", std::make_shared<DataTypeUInt8>()},
{"alias_to", std::make_shared<DataTypeString>()},
};
}
void StorageSystemDataTypeFamilies::fillData(MutableColumns & res_columns) const
void StorageSystemDataTypeFamilies::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
const auto & factory = DataTypeFactory::instance();
const auto & data_types = factory.getAllDataTypes();
for (const auto & pair : data_types)
auto names = factory.getAllRegisteredNames();
for (const auto & name : names)
{
res_columns[0]->insert(pair.first);
res_columns[0]->insert(name);
res_columns[1]->insert(UInt64(factory.isCaseInsensitive(name)));
try
{
DataTypePtr type_ptr;
//special case with enum, because it has arguments but it's properties doesn't
//depend on arguments
if (boost::starts_with(pair.first, "Enum"))
{
type_ptr = factory.get(pair.first, createFakeEnumCreationAst());
}
else
{
type_ptr = factory.get(pair.first);
}
res_columns[1]->insert(getPropertiesAsString(type_ptr));
}
catch (Exception & ex)
{
res_columns[1]->insert(String{"depends_on_arguments"});
}
if (factory.isAlias(name))
res_columns[2]->insert(factory.aliasTo(name));
else
res_columns[2]->insert(String(""));
}
}
}

View File

@ -1,25 +1,23 @@
#pragma once
#include <Storages/System/IStorageSystemWithStringColumns.h>
#include <ext/shared_ptr_helper.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class StorageSystemDataTypeFamilies : public ext::shared_ptr_helper<StorageSystemDataTypeFamilies>,
public IStorageSystemWithStringColumns<StorageSystemDataTypeFamilies>
public IStorageSystemOneBlock<StorageSystemDataTypeFamilies>
{
protected:
void fillData(MutableColumns & res_columns) const override;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
using IStorageSystemOneBlock::IStorageSystemOneBlock;
public:
using IStorageSystemWithStringColumns::IStorageSystemWithStringColumns;
std::string getName() const override { return "SystemTableDataTypeFamilies"; }
std::string getName() const override
{
return "SystemTableDataTypeFamilies";
}
static std::vector<String> getColumnNames()
{
return {"name", "properties"};
}
static NamesAndTypesList getNamesAndTypes();
};
}

View File

@ -1,40 +1,24 @@
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Databases/IDatabase.h>
#include <Storages/System/StorageSystemDatabases.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/Context.h>
#include <Storages/System/StorageSystemDatabases.h>
namespace DB
{
StorageSystemDatabases::StorageSystemDatabases(const std::string & name_)
: name(name_)
NamesAndTypesList StorageSystemDatabases::getNamesAndTypes()
{
setColumns(ColumnsDescription({
return {
{"name", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"data_path", std::make_shared<DataTypeString>()},
{"metadata_path", std::make_shared<DataTypeString>()},
}));
};
}
BlockInputStreams StorageSystemDatabases::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
void StorageSystemDatabases::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
auto databases = context.getDatabases();
for (const auto & database : databases)
{
@ -43,9 +27,6 @@ BlockInputStreams StorageSystemDatabases::read(
res_columns[2]->insert(database.second->getDataPath());
res_columns[3]->insert(database.second->getMetadataPath());
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
@ -12,25 +12,20 @@ class Context;
/** Implements `databases` system table, which allows you to get information about all databases.
*/
class StorageSystemDatabases : public ext::shared_ptr_helper<StorageSystemDatabases>, public IStorage
class StorageSystemDatabases : public ext::shared_ptr_helper<StorageSystemDatabases>, public IStorageSystemOneBlock<StorageSystemDatabases>
{
public:
std::string getName() const override { return "SystemDatabases"; }
std::string getTableName() const override { return name; }
std::string getName() const override
{
return "SystemDatabases";
}
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemDatabases(const std::string & name_);
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const override;
};
}

View File

@ -1,27 +1,23 @@
#include <Storages/System/StorageSystemDictionaries.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionaries.h>
#include <Storages/System/StorageSystemDictionaries.h>
#include <ext/map.h>
#include <mutex>
namespace DB
{
StorageSystemDictionaries::StorageSystemDictionaries(const std::string & name)
: name{name}
NamesAndTypesList StorageSystemDictionaries::getNamesAndTypes()
{
setColumns(ColumnsDescription({
return {
{ "name", std::make_shared<DataTypeString>() },
{ "origin", std::make_shared<DataTypeString>() },
{ "type", std::make_shared<DataTypeString>() },
@ -36,27 +32,14 @@ StorageSystemDictionaries::StorageSystemDictionaries(const std::string & name)
{ "creation_time", std::make_shared<DataTypeDateTime>() },
{ "source", std::make_shared<DataTypeString>() },
{ "last_exception", std::make_shared<DataTypeString>() },
}));
};
}
BlockInputStreams StorageSystemDictionaries::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t,
const unsigned)
void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
const auto & external_dictionaries = context.getExternalDictionaries();
auto objects_map = external_dictionaries.getObjectsMap();
const auto & dictionaries = objects_map.get();
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
for (const auto & dict_info : dictionaries)
{
size_t i = 0;
@ -102,8 +85,6 @@ BlockInputStreams StorageSystemDictionaries::read(
else
res_columns[i++]->insertDefault();
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
@ -10,25 +10,17 @@ namespace DB
class Context;
class StorageSystemDictionaries : public ext::shared_ptr_helper<StorageSystemDictionaries>, public IStorage
class StorageSystemDictionaries : public ext::shared_ptr_helper<StorageSystemDictionaries>, public IStorageSystemOneBlock<StorageSystemDictionaries>
{
public:
std::string getName() const override { return "SystemDictionaries"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemDictionaries(const std::string & name);
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -1,39 +1,21 @@
#include <Common/ProfileEvents.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemEvents.h>
namespace DB
{
StorageSystemEvents::StorageSystemEvents(const std::string & name_)
: name(name_)
NamesAndTypesList StorageSystemEvents::getNamesAndTypes()
{
setColumns(ColumnsDescription(
{
return {
{"event", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeUInt64>()},
}));
};
}
BlockInputStreams StorageSystemEvents::read(
const Names & column_names,
const SelectQueryInfo &,
const Context &,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
void StorageSystemEvents::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
UInt64 value = ProfileEvents::counters[i];
@ -44,9 +26,6 @@ BlockInputStreams StorageSystemEvents::read(
res_columns[1]->insert(value);
}
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,8 +1,7 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
@ -12,25 +11,17 @@ class Context;
/** Implements `events` system table, which allows you to obtain information for profiling.
*/
class StorageSystemEvents : public ext::shared_ptr_helper<StorageSystemEvents>, public IStorage
class StorageSystemEvents : public ext::shared_ptr_helper<StorageSystemEvents>, public IStorageSystemOneBlock<StorageSystemEvents>
{
public:
std::string getName() const override { return "SystemEvents"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemEvents(const std::string & name_);
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -1,30 +1,32 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/FormatFactory.h>
#include <Storages/System/StorageSystemFormats.h>
namespace DB
{
void StorageSystemFormats::fillData(MutableColumns & res_columns) const
NamesAndTypesList StorageSystemFormats::getNamesAndTypes()
{
return {
{"name", std::make_shared<DataTypeString>()},
{"is_input", std::make_shared<DataTypeUInt8>()},
{"is_output", std::make_shared<DataTypeUInt8>()},
};
}
void StorageSystemFormats::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
const auto & formats = FormatFactory::instance().getAllFormats();
for (const auto & pair : formats)
{
const auto & [name, creator_pair] = pair;
bool has_input_format = (creator_pair.first != nullptr);
bool has_output_format = (creator_pair.second != nullptr);
UInt64 has_input_format(creator_pair.first != nullptr);
UInt64 has_output_format(creator_pair.second != nullptr);
res_columns[0]->insert(name);
std::string format_type;
if (has_input_format)
format_type = "input";
if (has_output_format)
{
if (!format_type.empty())
format_type += "/output";
else
format_type = "output";
}
res_columns[1]->insert(format_type);
res_columns[1]->insert(has_input_format);
res_columns[2]->insert(has_output_format);
}
}
}

View File

@ -1,26 +1,23 @@
#pragma once
#include <Storages/System/IStorageSystemWithStringColumns.h>
#include <Storages/System/IStorageSystemOneBlock.h>
#include <ext/shared_ptr_helper.h>
namespace DB
{
class StorageSystemFormats : public ext::shared_ptr_helper<StorageSystemFormats>, public IStorageSystemWithStringColumns<StorageSystemFormats>
class StorageSystemFormats : public ext::shared_ptr_helper<StorageSystemFormats>, public IStorageSystemOneBlock<StorageSystemFormats>
{
protected:
void fillData(MutableColumns & res_columns) const override;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
using IStorageSystemOneBlock::IStorageSystemOneBlock;
public:
using IStorageSystemWithStringColumns::IStorageSystemWithStringColumns;
std::string getName() const override
{
return "SystemFormats";
}
static std::vector<String> getColumnNames()
{
return {"name", "type"};
}
static NamesAndTypesList getNamesAndTypes();
};
}

View File

@ -1,56 +1,53 @@
#include <Storages/System/StorageSystemFunctions.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <Storages/System/StorageSystemFunctions.h>
namespace DB
{
StorageSystemFunctions::StorageSystemFunctions(const std::string & name_)
: name(name_)
namespace
{
setColumns(ColumnsDescription({
{ "name", std::make_shared<DataTypeString>() },
{ "is_aggregate", std::make_shared<DataTypeUInt8>() },
}));
template <typename Factory>
void fillRow(MutableColumns & res_columns, const String & name, UInt64 is_aggregate, const Factory & f)
{
res_columns[0]->insert(name);
res_columns[1]->insert(is_aggregate);
res_columns[2]->insert(UInt64(f.isCaseInsensitive(name)));
if (f.isAlias(name))
res_columns[3]->insert(f.aliasTo(name));
else
res_columns[3]->insert(String{});
}
}
BlockInputStreams StorageSystemFunctions::read(
const Names & column_names,
const SelectQueryInfo &,
const Context &,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
NamesAndTypesList StorageSystemFunctions::getNamesAndTypes()
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
return {
{"name", std::make_shared<DataTypeString>()},
{"is_aggregate", std::make_shared<DataTypeUInt8>()},
{"case_insensitive", std::make_shared<DataTypeUInt8>()},
{"alias_to", std::make_shared<DataTypeString>()},
};
}
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
const auto & functions = FunctionFactory::instance().functions;
for (const auto & it : functions)
void StorageSystemFunctions::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
const auto & functions_factory = FunctionFactory::instance();
const auto & function_names = functions_factory.getAllRegisteredNames();
for (const auto & name : function_names)
{
res_columns[0]->insert(it.first);
res_columns[1]->insert(UInt64(0));
fillRow(res_columns, name, UInt64(0), functions_factory);
}
const auto & aggregate_functions = AggregateFunctionFactory::instance().aggregate_functions;
for (const auto & it : aggregate_functions)
const auto & aggregate_functions_factory = AggregateFunctionFactory::instance();
const auto & aggregate_function_names = aggregate_functions_factory.getAllRegisteredNames();
for (const auto & name : aggregate_function_names)
{
res_columns[0]->insert(it.first);
res_columns[1]->insert(UInt64(1));
fillRow(res_columns, name, UInt64(1), aggregate_functions_factory);
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
@ -13,25 +13,17 @@ class Context;
/** Implements `functions`system table, which allows you to get a list
* all normal and aggregate functions.
*/
class StorageSystemFunctions : public ext::shared_ptr_helper<StorageSystemFunctions>, public IStorage
class StorageSystemFunctions : public ext::shared_ptr_helper<StorageSystemFunctions>, public IStorageSystemOneBlock<StorageSystemFunctions>
{
public:
std::string getName() const override { return "SystemFunctions"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemFunctions(const std::string & name_);
using IStorageSystemOneBlock::IStorageSystemOneBlock;
private:
const std::string name;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -124,10 +124,9 @@ static Strings getAllGraphiteSections(const AbstractConfiguration & config)
} // namespace
StorageSystemGraphite::StorageSystemGraphite(const std::string & name_)
: name(name_)
NamesAndTypesList StorageSystemGraphite::getNamesAndTypes()
{
setColumns(ColumnsDescription({
return {
{"config_name", std::make_shared<DataTypeString>()},
{"regexp", std::make_shared<DataTypeString>()},
{"function", std::make_shared<DataTypeString>()},
@ -135,23 +134,12 @@ StorageSystemGraphite::StorageSystemGraphite(const std::string & name_)
{"precision", std::make_shared<DataTypeUInt64>()},
{"priority", std::make_shared<DataTypeUInt16>()},
{"is_default", std::make_shared<DataTypeUInt8>()},
}));
};
}
BlockInputStreams StorageSystemGraphite::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
void StorageSystemGraphite::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
const auto & config = context.getConfigRef();
Strings sections = getAllGraphiteSections(config);
@ -172,8 +160,6 @@ BlockInputStreams StorageSystemGraphite::read(
}
}
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,31 +1,24 @@
#pragma once
#include <Storages/IStorage.h>
#include <DataTypes/DataTypeString.h>
#include <Storages/System/IStorageSystemOneBlock.h>
#include <ext/shared_ptr_helper.h>
namespace DB
{
/// Provides information about Graphite configuration.
class StorageSystemGraphite : public ext::shared_ptr_helper<StorageSystemGraphite>, public IStorage
class StorageSystemGraphite : public ext::shared_ptr_helper<StorageSystemGraphite>, public IStorageSystemOneBlock<StorageSystemGraphite>
{
public:
std::string getName() const override { return "SystemGraphite"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemGraphite(const std::string & name_);
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -1,38 +1,21 @@
#include <Common/Macros.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemMacros.h>
#include <Interpreters/Context.h>
#include <Storages/System/StorageSystemMacros.h>
namespace DB
{
StorageSystemMacros::StorageSystemMacros(const std::string & name_)
: name(name_)
NamesAndTypesList StorageSystemMacros::getNamesAndTypes()
{
setColumns(ColumnsDescription({
{"macro", std::make_shared<DataTypeString>()},
{"substitution", std::make_shared<DataTypeString>()},
}));
return {
{"macro", std::make_shared<DataTypeString>()},
{"substitution", std::make_shared<DataTypeString>()},
};
}
BlockInputStreams StorageSystemMacros::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
void StorageSystemMacros::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
auto macros = context.getMacros();
for (const auto & macro : macros->getMacroMap())
@ -40,9 +23,6 @@ BlockInputStreams StorageSystemMacros::read(
res_columns[0]->insert(macro.first);
res_columns[1]->insert(macro.second);
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,7 +1,8 @@
#pragma once
#include <DataTypes/DataTypeString.h>
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
@ -12,25 +13,17 @@ class Context;
/** Information about macros for introspection.
*/
class StorageSystemMacros : public ext::shared_ptr_helper<StorageSystemMacros>, public IStorage
class StorageSystemMacros : public ext::shared_ptr_helper<StorageSystemMacros>, public IStorageSystemOneBlock<StorageSystemMacros>
{
public:
std::string getName() const override { return "SystemMacros"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemMacros(const std::string & name_);
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -1,53 +1,36 @@
#include <Storages/System/StorageSystemMerges.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/System/StorageSystemMerges.h>
namespace DB
{
StorageSystemMerges::StorageSystemMerges(const std::string & name)
: name{name}
NamesAndTypesList StorageSystemMerges::getNamesAndTypes()
{
setColumns(ColumnsDescription({
{ "database", std::make_shared<DataTypeString>() },
{ "table", std::make_shared<DataTypeString>() },
{ "elapsed", std::make_shared<DataTypeFloat64>() },
{ "progress", std::make_shared<DataTypeFloat64>() },
{ "num_parts", std::make_shared<DataTypeUInt64>() },
{ "source_part_names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
{ "result_part_name", std::make_shared<DataTypeString>() },
{ "total_size_bytes_compressed", std::make_shared<DataTypeUInt64>() },
{ "total_size_marks", std::make_shared<DataTypeUInt64>() },
{ "bytes_read_uncompressed", std::make_shared<DataTypeUInt64>() },
{ "rows_read", std::make_shared<DataTypeUInt64>() },
{ "bytes_written_uncompressed", std::make_shared<DataTypeUInt64>() },
{ "rows_written", std::make_shared<DataTypeUInt64>() },
{ "columns_written", std::make_shared<DataTypeUInt64>() },
{ "memory_usage", std::make_shared<DataTypeUInt64>() },
{ "thread_number", std::make_shared<DataTypeUInt64>() },
}));
return {
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"elapsed", std::make_shared<DataTypeFloat64>()},
{"progress", std::make_shared<DataTypeFloat64>()},
{"num_parts", std::make_shared<DataTypeUInt64>()},
{"source_part_names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"result_part_name", std::make_shared<DataTypeString>()},
{"total_size_bytes_compressed", std::make_shared<DataTypeUInt64>()},
{"total_size_marks", std::make_shared<DataTypeUInt64>()},
{"bytes_read_uncompressed", std::make_shared<DataTypeUInt64>()},
{"rows_read", std::make_shared<DataTypeUInt64>()},
{"bytes_written_uncompressed", std::make_shared<DataTypeUInt64>()},
{"rows_written", std::make_shared<DataTypeUInt64>()},
{"columns_written", std::make_shared<DataTypeUInt64>()},
{"memory_usage", std::make_shared<DataTypeUInt64>()},
{"thread_number", std::make_shared<DataTypeUInt64>()},
};
}
BlockInputStreams StorageSystemMerges::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t,
const unsigned)
void StorageSystemMerges::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
for (const auto & merge : context.getMergeList().get())
{
size_t i = 0;
@ -68,8 +51,6 @@ BlockInputStreams StorageSystemMerges::read(
res_columns[i++]->insert(merge.memory_usage);
res_columns[i++]->insert(merge.thread_number);
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,7 +1,10 @@
#pragma once
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
@ -10,25 +13,17 @@ namespace DB
class Context;
class StorageSystemMerges : public ext::shared_ptr_helper<StorageSystemMerges>, public IStorage
class StorageSystemMerges : public ext::shared_ptr_helper<StorageSystemMerges>, public IStorageSystemOneBlock<StorageSystemMerges>
{
public:
std::string getName() const override { return "SystemMerges"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemMerges(const std::string & name);
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -1,39 +1,23 @@
#include <Common/CurrentMetrics.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemMetrics.h>
namespace DB
{
StorageSystemMetrics::StorageSystemMetrics(const std::string & name_)
: name(name_)
NamesAndTypesList StorageSystemMetrics::getNamesAndTypes()
{
setColumns(ColumnsDescription({
return {
{"metric", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeInt64>()},
}));
{"value", std::make_shared<DataTypeInt64>()},
};
}
BlockInputStreams StorageSystemMetrics::read(
const Names & column_names,
const SelectQueryInfo &,
const Context &,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
void StorageSystemMetrics::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
Int64 value = CurrentMetrics::values[i].load(std::memory_order_relaxed);
@ -41,9 +25,6 @@ BlockInputStreams StorageSystemMetrics::read(
res_columns[0]->insert(String(CurrentMetrics::getDescription(CurrentMetrics::Metric(i))));
res_columns[1]->insert(value);
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
@ -12,25 +12,17 @@ class Context;
/** Implements `metrics` system table, which provides information about the operation of the server.
*/
class StorageSystemMetrics : public ext::shared_ptr_helper<StorageSystemMetrics>, public IStorage
class StorageSystemMetrics : public ext::shared_ptr_helper<StorageSystemMetrics>, public IStorageSystemOneBlock<StorageSystemMetrics>
{
public:
std::string getName() const override { return "SystemMetrics"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
static NamesAndTypesList getNamesAndTypes();
protected:
StorageSystemMetrics(const std::string & name_);
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -2,45 +2,29 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalModels.h>
#include <Dictionaries/CatBoostModel.h>
namespace DB
{
StorageSystemModels::StorageSystemModels(const std::string & name)
: name{name}
NamesAndTypesList StorageSystemModels::getNamesAndTypes()
{
setColumns(ColumnsDescription({
return {
{ "name", std::make_shared<DataTypeString>() },
{ "origin", std::make_shared<DataTypeString>() },
{ "type", std::make_shared<DataTypeString>() },
{ "creation_time", std::make_shared<DataTypeDateTime>() },
{ "last_exception", std::make_shared<DataTypeString>() },
}));
};
}
BlockInputStreams StorageSystemModels::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t,
const unsigned)
void StorageSystemModels::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
const auto & external_models = context.getExternalModels();
auto objects_map = external_models.getObjectsMap();
const auto & models = objects_map.get();
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
for (const auto & model_info : models)
{
res_columns[0]->insert(model_info.first);
@ -73,8 +57,6 @@ BlockInputStreams StorageSystemModels::read(
else
res_columns[4]->insertDefault();
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}
}

Some files were not shown because too many files have changed in this diff Show More