Merge branch 'master' into parallelize_insert_from_infile_by_file

This commit is contained in:
Max Kainov 2023-09-13 12:17:08 +00:00
commit 15094e58c9
124 changed files with 1567 additions and 1471 deletions

View File

@ -79,7 +79,7 @@ IndentWidth: 4
IndentWrappedFunctionNames: false
MacroBlockBegin: ''
MacroBlockEnd: ''
NamespaceIndentation: Inner
NamespaceIndentation: None
ObjCBlockIndentWidth: 4
ObjCSpaceAfterProperty: true
ObjCSpaceBeforeProtocolList: true
@ -89,6 +89,7 @@ PenaltyBreakFirstLessLess: 120
PenaltyBreakString: 1000
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 60
RemoveBracesLLVM: true
SpaceAfterCStyleCast: false
SpaceBeforeAssignmentOperators: true
SpaceBeforeParens: ControlStatements

View File

@ -3,12 +3,13 @@ slug: /en/operations/system-tables/information_schema
---
# INFORMATION_SCHEMA
`INFORMATION_SCHEMA` (`information_schema`) is a system database that contains views. Using these views, you can get information about the metadata of database objects. These views read data from the columns of the [system.columns](../../operations/system-tables/columns.md), [system.databases](../../operations/system-tables/databases.md) and [system.tables](../../operations/system-tables/tables.md) system tables.
The structure and composition of system tables may change in different versions of the product, but the support of the `information_schema` makes it possible to change the structure of system tables without changing the method of access to metadata. Metadata requests do not depend on the DBMS used.
`INFORMATION_SCHEMA` (or: `information_schema`) is a system database which provides a (somewhat) standardized, [DBMS-agnostic view](https://en.wikipedia.org/wiki/Information_schema) on metadata of database objects. The views in `INFORMATION_SCHEMA` are generally inferior to normal system tables but tools can use them to obtain basic information in a cross-DBMS manner. The structure and content of views in `INFORMATION_SCHEMA` is supposed to evolves in a backwards-compatible way, i.e. only new functionality is added but existing functionality is not changed or removed. In terms of internal implementation, views in `INFORMATION_SCHEMA` usually map to to normal system tables like [system.columns](../../operations/system-tables/columns.md), [system.databases](../../operations/system-tables/databases.md) and [system.tables](../../operations/system-tables/tables.md).
``` sql
SHOW TABLES FROM INFORMATION_SCHEMA;
-- or:
SHOW TABLES FROM information_schema;
```
``` text
@ -17,6 +18,10 @@ SHOW TABLES FROM INFORMATION_SCHEMA;
│ SCHEMATA │
│ TABLES │
│ VIEWS │
│ columns │
│ schemata │
│ tables │
│ views │
└──────────┘
```
@ -27,6 +32,8 @@ SHOW TABLES FROM INFORMATION_SCHEMA;
- [TABLES](#tables)
- [VIEWS](#views)
Case-insensitive equivalent views, e.g. `INFORMATION_SCHEMA.columns` are provided for reasons of compatibility with other databases.
## COLUMNS {#columns}
Contains columns read from the [system.columns](../../operations/system-tables/columns.md) system table and columns that are not supported in ClickHouse or do not make sense (always `NULL`), but must be by the standard.

View File

@ -101,7 +101,8 @@ Columns:
- `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/map.md)) — ProfileEvents that measure different metrics. The description of them could be found in the table [system.events](../../operations/system-tables/events.md#system_tables-events)
- `Settings` ([Map(String, String)](../../sql-reference/data-types/map.md)) — Settings that were changed when the client ran the query. To enable logging changes to settings, set the `log_query_settings` parameter to 1.
- `log_comment` ([String](../../sql-reference/data-types/string.md)) — Log comment. It can be set to arbitrary string no longer than [max_query_size](../../operations/settings/settings.md#settings-max_query_size). An empty string if it is not defined.
- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Thread ids that are participating in query execution.
- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Thread ids that are participating in query execution. These threads may not have run simultaneously.
- `peak_threads_usage` ([UInt64)](../../sql-reference/data-types/int-uint.md)) — Maximum count of simultaneous threads executing the query.
- `used_aggregate_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions`, which were used during query execution.
- `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions combinators`, which were used during query execution.
- `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `database engines`, which were used during query execution.

View File

@ -16,6 +16,8 @@ A client application to interact with clickhouse-keeper by its native protocol.
- `--session-timeout=TIMEOUT` — Set session timeout in seconds. Default value: 10s.
- `--operation-timeout=TIMEOUT` — Set operation timeout in seconds. Default value: 10s.
- `--history-file=FILE_PATH` — Set path of history file. Default value: `~/.keeper-client-history`.
- `--log-level=LEVEL` — Set log level. Default value: `information`.
- `--no-confirmation` — If set, will not require a confirmation on several commands. Default value `false` for interactive and `true` for query
- `--help` — Shows the help message.
## Example {#clickhouse-keeper-client-example}
@ -44,6 +46,7 @@ keeper foo bar
- `ls [path]` -- Lists the nodes for the given path (default: cwd)
- `cd [path]` -- Change the working path (default `.`)
- `exists <path>` -- Returns `1` if node exists, `0` otherwise
- `set <path> <value> [version]` -- Updates the node's value. Only update if version matches (default: -1)
- `create <path> <value> [mode]` -- Creates new node with the set value
- `touch <path>` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists
@ -56,3 +59,5 @@ keeper foo bar
- `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `delete_stale_backups` -- Deletes ClickHouse nodes used for backups that are now inactive
- `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)
- `sync <path>` -- Synchronizes node between processes and leader
- `reconfig <add|remove|set> "<arg>" [version]` -- Reconfigure Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration

View File

@ -99,7 +99,8 @@ ClickHouse не удаляет данные из таблица автомати
- `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/array.md)) — счетчики для изменения различных метрик. Описание метрик можно получить из таблицы [system.events](#system_tables-events)(#system_tables-events
- `Settings` ([Map(String, String)](../../sql-reference/data-types/array.md)) — имена настроек, которые меняются, когда клиент выполняет запрос. Чтобы разрешить логирование изменений настроек, установите параметр `log_query_settings` равным 1.
- `log_comment` ([String](../../sql-reference/data-types/string.md)) — комментарий к записи в логе. Представляет собой произвольную строку, длина которой должна быть не больше, чем [max_query_size](../../operations/settings/settings.md#settings-max_query_size). Если нет комментария, то пустая строка.
- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — идентификаторы потоков, участвующих в обработке запросов.
- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — идентификаторы потоков, участвующих в обработке запросов, эти потоки не обязательно выполняются одновременно.
- `peak_threads_usage` ([UInt64)](../../sql-reference/data-types/int-uint.md)) — максимальное количество одновременно работавших потоков, участвоваших в обработке запроса.
- `used_aggregate_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `агрегатных функций`, использованных при выполнении запроса.
- `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `комбинаторов агрегатных функций`, использованных при выполнении запроса.
- `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `движков баз данных`, использованных при выполнении запроса.

View File

@ -9,11 +9,11 @@ namespace DB
bool LSCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return true;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}
@ -42,11 +42,11 @@ void LSCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
bool CDCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return true;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}
@ -64,11 +64,12 @@ void CDCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
bool SetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
String arg;
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
@ -93,11 +94,12 @@ void SetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
bool CreateCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
String arg;
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
@ -143,10 +145,10 @@ void TouchCommand::execute(const ASTKeeperQuery * query, KeeperClient * client)
bool GetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}
@ -156,13 +158,28 @@ void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool ExistsCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(path));
return true;
}
void ExistsCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
std::cout << client->zookeeper->exists(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return true;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}
@ -325,10 +342,10 @@ void FindBigFamily::execute(const ASTKeeperQuery * query, KeeperClient * client)
bool RMCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}
@ -340,10 +357,10 @@ void RMCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
bool RMRCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}
@ -355,6 +372,70 @@ void RMRCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
[client, path]{ client->zookeeper->removeRecursive(path); });
}
bool ReconfigCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
ReconfigCommand::Operation operation;
if (ParserKeyword{"ADD"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::ADD;
else if (ParserKeyword{"REMOVE"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::REMOVE;
else if (ParserKeyword{"SET"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::SET;
else
return false;
node->args.push_back(operation);
ParserToken{TokenType::Whitespace}.ignore(pos);
String arg;
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
return true;
}
void ReconfigCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
String joining;
String leaving;
String new_members;
auto operation = query->args[0].get<ReconfigCommand::Operation>();
switch (operation)
{
case static_cast<UInt8>(ReconfigCommand::Operation::ADD):
joining = query->args[1].safeGet<DB::String>();
break;
case static_cast<UInt8>(ReconfigCommand::Operation::REMOVE):
leaving = query->args[1].safeGet<DB::String>();
break;
case static_cast<UInt8>(ReconfigCommand::Operation::SET):
new_members = query->args[1].safeGet<DB::String>();
break;
default:
UNREACHABLE();
}
auto response = client->zookeeper->reconfig(joining, leaving, new_members);
std::cout << response.value << '\n';
}
bool SyncCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(path));
return true;
}
void SyncCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
std::cout << client->zookeeper->sync(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const
{
return true;

View File

@ -101,6 +101,17 @@ class GetCommand : public IKeeperClientCommand
String getHelpMessage() const override { return "{} <path> -- Returns the node's value"; }
};
class ExistsCommand : public IKeeperClientCommand
{
String getName() const override { return "exists"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Returns `1` if node exists, `0` otherwise"; }
};
class GetStatCommand : public IKeeperClientCommand
{
String getName() const override { return "get_stat"; }
@ -177,6 +188,35 @@ class RMRCommand : public IKeeperClientCommand
String getHelpMessage() const override { return "{} <path> -- Recursively deletes path. Confirmation required"; }
};
class ReconfigCommand : public IKeeperClientCommand
{
enum class Operation : UInt8
{
ADD = 0,
REMOVE = 1,
SET = 2,
};
String getName() const override { return "reconfig"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <add|remove|set> \"<arg>\" [version] -- Reconfigure Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration"; }
};
class SyncCommand: public IKeeperClientCommand
{
String getName() const override { return "sync"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Synchronizes node between processes and leader"; }
};
class HelpCommand : public IKeeperClientCommand
{
String getName() const override { return "help"; }

View File

@ -84,8 +84,11 @@ std::vector<String> KeeperClient::getCompletions(const String & prefix) const
void KeeperClient::askConfirmation(const String & prompt, std::function<void()> && callback)
{
if (!ask_confirmation)
return callback();
std::cout << prompt << " Continue?\n";
need_confirmation = true;
waiting_confirmation = true;
confirmation_callback = callback;
}
@ -170,6 +173,14 @@ void KeeperClient::defineOptions(Poco::Util::OptionSet & options)
Poco::Util::Option("log-level", "", "set log level")
.argument("<level>")
.binding("log-level"));
options.addOption(
Poco::Util::Option("no-confirmation", "", "if set, will not require a confirmation on several commands. default false for interactive and true for query")
.binding("no-confirmation"));
options.addOption(
Poco::Util::Option("tests-mode", "", "run keeper-client in a special mode for tests. all commands output are separated by special symbols. default false")
.binding("tests-mode"));
}
void KeeperClient::initialize(Poco::Util::Application & /* self */)
@ -184,12 +195,15 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
std::make_shared<CreateCommand>(),
std::make_shared<TouchCommand>(),
std::make_shared<GetCommand>(),
std::make_shared<ExistsCommand>(),
std::make_shared<GetStatCommand>(),
std::make_shared<FindSuperNodes>(),
std::make_shared<DeleteStaleBackups>(),
std::make_shared<FindBigFamily>(),
std::make_shared<RMCommand>(),
std::make_shared<RMRCommand>(),
std::make_shared<ReconfigCommand>(),
std::make_shared<SyncCommand>(),
std::make_shared<HelpCommand>(),
std::make_shared<FourLetterWordCommand>(),
});
@ -229,18 +243,6 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
EventNotifier::init();
}
void KeeperClient::executeQuery(const String & query)
{
std::vector<String> queries;
boost::algorithm::split(queries, query, boost::is_any_of(";"));
for (const auto & query_text : queries)
{
if (!query_text.empty())
processQueryText(query_text);
}
}
bool KeeperClient::processQueryText(const String & text)
{
if (exit_strings.find(text) != exit_strings.end())
@ -248,29 +250,44 @@ bool KeeperClient::processQueryText(const String & text)
try
{
if (need_confirmation)
if (waiting_confirmation)
{
need_confirmation = false;
waiting_confirmation = false;
if (text.size() == 1 && (text == "y" || text == "Y"))
confirmation_callback();
return true;
}
KeeperParser parser;
String message;
const char * begin = text.data();
ASTPtr res = tryParseQuery(parser, begin, begin + text.size(), message, true, "", false, 0, 0, false);
const char * end = begin + text.size();
if (!res)
while (begin < end)
{
std::cerr << message << "\n";
return true;
String message;
ASTPtr res = tryParseQuery(
parser,
begin,
end,
/* out_error_message = */ message,
/* hilite = */ true,
/* description = */ "",
/* allow_multi_statements = */ true,
/* max_query_size = */ 0,
/* max_parser_depth = */ 0,
/* skip_insignificant = */ false);
if (!res)
{
std::cerr << message << "\n";
return true;
}
auto * query = res->as<ASTKeeperQuery>();
auto command = KeeperClient::commands.find(query->command);
command->second->execute(query, this);
}
auto * query = res->as<ASTKeeperQuery>();
auto command = KeeperClient::commands.find(query->command);
command->second->execute(query, this);
}
catch (Coordination::Exception & err)
{
@ -279,7 +296,7 @@ bool KeeperClient::processQueryText(const String & text)
return true;
}
void KeeperClient::runInteractive()
void KeeperClient::runInteractiveReplxx()
{
LineReader::Patterns query_extenders = {"\\"};
@ -299,7 +316,7 @@ void KeeperClient::runInteractive()
while (true)
{
String prompt;
if (need_confirmation)
if (waiting_confirmation)
prompt = "[y/n] ";
else
prompt = cwd.string() + " :) ";
@ -313,6 +330,26 @@ void KeeperClient::runInteractive()
}
}
void KeeperClient::runInteractiveInputStream()
{
for (String input; std::getline(std::cin, input);)
{
if (!processQueryText(input))
break;
std::cout << "\a\a\a\a" << std::endl;
std::cerr << std::flush;
}
}
void KeeperClient::runInteractive()
{
if (config().hasOption("tests-mode"))
runInteractiveInputStream();
else
runInteractiveReplxx();
}
int KeeperClient::main(const std::vector<String> & /* args */)
{
if (config().hasOption("help"))
@ -362,8 +399,13 @@ int KeeperClient::main(const std::vector<String> & /* args */)
zk_args.operation_timeout_ms = config().getInt("operation-timeout", 10) * 1000;
zookeeper = std::make_unique<zkutil::ZooKeeper>(zk_args);
if (config().has("no-confirmation") || config().has("query"))
ask_confirmation = false;
if (config().has("query"))
executeQuery(config().getString("query"));
{
processQueryText(config().getString("query"));
}
else
runInteractive();

View File

@ -49,8 +49,10 @@ public:
protected:
void runInteractive();
void runInteractiveReplxx();
void runInteractiveInputStream();
bool processQueryText(const String & text);
void executeQuery(const String & query);
void loadCommands(std::vector<Command> && new_commands);
@ -61,7 +63,8 @@ protected:
zkutil::ZooKeeperArgs zk_args;
bool need_confirmation = false;
bool ask_confirmation = true;
bool waiting_confirmation = false;
std::vector<String> registered_commands_and_four_letter_words;
};

View File

@ -7,43 +7,34 @@ namespace DB
bool parseKeeperArg(IParser::Pos & pos, Expected & expected, String & result)
{
expected.add(pos, getTokenName(TokenType::BareWord));
if (pos->type == TokenType::BareWord)
if (pos->type == TokenType::QuotedIdentifier || pos->type == TokenType::StringLiteral)
{
result = String(pos->begin, pos->end);
++pos;
if (!parseIdentifierOrStringLiteral(pos, expected, result))
return false;
ParserToken{TokenType::Whitespace}.ignore(pos);
return true;
}
bool status = parseIdentifierOrStringLiteral(pos, expected, result);
ParserToken{TokenType::Whitespace}.ignore(pos);
return status;
}
bool parseKeeperPath(IParser::Pos & pos, Expected & expected, String & path)
{
expected.add(pos, "path");
if (pos->type == TokenType::QuotedIdentifier || pos->type == TokenType::StringLiteral)
return parseIdentifierOrStringLiteral(pos, expected, path);
String result;
while (pos->type != TokenType::Whitespace && pos->type != TokenType::EndOfStream)
while (pos->type != TokenType::Whitespace && pos->type != TokenType::EndOfStream && pos->type != TokenType::Semicolon)
{
result.append(pos->begin, pos->end);
++pos;
}
ParserToken{TokenType::Whitespace}.ignore(pos);
if (result.empty())
return false;
path = result;
return true;
}
bool parseKeeperPath(IParser::Pos & pos, Expected & expected, String & path)
{
expected.add(pos, "path");
return parseKeeperArg(pos, expected, path);
}
bool KeeperParser::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTKeeperQuery>();

View File

@ -881,6 +881,8 @@ void LocalServer::processOptions(const OptionsDescription &, const CommandLineOp
config().setBool("no-system-tables", true);
if (options.count("only-system-tables"))
config().setBool("only-system-tables", true);
if (options.count("database"))
config().setString("default_database", options["database"].as<std::string>());
if (options.count("input-format"))
config().setString("table-data-format", options["input-format"].as<std::string>());

View File

@ -6341,9 +6341,9 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
{
/// For input function we should check if input format supports reading subset of columns.
if (table_function_ptr->getName() == "input")
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(scope.context->getInsertFormat());
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(scope.context->getInsertFormat(), scope.context);
else
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns();
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns(scope.context);
}
if (use_columns_from_insert_query)

View File

@ -1,195 +0,0 @@
#include "UniqToCountPass.h"
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/QueryNode.h>
namespace DB
{
namespace
{
bool matchFnUniq(String func_name)
{
auto name = Poco::toLower(func_name);
return name == "uniq" || name == "uniqHLL12" || name == "uniqExact" || name == "uniqTheta" || name == "uniqCombined"
|| name == "uniqCombined64";
}
/// Extract the corresponding projection columns for group by node list.
/// For example:
/// SELECT a as aa, any(b) FROM table group by a; -> aa(ColumnNode)
NamesAndTypes extractProjectionColumnsForGroupBy(const QueryNode * query_node)
{
if (!query_node->hasGroupBy())
return {};
NamesAndTypes result;
for (const auto & group_by_ele : query_node->getGroupByNode()->getChildren())
{
const auto & projection_columns = query_node->getProjectionColumns();
const auto & projection_nodes = query_node->getProjection().getNodes();
assert(projection_columns.size() == projection_nodes.size());
for (size_t i = 0; i < projection_columns.size(); i++)
{
if (projection_nodes[i]->isEqual(*group_by_ele))
result.push_back(projection_columns[i]);
}
}
return result;
}
/// Whether query_columns equals subquery_columns.
/// query_columns: query columns from query
/// subquery_columns: projection columns from subquery
bool nodeListEquals(const QueryTreeNodes & query_columns, const NamesAndTypes & subquery_columns)
{
if (query_columns.size() != subquery_columns.size())
return false;
for (const auto & query_column : query_columns)
{
auto find = std::find_if(
subquery_columns.begin(),
subquery_columns.end(),
[&](const auto & subquery_column) -> bool
{
if (auto * column_node = query_column->as<ColumnNode>())
{
return subquery_column == column_node->getColumn();
}
return false;
});
if (find == subquery_columns.end())
return false;
}
return true;
}
/// Whether subquery_columns contains all columns in subquery_columns.
/// query_columns: query columns from query
/// subquery_columns: projection columns from subquery
bool nodeListContainsAll(const QueryTreeNodes & query_columns, const NamesAndTypes & subquery_columns)
{
if (query_columns.size() > subquery_columns.size())
return false;
for (const auto & query_column : query_columns)
{
auto find = std::find_if(
subquery_columns.begin(),
subquery_columns.end(),
[&](const auto & subquery_column) -> bool
{
if (auto * column_node = query_column->as<ColumnNode>())
{
return subquery_column == column_node->getColumn();
}
return false;
});
if (find == subquery_columns.end())
return false;
}
return true;
}
}
class UniqToCountVisitor : public InDepthQueryTreeVisitorWithContext<UniqToCountVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<UniqToCountVisitor>;
using Base::Base;
void enterImpl(QueryTreeNodePtr & node)
{
if (!getSettings().optimize_uniq_to_count)
return;
auto * query_node = node->as<QueryNode>();
if (!query_node)
return;
/// Check that query has only single table expression which is subquery
auto * subquery_node = query_node->getJoinTree()->as<QueryNode>();
if (!subquery_node)
return;
/// Check that query has only single node in projection
auto & projection_nodes = query_node->getProjection().getNodes();
if (projection_nodes.size() != 1)
return;
/// Check that projection_node is a function
auto & projection_node = projection_nodes[0];
auto * function_node = projection_node->as<FunctionNode>();
if (!function_node)
return;
/// Check that query single projection node is `uniq` or its variants
if (!matchFnUniq(function_node->getFunctionName()))
return;
auto & uniq_arguments_nodes = function_node->getArguments().getNodes();
/// Whether query matches 'SELECT uniq(x ...) FROM (SELECT DISTINCT x ...)'
auto match_subquery_with_distinct = [&]() -> bool
{
if (!subquery_node->isDistinct())
return false;
/// uniq expression list == subquery projection columns
if (!nodeListEquals(uniq_arguments_nodes, subquery_node->getProjectionColumns()))
return false;
return true;
};
/// Whether query matches 'SELECT uniq(x ...) FROM (SELECT x ... GROUP BY x ...)'
auto match_subquery_with_group_by = [&]() -> bool
{
if (!subquery_node->hasGroupBy())
return false;
/// uniq argument node list == subquery group by node list
auto group_by_columns = extractProjectionColumnsForGroupBy(subquery_node);
if (!nodeListEquals(uniq_arguments_nodes, group_by_columns))
return false;
/// subquery projection columns must contain all columns in uniq argument node list
if (!nodeListContainsAll(uniq_arguments_nodes, subquery_node->getProjectionColumns()))
return false;
return true;
};
/// Replace uniq of initial query to count
if (match_subquery_with_distinct() || match_subquery_with_group_by())
{
AggregateFunctionProperties properties;
auto aggregate_function = AggregateFunctionFactory::instance().get("count", {}, {}, properties);
function_node->getArguments().getNodes().clear();
function_node->resolveAsAggregateFunction(std::move(aggregate_function));
}
}
};
void UniqToCountPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
UniqToCountVisitor visitor(context);
visitor.visit(query_tree_node);
}
}

View File

@ -1,30 +0,0 @@
#pragma once
#include <Analyzer/IQueryTreePass.h>
namespace DB
{
/** Optimize `uniq` and its variants(except uniqUpTo) into `count` over subquery.
* Example: 'SELECT uniq(x ...) FROM (SELECT DISTINCT x ...)' to
* Result: 'SELECT count() FROM (SELECT DISTINCT x ...)'
*
* Example: 'SELECT uniq(x ...) FROM (SELECT x ... GROUP BY x ...)' to
* Result: 'SELECT count() FROM (SELECT x ... GROUP BY x ...)'
*
* Note that we can rewrite all uniq variants except uniqUpTo.
*/
class UniqToCountPass final : public IQueryTreePass
{
public:
String getName() override { return "UniqToCount"; }
String getDescription() override
{
return "Rewrite uniq and its variants(except uniqUpTo) to count if subquery has distinct or group by clause.";
}
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
};
}

View File

@ -18,7 +18,6 @@
#include <Analyzer/Utils.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
#include <Analyzer/Passes/RewriteAggregateFunctionWithIfPass.h>
#include <Analyzer/Passes/SumIfToCountIfPass.h>
@ -248,7 +247,6 @@ void addQueryTreePasses(QueryTreePassManager & manager)
manager.addPass(std::make_unique<ConvertLogicalExpressionToCNFPass>());
manager.addPass(std::make_unique<CountDistinctPass>());
manager.addPass(std::make_unique<UniqToCountPass>());
manager.addPass(std::make_unique<RewriteAggregateFunctionWithIfPass>());
manager.addPass(std::make_unique<SumIfToCountIfPass>());
manager.addPass(std::make_unique<RewriteArrayExistsToHasPass>());

View File

@ -1071,7 +1071,9 @@ void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, b
}
catch (const LocalFormatError &)
{
local_format_error = std::current_exception();
/// Remember the first exception.
if (!local_format_error)
local_format_error = std::current_exception();
connection->sendCancel();
}
}

