Merge branch 'master' into no-export-dynamic

This commit is contained in:
Alexey Milovidov 2023-07-24 02:20:33 +02:00
commit 10aa12d3b3
15 changed files with 143 additions and 89 deletions

View File

@ -744,11 +744,12 @@ try
[&]() -> std::vector<ProtocolServerMetrics>
{
std::vector<ProtocolServerMetrics> metrics;
metrics.reserve(servers_to_start_before_tables.size());
std::lock_guard lock(servers_lock);
metrics.reserve(servers_to_start_before_tables.size() + servers.size());
for (const auto & server : servers_to_start_before_tables)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
std::lock_guard lock(servers_lock);
for (const auto & server : servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
return metrics;
@ -1311,7 +1312,7 @@ try
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);
std::lock_guard lock(servers_lock);
updateServers(*config, server_pool, async_metrics, servers);
updateServers(*config, server_pool, async_metrics, servers, servers_to_start_before_tables);
}
global_context->updateStorageConfiguration(*config);
@ -1413,10 +1414,27 @@ try
}
for (auto & server : servers_to_start_before_tables)
{
server.start();
LOG_INFO(log, "Listening for {}", server.getDescription());
std::lock_guard lock(servers_lock);
/// We should start interserver communications before (and more imporant shutdown after) tables.
/// Because server can wait for a long-running queries (for example in tcp_handler) after interserver handler was already shut down.
/// In this case we will have replicated tables which are unable to send any parts to other replicas, but still can
/// communicate with zookeeper, execute merges, etc.
createInterserverServers(
config(),
interserver_listen_hosts,
listen_try,
server_pool,
async_metrics,
servers_to_start_before_tables,
/* start_servers= */ false);
for (auto & server : servers_to_start_before_tables)
{
server.start();
LOG_INFO(log, "Listening for {}", server.getDescription());
}
}
/// Initialize access storages.
@ -1536,10 +1554,13 @@ try
{
LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish.");
size_t current_connections = 0;
for (auto & server : servers_to_start_before_tables)
{
server.stop();
current_connections += server.currentConnections();
std::lock_guard lock(servers_lock);
for (auto & server : servers_to_start_before_tables)
{
server.stop();
current_connections += server.currentConnections();
}
}
if (current_connections)
@ -1712,7 +1733,7 @@ try
{
std::lock_guard lock(servers_lock);
createServers(config(), listen_hosts, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers);
createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers);
if (servers.empty())
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"No servers started (add valid listen_host and 'tcp_port' or 'http_port' "
@ -1970,7 +1991,6 @@ HTTPContextPtr Server::httpContext() const
void Server::createServers(
Poco::Util::AbstractConfiguration & config,
const Strings & listen_hosts,
const Strings & interserver_listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
@ -2192,6 +2212,23 @@ void Server::createServers(
httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
});
}
}
void Server::createInterserverServers(
Poco::Util::AbstractConfiguration & config,
const Strings & interserver_listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers,
bool start_servers)
{
const Settings & settings = global_context->getSettingsRef();
Poco::Timespan keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
/// Now iterate over interserver_listen_hosts
for (const auto & interserver_listen_host : interserver_listen_hosts)
@ -2240,14 +2277,14 @@ void Server::createServers(
#endif
});
}
}
void Server::updateServers(
Poco::Util::AbstractConfiguration & config,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers)
std::vector<ProtocolServerAdapter> & servers,
std::vector<ProtocolServerAdapter> & servers_to_start_before_tables)
{
Poco::Logger * log = &logger();
@ -2273,11 +2310,19 @@ void Server::updateServers(
Poco::Util::AbstractConfiguration & previous_config = latest_config ? *latest_config : this->config();
std::vector<ProtocolServerAdapter *> all_servers;
all_servers.reserve(servers.size() + servers_to_start_before_tables.size());
for (auto & server : servers)
all_servers.push_back(&server);
for (auto & server : servers_to_start_before_tables)
all_servers.push_back(&server);
for (auto * server : all_servers)
{
if (!server.isStopping())
if (!server->isStopping())
{
std::string port_name = server.getPortName();
std::string port_name = server->getPortName();
bool has_host = false;
bool is_http = false;
if (port_name.starts_with("protocols."))
@ -2315,27 +2360,29 @@ void Server::updateServers(
/// NOTE: better to compare using getPortName() over using
/// dynamic_cast<> since HTTPServer is also used for prometheus and
/// internal replication communications.
is_http = server.getPortName() == "http_port" || server.getPortName() == "https_port";
is_http = server->getPortName() == "http_port" || server->getPortName() == "https_port";
}
if (!has_host)
has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server.getListenHost()) != listen_hosts.end();
has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server->getListenHost()) != listen_hosts.end();
bool has_port = !config.getString(port_name, "").empty();
bool force_restart = is_http && !isSameConfiguration(previous_config, config, "http_handlers");
if (force_restart)
LOG_TRACE(log, "<http_handlers> had been changed, will reload {}", server.getDescription());
LOG_TRACE(log, "<http_handlers> had been changed, will reload {}", server->getDescription());
if (!has_host || !has_port || config.getInt(server.getPortName()) != server.portNumber() || force_restart)
if (!has_host || !has_port || config.getInt(server->getPortName()) != server->portNumber() || force_restart)
{
server.stop();
LOG_INFO(log, "Stopped listening for {}", server.getDescription());
server->stop();
LOG_INFO(log, "Stopped listening for {}", server->getDescription());
}
}
}
createServers(config, listen_hosts, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers= */ true);
createServers(config, listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers= */ true);
createInterserverServers(config, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers_to_start_before_tables, /* start_servers= */ true);
std::erase_if(servers, std::bind_front(check_server, ""));
std::erase_if(servers_to_start_before_tables, std::bind_front(check_server, ""));
}
}