View File

@ -73,7 +73,7 @@ ColumnAggregateFunction::ColumnAggregateFunction(const AggregateFunctionPtr & fu
}
void ColumnAggregateFunction::set(const AggregateFunctionPtr & func_, size_t version_)
void ColumnAggregateFunction::set(const AggregateFunctionPtr & func_, std::optional<size_t> version_)
{
func = func_;
version = version_;

View File

@ -103,7 +103,7 @@ private:
public:
~ColumnAggregateFunction() override;
void set(const AggregateFunctionPtr & func_, size_t version_);
void set(const AggregateFunctionPtr & func_, std::optional<size_t> version_ = std::nullopt);
AggregateFunctionPtr getAggregateFunction() { return func; }
AggregateFunctionPtr getAggregateFunction() const { return func; }

View File

@ -57,7 +57,7 @@ ConcurrencyControl::Allocation::Allocation(ConcurrencyControl & parent_, SlotCou
*waiter = this;
}
// Grant single slot to allocation, returns true iff more slot(s) are required
// Grant single slot to allocation returns true iff more slot(s) are required
bool ConcurrencyControl::Allocation::grant()
{
std::unique_lock lock{mutex};

View File

@ -107,15 +107,25 @@ public:
static ThreadGroupPtr createForBackgroundProcess(ContextPtr storage_context);
std::vector<UInt64> getInvolvedThreadIds() const;
void linkThread(UInt64 thread_it);
size_t getPeakThreadsUsage() const;
void linkThread(UInt64 thread_id);
void unlinkThread();
private:
mutable std::mutex mutex;
/// Set up at creation, no race when reading
SharedData shared_data;
SharedData shared_data TSA_GUARDED_BY(mutex);
/// Set of all thread ids which has been attached to the group
std::unordered_set<UInt64> thread_ids;
std::unordered_set<UInt64> thread_ids TSA_GUARDED_BY(mutex);
/// Count of simultaneously working threads
size_t active_thread_count TSA_GUARDED_BY(mutex) = 0;
/// Peak threads count in the group
size_t peak_threads_usage TSA_GUARDED_BY(mutex) = 0;
};
/**

View File

@ -877,6 +877,24 @@ void ZooKeeper::handleEphemeralNodeExistence(const std::string & path, const std
}
}
Coordination::ReconfigResponse ZooKeeper::reconfig(
const std::string & joining,
const std::string & leaving,
const std::string & new_members,
int32_t version)
{
auto future_result = asyncReconfig(joining, leaving, new_members, version);
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {}", Coordination::OpNum::Reconfig));
throw KeeperException(Coordination::Error::ZOPERATIONTIMEOUT);
}
return future_result.get();
}
ZooKeeperPtr ZooKeeper::startNewSession() const
{
return std::make_shared<ZooKeeper>(args, zk_log);
@ -1226,6 +1244,27 @@ std::future<Coordination::SyncResponse> ZooKeeper::asyncSync(const std::string &
return future;
}
std::future<Coordination::ReconfigResponse> ZooKeeper::asyncReconfig(
const std::string & joining,
const std::string & leaving,
const std::string & new_members,
int32_t version)
{
auto promise = std::make_shared<std::promise<Coordination::ReconfigResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::ReconfigResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(response.error)));
else
promise->set_value(response);
};
impl->reconfig(joining, leaving, new_members, version, std::move(callback));
return future;
}
void ZooKeeper::finalize(const String & reason)
{
impl->finalize(reason);

View File

@ -449,6 +449,12 @@ public:
/// disappear automatically after 3x session_timeout.
void handleEphemeralNodeExistence(const std::string & path, const std::string & fast_delete_if_equal_value);
Coordination::ReconfigResponse reconfig(
const std::string & joining,
const std::string & leaving,
const std::string & new_members,
int32_t version = -1);
/// Async interface (a small subset of operations is implemented).
///
/// Usage:
@ -529,6 +535,13 @@ public:
const std::string & path,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using FutureReconfig = std::future<Coordination::ReconfigResponse>;
FutureReconfig asyncReconfig(
const std::string & joining,
const std::string & leaving,
const std::string & new_members,
int32_t version = -1);
void finalize(const String & reason);
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);

View File

@ -778,7 +778,6 @@ class IColumn;
M(Bool, function_json_value_return_type_allow_nullable, false, "Allow function JSON_VALUE to return nullable type.", 0) \
M(Bool, function_json_value_return_type_allow_complex, false, "Allow function JSON_VALUE to return complex type, such as: struct, array, map.", 0) \
M(Bool, use_with_fill_by_sorting_prefix, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently", 0) \
M(Bool, optimize_uniq_to_count, true, "Rewrite uniq and its variants(except uniqUpTo) to count if subquery has distinct or group by clause.", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \

View File

@ -117,6 +117,33 @@ Field DataTypeAggregateFunction::getDefault() const
return field;
}
bool DataTypeAggregateFunction::strictEquals(const DataTypePtr & lhs_state_type, const DataTypePtr & rhs_state_type)
{
const auto * lhs_state = typeid_cast<const DataTypeAggregateFunction *>(lhs_state_type.get());
const auto * rhs_state = typeid_cast<const DataTypeAggregateFunction *>(rhs_state_type.get());
if (!lhs_state || !rhs_state)
return false;
if (lhs_state->function->getName() != rhs_state->function->getName())
return false;
if (lhs_state->parameters.size() != rhs_state->parameters.size())
return false;
for (size_t i = 0; i < lhs_state->parameters.size(); ++i)
if (lhs_state->parameters[i] != rhs_state->parameters[i])
return false;
if (lhs_state->argument_types.size() != rhs_state->argument_types.size())
return false;
for (size_t i = 0; i < lhs_state->argument_types.size(); ++i)
if (!lhs_state->argument_types[i]->equals(*rhs_state->argument_types[i]))
return false;
return true;
}
bool DataTypeAggregateFunction::equals(const IDataType & rhs) const
{
@ -126,34 +153,7 @@ bool DataTypeAggregateFunction::equals(const IDataType & rhs) const
auto lhs_state_type = function->getNormalizedStateType();
auto rhs_state_type = typeid_cast<const DataTypeAggregateFunction &>(rhs).function->getNormalizedStateType();
if (typeid(lhs_state_type.get()) != typeid(rhs_state_type.get()))
return false;
if (const auto * lhs_state = typeid_cast<const DataTypeAggregateFunction *>(lhs_state_type.get()))
{
const auto & rhs_state = typeid_cast<const DataTypeAggregateFunction &>(*rhs_state_type);
if (lhs_state->function->getName() != rhs_state.function->getName())
return false;
if (lhs_state->parameters.size() != rhs_state.parameters.size())
return false;
for (size_t i = 0; i < lhs_state->parameters.size(); ++i)
if (lhs_state->parameters[i] != rhs_state.parameters[i])
return false;
if (lhs_state->argument_types.size() != rhs_state.argument_types.size())
return false;
for (size_t i = 0; i < lhs_state->argument_types.size(); ++i)
if (!lhs_state->argument_types[i]->equals(*rhs_state.argument_types[i]))
return false;
return true;
}
return lhs_state_type->equals(*rhs_state_type);
return strictEquals(lhs_state_type, rhs_state_type);
}

View File

@ -60,6 +60,7 @@ public:
Field getDefault() const override;
static bool strictEquals(const DataTypePtr & lhs_state_type, const DataTypePtr & rhs_state_type);
bool equals(const IDataType & rhs) const override;
bool isParametric() const override { return true; }

View File

@ -136,8 +136,17 @@ DataTypePtr FieldToDataType<on_error>::operator() (const Array & x) const
DataTypes element_types;
element_types.reserve(x.size());
bool has_signed_int = false;
bool uint64_convert_possible = true;
for (const Field & elem : x)
element_types.emplace_back(applyVisitor(*this, elem));
{
DataTypePtr type = applyVisitor(*this, elem);
element_types.emplace_back(type);
checkUInt64ToIn64Conversion(has_signed_int, uint64_convert_possible, type, elem);
}
if (has_signed_int && uint64_convert_possible)
convertUInt64ToInt64IfPossible(element_types);
return std::make_shared<DataTypeArray>(getLeastSupertype<on_error>(element_types));
}
@ -165,14 +174,28 @@ DataTypePtr FieldToDataType<on_error>::operator() (const Map & map) const
key_types.reserve(map.size());
value_types.reserve(map.size());
bool k_has_signed_int = false;
bool k_uint64_convert_possible = true;
bool v_has_signed_int = false;
bool v_uint64_convert_possible = true;
for (const auto & elem : map)
{
const auto & tuple = elem.safeGet<const Tuple &>();
assert(tuple.size() == 2);
key_types.push_back(applyVisitor(*this, tuple[0]));
value_types.push_back(applyVisitor(*this, tuple[1]));
DataTypePtr k_type = applyVisitor(*this, tuple[0]);
key_types.push_back(k_type);
checkUInt64ToIn64Conversion(k_has_signed_int, k_uint64_convert_possible, k_type, tuple[0]);
DataTypePtr v_type = applyVisitor(*this, tuple[1]);
value_types.push_back(v_type);
checkUInt64ToIn64Conversion(v_has_signed_int, v_uint64_convert_possible, v_type, tuple[1]);
}
if (k_has_signed_int && k_uint64_convert_possible)
convertUInt64ToInt64IfPossible(key_types);
if (v_has_signed_int && v_uint64_convert_possible)
convertUInt64ToInt64IfPossible(value_types);
return std::make_shared<DataTypeMap>(
getLeastSupertype<on_error>(key_types),
getLeastSupertype<on_error>(value_types));
@ -204,6 +227,28 @@ DataTypePtr FieldToDataType<on_error>::operator()(const bool &) const
return DataTypeFactory::instance().get("Bool");
}
template <LeastSupertypeOnError on_error>
void FieldToDataType<on_error>::checkUInt64ToIn64Conversion(bool & has_signed_int, bool & uint64_convert_possible, const DataTypePtr & type, const Field & elem) const
{
if (uint64_convert_possible)
{
bool is_native_int = WhichDataType(type).isNativeInt();
if (is_native_int)
has_signed_int |= is_native_int;
else if (type->getTypeId() == TypeIndex::UInt64)
uint64_convert_possible &= (elem.template get<UInt64>() <= std::numeric_limits<Int64>::max());
}
}
template <LeastSupertypeOnError on_error>
void FieldToDataType<on_error>::convertUInt64ToInt64IfPossible(DataTypes & data_types) const
{
for (auto& type : data_types)
if (type->getTypeId() == TypeIndex::UInt64)
type = std::make_shared<DataTypeInt64>();
}
template class FieldToDataType<LeastSupertypeOnError::Throw>;
template class FieldToDataType<LeastSupertypeOnError::String>;
template class FieldToDataType<LeastSupertypeOnError::Null>;

View File

@ -45,6 +45,16 @@ public:
DataTypePtr operator() (const UInt256 & x) const;
DataTypePtr operator() (const Int256 & x) const;
DataTypePtr operator() (const bool & x) const;
private:
// The conditions for converting UInt64 to Int64 are:
// 1. The existence of Int.
// 2. The existence of UInt64, and the UInt64 value must be <= Int64.max.
void checkUInt64ToIn64Conversion(bool& has_signed_int, bool& uint64_convert_possible, const DataTypePtr & type, const Field & elem) const;
// Convert the UInt64 type to Int64 in order to cover other signed_integer types
// and obtain the least super type of all ints.
void convertUInt64ToInt64IfPossible(DataTypes & data_types) const;
};
}

View File

@ -493,7 +493,10 @@ void SerializationArray::deserializeText(IColumn & column, ReadBuffer & istr, co
deserializeTextImpl(column, istr,
[&](IColumn & nested_column)
{
nested->deserializeTextQuoted(nested_column, istr, settings);
if (settings.null_as_default)
SerializationNullable::deserializeTextQuotedImpl(nested_column, istr, settings, nested);
else
nested->deserializeTextQuoted(nested_column, istr, settings);
}, false);
if (whole && !istr.eof())
@ -604,7 +607,10 @@ void SerializationArray::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
deserializeTextImpl(column, rb,
[&](IColumn & nested_column)
{
nested->deserializeTextCSV(nested_column, rb, settings);
if (settings.null_as_default)
SerializationNullable::deserializeTextCSVImpl(nested_column, rb, settings, nested);
else
nested->deserializeTextCSV(nested_column, rb, settings);
}, true);
}
else
@ -612,7 +618,10 @@ void SerializationArray::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
deserializeTextImpl(column, rb,
[&](IColumn & nested_column)
{
nested->deserializeTextQuoted(nested_column, rb, settings);
if (settings.null_as_default)
SerializationNullable::deserializeTextQuotedImpl(nested_column, rb, settings, nested);
else
nested->deserializeTextQuoted(nested_column, rb, settings);
}, true);
}
}

View File

@ -192,7 +192,10 @@ void SerializationMap::deserializeText(IColumn & column, ReadBuffer & istr, cons
deserializeTextImpl(column, istr,
[&settings](ReadBuffer & buf, const SerializationPtr & subcolumn_serialization, IColumn & subcolumn)
{
subcolumn_serialization->deserializeTextQuoted(subcolumn, buf, settings);
if (settings.null_as_default)
SerializationNullable::deserializeTextQuotedImpl(subcolumn, buf, settings, subcolumn_serialization);
else
subcolumn_serialization->deserializeTextQuoted(subcolumn, buf, settings);
});
if (whole && !istr.eof())

View File

@ -135,7 +135,10 @@ void SerializationTuple::deserializeText(IColumn & column, ReadBuffer & istr, co
assertChar(',', istr);
skipWhitespaceIfAny(istr);
}
elems[i]->deserializeTextQuoted(extractElementColumn(column, i), istr, settings);
if (settings.null_as_default)
SerializationNullable::deserializeTextQuotedImpl(extractElementColumn(column, i), istr, settings, elems[i]);
else
elems[i]->deserializeTextQuoted(extractElementColumn(column, i), istr, settings);
}
// Special format for one element tuple (1,)
@ -366,7 +369,10 @@ void SerializationTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
assertChar(settings.csv.tuple_delimiter, istr);
skipWhitespaceIfAny(istr);
}
elems[i]->deserializeTextCSV(extractElementColumn(column, i), istr, settings);
if (settings.null_as_default)
SerializationNullable::deserializeTextCSVImpl(extractElementColumn(column, i), istr, settings, elems[i]);
else
elems[i]->deserializeTextCSV(extractElementColumn(column, i), istr, settings);
}
});
}

View File

@ -684,10 +684,18 @@ void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & na
void FormatFactory::markFormatSupportsSubsetOfColumns(const String & name)
{
auto & target = dict[name].supports_subset_of_columns;
auto & target = dict[name].subset_of_columns_support_checker;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subset of columns", name);
target = true;
target = [](const FormatSettings &){ return true; };
}
void FormatFactory::registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker)
{
auto & target = dict[name].subset_of_columns_support_checker;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subset of columns", name);
target = std::move(subset_of_columns_support_checker);
}
void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
@ -698,10 +706,11 @@ void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
target = true;
}
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) const
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const DB::String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_) const
{
const auto & target = getCreators(name);
return target.supports_subset_of_columns;
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
return target.subset_of_columns_support_checker && target.subset_of_columns_support_checker(format_settings);
}
void FormatFactory::registerAdditionalInfoForSchemaCacheGetter(

View File

@ -123,6 +123,10 @@ private:
/// and the name of the message.
using AdditionalInfoForSchemaCacheGetter = std::function<String(const FormatSettings & settings)>;
/// Some formats can support reading subset of columns depending on settings.
/// The checker should return true if format support append.
using SubsetOfColumnsSupportChecker = std::function<bool(const FormatSettings & settings)>;
struct Creators
{
InputCreator input_creator;
@ -132,12 +136,11 @@ private:
SchemaReaderCreator schema_reader_creator;
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
bool supports_subcolumns{false};
bool supports_subset_of_columns{false};
bool prefers_large_blocks{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker;
AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter;
SubsetOfColumnsSupportChecker subset_of_columns_support_checker;
};
using FormatsDictionary = std::unordered_map<String, Creators>;
@ -225,9 +228,10 @@ public:
void markOutputFormatSupportsParallelFormatting(const String & name);
void markOutputFormatPrefersLargeBlocks(const String & name);
void markFormatSupportsSubsetOfColumns(const String & name);
bool checkIfFormatSupportsSubsetOfColumns(const String & name) const;
void markFormatSupportsSubsetOfColumns(const String & name);
void registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker);
bool checkIfFormatSupportsSubsetOfColumns(const String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_ = std::nullopt) const;
bool checkIfFormatHasSchemaReader(const String & name) const;
bool checkIfFormatHasExternalSchemaReader(const String & name) const;

View File

@ -1,16 +1,96 @@
#include <Formats/insertNullAsDefaultIfNeeded.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Functions/FunctionHelpers.h>
namespace DB
{
void insertNullAsDefaultIfNeeded(ColumnWithTypeAndName & input_column, const ColumnWithTypeAndName & header_column, size_t column_i, BlockMissingValues * block_missing_values)
bool insertNullAsDefaultIfNeeded(ColumnWithTypeAndName & input_column, const ColumnWithTypeAndName & header_column, size_t column_i, BlockMissingValues * block_missing_values)
{
if (isArray(input_column.type) && isArray(header_column.type))
{
ColumnWithTypeAndName nested_input_column;
const auto * array_input_column = checkAndGetColumn<ColumnArray>(input_column.column.get());
nested_input_column.column = array_input_column->getDataPtr();
nested_input_column.type = checkAndGetDataType<DataTypeArray>(input_column.type.get())->getNestedType();
ColumnWithTypeAndName nested_header_column;
nested_header_column.column = checkAndGetColumn<ColumnArray>(header_column.column.get())->getDataPtr();
nested_header_column.type = checkAndGetDataType<DataTypeArray>(header_column.type.get())->getNestedType();
if (!insertNullAsDefaultIfNeeded(nested_input_column, nested_header_column, 0, nullptr))
return false;
input_column.column = ColumnArray::create(nested_input_column.column, array_input_column->getOffsetsPtr());
input_column.type = std::make_shared<DataTypeArray>(std::move(nested_input_column.type));
return true;
}
if (isTuple(input_column.type) && isTuple(header_column.type))
{
const auto * tuple_input_column = checkAndGetColumn<ColumnTuple>(input_column.column.get());
const auto * tuple_input_type = checkAndGetDataType<DataTypeTuple>(input_column.type.get());
const auto * tuple_header_column = checkAndGetColumn<ColumnTuple>(header_column.column.get());
const auto * tuple_header_type = checkAndGetDataType<DataTypeTuple>(header_column.type.get());
if (tuple_input_type->getElements().size() != tuple_header_type->getElements().size())
return false;
Columns nested_input_columns;
nested_input_columns.reserve(tuple_input_type->getElements().size());
DataTypes nested_input_types;
nested_input_types.reserve(tuple_input_type->getElements().size());
bool changed = false;
for (size_t i = 0; i != tuple_input_type->getElements().size(); ++i)
{
ColumnWithTypeAndName nested_input_column;
nested_input_column.column = tuple_input_column->getColumnPtr(i);
nested_input_column.type = tuple_input_type->getElement(i);
ColumnWithTypeAndName nested_header_column;
nested_header_column.column = tuple_header_column->getColumnPtr(i);
nested_header_column.type = tuple_header_type->getElement(i);
changed |= insertNullAsDefaultIfNeeded(nested_input_column, nested_header_column, 0, nullptr);
nested_input_columns.push_back(std::move(nested_input_column.column));
nested_input_types.push_back(std::move(nested_input_column.type));
}
if (!changed)
return false;
input_column.column = ColumnTuple::create(std::move(nested_input_columns));
input_column.type = std::make_shared<DataTypeTuple>(std::move(nested_input_types));
return true;
}
if (isMap(input_column.type) && isMap(header_column.type))
{
ColumnWithTypeAndName nested_input_column;
nested_input_column.column = checkAndGetColumn<ColumnMap>(input_column.column.get())->getNestedColumnPtr();
nested_input_column.type = checkAndGetDataType<DataTypeMap>(input_column.type.get())->getNestedType();
ColumnWithTypeAndName nested_header_column;
nested_header_column.column = checkAndGetColumn<ColumnMap>(header_column.column.get())->getNestedColumnPtr();
nested_header_column.type = checkAndGetDataType<DataTypeMap>(header_column.type.get())->getNestedType();
if (!insertNullAsDefaultIfNeeded(nested_input_column, nested_header_column, 0, nullptr))
return false;
input_column.column = ColumnMap::create(std::move(nested_input_column.column));
input_column.type = std::make_shared<DataTypeMap>(std::move(nested_input_column.type));
return true;
}
if (!isNullableOrLowCardinalityNullable(input_column.type) || isNullableOrLowCardinalityNullable(header_column.type))
return;
return false;
if (block_missing_values)
{
@ -32,6 +112,8 @@ void insertNullAsDefaultIfNeeded(ColumnWithTypeAndName & input_column, const Col
const auto * lc_type = assert_cast<const DataTypeLowCardinality *>(input_column.type.get());
input_column.type = std::make_shared<DataTypeLowCardinality>(removeNullable(lc_type->getDictionaryType()));
}
return true;
}
}

View File

@ -5,6 +5,6 @@
namespace DB
{
void insertNullAsDefaultIfNeeded(ColumnWithTypeAndName & input_column, const ColumnWithTypeAndName & header_column, size_t column_i, BlockMissingValues * block_missing_values);
bool insertNullAsDefaultIfNeeded(ColumnWithTypeAndName & input_column, const ColumnWithTypeAndName & header_column, size_t column_i, BlockMissingValues * block_missing_values);
}

View File

@ -12,8 +12,9 @@ void registerWithNamesAndTypes(const std::string & base_format_name, RegisterWit
void markFormatWithNamesAndTypesSupportsSamplingColumns(const std::string & base_format_name, FormatFactory & factory)
{
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNames");
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNamesAndTypes");
auto setting_checker = [](const FormatSettings & settings){ return settings.with_names_use_header; };
factory.registerSubsetOfColumnsSupportChecker(base_format_name + "WithNames", setting_checker);
factory.registerSubsetOfColumnsSupportChecker(base_format_name + "WithNamesAndTypes", setting_checker);
}
}

View File

@ -33,6 +33,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnTuple.h>
@ -3275,14 +3276,40 @@ private:
{
return &ConvertImplGenericFromString<ColumnString>::execute;
}
else
else if (const auto * agg_type = checkAndGetDataType<DataTypeAggregateFunction>(from_type_untyped.get()))
{
if (cast_type == CastType::accurateOrNull)
return createToNullableColumnWrapper();
else
throw Exception(ErrorCodes::CANNOT_CONVERT_TYPE, "Conversion from {} to {} is not supported",
from_type_untyped->getName(), to_type->getName());
if (agg_type->getFunction()->haveSameStateRepresentation(*to_type->getFunction()))
{
return [function = to_type->getFunction()](
ColumnsWithTypeAndName & arguments,
const DataTypePtr & /* result_type */,
const ColumnNullable * /* nullable_source */,
size_t /*input_rows_count*/) -> ColumnPtr
{
const auto & argument_column = arguments.front();
const auto * col_agg = checkAndGetColumn<ColumnAggregateFunction>(argument_column.column.get());
if (col_agg)
{
auto new_col_agg = ColumnAggregateFunction::create(*col_agg);
new_col_agg->set(function);
return new_col_agg;
}
else
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Illegal column {} for function CAST AS AggregateFunction",
argument_column.column->getName());
}
};
}
}
if (cast_type == CastType::accurateOrNull)
return createToNullableColumnWrapper();
else
throw Exception(ErrorCodes::CANNOT_CONVERT_TYPE, "Conversion from {} to {} is not supported",
from_type_untyped->getName(), to_type->getName());
}
WrapperType createArrayWrapper(const DataTypePtr & from_type_untyped, const DataTypeArray & to_type) const
@ -4063,7 +4090,16 @@ private:
safe_convert_custom_types = to_type->getCustomName() && from_type_custom_name->getName() == to_type->getCustomName()->getName();
if (from_type->equals(*to_type) && safe_convert_custom_types)
return createIdentityWrapper(from_type);
{
/// We can only use identity conversion for DataTypeAggregateFunction when they are strictly equivalent.
if (typeid_cast<const DataTypeAggregateFunction *>(from_type.get()))
{
if (DataTypeAggregateFunction::strictEquals(from_type, to_type))
return createIdentityWrapper(from_type);
}
else
return createIdentityWrapper(from_type);
}
else if (WhichDataType(from_type).isNothing())
return createNothingWrapper(to_type.get());

View File

@ -140,18 +140,30 @@ struct NgramDistanceImpl
{
case 1:
res = 0;
memcpy(&res, pos, 1);
if constexpr (std::endian::native == std::endian::little)
memcpy(&res, pos, 1);
else
reverseMemcpy(reinterpret_cast<char*>(&res) + sizeof(CodePoint) - 1, pos, 1);
break;
case 2:
res = 0;
memcpy(&res, pos, 2);
if constexpr (std::endian::native == std::endian::little)
memcpy(&res, pos, 2);
else
reverseMemcpy(reinterpret_cast<char*>(&res) + sizeof(CodePoint) - 2, pos, 2);
break;
case 3:
res = 0;
memcpy(&res, pos, 3);
if constexpr (std::endian::native == std::endian::little)
memcpy(&res, pos, 3);
else
reverseMemcpy(reinterpret_cast<char*>(&res) + sizeof(CodePoint) - 3, pos, 3);
break;
default:
memcpy(&res, pos, 4);
if constexpr (std::endian::native == std::endian::little)
memcpy(&res, pos, 4);
else
reverseMemcpy(reinterpret_cast<char*>(&res) + sizeof(CodePoint) - 4, pos, 4);
}
/// This is not a really true case insensitive utf8. We zero the 5-th bit of every byte.