View File

@ -102,6 +102,14 @@ private:
void createServers(
Poco::Util::AbstractConfiguration & config,
const Strings & listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers,
bool start_servers = false);
void createInterserverServers(
Poco::Util::AbstractConfiguration & config,
const Strings & interserver_listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
@ -113,7 +121,8 @@ private:
Poco::Util::AbstractConfiguration & config,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers);
std::vector<ProtocolServerAdapter> & servers,
std::vector<ProtocolServerAdapter> & servers_to_start_before_tables);
};
}

View File

@ -548,15 +548,17 @@ void ExpressionAnalyzer::getRootActionsForWindowFunctions(const ASTPtr & ast, bo
void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions, AggregateDescriptions & descriptions)
{
for (const ASTFunction * node : aggregates())
for (const ASTPtr & ast : aggregates())
{
const ASTFunction & node = typeid_cast<const ASTFunction &>(*ast);
AggregateDescription aggregate;
if (node->arguments)
getRootActionsNoMakeSet(node->arguments, actions);
if (node.arguments)
getRootActionsNoMakeSet(node.arguments, actions);
aggregate.column_name = node->getColumnName();
aggregate.column_name = node.getColumnName();
const ASTs & arguments = node->arguments ? node->arguments->children : ASTs();
const ASTs & arguments = node.arguments ? node.arguments->children : ASTs();
aggregate.argument_names.resize(arguments.size());
DataTypes types(arguments.size());
@ -568,7 +570,7 @@ void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions, Aggr
{
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown identifier '{}' in aggregate function '{}'",
name, node->formatForErrorMessage());
name, node.formatForErrorMessage());
}
types[i] = dag_node->result_type;
@ -576,8 +578,8 @@ void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions, Aggr
}
AggregateFunctionProperties properties;
aggregate.parameters = (node->parameters) ? getAggregateFunctionParametersArray(node->parameters, "", getContext()) : Array();
aggregate.function = AggregateFunctionFactory::instance().get(node->name, types, aggregate.parameters, properties);
aggregate.parameters = (node.parameters) ? getAggregateFunctionParametersArray(node.parameters, "", getContext()) : Array();
aggregate.function = AggregateFunctionFactory::instance().get(node.name, types, aggregate.parameters, properties);
descriptions.push_back(aggregate);
}
@ -744,12 +746,13 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr actions)
}
// Window functions
for (const ASTFunction * function_node : syntax->window_function_asts)
for (const ASTPtr & ast : syntax->window_function_asts)
{
assert(function_node->is_window_function);
const ASTFunction & function_node = typeid_cast<const ASTFunction &>(*ast);
assert(function_node.is_window_function);
WindowFunctionDescription window_function;
window_function.function_node = function_node;
window_function.function_node = &function_node;
window_function.column_name
= window_function.function_node->getColumnName();
window_function.function_parameters
@ -760,7 +763,7 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr actions)
// Requiring a constant reference to a shared pointer to non-const AST
// doesn't really look sane, but the visitor does indeed require it.
// Hence we clone the node (not very sane either, I know).
// Hence, we clone the node (not very sane either, I know).
getRootActionsNoMakeSet(window_function.function_node->clone(), actions);
const ASTs & arguments
@ -793,22 +796,22 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr actions)
// Find the window corresponding to this function. It may be either
// referenced by name and previously defined in WINDOW clause, or it
// may be defined inline.
if (!function_node->window_name.empty())
if (!function_node.window_name.empty())
{
auto it = window_descriptions.find(function_node->window_name);
auto it = window_descriptions.find(function_node.window_name);
if (it == std::end(window_descriptions))
{
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Window '{}' is not defined (referenced by '{}')",
function_node->window_name,
function_node->formatForErrorMessage());
function_node.window_name,
function_node.formatForErrorMessage());
}
it->second.window_functions.push_back(window_function);
}
else
{
const auto & definition = function_node->window_definition->as<
const auto & definition = function_node.window_definition->as<
const ASTWindowDefinition &>();
WindowDescription desc;
desc.window_name = definition.getDefaultWindowName();
@ -1323,10 +1326,13 @@ void SelectQueryExpressionAnalyzer::appendAggregateFunctionsArguments(Expression
GetAggregatesVisitor(data).visit(select_query->orderBy());
/// TODO: data.aggregates -> aggregates()
for (const ASTFunction * node : data.aggregates)
if (node->arguments)
for (auto & argument : node->arguments->children)
for (const ASTPtr & ast : data.aggregates)
{
const ASTFunction & node = typeid_cast<const ASTFunction &>(*ast);
if (node.arguments)
for (auto & argument : node.arguments->children)
getRootActions(argument, only_types, step.actions());
}
}
void SelectQueryExpressionAnalyzer::appendWindowFunctionsArguments(

View File

@ -168,7 +168,7 @@ protected:
const ConstStoragePtr & storage() const { return syntax->storage; } /// The main table in FROM clause, if exists.
const TableJoin & analyzedJoin() const { return *syntax->analyzed_join; }
const NamesAndTypesList & sourceColumns() const { return syntax->required_source_columns; }
const std::vector<const ASTFunction *> & aggregates() const { return syntax->aggregates; }
const ASTs & aggregates() const { return syntax->aggregates; }
/// Find global subqueries in the GLOBAL IN/JOIN sections. Fills in external_tables.
void initGlobalSubqueriesAndExternalTables(bool do_global, bool is_explain);

View File

@ -26,8 +26,8 @@ public:
// Explicit empty initializers are needed to make designated initializers
// work on GCC 10.
std::unordered_set<String> uniq_names {};
std::vector<const ASTFunction *> aggregates {};
std::vector<const ASTFunction *> window_functions {};
ASTs aggregates;
ASTs window_functions;
};
static bool needChildVisit(const ASTPtr & node, const ASTPtr & child)
@ -61,7 +61,7 @@ public:
}
private:
static void visit(const ASTFunction & node, const ASTPtr &, Data & data)
static void visit(const ASTFunction & node, const ASTPtr & ast, Data & data)
{
if (isAggregateFunction(node))
{
@ -74,7 +74,7 @@ private:
return;
data.uniq_names.insert(column_name);
data.aggregates.push_back(&node);
data.aggregates.push_back(ast);
}
else if (node.is_window_function)
{
@ -87,7 +87,7 @@ private:
return;
data.uniq_names.insert(column_name);
data.window_functions.push_back(&node);
data.window_functions.push_back(ast);
}
}