View File

@ -266,7 +266,6 @@ void executeQuery(
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast,
ContextPtr context,
@ -328,7 +327,6 @@ void executeQueryWithParallelReplicas(
stream_factory.header,
stream_factory.processed_stage,
main_table,
table_func_ptr,
new_context,
getThrottler(new_context),
std::move(scalars),

View File

@ -65,7 +65,6 @@ void executeQuery(
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast,
ContextPtr context,

View File

@ -1684,9 +1684,9 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
{
/// For input function we should check if input format supports reading subset of columns.
if (table_function_ptr->getName() == "input")
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(getInsertFormat());
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(getInsertFormat(), shared_from_this());
else
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns();
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns(shared_from_this());
}
if (use_columns_from_insert_query)

View File

@ -39,7 +39,6 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Interpreters/RewriteCountDistinctVisitor.h>
#include <Interpreters/RewriteUniqToCountVisitor.h>
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
#include <QueryPipeline/Pipe.h>
@ -422,12 +421,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
RewriteCountDistinctFunctionVisitor(data_rewrite_countdistinct).visit(query_ptr);
}
if (settings.optimize_uniq_to_count)
{
RewriteUniqToCountMatcher::Data data_rewrite_uniq_count;
RewriteUniqToCountVisitor(data_rewrite_uniq_count).visit(query_ptr);
}
JoinedTables joined_tables(getSubqueryContext(context), getSelectQuery(), options.with_all_cols, options_.is_create_parameterized_view);
bool got_storage_from_query = false;

View File

@ -590,8 +590,10 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
res.peak_memory_usage = thread_group->memory_tracker.getPeak();
if (get_thread_list)
{
res.thread_ids = thread_group->getInvolvedThreadIds();
res.peak_threads_usage = thread_group->getPeakThreadsUsage();
}
if (get_profile_events)
res.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_group->performance_counters.getPartiallyAtomicSnapshot());
}

View File

@ -67,6 +67,7 @@ struct QueryStatusInfo
/// Optional fields, filled by query
std::vector<UInt64> thread_ids;
size_t peak_threads_usage;
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters;
std::shared_ptr<Settings> query_settings;
std::string current_database;

View File

@ -118,6 +118,7 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes()
{"log_comment", std::make_shared<DataTypeString>()},
{"thread_ids", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
{"peak_threads_usage", std::make_shared<DataTypeUInt64>()},
{"ProfileEvents", std::make_shared<DataTypeMap>(low_cardinality_string, std::make_shared<DataTypeUInt64>())},
{"Settings", std::make_shared<DataTypeMap>(low_cardinality_string, low_cardinality_string)},
@ -230,6 +231,8 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(threads_array);
}
columns[i++]->insert(peak_threads_usage);
if (profile_counters)
{
auto * column = columns[i++].get();

View File

@ -91,6 +91,7 @@ struct QueryLogElement
String log_comment;
std::vector<UInt64> thread_ids;
UInt64 peak_threads_usage = 0;
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<Settings> query_settings;

View File

@ -1,163 +0,0 @@
#include <Interpreters/RewriteUniqToCountVisitor.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/parseQuery.h>
namespace DB
{
using Aliases = std::unordered_map<String, ASTPtr>;
namespace
{
bool matchFnUniq(String func_name)
{
auto name = Poco::toLower(func_name);
return name == "uniq" || name == "uniqHLL12" || name == "uniqExact" || name == "uniqTheta" || name == "uniqCombined"
|| name == "uniqCombined64";
}
bool expressionEquals(const ASTPtr & lhs, const ASTPtr & rhs, const Aliases & alias)
{
if (lhs->getTreeHash() == rhs->getTreeHash())
{
return true;
}
else
{
auto * lhs_idf = lhs->as<ASTIdentifier>();
auto * rhs_idf = rhs->as<ASTIdentifier>();
if (lhs_idf && rhs_idf)
{
/// compound identifiers, such as: <t.name, name>
if (lhs_idf->shortName() == rhs_idf->shortName())
return true;
/// translate alias
if (alias.find(lhs_idf->shortName()) != alias.end())
lhs_idf = alias.find(lhs_idf->shortName())->second->as<ASTIdentifier>();
if (alias.find(rhs_idf->shortName()) != alias.end())
rhs_idf = alias.find(rhs_idf->shortName())->second->as<ASTIdentifier>();
if (lhs_idf->shortName() == rhs_idf->shortName())
return true;
}
}
return false;
}
bool expressionListEquals(ASTExpressionList * lhs, ASTExpressionList * rhs, const Aliases & alias)
{
if (!lhs || !rhs)
return false;
if (lhs->children.size() != rhs->children.size())
return false;
for (size_t i = 0; i < lhs->children.size(); i++)
{
if (!expressionEquals(lhs->children[i], rhs->children[i], alias))
return false;
}
return true;
}
/// Test whether lhs contains all expressions in rhs.
bool expressionListContainsAll(ASTExpressionList * lhs, ASTExpressionList * rhs, const Aliases & alias)
{
if (!lhs || !rhs)
return false;
if (lhs->children.size() < rhs->children.size())
return false;
for (const auto & re : rhs->children)
{
auto predicate = [&re, &alias](ASTPtr & le) { return expressionEquals(le, re, alias); };
if (std::find_if(lhs->children.begin(), lhs->children.end(), predicate) == lhs->children.end())
return false;
}
return true;
}
}
void RewriteUniqToCountMatcher::visit(ASTPtr & ast, Data & /*data*/)
{
auto * selectq = ast->as<ASTSelectQuery>();
if (!selectq || !selectq->tables() || selectq->tables()->children.size() != 1)
return;
auto expr_list = selectq->select();
if (!expr_list || expr_list->children.size() != 1)
return;
auto * func = expr_list->children[0]->as<ASTFunction>();
if (!func || !matchFnUniq(func->name))
return;
if (selectq->tables()->as<ASTTablesInSelectQuery>()->children[0]->as<ASTTablesInSelectQueryElement>()->children.size() != 1)
return;
auto * table_expr = selectq->tables()
->as<ASTTablesInSelectQuery>()
->children[0]
->as<ASTTablesInSelectQueryElement>()
->children[0]
->as<ASTTableExpression>();
if (!table_expr || table_expr->children.size() != 1 || !table_expr->subquery)
return;
auto * subquery = table_expr->subquery->as<ASTSubquery>();
if (!subquery)
return;
auto * sub_selectq = subquery->children[0]
->as<ASTSelectWithUnionQuery>()->children[0]
->as<ASTExpressionList>()->children[0]
->as<ASTSelectQuery>();
if (!sub_selectq)
return;
auto sub_expr_list = sub_selectq->select();
if (!sub_expr_list)
return;
/// collect subquery select expressions alias
Aliases alias;
for (const auto & expr : sub_expr_list->children)
{
if (!expr->tryGetAlias().empty())
alias.insert({expr->tryGetAlias(), expr});
}
/// Whether query matches 'SELECT uniq(x ...) FROM (SELECT DISTINCT x ...)'
auto match_subquery_with_distinct = [&]() -> bool
{
if (!sub_selectq->distinct)
return false;
/// uniq expression list == subquery group by expression list
if (!expressionListEquals(func->children[0]->as<ASTExpressionList>(), sub_expr_list->as<ASTExpressionList>(), alias))
return false;
return true;
};
/// Whether query matches 'SELECT uniq(x ...) FROM (SELECT x ... GROUP BY x ...)'
auto match_subquery_with_group_by = [&]() -> bool
{
auto group_by = sub_selectq->groupBy();
if (!group_by)
return false;
/// uniq expression list == subquery group by expression list
if (!expressionListEquals(func->children[0]->as<ASTExpressionList>(), group_by->as<ASTExpressionList>(), alias))
return false;
/// subquery select expression list must contain all columns in uniq expression list
if (!expressionListContainsAll(sub_expr_list->as<ASTExpressionList>(), func->children[0]->as<ASTExpressionList>(), alias))
return false;
return true;
};
if (match_subquery_with_distinct() || match_subquery_with_group_by())
expr_list->children[0] = makeASTFunction("count");
}
}

View File

@ -1,30 +0,0 @@
#pragma once
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include "Interpreters/TreeRewriter.h"
namespace DB
{
class ASTFunction;
/** Optimize `uniq` into `count` over subquery.
* Example: 'SELECT uniq(x ...) FROM (SELECT DISTINCT x ...)' to
* Result: 'SELECT count() FROM (SELECT DISTINCT x ...)'
*
* Example: 'SELECT uniq(x ...) FROM (SELECT x ... GROUP BY x ...)' to
* Result: 'SELECT count() FROM (SELECT x ... GROUP BY x ...)'
*
* Note that we can rewrite all uniq variants except uniqUpTo.
*/
class RewriteUniqToCountMatcher
{
public:
struct Data {};
static void visit(ASTPtr & ast, Data &);
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
};
using RewriteUniqToCountVisitor = InDepthNodeVisitor<RewriteUniqToCountMatcher, true>;
}

View File

@ -61,10 +61,27 @@ std::vector<UInt64> ThreadGroup::getInvolvedThreadIds() const
return res;
}
void ThreadGroup::linkThread(UInt64 thread_it)
size_t ThreadGroup::getPeakThreadsUsage() const
{
std::lock_guard lock(mutex);
thread_ids.insert(thread_it);
return peak_threads_usage;
}
void ThreadGroup::linkThread(UInt64 thread_id)
{
std::lock_guard lock(mutex);
thread_ids.insert(thread_id);
++active_thread_count;
peak_threads_usage = std::max(peak_threads_usage, active_thread_count);
}
void ThreadGroup::unlinkThread()
{
std::lock_guard lock(mutex);
chassert(active_thread_count > 0);
--active_thread_count;
}
ThreadGroupPtr ThreadGroup::createForQuery(ContextPtr query_context_, std::function<void()> fatal_error_callback_)
@ -243,6 +260,8 @@ void ThreadStatus::detachFromGroup()
/// Extract MemoryTracker out from query and user context
memory_tracker.setParent(&total_memory_tracker);
thread_group->unlinkThread();
thread_group.reset();
query_id_from_query_context.clear();

View File

@ -241,6 +241,7 @@ addStatusInfoToQueryLogElement(QueryLogElement & element, const QueryStatusInfo
element.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
element.thread_ids = info.thread_ids;
element.peak_threads_usage = info.peak_threads_usage;
element.profile_counters = info.profile_counters;
/// We need to refresh the access info since dependent views might have added extra information, either during

View File

@ -850,14 +850,6 @@ static ColumnWithTypeAndName readColumnFromORCColumn(
if (skipped)
return {};
if (value_type_hint && !value_type_hint->equals(*value_column.type))
{
/// Cast value column to target type, because it can happen
/// that parsed type cannot be ClickHouse Map value type.
value_column.column = castColumn(value_column, value_type_hint);
value_column.type = value_type_hint;
}
auto offsets_column = readOffsetsFromORCListColumn(orc_map_column);
auto map_column = ColumnMap::create(key_column.column, value_column.column, offsets_column);
auto map_type = std::make_shared<DataTypeMap>(key_column.type, value_column.type);

View File

@ -143,12 +143,12 @@ std::optional<AggregateFunctionMatches> matchAggregateFunctions(
argument_types.clear();
const auto & candidate = info.aggregates[idx];
/// Note: this check is a bit strict.
/// We check that aggregate function names, argument types and parameters are equal.
/// In some cases it's possible only to check that states are equal,
/// e.g. for quantile(0.3)(...) and quantile(0.5)(...).
/// But also functions sum(...) and sumIf(...) will have equal states,
/// and we can't replace one to another from projection.
///
/// Note we already checked that aggregate function names are equal,
/// so that functions sum(...) and sumIf(...) with equal states will
/// not match.
if (!candidate.function->getStateType()->equals(*aggregate.function->getStateType()))
{
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Cannot match agg func {} vs {} by state {} vs {}",
@ -249,12 +249,24 @@ static void appendAggregateFunctions(
auto & input = inputs[match.description];
if (!input)
input = &proj_dag.addInput(match.description->column_name, std::move(type));
input = &proj_dag.addInput(match.description->column_name, type);
const auto * node = input;
if (node->result_name != aggregate.column_name)
node = &proj_dag.addAlias(*node, aggregate.column_name);
{
if (DataTypeAggregateFunction::strictEquals(type, node->result_type))
{
node = &proj_dag.addAlias(*node, aggregate.column_name);
}
else
{
/// Cast to aggregate types specified in query if it's not
/// strictly the same as the one specified in projection. This
/// is required to generate correct results during finalization.
node = &proj_dag.addCast(*node, type, aggregate.column_name);
}
}
proj_dag_outputs.push_back(node);
}

View File

@ -305,7 +305,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextMutablePtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
@ -318,7 +317,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
, coordinator(std::move(coordinator_))
, stage(std::move(stage_))
, main_table(std::move(main_table_))
, table_func_ptr(table_func_ptr_)
, context(context_)
, throttler(throttler_)
, scalars(scalars_)

View File

@ -74,7 +74,6 @@ public:
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextMutablePtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
@ -98,7 +97,6 @@ private:
ParallelReplicasReadingCoordinatorPtr coordinator;
QueryProcessingStage::Enum stage;
StorageID main_table;
ASTPtr table_func_ptr;
ContextMutablePtr context;
ThrottlerPtr throttler;
Scalars scalars;

View File

@ -838,9 +838,9 @@ private:
};
bool StorageHDFS::supportsSubsetOfColumns() const
bool StorageHDFS::supportsSubsetOfColumns(const ContextPtr & context_) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context_);
}
Pipe StorageHDFS::read(
@ -878,7 +878,7 @@ Pipe StorageHDFS::read(
});
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context_->getSettingsRef().optimize_count_from_files;

View File

@ -76,7 +76,7 @@ public:
/// Is is useful because column oriented formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context_) const;
bool supportsSubcolumns() const override { return true; }

View File

@ -65,7 +65,7 @@ public:
NamesAndTypesList getVirtuals() const override;
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns() const;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const override;

View File

@ -620,8 +620,6 @@ public:
/// NOTE: write-once also does not support INSERTs/merges/... for MergeTree
virtual bool isStaticStorage() const;
virtual bool supportsSubsetOfColumns() const { return false; }
/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
/// Used for:
/// - Simple count() optimization

View File

@ -40,7 +40,6 @@ bool MergePlainMergeTreeTask::executeStep()
if (merge_list_entry)
{
switcher.emplace((*merge_list_entry)->thread_group);
}
switch (state)

View File

@ -4245,7 +4245,7 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
}
void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
bool MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
{
DataPartPtr part_to_delete;
{
@ -4271,7 +4271,7 @@ void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
if (!it->unique())
LOG_WARNING(log, "Cannot immediately remove part {} because someone using it right now "
"usage counter {}", part_name_with_state, it->use_count());
return;
return false;
}
modifyPartState(it, DataPartState::Deleting);
@ -4296,6 +4296,7 @@ void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
removePartsFinally({part_to_delete});
LOG_TRACE(log, "Removed part {}", part_to_delete->name);
return true;
}

View File

@ -671,7 +671,7 @@ public:
void outdateUnexpectedPartAndCloneToDetached(const DataPartPtr & part);
/// If the part is Obsolete and not used by anybody else, immediately delete it from filesystem and remove from memory.
void tryRemovePartImmediately(DataPartPtr && part);
bool tryRemovePartImmediately(DataPartPtr && part);
/// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts but not from the disk.
/// If 'force' - don't wait for old_parts_lifetime.

View File

@ -61,6 +61,12 @@ MergeTreeReadTask::Readers MergeTreePrefetchedReadPool::PrefetechedReaders::get(
{
SCOPE_EXIT({ is_valid = false; });
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
/// First wait for completion of all futures.
for (auto & prefetch_future : prefetch_futures)
prefetch_future.wait();
/// Then rethrow first exception if any.
for (auto & prefetch_future : prefetch_futures)
prefetch_future.get();

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/MergeTree/InsertBlockInfo.h>
#include <Interpreters/PartLog.h>
#include "Common/Exception.h"
#include <Common/FailPoint.h>
#include <Common/ProfileEventsScope.h>
#include <Common/SipHash.h>
@ -44,6 +45,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int TABLE_IS_READ_ONLY;
extern const int QUERY_WAS_CANCELLED;
extern const int CHECKSUM_DOESNT_MATCH;
}
template<bool async_insert>
@ -801,8 +803,48 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
"Conflict block ids and block number lock should not "
"be empty at the same time for async inserts");
/// Information about the part.
storage.getCommitPartOps(ops, part, block_id_path);
if constexpr (!async_insert)
{
if (!existing_part_name.empty())
{
LOG_DEBUG(log, "Will check part {} checksums", existing_part_name);
try
{
NameSet unused;
/// if we found part in deduplication hashes part must exists on some replica
storage.checkPartChecksumsAndAddCommitOps(zookeeper, part, ops, existing_part_name, unused);
}
catch (const zkutil::KeeperException &)
{
throw;
}
catch (const Exception & ex)
{
if (ex.code() == ErrorCodes::CHECKSUM_DOESNT_MATCH)
{
LOG_INFO(
log,
"Block with ID {} has the same deduplication hash as other part {} on other replica, but checksums (which "
"include metadata files like columns.txt) doesn't match, will not write it locally",
block_id,
existing_part_name);
return;
}
throw;
}
}
else
{
/// Information about the part.
storage.getCommitPartOps(ops, part, block_id_path);
}
}
else
{
chassert(existing_part_name.empty());
storage.getCommitPartOps(ops, part, block_id_path);
}
/// It's important to create it outside of lock scope because
/// otherwise it can lock parts in destructor and deadlock is possible.

View File

@ -162,9 +162,9 @@ bool StorageS3Queue::supportsSubcolumns() const
return true;
}
bool StorageS3Queue::supportsSubsetOfColumns() const
bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context_, format_settings);
}
Pipe StorageS3Queue::read(
@ -187,7 +187,7 @@ Pipe StorageS3Queue::read(
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(local_context, query_info.query);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
@ -363,7 +363,7 @@ void StorageS3Queue::streamToViews()
// Create a stream for each consumer and join them in a union stream
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(s3queue_context, nullptr);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(getContext()), getVirtuals());
const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads;
auto pipe = Pipe(std::make_shared<StorageS3QueueSource>(

View File

@ -125,7 +125,7 @@ private:
};
std::shared_ptr<TaskContext> task;
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context_) const;
const UInt32 zk_create_table_retries = 1000;
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);

View File

@ -687,7 +687,7 @@ Pipe StorageAzureBlob::read(
query_info.query, virtual_columns, local_context, nullptr, local_context->getFileProgressCallback());
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
@ -792,9 +792,9 @@ bool StorageAzureBlob::supportsPartitionBy() const
return true;
}
bool StorageAzureBlob::supportsSubsetOfColumns() const
bool StorageAzureBlob::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context, format_settings);
}
bool StorageAzureBlob::prefersLargeBlocks() const

View File

@ -99,7 +99,7 @@ public:
bool supportsSubcolumns() const override { return true; }
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;
bool supportsTrivialCountOptimization() const override { return true; }

View File

@ -803,9 +803,9 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return columns;
}
bool StorageFile::supportsSubsetOfColumns() const
bool StorageFile::supportsSubsetOfColumns(const ContextPtr & context) const
{
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context, format_settings);
}
bool StorageFile::prefersLargeBlocks() const
@ -1433,7 +1433,7 @@ Pipe StorageFile::read(
if (progress_callback && !archive_info)
progress_callback(FileProgress(0, total_bytes_to_read));
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context->getSettingsRef().optimize_count_from_files;

View File

@ -74,7 +74,7 @@ public:
/// Is is useful because such formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;
bool supportsSubcolumns() const override { return true; }

View File

@ -237,9 +237,13 @@ void StorageMergeTree::read(
processed_stage);
ClusterProxy::executeQueryWithParallelReplicas(
query_plan, getStorageID(), /*remove_table_function_ptr*/ nullptr,
select_stream_factory, modified_query_ast,
local_context, query_info.storage_limits, cluster);
query_plan,
getStorageID(),
select_stream_factory,
modified_query_ast,
local_context,
query_info.storage_limits,
cluster);
}
else
{

View File

@ -981,6 +981,16 @@ void StorageReplicatedMergeTree::drop()
{
/// Session could expire, get it again
zookeeper = getZooKeeperIfTableShutDown();
auto lost_part_count_path = fs::path(zookeeper_path) / "lost_part_count";
Coordination::Stat lost_part_count_stat;
String lost_part_count_str;
if (zookeeper->tryGet(lost_part_count_path, lost_part_count_str, &lost_part_count_stat))
{
UInt64 lost_part_count = lost_part_count_str.empty() ? 0 : parse<UInt64>(lost_part_count_str);
if (lost_part_count > 0)
LOG_INFO(log, "Dropping table with non-zero lost_part_count equal to {}", lost_part_count);
}
dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings(), &has_metadata_in_zookeeper);
}
}
@ -1476,8 +1486,12 @@ void StorageReplicatedMergeTree::syncPinnedPartUUIDs()
}
}
void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper,
const DataPartPtr & part, Coordination::Requests & ops, String part_name, NameSet * absent_replicas_paths)
bool StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const DataPartPtr & part,
Coordination::Requests & ops,
String part_name,
NameSet & absent_replicas_paths)
{
if (part_name.empty())
part_name = part->name;
@ -1487,20 +1501,24 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
Strings replicas = zookeeper->getChildren(fs::path(zookeeper_path) / "replicas");
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
bool has_been_already_added = false;
bool part_found = false;
bool part_exists_on_our_replica = false;
for (const String & replica : replicas)
{
String current_part_path = fs::path(zookeeper_path) / "replicas" / replica / "parts" / part_name;
String part_zk_str;
if (!zookeeper->tryGet(current_part_path, part_zk_str))
{
if (absent_replicas_paths)
absent_replicas_paths->emplace(current_part_path);
absent_replicas_paths.emplace(current_part_path);
continue;
}
else
{
part_found = true;
if (replica == replica_name)
part_exists_on_our_replica = true;
}
ReplicatedMergeTreePartHeader replica_part_header;
if (part_zk_str.empty())
@ -1540,20 +1558,13 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
}
replica_part_header.getChecksums().checkEqual(local_part_header.getChecksums(), true);
if (replica == replica_name)
has_been_already_added = true;
/// If we verify checksums in "sequential manner" (i.e. recheck absence of checksums on other replicas when commit)
/// then it is enough to verify checksums on at least one replica since checksums on other replicas must be the same.
if (absent_replicas_paths)
{
absent_replicas_paths->clear();
break;
}
break;
}
if (!has_been_already_added)
if (part_found)
absent_replicas_paths.clear();
if (!part_exists_on_our_replica)
{
const auto storage_settings_ptr = getSettings();
String part_path = fs::path(replica_path) / "parts" / part_name;
@ -1578,6 +1589,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
LOG_WARNING(log, "checkPartAndAddToZooKeeper: node {} already exists. Will not commit any nodes.",
(fs::path(replica_path) / "parts" / part_name).string());
}
return part_found;
}
MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAndCommit(Transaction & transaction,
@ -1596,14 +1608,14 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
size_t zero_copy_lock_ops_size = ops.size();
/// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`.
checkPartChecksumsAndAddCommitOps(zookeeper, part, ops, part->name, &absent_part_paths_on_replicas);
bool part_found = checkPartChecksumsAndAddCommitOps(std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), part, ops, part->name, absent_part_paths_on_replicas);
/// Do not commit if the part is obsolete, we have just briefly checked its checksums
if (transaction.isEmpty())
return {};
/// Will check that the part did not suddenly appear on skipped replicas
if (!absent_part_paths_on_replicas.empty())
if (!part_found)
{
Coordination::Requests new_ops;
for (const String & part_path : absent_part_paths_on_replicas)
@ -1617,6 +1629,10 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
new_ops.insert(new_ops.end(), ops.begin(), ops.end());
ops = std::move(new_ops);
}
else
{
chassert(absent_part_paths_on_replicas.empty());
}
Coordination::Responses responses;
Coordination::Error e = zookeeper->tryMulti(ops, responses);
@ -5188,10 +5204,13 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
processed_stage);
ClusterProxy::executeQueryWithParallelReplicas(
query_plan, getStorageID(),
/* table_func_ptr= */ nullptr,
select_stream_factory, modified_query_ast,
local_context, query_info.storage_limits, parallel_replicas_cluster);
query_plan,
getStorageID(),
select_stream_factory,
modified_query_ast,
local_context,
query_info.storage_limits,
parallel_replicas_cluster);
}
void StorageReplicatedMergeTree::readLocalImpl(

View File

@ -631,8 +631,12 @@ private:
* Adds actions to `ops` that add data about the part into ZooKeeper.
* Call under lockForShare.
*/
void checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const DataPartPtr & part,
Coordination::Requests & ops, String part_name = "", NameSet * absent_replicas_paths = nullptr);
bool checkPartChecksumsAndAddCommitOps(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const DataPartPtr & part,
Coordination::Requests & ops,
String part_name,
NameSet & absent_replicas_paths);
String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const;

View File

@ -983,9 +983,9 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
}
}
bool StorageS3::supportsSubsetOfColumns() const
bool StorageS3::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context, format_settings);
}
bool StorageS3::prefersLargeBlocks() const
@ -1017,7 +1017,7 @@ Pipe StorageS3::read(
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;

View File

@ -388,7 +388,7 @@ private:
bool supportsSubcolumns() const override { return true; }
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;
bool prefersLargeBlocks() const override;

View File

@ -817,9 +817,9 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
return columns;
}
bool IStorageURLBase::supportsSubsetOfColumns() const
bool IStorageURLBase::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context, format_settings);
}
bool IStorageURLBase::prefersLargeBlocks() const
@ -846,7 +846,7 @@ Pipe IStorageURLBase::read(
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
bool is_url_with_globs = urlWithGlobs(uri);
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
if (distributed_processing)
{
@ -951,7 +951,7 @@ Pipe StorageURLWithFailover::read(
return uri_options;
});
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);

View File

@ -114,7 +114,7 @@ protected:
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;
bool supportsSubsetOfColumns() const override;
virtual bool supportsSubsetOfColumns(const ContextPtr & context) const;
bool prefersLargeBlocks() const override;

View File

@ -146,7 +146,7 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMet
compression_method);
}
bool StorageXDBC::supportsSubsetOfColumns() const
bool StorageXDBC::supportsSubsetOfColumns(const ContextPtr &) const
{
return true;
}

View File

@ -68,7 +68,7 @@ private:
Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const override;
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr &) const override;
};
}

View File

@ -76,7 +76,7 @@ public:
/// because we cannot determine which column from table correspond to this virtual column.
virtual std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const { return {}; }
virtual bool supportsReadingSubsetOfColumns() { return true; }
virtual bool supportsReadingSubsetOfColumns(const ContextPtr &) { return true; }
/// Create storage according to the query.
StoragePtr

View File

@ -32,9 +32,9 @@ String ITableFunctionFileLike::getFormatFromFirstArgument()
return FormatFactory::instance().getFormatFromFileName(filename, true);
}
bool ITableFunctionFileLike::supportsReadingSubsetOfColumns()
bool ITableFunctionFileLike::supportsReadingSubsetOfColumns(const ContextPtr & context)
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format, context);
}
void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, ContextPtr context)

View File

@ -27,7 +27,7 @@ public:
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
bool supportsReadingSubsetOfColumns() override;
bool supportsReadingSubsetOfColumns(const ContextPtr & context) override;
static size_t getMaxNumberOfArguments() { return 4; }

View File

@ -269,9 +269,9 @@ ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(Contex
return parseColumnsListFromString(configuration.structure, context);
}
bool TableFunctionAzureBlobStorage::supportsReadingSubsetOfColumns()
bool TableFunctionAzureBlobStorage::supportsReadingSubsetOfColumns(const ContextPtr & context)
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context);
}
StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const

View File

@ -49,7 +49,7 @@ public:
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
bool supportsReadingSubsetOfColumns() override;
bool supportsReadingSubsetOfColumns(const ContextPtr & context) override;
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override
{

View File

@ -331,9 +331,9 @@ ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context,
return parseColumnsListFromString(configuration.structure, context);
}
bool TableFunctionS3::supportsReadingSubsetOfColumns()
bool TableFunctionS3::supportsReadingSubsetOfColumns(const ContextPtr & context)
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context);
}
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const

View File

@ -47,7 +47,7 @@ public:
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
bool supportsReadingSubsetOfColumns() override;
bool supportsReadingSubsetOfColumns(const ContextPtr & context) override;
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override
{

View File

@ -4,6 +4,7 @@ import csv
import os
import time
from typing import Dict, List, Optional, Union
from collections import defaultdict
import logging
from github import Github
@ -11,6 +12,7 @@ from github.GithubObject import _NotSetType, NotSet as NotSet
from github.Commit import Commit
from github.CommitStatus import CommitStatus
from github.IssueComment import IssueComment
from github.PullRequest import PullRequest
from github.Repository import Repository
from ci_config import CI_CONFIG, REQUIRED_CHECKS, CHECK_DESCRIPTIONS, CheckDescription
@ -128,6 +130,27 @@ def post_commit_status(
logging.error("Failed to update the status comment, continue anyway")
STATUS_ICON_MAP = defaultdict(
str,
{
ERROR: "",
FAILURE: "",
PENDING: "",
SUCCESS: "",
},
)
def update_pr_status_label(pr: PullRequest, status: str) -> None:
new_label = "pr-status-" + STATUS_ICON_MAP[status]
for label in pr.get_labels():
if label.name == new_label:
return
if label.name.startswith("pr-status-"):
pr.remove_from_labels(label.name)
pr.add_to_labels(new_label)
def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
"""It adds or updates the comment status to all Pull Requests but for release
one, so the method does nothing for simple pushes and pull requests with
@ -167,6 +190,8 @@ def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
comment = ic
break
update_pr_status_label(pr, get_worst_state(statuses))
if comment is None:
pr.create_issue_comment(comment_body)
return
@ -180,33 +205,16 @@ def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
def generate_status_comment(pr_info: PRInfo, statuses: CommitStatuses) -> str:
"""The method generates the comment body, as well it updates the CI report"""
def beauty_state(state: str) -> str:
if state == SUCCESS:
return f"🟢 {state}"
if state == PENDING:
return f"🟡 {state}"
if state in [ERROR, FAILURE]:
return f"🔴 {state}"
return state
report_url = create_ci_report(pr_info, statuses)
worst_state = get_worst_state(statuses)
if not worst_state:
# Theoretically possible, although
# the function should not be used on empty statuses
worst_state = "The commit doesn't have the statuses yet"
else:
worst_state = f"The overall status of the commit is {beauty_state(worst_state)}"
comment_body = (
f"<!-- automatic status comment for PR #{pr_info.number} "
f"from {pr_info.head_name}:{pr_info.head_ref} -->\n"
f"This is an automated comment for commit {pr_info.sha} with "
f"description of existing statuses. It's updated for the latest CI running\n"
f"The full report is available [here]({report_url})\n"
f"{worst_state}\n\n<table>"
"<thead><tr><th>Check name</th><th>Description</th><th>Status</th></tr></thead>\n"
"<tbody>"
f"*This is an automated comment for commit {pr_info.sha} with "
f"description of existing statuses. It's updated for the latest CI running*\n\n"
f"[{STATUS_ICON_MAP[worst_state]} Click here]({report_url}) to open a full report in a separate page\n"
f"\n"
)
# group checks by the name to get the worst one per each
grouped_statuses = {} # type: Dict[CheckDescription, CommitStatuses]
@ -230,17 +238,46 @@ def generate_status_comment(pr_info: PRInfo, statuses: CommitStatuses) -> str:
else:
grouped_statuses[cd] = [status]
table_rows = [] # type: List[str]
table_header = (
"<table>\n"
"<thead><tr><th>Check name</th><th>Description</th><th>Status</th></tr></thead>\n"
"<tbody>\n"
)
table_footer = "<tbody>\n</table>\n"
details_header = "<details><summary>Successful checks</summary>\n"
details_footer = "</details>\n"
visible_table_rows = [] # type: List[str]
hidden_table_rows = [] # type: List[str]
for desc, gs in grouped_statuses.items():
table_rows.append(
state = get_worst_state(gs)
table_row = (
f"<tr><td>{desc.name}</td><td>{desc.description}</td>"
f"<td>{beauty_state(get_worst_state(gs))}</td></tr>\n"
f"<td>{STATUS_ICON_MAP[state]} {state}</td></tr>\n"
)
if state == SUCCESS:
hidden_table_rows.append(table_row)
else:
visible_table_rows.append(table_row)
table_rows.sort()
result = [comment_body]
comment_footer = "</table>"
return "".join([comment_body, *table_rows, comment_footer])
if hidden_table_rows:
hidden_table_rows.sort()
result.append(details_header)
result.append(table_header)
result.extend(hidden_table_rows)
result.append(table_footer)
result.append(details_footer)
if visible_table_rows:
visible_table_rows.sort()
result.append(table_header)
result.extend(visible_table_rows)
result.append(table_footer)
return "".join(result)
def get_worst_state(statuses: CommitStatuses) -> str:
@ -252,10 +289,15 @@ def create_ci_report(pr_info: PRInfo, statuses: CommitStatuses) -> str:
to S3 tests bucket. Then it returns the URL"""
test_results = [] # type: TestResults
for status in statuses:
log_urls = None
log_urls = []
if status.target_url is not None:
log_urls = [status.target_url]
test_results.append(TestResult(status.context, status.state, log_urls=log_urls))
log_urls.append(status.target_url)
raw_logs = status.description or None
test_results.append(
TestResult(
status.context, status.state, log_urls=log_urls, raw_logs=raw_logs
)
)
return upload_results(
S3Helper(), pr_info.number, pr_info.sha, test_results, [], CI_STATUS_NAME
)

View File

@ -110,8 +110,8 @@ p.links a {{ padding: 5px; margin: 3px; background: var(--menu-background); line
p.links a:hover {{ background: var(--menu-hover-background); color: var(--menu-hover-color); }}
th {{ cursor: pointer; }}
tr:hover {{ filter: var(--tr-hover-filter); }}
.failed {{ cursor: pointer; }}
.failed-content {{ display: none; }}
.expandable {{ cursor: pointer; }}
.expandable-content {{ display: none; }}
#fish {{ display: none; float: right; position: relative; top: -20em; right: 2vw; margin-bottom: -20em; width: 30vw; filter: brightness(7%); z-index: -1; }}
.themes {{
@ -148,7 +148,7 @@ FOOTER_HTML_TEMPLATE = """<img id="fish" src="https://presentations.clickhouse.c
const getCellValue = (tr, idx) => {{
var classes = tr.classList;
var elem = tr;
if (classes.contains("failed-content") || classes.contains("failed-content.open"))
if (classes.contains("expandable-content") || classes.contains("expandable-content.open"))
elem = tr.previousElementSibling;
return elem.children[idx].innerText || elem.children[idx].textContent;
}}
@ -164,9 +164,9 @@ FOOTER_HTML_TEMPLATE = """<img id="fish" src="https://presentations.clickhouse.c
.forEach(tr => table.appendChild(tr) );
}})));
Array.from(document.getElementsByClassName("failed")).forEach(tr => tr.addEventListener('click', function() {{
Array.from(document.getElementsByClassName("expandable")).forEach(tr => tr.addEventListener('click', function() {{
var content = this.nextElementSibling;
content.classList.toggle("failed-content");
content.classList.toggle("expandable-content");
}}));
let theme = 'dark';
@ -546,9 +546,8 @@ def create_test_html_report(
has_log_urls = True
row = []
has_error = test_result.status in ("FAIL", "NOT_FAILED")
if has_error and test_result.raw_logs is not None:
row.append('<tr class="failed">')
if test_result.raw_logs is not None:
row.append('<tr class="expandable">')
else:
row.append("<tr>")
row.append(f"<td>{test_result.name}</td>")
@ -557,6 +556,7 @@ def create_test_html_report(
# Allow to quickly scroll to the first failure.
fail_id = ""
has_error = test_result.status in ("FAIL", "NOT_FAILED")
if has_error:
num_fails = num_fails + 1
fail_id = f'id="fail{num_fails}" '
@ -578,11 +578,11 @@ def create_test_html_report(
colspan += 1
row.append("</tr>")
rows_part.append("".join(row))
rows_part.append("\n".join(row))
if test_result.raw_logs is not None:
raw_logs = escape(test_result.raw_logs)
row_raw_logs = (
'<tr class="failed-content">'
'<tr class="expandable-content">'
f'<td colspan="{colspan}"><pre>{raw_logs}</pre></td>'
"</tr>"
)

View File

@ -1,6 +1,185 @@
import io
import subprocess
import socket
import time
import typing as tp
import contextlib
import select
from kazoo.client import KazooClient
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.client import CommandRequest
def execute_keeper_client_query(
cluster: ClickHouseCluster, node: ClickHouseInstance, query: str
) -> str:
request = CommandRequest(
[
cluster.server_bin_path,
"keeper-client",
"--host",
str(cluster.get_instance_ip(node.name)),
"--port",
str(cluster.zookeeper_port),
"-q",
query,
],
stdin="",
)
return request.get_answer()
class KeeperException(Exception):
pass
class KeeperClient(object):
SEPARATOR = b"\a\a\a\a\n"
def __init__(self, bin_path: str, host: str, port: int):
self.bin_path = bin_path
self.host = host
self.port = port
self.proc = subprocess.Popen(
[
bin_path,
"keeper-client",
"--host",
host,
"--port",
str(port),
"--log-level",
"error",
"--tests-mode",
"--no-confirmation",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self.poller = select.epoll()
self.poller.register(self.proc.stdout)
self.poller.register(self.proc.stderr)
self._fd_nums = {
self.proc.stdout.fileno(): self.proc.stdout,
self.proc.stderr.fileno(): self.proc.stderr,
}
self.stopped = False
def execute_query(self, query: str, timeout: float = 60.0) -> str:
output = io.BytesIO()
self.proc.stdin.write(query.encode() + b"\n")
self.proc.stdin.flush()
events = self.poller.poll(timeout)
if not events:
raise TimeoutError(f"Keeper client returned no output")
for fd_num, event in events:
if event & (select.EPOLLIN | select.EPOLLPRI):
file = self._fd_nums[fd_num]
if file == self.proc.stdout:
while True:
chunk = file.readline()
if chunk.endswith(self.SEPARATOR):
break
output.write(chunk)
elif file == self.proc.stderr:
assert self.proc.stdout.readline() == self.SEPARATOR
raise KeeperException(self.proc.stderr.readline().strip().decode())
else:
raise ValueError(f"Failed to read from pipe. Flag {event}")
data = output.getvalue().strip().decode()
return data
def cd(self, path: str, timeout: float = 60.0):
self.execute_query(f"cd {path}", timeout)
def ls(self, path: str, timeout: float = 60.0) -> list[str]:
return self.execute_query(f"ls {path}", timeout).split(" ")
def create(self, path: str, value: str, timeout: float = 60.0):
self.execute_query(f"create {path} {value}", timeout)
def get(self, path: str, timeout: float = 60.0) -> str:
return self.execute_query(f"get {path}", timeout)
def exists(self, path: str, timeout: float = 60.0) -> bool:
return bool(int(self.execute_query(f"exists {path}", timeout)))
def stop(self):
if not self.stopped:
self.stopped = True
self.proc.communicate(b"exit\n", timeout=10.0)
def sync(self, path: str, timeout: float = 60.0):
self.execute_query(f"sync {path}", timeout)
def touch(self, path: str, timeout: float = 60.0):
self.execute_query(f"touch {path}", timeout)
def find_big_family(self, path: str, n: int = 10, timeout: float = 60.0) -> str:
return self.execute_query(f"find_big_family {path} {n}", timeout)
def find_super_nodes(self, threshold: int, timeout: float = 60.0) -> str:
return self.execute_query(f"find_super_nodes {threshold}", timeout)
def delete_stale_backups(self, timeout: float = 60.0) -> str:
return self.execute_query("delete_stale_backups", timeout)
def reconfig(
self,
joining: tp.Optional[str],
leaving: tp.Optional[str],
new_members: tp.Optional[str],
timeout: float = 60.0,
) -> str:
if bool(joining) + bool(leaving) + bool(new_members) != 1:
raise ValueError(
"Exactly one of joining, leaving or new_members must be specified"
)
if joining is not None:
operation = "add"
elif leaving is not None:
operation = "remove"
elif new_members is not None:
operation = "set"
else:
raise ValueError(
"At least one of joining, leaving or new_members must be specified"
)
return self.execute_query(
f"reconfig {operation} {joining or leaving or new_members}", timeout
)
@classmethod
@contextlib.contextmanager
def from_cluster(
cls, cluster: ClickHouseCluster, keeper_node: str, port: tp.Optional[int] = None
) -> "KeeperClient":
client = cls(
cluster.server_bin_path,
cluster.get_instance_ip(keeper_node),
port or cluster.zookeeper_port,
)
try:
yield client
finally:
client.stop()
def get_keeper_socket(cluster, node, port=9181):
@ -70,14 +249,14 @@ def get_fake_zk(cluster, node, timeout: float = 30.0) -> KazooClient:
return _fake
def get_config_str(zk: KazooClient) -> str:
def get_config_str(zk: KeeperClient) -> str:
"""
Return decoded contents of /keeper/config node
"""
return zk.get("/keeper/config")[0].decode("utf-8")
return zk.get("/keeper/config")
def wait_configs_equal(left_config: str, right_zk: KazooClient, timeout: float = 30.0):
def wait_configs_equal(left_config: str, right_zk: KeeperClient, timeout: float = 30.0):
"""
Check whether get /keeper/config result in left_config is equal
to get /keeper/config on right_zk ZK connection.

View File

@ -1,7 +1,7 @@
import pytest
from helpers.client import CommandRequest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.keeper_utils import KeeperClient
cluster = ClickHouseCluster(__file__)
@ -24,39 +24,28 @@ def started_cluster():
cluster.shutdown()
def keeper_query(query: str):
return CommandRequest(
[
cluster.server_bin_path,
"keeper-client",
"--host",
str(cluster.get_instance_ip("zoo1")),
"--port",
str(cluster.zookeeper_port),
"-q",
query,
],
stdin="",
)
@pytest.fixture(scope="function")
def client(started_cluster):
with KeeperClient.from_cluster(cluster, "zoo1") as keeper_client:
yield keeper_client
def test_big_family():
command = keeper_query(
"touch test_big_family;"
"touch test_big_family/1;"
"touch test_big_family/1/1;"
"touch test_big_family/1/2;"
"touch test_big_family/1/3;"
"touch test_big_family/1/4;"
"touch test_big_family/1/5;"
"touch test_big_family/2;"
"touch test_big_family/2/1;"
"touch test_big_family/2/2;"
"touch test_big_family/2/3;"
"find_big_family test_big_family;"
)
def test_big_family(client: KeeperClient):
client.touch("/test_big_family")
client.touch("/test_big_family/1")
client.touch("/test_big_family/1/1")
client.touch("/test_big_family/1/2")
client.touch("/test_big_family/1/3")
client.touch("/test_big_family/1/4")
client.touch("/test_big_family/1/5")
client.touch("/test_big_family/2")
client.touch("/test_big_family/2/1")
client.touch("/test_big_family/2/2")
client.touch("/test_big_family/2/3")
assert command.get_answer() == TSV(
response = client.find_big_family("/test_big_family")
assert response == TSV(
[
["/test_big_family/1", "5"],
["/test_big_family/2", "3"],
@ -71,34 +60,33 @@ def test_big_family():
]
)
command = keeper_query("find_big_family test_big_family 1;")
response = client.find_big_family("/test_big_family", 1)
assert command.get_answer() == TSV(
assert response == TSV(
[
["/test_big_family/1", "5"],
]
)
def test_find_super_nodes():
command = keeper_query(
"touch test_find_super_nodes;"
"touch test_find_super_nodes/1;"
"touch test_find_super_nodes/1/1;"
"touch test_find_super_nodes/1/2;"
"touch test_find_super_nodes/1/3;"
"touch test_find_super_nodes/1/4;"
"touch test_find_super_nodes/1/5;"
"touch test_find_super_nodes/2;"
"touch test_find_super_nodes/2/1;"
"touch test_find_super_nodes/2/2;"
"touch test_find_super_nodes/2/3;"
"touch test_find_super_nodes/2/4;"
"cd test_find_super_nodes;"
"find_super_nodes 4;"
)
def test_find_super_nodes(client: KeeperClient):
client.touch("/test_find_super_nodes")
client.touch("/test_find_super_nodes/1")
client.touch("/test_find_super_nodes/1/1")
client.touch("/test_find_super_nodes/1/2")
client.touch("/test_find_super_nodes/1/3")
client.touch("/test_find_super_nodes/1/4")
client.touch("/test_find_super_nodes/1/5")
client.touch("/test_find_super_nodes/2")
client.touch("/test_find_super_nodes/2/1")
client.touch("/test_find_super_nodes/2/2")
client.touch("/test_find_super_nodes/2/3")
client.touch("/test_find_super_nodes/2/4")
assert command.get_answer() == TSV(
client.cd("/test_find_super_nodes")
response = client.find_super_nodes(4)
assert response == TSV(
[
["/test_find_super_nodes/1", "5"],
["/test_find_super_nodes/2", "4"],
@ -106,41 +94,38 @@ def test_find_super_nodes():
)
def test_delete_stale_backups():
command = keeper_query(
"touch /clickhouse;"
"touch /clickhouse/backups;"
"touch /clickhouse/backups/1;"
"touch /clickhouse/backups/1/stage;"
"touch /clickhouse/backups/1/stage/alive123;"
"touch /clickhouse/backups/2;"
"touch /clickhouse/backups/2/stage;"
"touch /clickhouse/backups/2/stage/dead123;"
"delete_stale_backups;"
"y;"
"ls clickhouse/backups;"
)
def test_delete_stale_backups(client: KeeperClient):
client.touch("/clickhouse")
client.touch("/clickhouse/backups")
client.touch("/clickhouse/backups/1")
client.touch("/clickhouse/backups/1/stage")
client.touch("/clickhouse/backups/1/stage/alive123")
client.touch("/clickhouse/backups/2")
client.touch("/clickhouse/backups/2/stage")
client.touch("/clickhouse/backups/2/stage/dead123")
assert command.get_answer() == (
"You are going to delete all inactive backups in /clickhouse/backups. Continue?\n"
response = client.delete_stale_backups()
assert response == (
'Found backup "/clickhouse/backups/1", checking if it\'s active\n'
'Backup "/clickhouse/backups/1" is active, not going to delete\n'
'Found backup "/clickhouse/backups/2", checking if it\'s active\n'
'Backup "/clickhouse/backups/2" is not active, deleting it\n'
"1\n"
'Backup "/clickhouse/backups/2" is not active, deleting it'
)
def test_base_commands():
command = keeper_query(
"create test_create_zk_node1 testvalue1;"
"create test_create_zk_node_2 testvalue2;"
"get test_create_zk_node1;"
)
assert command.get_answer() == "testvalue1\n"
assert client.ls("/clickhouse/backups") == ["1"]
def test_four_letter_word_commands():
command = keeper_query("ruok")
assert command.get_answer() == "imok\n"
def test_base_commands(client: KeeperClient):
client.create("/test_create_zk_node1", "testvalue1")
client.create("/test_create_zk_node_2", "testvalue2")
assert client.get("/test_create_zk_node1") == "testvalue1"
client.create("/123", "1=2")
client.create("/123/321", "'foo;bar'")
assert client.get("/123") == "1=2"
assert client.get("/123/321") == "foo;bar"
def test_four_letter_word_commands(client: KeeperClient):
assert client.execute_query("ruok") == "imok"

View File

@ -1,11 +1,10 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.keeper_utils as ku
import os
from kazoo.client import KazooClient
from kazoo.exceptions import BadArgumentsException
import typing as tp
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
@ -19,11 +18,7 @@ part_of_cluster = "now this node is the part of cluster"
zk1, zk2, zk3 = None, None, None
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
@pytest.fixture(scope="module")
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
@ -43,21 +38,28 @@ def started_cluster():
yield cluster
finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3]:
if conn:
if conn is not None:
conn.stop()
conn.close()
cluster.shutdown()
def test_reconfig_add(started_cluster):
def create_client(node: ClickHouseInstance):
return ku.KeeperClient(
cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181
)
def test_reconfig_add():
"""
Add a node to another node. Then add another node to two.
"""
global zk1, zk2, zk3
zk1 = create_client(node1)
zk1 = get_fake_zk(node1)
config = ku.get_config_str(zk1)
config = zk1.get("/keeper/config")
print("Initial config", config)
assert len(config.split("\n")) == 1
@ -65,24 +67,20 @@ def test_reconfig_add(started_cluster):
assert "node2" not in config
assert "node3" not in config
with pytest.raises(BadArgumentsException):
with pytest.raises(ku.KeeperException):
# duplicate id with different endpoint
zk1.reconfig(joining="server.1=localhost:1337", leaving=None, new_members=None)
with pytest.raises(BadArgumentsException):
with pytest.raises(ku.KeeperException):
# duplicate endpoint
zk1.reconfig(joining="server.8=node1:9234", leaving=None, new_members=None)
for i in range(100):
zk1.create(f"/test_three_{i}", b"somedata")
zk1.create(f"/test_three_{i}", "somedata")
node2.start_clickhouse()
config, _ = zk1.reconfig(
joining="server.2=node2:9234", leaving=None, new_members=None
)
config = zk1.reconfig(joining="server.2=node2:9234", leaving=None, new_members=None)
ku.wait_until_connected(cluster, node2)
config = config.decode("utf-8")
print("After adding 2", config)
assert len(config.split("\n")) == 2
@ -90,12 +88,12 @@ def test_reconfig_add(started_cluster):
assert "node2" in config
assert "node3" not in config
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
ku.wait_configs_equal(config, zk2)
for i in range(100):
assert zk2.exists(f"/test_three_{i}") is not None
zk2.create(f"/test_three_{100 + i}", b"somedata")
assert zk2.exists(f"/test_three_{i}")
zk2.create(f"/test_three_{100 + i}", "somedata")
# Why not both?
# One node will process add_srv request, other will pull out updated config, apply
@ -107,23 +105,19 @@ def test_reconfig_add(started_cluster):
assert node2.contains_in_log(part_of_cluster)
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
zk1.sync("/test_three_0")
for i in range(200):
assert zk1.exists(f"/test_three_{i}") is not None
assert zk1.exists(f"/test_three_{i}")
for i in range(100):
zk2.create(f"/test_four_{i}", b"somedata")
zk2.create(f"/test_four_{i}", "somedata")
node3.start_clickhouse()
config, _ = zk2.reconfig(
joining="server.3=node3:9234", leaving=None, new_members=None
)
config = zk2.reconfig(joining="server.3=node3:9234", leaving=None, new_members=None)
ku.wait_until_connected(cluster, node3)
config = config.decode("utf-8")
print("After adding 3", config)
assert len(config.split("\n")) == 3
@ -131,25 +125,23 @@ def test_reconfig_add(started_cluster):
assert "node2" in config
assert "node3" in config
zk3 = get_fake_zk(node3)
zk3 = create_client(node3)
ku.wait_configs_equal(config, zk3)
for i in range(100):
assert zk3.exists(f"/test_four_{i}") is not None
zk3.create(f"/test_four_{100 + i}", b"somedata")
assert zk3.exists(f"/test_four_{i}")
zk3.create(f"/test_four_{100 + i}", "somedata")
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
zk1.sync("/test_four_0")
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
zk2.sync("/test_four_0")
for i in range(200):
assert zk1.exists(f"/test_four_{i}") is not None
assert zk2.exists(f"/test_four_{i}") is not None
assert zk1.exists(f"/test_four_{i}")
assert zk2.exists(f"/test_four_{i}")
assert node3.contains_in_log(part_of_cluster)

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3
import subprocess
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.keeper_utils as ku
import os
from kazoo.client import KazooClient
from kazoo.exceptions import BadVersionException, BadArgumentsException
import typing as tp
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
@ -23,16 +23,18 @@ def started_cluster():
cluster.start()
yield cluster
finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def create_client(node: ClickHouseInstance):
return ku.KeeperClient(
cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181
)
def test_reconfig_remove_followers_from_3(started_cluster):
@ -42,9 +44,9 @@ def test_reconfig_remove_followers_from_3(started_cluster):
Check that remaining node is in standalone mode.
"""
zk1 = get_fake_zk(node1)
config, _ = zk1.get("/keeper/config")
config = config.decode("utf-8")
global zk1, zk2, zk3
zk1 = create_client(node1)
config = zk1.get("/keeper/config")
print("Initial config", config)
assert len(config.split("\n")) == 3
@ -52,36 +54,33 @@ def test_reconfig_remove_followers_from_3(started_cluster):
assert "node2" in config
assert "node3" in config
with pytest.raises(BadVersionException):
zk1.reconfig(joining=None, leaving="1", new_members=None, from_config=20)
with pytest.raises(BadArgumentsException):
with pytest.raises(ValueError):
zk1.reconfig(joining=None, leaving=None, new_members=None)
with pytest.raises(BadArgumentsException):
with pytest.raises(ku.KeeperException):
# bulk reconfiguration is not supported
zk1.reconfig(joining=None, leaving=None, new_members="3")
with pytest.raises(BadArgumentsException):
with pytest.raises(ValueError):
zk1.reconfig(joining="1", leaving="1", new_members="3")
with pytest.raises(BadArgumentsException):
with pytest.raises(ku.KeeperException):
# at least one node must be left
zk1.reconfig(joining=None, leaving="1,2,3", new_members=None)
for i in range(100):
zk1.create(f"/test_two_{i}", b"somedata")
zk1.create(f"/test_two_{i}", "somedata")
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
zk2.sync("/test_two_0")
ku.wait_configs_equal(config, zk2)
zk3 = get_fake_zk(node3)
zk3 = create_client(node3)
zk3.sync("/test_two_0")
ku.wait_configs_equal(config, zk3)
for i in range(100):
assert zk2.exists(f"test_two_{i}") is not None
assert zk3.exists(f"test_two_{i}") is not None
assert zk2.exists(f"test_two_{i}")
assert zk3.exists(f"test_two_{i}")
config, _ = zk1.reconfig(joining=None, leaving="3", new_members=None)
config = config.decode("utf-8")
config = zk1.reconfig(joining=None, leaving="3", new_members=None)
print("After removing 3", config)
assert len(config.split("\n")) == 2
@ -90,35 +89,26 @@ def test_reconfig_remove_followers_from_3(started_cluster):
assert "node3" not in config
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
ku.wait_configs_equal(config, zk2)
for i in range(100):
assert zk2.exists(f"test_two_{i}") is not None
zk2.create(f"/test_two_{100 + i}", b"otherdata")
assert zk2.exists(f"test_two_{i}")
zk2.create(f"/test_two_{100 + i}", "otherdata")
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
zk1.sync("/test_two_0")
for i in range(200):
assert zk1.exists(f"test_two_{i}") is not None
with pytest.raises(Exception):
zk3.stop()
zk3.close()
zk3 = get_fake_zk(node3)
zk3.sync("/test_two_0")
assert zk1.exists(f"test_two_{i}")
assert node3.contains_in_log(log_msg_removed)
for i in range(100):
zk2.create(f"/test_two_{200 + i}", b"otherdata")
zk2.create(f"/test_two_{200 + i}", "otherdata")
config, _ = zk1.reconfig(joining=None, leaving="2", new_members=None)
config = config.decode("utf-8")
config = zk1.reconfig(joining=None, leaving="2", new_members=None)
print("After removing 2", config)
assert len(config.split("\n")) == 1
@ -127,19 +117,12 @@ def test_reconfig_remove_followers_from_3(started_cluster):
assert "node3" not in config
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
zk1.sync("/test_two_0")
for i in range(300):
assert zk1.exists(f"test_two_{i}") is not None
with pytest.raises(Exception):
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2.sync("/test_two_0")
assert zk1.exists(f"test_two_{i}")
assert not node1.contains_in_log(log_msg_removed)
assert node2.contains_in_log(log_msg_removed)
assert "Mode: standalone" in zk1.command(b"stat")
assert "Mode: standalone" in zk1.execute_query("stat")

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.keeper_utils as ku
import os
from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import BadVersionException, BadArgumentsException
import typing as tp
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
@ -26,49 +26,51 @@ def started_cluster():
cluster.start()
yield cluster
finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3, zk4, zk5]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def create_client(node: ClickHouseInstance):
return ku.KeeperClient(
cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181
)
def test_reconfig_remove_2_and_leader(started_cluster):
"""
Remove 2 followers from a cluster of 5. Remove leader from 3 nodes.
"""
global zk1, zk2, zk3, zk4, zk5
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
config = ku.get_config_str(zk1)
print("Initial config", config)
assert len(config.split("\n")) == 5
for i in range(100):
zk1.create(f"/test_two_{i}", b"somedata")
zk1.create(f"/test_two_{i}", "somedata")
zk4 = get_fake_zk(node4)
zk4 = create_client(node4)
zk4.sync("/test_two_0")
ku.wait_configs_equal(config, zk4)
zk5 = get_fake_zk(node5)
zk5 = create_client(node5)
zk5.sync("/test_two_0")
ku.wait_configs_equal(config, zk5)
for i in range(100):
assert zk4.exists(f"test_two_{i}") is not None
assert zk5.exists(f"test_two_{i}") is not None
assert zk4.exists(f"test_two_{i}")
assert zk5.exists(f"test_two_{i}")
zk4.create(f"/test_two_{100 + i}", b"otherdata")
zk4.create(f"/test_two_{100 + i}", "otherdata")
zk2 = get_fake_zk(node2)
config, _ = zk2.reconfig(joining=None, leaving="4,5", new_members=None)
config = config.decode("utf-8")
zk2 = create_client(node2)
config = zk2.reconfig(joining=None, leaving="4,5", new_members=None)
print("After removing 4,5", config)
assert len(config.split("\n")) == 3
@ -79,27 +81,14 @@ def test_reconfig_remove_2_and_leader(started_cluster):
assert "node5" not in config
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
zk1.sync("/test_two_0")
ku.wait_configs_equal(config, zk1)
for i in range(200):
assert zk1.exists(f"test_two_{i}") is not None
assert zk2.exists(f"test_two_{i}") is not None
with pytest.raises(Exception):
zk4.stop()
zk4.close()
zk4 = get_fake_zk(node4)
zk4.sync("/test_two_0")
with pytest.raises(Exception):
zk5.stop()
zk5.close()
zk5 = get_fake_zk(node5)
zk5.sync("/test_two_0")
assert zk1.exists(f"test_two_{i}")
assert zk2.exists(f"test_two_{i}")
assert not node1.contains_in_log(log_msg_removed)
assert not node2.contains_in_log(log_msg_removed)
@ -110,11 +99,10 @@ def test_reconfig_remove_2_and_leader(started_cluster):
assert ku.is_leader(cluster, node1)
for i in range(100):
zk1.create(f"/test_leader_{i}", b"somedata")
zk1.create(f"/test_leader_{i}", "somedata")
# when a leader gets a remove request, it must yield leadership
config, _ = zk1.reconfig(joining=None, leaving="1", new_members=None)
config = config.decode("utf-8")
config = zk1.reconfig(joining=None, leaving="1", new_members=None)
print("After removing 1 (leader)", config)
assert len(config.split("\n")) == 2
@ -125,24 +113,17 @@ def test_reconfig_remove_2_and_leader(started_cluster):
assert "node5" not in config
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
zk2.sync("/test_leader_0")
ku.wait_configs_equal(config, zk2)
zk3 = get_fake_zk(node3)
zk3 = create_client(node3)
zk3.sync("/test_leader_0")
ku.wait_configs_equal(config, zk3)
for i in range(100):
assert zk2.exists(f"test_leader_{i}") is not None
assert zk3.exists(f"test_leader_{i}") is not None
with pytest.raises(Exception):
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_leader_0")
assert zk2.exists(f"test_leader_{i}")
assert zk3.exists(f"test_leader_{i}")
assert node1.contains_in_log(log_msg_removed)
assert not node2.contains_in_log(log_msg_removed)

View File

@ -1,11 +1,10 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from os.path import join, dirname, realpath
import time
import helpers.keeper_utils as ku
from kazoo.client import KazooClient, KazooState
import typing as tp
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = join(dirname(realpath(__file__)), "configs")
@ -31,24 +30,26 @@ def started_cluster():
yield cluster
finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3, zk4]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def create_client(node: ClickHouseInstance):
return ku.KeeperClient(
cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181
)
def test_reconfig_replace_leader(started_cluster):
"""
Remove leader from a cluster of 3 and add a new node via two commands.
"""
zk1 = get_fake_zk(node1)
global zk1, zk2, zk3, zk4
zk1 = create_client(node1)
config = ku.get_config_str(zk1)
assert len(config.split("\n")) == 3
@ -58,23 +59,22 @@ def test_reconfig_replace_leader(started_cluster):
assert "node4" not in config
for i in range(100):
zk1.create(f"/test_four_{i}", b"somedata")
zk1.create(f"/test_four_{i}", "somedata")
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
zk2.sync("/test_four_0")
ku.wait_configs_equal(config, zk2)
zk3 = get_fake_zk(node3)
zk3 = create_client(node3)
zk3.sync("/test_four_0")
ku.wait_configs_equal(config, zk3)
for i in range(100):
assert zk2.exists(f"/test_four_{i}") is not None
assert zk3.exists(f"/test_four_{i}") is not None
assert zk2.exists(f"/test_four_{i}")
assert zk3.exists(f"/test_four_{i}")
assert ku.is_leader(cluster, node1)
config, _ = zk2.reconfig(joining=None, leaving="1", new_members=None)
config = config.decode("utf-8")
config = zk2.reconfig(joining=None, leaving="1", new_members=None)
print("After removing 1 (leader)", config)
assert len(config.split("\n")) == 2
@ -85,17 +85,8 @@ def test_reconfig_replace_leader(started_cluster):
ku.wait_configs_equal(config, zk2)
with pytest.raises(Exception):
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_four_0")
node4.start_clickhouse()
config, _ = zk2.reconfig(
joining="server.4=node4:9234", leaving=None, new_members=None
)
config = config.decode("utf-8")
config = zk2.reconfig(joining="server.4=node4:9234", leaving=None, new_members=None)
ku.wait_until_connected(cluster, node4)
print("After adding 4", config)
@ -105,22 +96,20 @@ def test_reconfig_replace_leader(started_cluster):
assert "node3" in config
assert "node4" in config
zk4 = get_fake_zk(node4)
zk4 = create_client(node4)
ku.wait_configs_equal(config, zk4)
for i in range(100):
assert zk4.exists(f"test_four_{i}") is not None
zk4.create(f"/test_four_{100 + i}", b"somedata")
assert zk4.exists(f"test_four_{i}")
zk4.create(f"/test_four_{100 + i}", "somedata")
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
zk2.sync("/test_four_0")
ku.wait_configs_equal(config, zk2)
zk3.stop()
zk3.close()
zk3 = get_fake_zk(node3)
zk3 = create_client(node3)
zk3.sync("/test_four_0")
ku.wait_configs_equal(config, zk3)

View File

@ -1,35 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,35 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,35 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,21 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>4</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server> <id>2</id> <hostname>node2</hostname> <port>9234</port> </server>
<server> <id>3</id> <hostname>node3</hostname> <port>9234</port> </server>
<server> <id>4</id> <hostname>node4</hostname> <port>9234</port> </server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,43 +0,0 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from os.path import join, dirname, realpath
import time
import helpers.keeper_utils as ku
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = join(dirname(realpath(__file__)), "configs")
node1 = cluster.add_instance("node1", main_configs=["configs/keeper1.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/keeper2.xml"])
node3 = cluster.add_instance("node3", main_configs=["configs/keeper3.xml"])
node4 = cluster.add_instance("node4", stay_alive=True)
zk1, zk2, zk3, zk4 = None, None, None, None
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node4.stop_clickhouse()
node4.copy_file_to_container(
join(CONFIG_DIR, "keeper4.xml"),
"/etc/clickhouse-server/config.d/keeper.xml",
)
yield cluster
finally:
for conn in [zk1, zk2, zk3, zk4]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)

View File

@ -1,8 +0,0 @@
<test>
<query>select uniq(number) from (select DISTINCT number from numbers(1000000))</query>
<query>select uniq(number) from (select number from numbers(1000000) group by number)</query>
<!--For new analyzer-->
<query>select uniq(number) from (select DISTINCT number from numbers(1000000)) SETTINGS allow_experimental_analyzer=1</query>
<query>select uniq(number) from (select number from numbers(1000000) group by number) SETTINGS allow_experimental_analyzer=1</query>
</test>

View File

@ -1,6 +1,7 @@
DROP TABLE IF EXISTS arraytest;
set allow_deprecated_syntax_for_merge_tree=1;
set input_format_null_as_default=0;
CREATE TABLE arraytest ( created_date Date DEFAULT toDate(created_at), created_at DateTime DEFAULT now(), strings Array(String) DEFAULT emptyArrayString()) ENGINE = MergeTree(created_date, cityHash64(created_at), (created_date, cityHash64(created_at)), 8192);
INSERT INTO arraytest (created_at, strings) VALUES (now(), ['aaaaa', 'bbbbb', 'ccccc']);

Some files were not shown because too many files have changed in this diff Show More