View File

@ -731,7 +731,7 @@ void expandGroupByAll(ASTSelectQuery * select_query)
select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, group_expression_list);
}
std::vector<const ASTFunction *> getAggregates(ASTPtr & query, const ASTSelectQuery & select_query)
ASTs getAggregates(ASTPtr & query, const ASTSelectQuery & select_query)
{
/// There can not be aggregate functions inside the WHERE and PREWHERE.
if (select_query.where())
@ -743,11 +743,12 @@ std::vector<const ASTFunction *> getAggregates(ASTPtr & query, const ASTSelectQu
GetAggregatesVisitor(data).visit(query);
/// There can not be other aggregate functions within the aggregate functions.
for (const ASTFunction * node : data.aggregates)
for (const ASTPtr & ast : data.aggregates)
{
if (node->arguments)
const ASTFunction & node = typeid_cast<const ASTFunction &>(*ast);
if (node.arguments)
{
for (auto & arg : node->arguments->children)
for (auto & arg : node.arguments->children)
{
assertNoAggregates(arg, "inside another aggregate function");
// We also can't have window functions inside aggregate functions,
@ -759,7 +760,7 @@ std::vector<const ASTFunction *> getAggregates(ASTPtr & query, const ASTSelectQu
return data.aggregates;
}
std::vector<const ASTFunction *> getWindowFunctions(ASTPtr & query, const ASTSelectQuery & select_query)
ASTs getWindowFunctions(ASTPtr & query, const ASTSelectQuery & select_query)
{
/// There can not be window functions inside the WHERE, PREWHERE and HAVING
if (select_query.having())
@ -777,20 +778,16 @@ std::vector<const ASTFunction *> getWindowFunctions(ASTPtr & query, const ASTSel
/// Window functions cannot be inside aggregates or other window functions.
/// Aggregate functions can be inside window functions because they are
/// calculated earlier.
for (const ASTFunction * node : data.window_functions)
for (const ASTPtr & ast : data.window_functions)
{
if (node->arguments)
{
for (auto & arg : node->arguments->children)
{
assertNoWindows(arg, "inside another window function");
}
}
const ASTFunction & node = typeid_cast<const ASTFunction &>(*ast);
if (node->window_definition)
{
assertNoWindows(node->window_definition, "inside window definition");
}
if (node.arguments)
for (auto & arg : node.arguments->children)
assertNoWindows(arg, "inside another window function");
if (node.window_definition)
assertNoWindows(node.window_definition, "inside window definition");
}
return data.window_functions;
@ -1357,8 +1354,8 @@ TreeRewriterResultPtr TreeRewriter::analyze(
GetAggregatesVisitor(data).visit(query);
/// There can not be other aggregate functions within the aggregate functions.
for (const ASTFunction * node : data.aggregates)
for (auto & arg : node->arguments->children)
for (const ASTPtr & node : data.aggregates)
for (auto & arg : typeid_cast<const ASTFunction &>(*node).arguments->children)
assertNoAggregates(arg, "inside another aggregate function");
result.aggregates = data.aggregates;
}

View File

@ -41,8 +41,8 @@ struct TreeRewriterResult
Aliases aliases;
std::vector<const ASTFunction *> aggregates;
std::vector<const ASTFunction *> window_function_asts;
ASTs aggregates;
ASTs window_function_asts;
ASTs expressions_with_window_function;
/// Which column is needed to be ARRAY-JOIN'ed to get the specified.

View File

@ -190,7 +190,7 @@ def clear_ip_tables_and_restart_daemons():
try:
logging.info("Killing all alive docker containers")
subprocess.check_output(
"timeout -s 9 10m docker ps --quiet | xargs --no-run-if-empty docker kill",
"timeout --signal=KILL 10m docker ps --quiet | xargs --no-run-if-empty docker kill",
shell=True,
)
except subprocess.CalledProcessError as err:
@ -199,7 +199,7 @@ def clear_ip_tables_and_restart_daemons():
try:
logging.info("Removing all docker containers")
subprocess.check_output(
"timeout -s 9 10m docker ps --all --quiet | xargs --no-run-if-empty docker rm --force",
"timeout --signal=KILL 10m docker ps --all --quiet | xargs --no-run-if-empty docker rm --force",
shell=True,
)
except subprocess.CalledProcessError as err:
@ -321,7 +321,7 @@ class ClickhouseIntegrationTestsRunner:
cmd = (
"cd {repo_path}/tests/integration && "
"timeout -s 9 1h ./runner {runner_opts} {image_cmd} --pre-pull --command '{command}' ".format(
"timeout --signal=KILL 1h ./runner {runner_opts} {image_cmd} --pre-pull --command '{command}' ".format(
repo_path=repo_path,
runner_opts=self._get_runner_opts(),
image_cmd=image_cmd,
@ -433,9 +433,9 @@ class ClickhouseIntegrationTestsRunner:
out_file_full = os.path.join(self.result_path, "runner_get_all_tests.log")
cmd = (
"cd {repo_path}/tests/integration && "
"timeout -s 9 1h ./runner {runner_opts} {image_cmd} -- --setup-plan "
"| tee {out_file_full} | grep '::' | sed 's/ (fixtures used:.*//g' | sed 's/^ *//g' | sed 's/ *$//g' "
"| grep -v 'SKIPPED' | sort -u > {out_file}".format(
"timeout --signal=KILL 1h ./runner {runner_opts} {image_cmd} -- --setup-plan "
"| tee '{out_file_full}' | grep -F '::' | sed -r 's/ \(fixtures used:.*//g; s/^ *//g; s/ *$//g' "
"| grep -v -F 'SKIPPED' | sort --unique > {out_file}".format(
repo_path=repo_path,
runner_opts=self._get_runner_opts(),
image_cmd=image_cmd,
@ -677,7 +677,7 @@ class ClickhouseIntegrationTestsRunner:
# -E -- (E)rror
# -p -- (p)assed
# -s -- (s)kipped
cmd = "cd {}/tests/integration && timeout -s 9 1h ./runner {} {} -t {} {} -- -rfEps --run-id={} --color=no --durations=0 {} | tee {}".format(
cmd = "cd {}/tests/integration && timeout --signal=KILL 1h ./runner {} {} -t {} {} -- -rfEps --run-id={} --color=no --durations=0 {} | tee {}".format(
repo_path,
self._get_runner_opts(),
image_cmd,

View File

@ -1,11 +1,10 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=none/g')
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS check;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE check (x UInt64, y UInt64 DEFAULT throwIf(x > 1500000)) ENGINE = Memory;"

View File

@ -1,11 +1,10 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=none/g')
$CLICKHOUSE_CLIENT --query="""
SELECT * FROM (SELECT number % 5 AS a, count() AS b, c FROM numbers(10)
ARRAY JOIN [1,2] AS c GROUP BY a,c) AS table

View File

@ -4,11 +4,10 @@
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=none/g')
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS t"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE t (x Int8) ENGINE = MergeTree ORDER BY tuple()"

View File

@ -2,9 +2,8 @@
# Tags: race
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=debug
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=debug/g')
${CLICKHOUSE_CLIENT} --query="SELECT logTrace('logTrace Function Test');" 2>&1 | grep -q "logTrace Function Test" && echo "OK" || echo "FAIL"

View File

@ -1,11 +1,10 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=none/g')
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS check;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE check (x UInt64) ENGINE = Memory;"

View File

@ -1,11 +1,11 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=trace
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
[ ! -z "$CLICKHOUSE_CLIENT_REDEFINED" ] && CLICKHOUSE_CLIENT=$CLICKHOUSE_CLIENT_REDEFINED
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=trace/g')
regexp="executeQuery|InterpreterSelectQuery"
$CLICKHOUSE_CLIENT --send_logs_source_regexp "$regexp" -q "SELECT 1;" 2> >(grep -v -E "$regexp" 1>&2)

View File

@ -12,6 +12,6 @@ opts=(
${CLICKHOUSE_CLIENT} "${opts[@]}" --multiquery --multiline --query """
DROP TABLE IF EXISTS mt ON CLUSTER test_shard_localhost;
DROP TABLE IF EXISTS wv ON CLUSTER test_shard_localhost;
CREATE TABLE mt ON CLUSTER test_shard_localhost (a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE TABLE mt ON CLUSTER test_shard_localhost (a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv ON CLUSTER test_shard_localhost TO input_deduplicated INNER ENGINE Memory WATERMARK=INTERVAL '1' SECOND AS SELECT count(a), hopStart(wid) AS w_start, hopEnd(wid) AS w_end FROM mt GROUP BY hop(timestamp, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS wid;
""" 2>&1 | grep -q -e "Code: 344" -e "Code: 60" && echo 'ok' || echo 'fail' ||: