mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 04:12:19 +00:00
Merge branch 'master' into mvcc_prototype
This commit is contained in:
commit
8290ffa88d
@ -164,6 +164,8 @@ Checks: '-*,
|
|||||||
clang-analyzer-unix.cstring.NullArg,
|
clang-analyzer-unix.cstring.NullArg,
|
||||||
|
|
||||||
boost-use-to-string,
|
boost-use-to-string,
|
||||||
|
|
||||||
|
alpha.security.cert.env.InvalidPtr,
|
||||||
'
|
'
|
||||||
WarningsAsErrors: '*'
|
WarningsAsErrors: '*'
|
||||||
|
|
||||||
|
@ -226,7 +226,6 @@ quit
|
|||||||
--receive_data_timeout_ms=10000 \
|
--receive_data_timeout_ms=10000 \
|
||||||
--stacktrace \
|
--stacktrace \
|
||||||
--query-fuzzer-runs=1000 \
|
--query-fuzzer-runs=1000 \
|
||||||
--testmode \
|
|
||||||
--queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) \
|
--queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) \
|
||||||
$NEW_TESTS_OPT \
|
$NEW_TESTS_OPT \
|
||||||
> >(tail -n 100000 > fuzzer.log) \
|
> >(tail -n 100000 > fuzzer.log) \
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
# docker build -t clickhouse/mysql-js-client .
|
# docker build -t clickhouse/mysql-js-client .
|
||||||
# MySQL JavaScript client docker container
|
# MySQL JavaScript client docker container
|
||||||
|
|
||||||
FROM node:8
|
FROM node:16.14.2
|
||||||
|
|
||||||
|
WORKDIR /usr/app
|
||||||
|
|
||||||
RUN npm install mysql
|
RUN npm install mysql
|
||||||
|
|
||||||
COPY ./test.js test.js
|
COPY ./test.js ./test.js
|
||||||
|
@ -348,13 +348,13 @@ then
|
|||||||
rm -f /test_output/tmp
|
rm -f /test_output/tmp
|
||||||
|
|
||||||
# OOM
|
# OOM
|
||||||
zgrep -Fa " <Fatal> Application: Child process was terminated by signal 9" /var/log/clickhouse-server/clickhouse-server.log > /dev/null \
|
zgrep -Fa " <Fatal> Application: Child process was terminated by signal 9" /var/log/clickhouse-server/clickhouse-server.log* > /dev/null \
|
||||||
&& echo -e 'Backward compatibility check: OOM killer (or signal 9) in clickhouse-server.log\tFAIL' >> /test_output/test_results.tsv \
|
&& echo -e 'Backward compatibility check: OOM killer (or signal 9) in clickhouse-server.log\tFAIL' >> /test_output/test_results.tsv \
|
||||||
|| echo -e 'Backward compatibility check: No OOM messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv
|
|| echo -e 'Backward compatibility check: No OOM messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv
|
||||||
|
|
||||||
# Logical errors
|
# Logical errors
|
||||||
echo "Check for Logical errors in server log:"
|
echo "Check for Logical errors in server log:"
|
||||||
zgrep -Fa -A20 "Code: 49, e.displayText() = DB::Exception:" /var/log/clickhouse-server/clickhouse-server.log > /test_output/bc_check_logical_errors.txt \
|
zgrep -Fa -A20 "Code: 49, e.displayText() = DB::Exception:" /var/log/clickhouse-server/clickhouse-server.log* > /test_output/bc_check_logical_errors.txt \
|
||||||
&& echo -e 'Backward compatibility check: Logical error thrown (see clickhouse-server.log or bc_check_logical_errors.txt)\tFAIL' >> /test_output/test_results.tsv \
|
&& echo -e 'Backward compatibility check: Logical error thrown (see clickhouse-server.log or bc_check_logical_errors.txt)\tFAIL' >> /test_output/test_results.tsv \
|
||||||
|| echo -e 'Backward compatibility check: No logical errors\tOK' >> /test_output/test_results.tsv
|
|| echo -e 'Backward compatibility check: No logical errors\tOK' >> /test_output/test_results.tsv
|
||||||
|
|
||||||
@ -362,13 +362,13 @@ then
|
|||||||
[ -s /test_output/bc_check_logical_errors.txt ] || rm /test_output/bc_check_logical_errors.txt
|
[ -s /test_output/bc_check_logical_errors.txt ] || rm /test_output/bc_check_logical_errors.txt
|
||||||
|
|
||||||
# Crash
|
# Crash
|
||||||
zgrep -Fa "########################################" /var/log/clickhouse-server/clickhouse-server.log > /dev/null \
|
zgrep -Fa "########################################" /var/log/clickhouse-server/clickhouse-server.log* > /dev/null \
|
||||||
&& echo -e 'Backward compatibility check: Killed by signal (in clickhouse-server.log)\tFAIL' >> /test_output/test_results.tsv \
|
&& echo -e 'Backward compatibility check: Killed by signal (in clickhouse-server.log)\tFAIL' >> /test_output/test_results.tsv \
|
||||||
|| echo -e 'Backward compatibility check: Not crashed\tOK' >> /test_output/test_results.tsv
|
|| echo -e 'Backward compatibility check: Not crashed\tOK' >> /test_output/test_results.tsv
|
||||||
|
|
||||||
# It also checks for crash without stacktrace (printed by watchdog)
|
# It also checks for crash without stacktrace (printed by watchdog)
|
||||||
echo "Check for Fatal message in server log:"
|
echo "Check for Fatal message in server log:"
|
||||||
zgrep -Fa " <Fatal> " /var/log/clickhouse-server/clickhouse-server.log > /test_output/bc_check_fatal_messages.txt \
|
zgrep -Fa " <Fatal> " /var/log/clickhouse-server/clickhouse-server.log* > /test_output/bc_check_fatal_messages.txt \
|
||||||
&& echo -e 'Backward compatibility check: Fatal message in clickhouse-server.log (see bc_check_fatal_messages.txt)\tFAIL' >> /test_output/test_results.tsv \
|
&& echo -e 'Backward compatibility check: Fatal message in clickhouse-server.log (see bc_check_fatal_messages.txt)\tFAIL' >> /test_output/test_results.tsv \
|
||||||
|| echo -e 'Backward compatibility check: No fatal messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv
|
|| echo -e 'Backward compatibility check: No fatal messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ toc_title: Adopters
|
|||||||
| <a href="https://city-mobil.ru" class="favicon">Citymobil</a> | Taxi | Analytics | — | — | [Blog Post in Russian, March 2020](https://habr.com/en/company/citymobil/blog/490660/) |
|
| <a href="https://city-mobil.ru" class="favicon">Citymobil</a> | Taxi | Analytics | — | — | [Blog Post in Russian, March 2020](https://habr.com/en/company/citymobil/blog/490660/) |
|
||||||
| <a href="https://cloudflare.com" class="favicon">Cloudflare</a> | CDN | Traffic analysis | 36 servers | — | [Blog post, May 2017](https://blog.cloudflare.com/how-cloudflare-analyzes-1m-dns-queries-per-second/), [Blog post, March 2018](https://blog.cloudflare.com/http-analytics-for-6m-requests-per-second-using-clickhouse/) |
|
| <a href="https://cloudflare.com" class="favicon">Cloudflare</a> | CDN | Traffic analysis | 36 servers | — | [Blog post, May 2017](https://blog.cloudflare.com/how-cloudflare-analyzes-1m-dns-queries-per-second/), [Blog post, March 2018](https://blog.cloudflare.com/http-analytics-for-6m-requests-per-second-using-clickhouse/) |
|
||||||
| <a href="https://corporate.comcast.com/" class="favicon">Comcast</a> | Media | CDN Traffic Analysis | — | — | [ApacheCon 2019 Talk](https://www.youtube.com/watch?v=e9TZ6gFDjNg) |
|
| <a href="https://corporate.comcast.com/" class="favicon">Comcast</a> | Media | CDN Traffic Analysis | — | — | [ApacheCon 2019 Talk](https://www.youtube.com/watch?v=e9TZ6gFDjNg) |
|
||||||
| <a href="https://contentsquare.com" class="favicon">ContentSquare</a> | Web analytics | Main product | — | — | [Blog post in French, November 2018](http://souslecapot.net/2018/11/21/patrick-chatain-vp-engineering-chez-contentsquare-penser-davantage-amelioration-continue-que-revolution-constante/) |
|
| <a href="https://contentsquare.com" class="favicon">Contentsquare</a> | Web analytics | Main product | — | — | [Blog post in French, November 2018](http://souslecapot.net/2018/11/21/patrick-chatain-vp-engineering-chez-contentsquare-penser-davantage-amelioration-continue-que-revolution-constante/) |
|
||||||
| <a href="https://coru.net/" class="favicon">Corunet</a> | Analytics | Main product | — | — | [Slides in English, April 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup21/predictive_models.pdf) |
|
| <a href="https://coru.net/" class="favicon">Corunet</a> | Analytics | Main product | — | — | [Slides in English, April 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup21/predictive_models.pdf) |
|
||||||
| <a href="https://www.creditx.com" class="favicon">CraiditX 氪信</a> | Finance AI | Analysis | — | — | [Slides in English, November 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup33/udf.pptx) |
|
| <a href="https://www.creditx.com" class="favicon">CraiditX 氪信</a> | Finance AI | Analysis | — | — | [Slides in English, November 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup33/udf.pptx) |
|
||||||
| <a href="https://crazypanda.ru/en/" class="favicon">Crazypanda</a> | Games | | — | — | Live session on ClickHouse meetup |
|
| <a href="https://crazypanda.ru/en/" class="favicon">Crazypanda</a> | Games | | — | — | Live session on ClickHouse meetup |
|
||||||
|
@ -27,7 +27,7 @@ toc_title: "\u30A2\u30C0\u30D7\u30BF\u30FC"
|
|||||||
| <a href="http://cisco.com/" class="favicon">Cisco</a> | ネットワーク | トラフィック分析 | — | — | [ライトニングトーク2019](https://youtu.be/-hI1vDR2oPY?t=5057) |
|
| <a href="http://cisco.com/" class="favicon">Cisco</a> | ネットワーク | トラフィック分析 | — | — | [ライトニングトーク2019](https://youtu.be/-hI1vDR2oPY?t=5057) |
|
||||||
| <a href="https://www.citadelsecurities.com/" class="favicon">Citadel Securities</a> | 金融 | — | — | — | [2019年の貢献](https://github.com/ClickHouse/ClickHouse/pull/4774) |
|
| <a href="https://www.citadelsecurities.com/" class="favicon">Citadel Securities</a> | 金融 | — | — | — | [2019年の貢献](https://github.com/ClickHouse/ClickHouse/pull/4774) |
|
||||||
| <a href="https://city-mobil.ru" class="favicon">シティモービル</a> | タクシー | 分析 | — | — | [ロシア語でのブログ投稿,月2020](https://habr.com/en/company/citymobil/blog/490660/) |
|
| <a href="https://city-mobil.ru" class="favicon">シティモービル</a> | タクシー | 分析 | — | — | [ロシア語でのブログ投稿,月2020](https://habr.com/en/company/citymobil/blog/490660/) |
|
||||||
| <a href="https://contentsquare.com" class="favicon">ContentSquare</a> | ウェブ分析 | 主な製品 | — | — | [フランス語でのブログ投稿,November2018](http://souslecapot.net/2018/11/21/patrick-chatain-vp-engineering-chez-contentsquare-penser-davantage-amelioration-continue-que-revolution-constante/) |
|
| <a href="https://contentsquare.com" class="favicon">Contentsquare</a> | ウェブ分析 | 主な製品 | — | — | [フランス語でのブログ投稿,November2018](http://souslecapot.net/2018/11/21/patrick-chatain-vp-engineering-chez-contentsquare-penser-davantage-amelioration-continue-que-revolution-constante/) |
|
||||||
| <a href="https://cloudflare.com" class="favicon">Cloudflare</a> | CDN | トラフィック分析 | 36台のサーバー | — | [ブログ投稿,月2017](https://blog.cloudflare.com/how-cloudflare-analyzes-1m-dns-queries-per-second/), [ブログ投稿,月2018](https://blog.cloudflare.com/http-analytics-for-6m-requests-per-second-using-clickhouse/) |
|
| <a href="https://cloudflare.com" class="favicon">Cloudflare</a> | CDN | トラフィック分析 | 36台のサーバー | — | [ブログ投稿,月2017](https://blog.cloudflare.com/how-cloudflare-analyzes-1m-dns-queries-per-second/), [ブログ投稿,月2018](https://blog.cloudflare.com/http-analytics-for-6m-requests-per-second-using-clickhouse/) |
|
||||||
| <a href="https://coru.net/" class="favicon">コルネット</a> | 分析 | 主な製品 | — | — | [2019年英語スライド](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup21/predictive_models.pdf) |
|
| <a href="https://coru.net/" class="favicon">コルネット</a> | 分析 | 主な製品 | — | — | [2019年英語スライド](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup21/predictive_models.pdf) |
|
||||||
| <a href="https://www.creditx.com" class="favicon">CraiditX 氪信</a> | ファイナンスAI | 分析 | — | — | [2019年のスライド](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup33/udf.pptx) |
|
| <a href="https://www.creditx.com" class="favicon">CraiditX 氪信</a> | ファイナンスAI | 分析 | — | — | [2019年のスライド](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup33/udf.pptx) |
|
||||||
|
@ -163,10 +163,24 @@ void Client::initialize(Poco::Util::Application & self)
|
|||||||
|
|
||||||
configReadClient(config(), home_path);
|
configReadClient(config(), home_path);
|
||||||
|
|
||||||
|
/** getenv is thread-safe in Linux glibc and in all sane libc implementations.
|
||||||
|
* But the standard does not guarantee that subsequent calls will not rewrite the value by returned pointer.
|
||||||
|
*
|
||||||
|
* man getenv:
|
||||||
|
*
|
||||||
|
* As typically implemented, getenv() returns a pointer to a string within the environment list.
|
||||||
|
* The caller must take care not to modify this string, since that would change the environment of
|
||||||
|
* the process.
|
||||||
|
*
|
||||||
|
* The implementation of getenv() is not required to be reentrant. The string pointed to by the return value of getenv()
|
||||||
|
* may be statically allocated, and can be modified by a subsequent call to getenv(), putenv(3), setenv(3), or unsetenv(3).
|
||||||
|
*/
|
||||||
|
|
||||||
const char * env_user = getenv("CLICKHOUSE_USER");
|
const char * env_user = getenv("CLICKHOUSE_USER");
|
||||||
const char * env_password = getenv("CLICKHOUSE_PASSWORD");
|
|
||||||
if (env_user)
|
if (env_user)
|
||||||
config().setString("user", env_user);
|
config().setString("user", env_user);
|
||||||
|
|
||||||
|
const char * env_password = getenv("CLICKHOUSE_PASSWORD");
|
||||||
if (env_password)
|
if (env_password)
|
||||||
config().setString("password", env_password);
|
config().setString("password", env_password);
|
||||||
|
|
||||||
|
@ -1494,24 +1494,19 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
|||||||
|
|
||||||
bool ClientBase::executeMultiQuery(const String & all_queries_text)
|
bool ClientBase::executeMultiQuery(const String & all_queries_text)
|
||||||
{
|
{
|
||||||
// It makes sense not to base any control flow on this, so that it is
|
|
||||||
// the same in tests and in normal usage. The only difference is that in
|
|
||||||
// normal mode we ignore the test hints.
|
|
||||||
const bool test_mode = config().has("testmode");
|
|
||||||
if (test_mode)
|
|
||||||
{
|
|
||||||
/// disable logs if expects errors
|
|
||||||
TestHint test_hint(test_mode, all_queries_text);
|
|
||||||
if (test_hint.clientError() || test_hint.serverError())
|
|
||||||
processTextAsSingleQuery("SET send_logs_level = 'fatal'");
|
|
||||||
}
|
|
||||||
|
|
||||||
bool echo_query = echo_queries;
|
bool echo_query = echo_queries;
|
||||||
|
|
||||||
/// Test tags are started with "--" so they are interpreted as comments anyway.
|
/// Test tags are started with "--" so they are interpreted as comments anyway.
|
||||||
/// But if the echo is enabled we have to remove the test tags from `all_queries_text`
|
/// But if the echo is enabled we have to remove the test tags from `all_queries_text`
|
||||||
/// because we don't want test tags to be echoed.
|
/// because we don't want test tags to be echoed.
|
||||||
size_t test_tags_length = test_mode ? getTestTagsLength(all_queries_text) : 0;
|
{
|
||||||
|
/// disable logs if expects errors
|
||||||
|
TestHint test_hint(all_queries_text);
|
||||||
|
if (test_hint.clientError() || test_hint.serverError())
|
||||||
|
processTextAsSingleQuery("SET send_logs_level = 'fatal'");
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t test_tags_length = getTestTagsLength(all_queries_text);
|
||||||
|
|
||||||
/// Several queries separated by ';'.
|
/// Several queries separated by ';'.
|
||||||
/// INSERT data is ended by the end of line, not ';'.
|
/// INSERT data is ended by the end of line, not ';'.
|
||||||
@ -1548,7 +1543,7 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
|
|||||||
// Try to find test hint for syntax error. We don't know where
|
// Try to find test hint for syntax error. We don't know where
|
||||||
// the query ends because we failed to parse it, so we consume
|
// the query ends because we failed to parse it, so we consume
|
||||||
// the entire line.
|
// the entire line.
|
||||||
TestHint hint(test_mode, String(this_query_begin, this_query_end - this_query_begin));
|
TestHint hint(String(this_query_begin, this_query_end - this_query_begin));
|
||||||
if (hint.serverError())
|
if (hint.serverError())
|
||||||
{
|
{
|
||||||
// Syntax errors are considered as client errors
|
// Syntax errors are considered as client errors
|
||||||
@ -1586,7 +1581,7 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
|
|||||||
// Look for the hint in the text of query + insert data + trailing
|
// Look for the hint in the text of query + insert data + trailing
|
||||||
// comments, e.g. insert into t format CSV 'a' -- { serverError 123 }.
|
// comments, e.g. insert into t format CSV 'a' -- { serverError 123 }.
|
||||||
// Use the updated query boundaries we just calculated.
|
// Use the updated query boundaries we just calculated.
|
||||||
TestHint test_hint(test_mode, full_query);
|
TestHint test_hint(full_query);
|
||||||
|
|
||||||
// Echo all queries if asked; makes for a more readable reference file.
|
// Echo all queries if asked; makes for a more readable reference file.
|
||||||
echo_query = test_hint.echoQueries().value_or(echo_query);
|
echo_query = test_hint.echoQueries().value_or(echo_query);
|
||||||
@ -2187,8 +2182,6 @@ void ClientBase::init(int argc, char ** argv)
|
|||||||
("suggestion_limit", po::value<int>()->default_value(10000),
|
("suggestion_limit", po::value<int>()->default_value(10000),
|
||||||
"Suggestion limit for how many databases, tables and columns to fetch.")
|
"Suggestion limit for how many databases, tables and columns to fetch.")
|
||||||
|
|
||||||
("testmode,T", "enable test hints in comments")
|
|
||||||
|
|
||||||
("format,f", po::value<std::string>(), "default output format")
|
("format,f", po::value<std::string>(), "default output format")
|
||||||
("vertical,E", "vertical output format, same as --format=Vertical or FORMAT Vertical or \\G at end of command")
|
("vertical,E", "vertical output format, same as --format=Vertical or FORMAT Vertical or \\G at end of command")
|
||||||
("highlight", po::value<bool>()->default_value(true), "enable or disable basic syntax highlight in interactive command line")
|
("highlight", po::value<bool>()->default_value(true), "enable or disable basic syntax highlight in interactive command line")
|
||||||
@ -2294,8 +2287,6 @@ void ClientBase::init(int argc, char ** argv)
|
|||||||
config().setBool("interactive", true);
|
config().setBool("interactive", true);
|
||||||
if (options.count("pager"))
|
if (options.count("pager"))
|
||||||
config().setString("pager", options["pager"].as<std::string>());
|
config().setString("pager", options["pager"].as<std::string>());
|
||||||
if (options.count("testmode"))
|
|
||||||
config().setBool("testmode", true);
|
|
||||||
|
|
||||||
if (options.count("log-level"))
|
if (options.count("log-level"))
|
||||||
Poco::Logger::root().setLevel(options["log-level"].as<std::string>());
|
Poco::Logger::root().setLevel(options["log-level"].as<std::string>());
|
||||||
|
@ -32,12 +32,9 @@ int parseErrorCode(DB::ReadBufferFromString & in)
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
TestHint::TestHint(bool enabled_, const String & query_)
|
TestHint::TestHint(const String & query_)
|
||||||
: query(query_)
|
: query(query_)
|
||||||
{
|
{
|
||||||
if (!enabled_)
|
|
||||||
return;
|
|
||||||
|
|
||||||
// Don't parse error hints in leading comments, because it feels weird.
|
// Don't parse error hints in leading comments, because it feels weird.
|
||||||
// Leading 'echo' hint is OK.
|
// Leading 'echo' hint is OK.
|
||||||
bool is_leading_hint = true;
|
bool is_leading_hint = true;
|
||||||
|
@ -7,7 +7,7 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
/// Checks expected server and client error codes in --testmode.
|
/// Checks expected server and client error codes.
|
||||||
///
|
///
|
||||||
/// The following comment hints are supported:
|
/// The following comment hints are supported:
|
||||||
///
|
///
|
||||||
@ -25,12 +25,12 @@ namespace DB
|
|||||||
///
|
///
|
||||||
/// Examples:
|
/// Examples:
|
||||||
///
|
///
|
||||||
/// - echo 'select / -- { clientError 62 }' | clickhouse-client --testmode -nm
|
/// - echo 'select / -- { clientError 62 }' | clickhouse-client -nm
|
||||||
///
|
///
|
||||||
// Here the client parses the query but it is incorrect, so it expects
|
// Here the client parses the query but it is incorrect, so it expects
|
||||||
/// SYNTAX_ERROR (62).
|
/// SYNTAX_ERROR (62).
|
||||||
///
|
///
|
||||||
/// - echo 'select foo -- { serverError 47 }' | clickhouse-client --testmode -nm
|
/// - echo 'select foo -- { serverError 47 }' | clickhouse-client -nm
|
||||||
///
|
///
|
||||||
/// But here the query is correct, but there is no such column "foo", so it
|
/// But here the query is correct, but there is no such column "foo", so it
|
||||||
/// is UNKNOWN_IDENTIFIER server error.
|
/// is UNKNOWN_IDENTIFIER server error.
|
||||||
@ -43,7 +43,7 @@ namespace DB
|
|||||||
class TestHint
|
class TestHint
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
TestHint(bool enabled_, const String & query_);
|
TestHint(const String & query_);
|
||||||
|
|
||||||
int serverError() const { return server_error; }
|
int serverError() const { return server_error; }
|
||||||
int clientError() const { return client_error; }
|
int clientError() const { return client_error; }
|
||||||
|
@ -81,6 +81,14 @@
|
|||||||
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
|
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
|
||||||
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
|
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
|
||||||
M(PendingAsyncInsert, "Number of asynchronous inserts that are waiting for flush.") \
|
M(PendingAsyncInsert, "Number of asynchronous inserts that are waiting for flush.") \
|
||||||
|
M(KafkaConsumers, "Number of active Kafka consumers") \
|
||||||
|
M(KafkaConsumersWithAssignment, "Number of active Kafka consumers which have some partitions assigned.") \
|
||||||
|
M(KafkaProducers, "Number of active Kafka producer created") \
|
||||||
|
M(KafkaLibrdkafkaThreads, "Number of active librdkafka threads") \
|
||||||
|
M(KafkaBackgroundReads, "Number of background reads currently working (populating materialized views from Kafka)") \
|
||||||
|
M(KafkaConsumersInUse, "Number of consumers which are currently used by direct or background reads") \
|
||||||
|
M(KafkaWrites, "Number of currently running inserts to Kafka") \
|
||||||
|
M(KafkaAssignedPartitions, "Number of partitions Kafka tables currently assigned to") \
|
||||||
|
|
||||||
namespace CurrentMetrics
|
namespace CurrentMetrics
|
||||||
{
|
{
|
||||||
|
@ -112,6 +112,8 @@
|
|||||||
M(CompileExpressionsMicroseconds, "Total time spent for compilation of expressions to LLVM code.") \
|
M(CompileExpressionsMicroseconds, "Total time spent for compilation of expressions to LLVM code.") \
|
||||||
M(CompileExpressionsBytes, "Number of bytes used for expressions compilation.") \
|
M(CompileExpressionsBytes, "Number of bytes used for expressions compilation.") \
|
||||||
\
|
\
|
||||||
|
M(ExecuteShellCommand, "Number of shell command executions.") \
|
||||||
|
\
|
||||||
M(ExternalSortWritePart, "") \
|
M(ExternalSortWritePart, "") \
|
||||||
M(ExternalSortMerge, "") \
|
M(ExternalSortMerge, "") \
|
||||||
M(ExternalAggregationWritePart, "") \
|
M(ExternalAggregationWritePart, "") \
|
||||||
@ -295,6 +297,25 @@
|
|||||||
M(MergeTreeMetadataCacheHit, "Number of times the read of meta file was done from MergeTree metadata cache") \
|
M(MergeTreeMetadataCacheHit, "Number of times the read of meta file was done from MergeTree metadata cache") \
|
||||||
M(MergeTreeMetadataCacheMiss, "Number of times the read of meta file was not done from MergeTree metadata cache") \
|
M(MergeTreeMetadataCacheMiss, "Number of times the read of meta file was not done from MergeTree metadata cache") \
|
||||||
\
|
\
|
||||||
|
M(KafkaRebalanceRevocations, "Number of partition revocations (the first stage of consumer group rebalance)") \
|
||||||
|
M(KafkaRebalanceAssignments, "Number of partition assignments (the final stage of consumer group rebalance)") \
|
||||||
|
M(KafkaRebalanceErrors, "Number of failed consumer group rebalances") \
|
||||||
|
M(KafkaMessagesPolled, "Number of Kafka messages polled from librdkafka to ClickHouse") \
|
||||||
|
M(KafkaMessagesRead, "Number of Kafka messages already processed by ClickHouse") \
|
||||||
|
M(KafkaMessagesFailed, "Number of Kafka messages ClickHouse failed to parse") \
|
||||||
|
M(KafkaRowsRead, "Number of rows parsed from Kafka messages") \
|
||||||
|
M(KafkaRowsRejected, "Number of parsed rows which were later rejected (due to rebalances / errors or similar reasons). Those rows will be consumed again after the rebalance.") \
|
||||||
|
M(KafkaDirectReads, "Number of direct selects from Kafka tables since server start") \
|
||||||
|
M(KafkaBackgroundReads, "Number of background reads populating materialized views from Kafka since server start") \
|
||||||
|
M(KafkaCommits, "Number of successful commits of consumed offsets to Kafka (normally should be the same as KafkaBackgroundReads)") \
|
||||||
|
M(KafkaCommitFailures, "Number of failed commits of consumed offsets to Kafka (usually is a sign of some data duplication)") \
|
||||||
|
M(KafkaConsumerErrors, "Number of errors reported by librdkafka during polls") \
|
||||||
|
M(KafkaWrites, "Number of writes (inserts) to Kafka tables ") \
|
||||||
|
M(KafkaRowsWritten, "Number of rows inserted into Kafka tables") \
|
||||||
|
M(KafkaProducerFlushes, "Number of explicit flushes to Kafka producer") \
|
||||||
|
M(KafkaMessagesProduced, "Number of messages produced to Kafka") \
|
||||||
|
M(KafkaProducerErrors, "Number of errors during producing the messages to Kafka") \
|
||||||
|
\
|
||||||
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
|
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
|
||||||
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
|
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
|
||||||
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
|
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
|
||||||
|
@ -29,6 +29,11 @@ namespace
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event ExecuteShellCommand;
|
||||||
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -158,6 +163,7 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
|
|||||||
const Config & config)
|
const Config & config)
|
||||||
{
|
{
|
||||||
logCommand(filename, argv);
|
logCommand(filename, argv);
|
||||||
|
ProfileEvents::increment(ProfileEvents::ExecuteShellCommand);
|
||||||
|
|
||||||
#if !defined(USE_MUSL)
|
#if !defined(USE_MUSL)
|
||||||
/** Here it is written that with a normal call `vfork`, there is a chance of deadlock in multithreaded programs,
|
/** Here it is written that with a normal call `vfork`, there is a chance of deadlock in multithreaded programs,
|
||||||
|
@ -36,7 +36,7 @@ DataTypePtr recursiveRemoveLowCardinality(const DataTypePtr & type)
|
|||||||
element = recursiveRemoveLowCardinality(element);
|
element = recursiveRemoveLowCardinality(element);
|
||||||
|
|
||||||
if (tuple_type->haveExplicitNames())
|
if (tuple_type->haveExplicitNames())
|
||||||
return std::make_shared<DataTypeTuple>(elements, tuple_type->getElementNames(), tuple_type->serializeNames());
|
return std::make_shared<DataTypeTuple>(elements, tuple_type->getElementNames());
|
||||||
else
|
else
|
||||||
return std::make_shared<DataTypeTuple>(elements);
|
return std::make_shared<DataTypeTuple>(elements);
|
||||||
}
|
}
|
||||||
|
@ -64,8 +64,8 @@ static std::optional<Exception> checkTupleNames(const Strings & names)
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
DataTypeTuple::DataTypeTuple(const DataTypes & elems_, const Strings & names_, bool serialize_names_)
|
DataTypeTuple::DataTypeTuple(const DataTypes & elems_, const Strings & names_)
|
||||||
: elems(elems_), names(names_), have_explicit_names(true), serialize_names(serialize_names_)
|
: elems(elems_), names(names_), have_explicit_names(true)
|
||||||
{
|
{
|
||||||
size_t size = elems.size();
|
size_t size = elems.size();
|
||||||
if (names.size() != size)
|
if (names.size() != size)
|
||||||
@ -75,11 +75,6 @@ DataTypeTuple::DataTypeTuple(const DataTypes & elems_, const Strings & names_, b
|
|||||||
throw std::move(*exception);
|
throw std::move(*exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DataTypeTuple::canBeCreatedWithNames(const Strings & names)
|
|
||||||
{
|
|
||||||
return checkTupleNames(names) == std::nullopt;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string DataTypeTuple::doGetName() const
|
std::string DataTypeTuple::doGetName() const
|
||||||
{
|
{
|
||||||
size_t size = elems.size();
|
size_t size = elems.size();
|
||||||
@ -91,7 +86,7 @@ std::string DataTypeTuple::doGetName() const
|
|||||||
if (i != 0)
|
if (i != 0)
|
||||||
s << ", ";
|
s << ", ";
|
||||||
|
|
||||||
if (have_explicit_names && serialize_names)
|
if (have_explicit_names)
|
||||||
s << backQuoteIfNeed(names[i]) << ' ';
|
s << backQuoteIfNeed(names[i]) << ' ';
|
||||||
|
|
||||||
s << elems[i]->getName();
|
s << elems[i]->getName();
|
||||||
@ -206,7 +201,7 @@ bool DataTypeTuple::equals(const IDataType & rhs) const
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
for (size_t i = 0; i < size; ++i)
|
for (size_t i = 0; i < size; ++i)
|
||||||
if (!elems[i]->equals(*rhs_tuple.elems[i]))
|
if (!elems[i]->equals(*rhs_tuple.elems[i]) || names[i] != rhs_tuple.names[i])
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -265,31 +260,29 @@ size_t DataTypeTuple::getSizeOfValueInMemory() const
|
|||||||
SerializationPtr DataTypeTuple::doGetDefaultSerialization() const
|
SerializationPtr DataTypeTuple::doGetDefaultSerialization() const
|
||||||
{
|
{
|
||||||
SerializationTuple::ElementSerializations serializations(elems.size());
|
SerializationTuple::ElementSerializations serializations(elems.size());
|
||||||
bool use_explicit_names = have_explicit_names && serialize_names;
|
|
||||||
for (size_t i = 0; i < elems.size(); ++i)
|
for (size_t i = 0; i < elems.size(); ++i)
|
||||||
{
|
{
|
||||||
String elem_name = use_explicit_names ? names[i] : toString(i + 1);
|
String elem_name = have_explicit_names ? names[i] : toString(i + 1);
|
||||||
auto serialization = elems[i]->getDefaultSerialization();
|
auto serialization = elems[i]->getDefaultSerialization();
|
||||||
serializations[i] = std::make_shared<SerializationNamed>(serialization, elem_name);
|
serializations[i] = std::make_shared<SerializationNamed>(serialization, elem_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::make_shared<SerializationTuple>(std::move(serializations), use_explicit_names);
|
return std::make_shared<SerializationTuple>(std::move(serializations), have_explicit_names);
|
||||||
}
|
}
|
||||||
|
|
||||||
SerializationPtr DataTypeTuple::getSerialization(const SerializationInfo & info) const
|
SerializationPtr DataTypeTuple::getSerialization(const SerializationInfo & info) const
|
||||||
{
|
{
|
||||||
SerializationTuple::ElementSerializations serializations(elems.size());
|
SerializationTuple::ElementSerializations serializations(elems.size());
|
||||||
const auto & info_tuple = assert_cast<const SerializationInfoTuple &>(info);
|
const auto & info_tuple = assert_cast<const SerializationInfoTuple &>(info);
|
||||||
bool use_explicit_names = have_explicit_names && serialize_names;
|
|
||||||
|
|
||||||
for (size_t i = 0; i < elems.size(); ++i)
|
for (size_t i = 0; i < elems.size(); ++i)
|
||||||
{
|
{
|
||||||
String elem_name = use_explicit_names ? names[i] : toString(i + 1);
|
String elem_name = have_explicit_names ? names[i] : toString(i + 1);
|
||||||
auto serialization = elems[i]->getSerialization(*info_tuple.getElementInfo(i));
|
auto serialization = elems[i]->getSerialization(*info_tuple.getElementInfo(i));
|
||||||
serializations[i] = std::make_shared<SerializationNamed>(serialization, elem_name);
|
serializations[i] = std::make_shared<SerializationNamed>(serialization, elem_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::make_shared<SerializationTuple>(std::move(serializations), use_explicit_names);
|
return std::make_shared<SerializationTuple>(std::move(serializations), have_explicit_names);
|
||||||
}
|
}
|
||||||
|
|
||||||
MutableSerializationInfoPtr DataTypeTuple::createSerializationInfo(const SerializationInfo::Settings & settings) const
|
MutableSerializationInfoPtr DataTypeTuple::createSerializationInfo(const SerializationInfo::Settings & settings) const
|
||||||
|
@ -22,14 +22,11 @@ private:
|
|||||||
DataTypes elems;
|
DataTypes elems;
|
||||||
Strings names;
|
Strings names;
|
||||||
bool have_explicit_names;
|
bool have_explicit_names;
|
||||||
bool serialize_names = true;
|
|
||||||
public:
|
public:
|
||||||
static constexpr bool is_parametric = true;
|
static constexpr bool is_parametric = true;
|
||||||
|
|
||||||
explicit DataTypeTuple(const DataTypes & elems);
|
explicit DataTypeTuple(const DataTypes & elems);
|
||||||
DataTypeTuple(const DataTypes & elems, const Strings & names, bool serialize_names_ = true);
|
DataTypeTuple(const DataTypes & elems, const Strings & names);
|
||||||
|
|
||||||
static bool canBeCreatedWithNames(const Strings & names);
|
|
||||||
|
|
||||||
TypeIndex getTypeId() const override { return TypeIndex::Tuple; }
|
TypeIndex getTypeId() const override { return TypeIndex::Tuple; }
|
||||||
std::string doGetName() const override;
|
std::string doGetName() const override;
|
||||||
@ -66,7 +63,6 @@ public:
|
|||||||
String getNameByPosition(size_t i) const;
|
String getNameByPosition(size_t i) const;
|
||||||
|
|
||||||
bool haveExplicitNames() const { return have_explicit_names; }
|
bool haveExplicitNames() const { return have_explicit_names; }
|
||||||
bool serializeNames() const { return serialize_names; }
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2958,8 +2958,7 @@ private:
|
|||||||
/// For named tuples allow conversions for tuples with
|
/// For named tuples allow conversions for tuples with
|
||||||
/// different sets of elements. If element exists in @to_type
|
/// different sets of elements. If element exists in @to_type
|
||||||
/// and doesn't exist in @to_type it will be filled by default values.
|
/// and doesn't exist in @to_type it will be filled by default values.
|
||||||
if (from_type->haveExplicitNames() && from_type->serializeNames()
|
if (from_type->haveExplicitNames() && to_type->haveExplicitNames())
|
||||||
&& to_type->haveExplicitNames() && to_type->serializeNames())
|
|
||||||
{
|
{
|
||||||
const auto & from_names = from_type->getElementNames();
|
const auto & from_names = from_type->getElementNames();
|
||||||
std::unordered_map<String, size_t> from_positions;
|
std::unordered_map<String, size_t> from_positions;
|
||||||
|
@ -54,29 +54,12 @@ public:
|
|||||||
bool useDefaultImplementationForNulls() const override { return false; }
|
bool useDefaultImplementationForNulls() const override { return false; }
|
||||||
bool useDefaultImplementationForConstants() const override { return true; }
|
bool useDefaultImplementationForConstants() const override { return true; }
|
||||||
|
|
||||||
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
|
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
|
||||||
{
|
{
|
||||||
if (arguments.empty())
|
if (arguments.empty())
|
||||||
throw Exception("Function " + getName() + " requires at least one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
throw Exception("Function " + getName() + " requires at least one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||||
|
|
||||||
DataTypes types;
|
return std::make_shared<DataTypeTuple>(arguments);
|
||||||
Strings names;
|
|
||||||
|
|
||||||
for (const auto & argument : arguments)
|
|
||||||
{
|
|
||||||
types.emplace_back(argument.type);
|
|
||||||
names.emplace_back(argument.name);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create named tuple if possible. We don't print tuple element names
|
|
||||||
/// because they are bad anyway -- aliases are not used, e.g. tuple(1 a)
|
|
||||||
/// will have element name '1' and not 'a'. If we ever change this, and
|
|
||||||
/// add the ability to access tuple elements by name, like tuple(1 a).a,
|
|
||||||
/// we should probably enable printing for better discoverability.
|
|
||||||
if (DataTypeTuple::canBeCreatedWithNames(names))
|
|
||||||
return std::make_shared<DataTypeTuple>(types, names, false /*print names*/);
|
|
||||||
|
|
||||||
return std::make_shared<DataTypeTuple>(types);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
|
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
|
||||||
|
@ -852,6 +852,8 @@ inline ReturnType readDateTimeTextImpl(time_t & datetime, ReadBuffer & buf, cons
|
|||||||
|
|
||||||
/// YYYY-MM-DD hh:mm:ss
|
/// YYYY-MM-DD hh:mm:ss
|
||||||
static constexpr auto DateTimeStringInputSize = 19;
|
static constexpr auto DateTimeStringInputSize = 19;
|
||||||
|
///YYYY-MM-DD
|
||||||
|
static constexpr auto DateStringInputSize = 10;
|
||||||
bool optimistic_path_for_date_time_input = s + DateTimeStringInputSize <= buf.buffer().end();
|
bool optimistic_path_for_date_time_input = s + DateTimeStringInputSize <= buf.buffer().end();
|
||||||
|
|
||||||
if (optimistic_path_for_date_time_input)
|
if (optimistic_path_for_date_time_input)
|
||||||
@ -862,16 +864,27 @@ inline ReturnType readDateTimeTextImpl(time_t & datetime, ReadBuffer & buf, cons
|
|||||||
UInt8 month = (s[5] - '0') * 10 + (s[6] - '0');
|
UInt8 month = (s[5] - '0') * 10 + (s[6] - '0');
|
||||||
UInt8 day = (s[8] - '0') * 10 + (s[9] - '0');
|
UInt8 day = (s[8] - '0') * 10 + (s[9] - '0');
|
||||||
|
|
||||||
UInt8 hour = (s[11] - '0') * 10 + (s[12] - '0');
|
UInt8 hour = 0;
|
||||||
UInt8 minute = (s[14] - '0') * 10 + (s[15] - '0');
|
UInt8 minute = 0;
|
||||||
UInt8 second = (s[17] - '0') * 10 + (s[18] - '0');
|
UInt8 second = 0;
|
||||||
|
///simply determine whether it is YYYY-MM-DD hh:mm:ss or YYYY-MM-DD by the content of the tenth character in an optimistic scenario
|
||||||
|
bool dt_long = (s[10] == ' ' || s[10] == 'T');
|
||||||
|
if (dt_long)
|
||||||
|
{
|
||||||
|
hour = (s[11] - '0') * 10 + (s[12] - '0');
|
||||||
|
minute = (s[14] - '0') * 10 + (s[15] - '0');
|
||||||
|
second = (s[17] - '0') * 10 + (s[18] - '0');
|
||||||
|
}
|
||||||
|
|
||||||
if (unlikely(year == 0))
|
if (unlikely(year == 0))
|
||||||
datetime = 0;
|
datetime = 0;
|
||||||
else
|
else
|
||||||
datetime = date_lut.makeDateTime(year, month, day, hour, minute, second);
|
datetime = date_lut.makeDateTime(year, month, day, hour, minute, second);
|
||||||
|
|
||||||
|
if (dt_long)
|
||||||
buf.position() += DateTimeStringInputSize;
|
buf.position() += DateTimeStringInputSize;
|
||||||
|
else
|
||||||
|
buf.position() += DateStringInputSize;
|
||||||
return ReturnType(true);
|
return ReturnType(true);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -57,6 +57,10 @@ public:
|
|||||||
|
|
||||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
|
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
|
||||||
{
|
{
|
||||||
|
/// Do not start user defined script during query analysis. Because user script startup could be heavy.
|
||||||
|
if (input_rows_count == 0)
|
||||||
|
return result_type->createColumn();
|
||||||
|
|
||||||
auto coordinator = executable_function->getCoordinator();
|
auto coordinator = executable_function->getCoordinator();
|
||||||
const auto & coordinator_configuration = coordinator->getConfiguration();
|
const auto & coordinator_configuration = coordinator->getConfiguration();
|
||||||
const auto & configuration = executable_function->getConfiguration();
|
const auto & configuration = executable_function->getConfiguration();
|
||||||
|
@ -94,6 +94,8 @@ bool RemoteReadBuffer::nextImpl()
|
|||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//file_buffer::pos should increase correspondingly when RemoteReadBuffer is consumed, otherwise start_offset will be incorrect.
|
||||||
|
local_file_holder->file_buffer->position() = local_file_holder->file_buffer->buffer().begin() + BufferBase::offset();
|
||||||
auto start_offset = local_file_holder->file_buffer->getPosition();
|
auto start_offset = local_file_holder->file_buffer->getPosition();
|
||||||
auto end_offset = start_offset + local_file_holder->file_buffer->internalBuffer().size();
|
auto end_offset = start_offset + local_file_holder->file_buffer->internalBuffer().size();
|
||||||
local_file_holder->file_cache_controller->value().waitMoreData(start_offset, end_offset);
|
local_file_holder->file_cache_controller->value().waitMoreData(start_offset, end_offset);
|
||||||
|
@ -723,6 +723,7 @@ bool StorageFileLog::streamToViews()
|
|||||||
size_t rows = 0;
|
size_t rows = 0;
|
||||||
{
|
{
|
||||||
block_io.pipeline.complete(std::move(input));
|
block_io.pipeline.complete(std::move(input));
|
||||||
|
block_io.pipeline.setNumThreads(max_streams_number);
|
||||||
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
|
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
|
||||||
CompletedPipelineExecutor executor(block_io.pipeline);
|
CompletedPipelineExecutor executor(block_io.pipeline);
|
||||||
executor.execute();
|
executor.execute();
|
||||||
|
@ -6,6 +6,16 @@
|
|||||||
#include <base/logger_useful.h>
|
#include <base/logger_useful.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
|
|
||||||
|
#include <Common/ProfileEvents.h>
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event KafkaMessagesRead;
|
||||||
|
extern const Event KafkaMessagesFailed;
|
||||||
|
extern const Event KafkaRowsRead;
|
||||||
|
extern const Event KafkaRowsRejected;
|
||||||
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
@ -85,6 +95,8 @@ Chunk KafkaSource::generateImpl()
|
|||||||
|
|
||||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
||||||
{
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
|
||||||
|
|
||||||
if (put_error_to_stream)
|
if (put_error_to_stream)
|
||||||
{
|
{
|
||||||
exception_message = e.message();
|
exception_message = e.message();
|
||||||
@ -117,7 +129,11 @@ Chunk KafkaSource::generateImpl()
|
|||||||
size_t new_rows = 0;
|
size_t new_rows = 0;
|
||||||
exception_message.reset();
|
exception_message.reset();
|
||||||
if (buffer->poll())
|
if (buffer->poll())
|
||||||
|
{
|
||||||
|
// poll provide one message at a time to the input_format
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaMessagesRead);
|
||||||
new_rows = executor.execute();
|
new_rows = executor.execute();
|
||||||
|
}
|
||||||
|
|
||||||
if (new_rows)
|
if (new_rows)
|
||||||
{
|
{
|
||||||
@ -128,6 +144,8 @@ Chunk KafkaSource::generateImpl()
|
|||||||
if (buffer->isStalled())
|
if (buffer->isStalled())
|
||||||
throw Exception("Polled messages became unusable", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Polled messages became unusable", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows);
|
||||||
|
|
||||||
buffer->storeLastReadMessageOffset();
|
buffer->storeLastReadMessageOffset();
|
||||||
|
|
||||||
auto topic = buffer->currentTopic();
|
auto topic = buffer->currentTopic();
|
||||||
@ -212,8 +230,18 @@ Chunk KafkaSource::generateImpl()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (buffer->polledDataUnusable() || total_rows == 0)
|
if (total_rows == 0)
|
||||||
|
{
|
||||||
return {};
|
return {};
|
||||||
|
}
|
||||||
|
else if (buffer->polledDataUnusable())
|
||||||
|
{
|
||||||
|
// the rows were counted already before by KafkaRowsRead,
|
||||||
|
// so let's count the rows we ignore separately
|
||||||
|
// (they will be retried after the rebalance)
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRowsRejected, total_rows);
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
/// MATERIALIZED columns can be added here, but I think
|
/// MATERIALIZED columns can be added here, but I think
|
||||||
// they are not needed here:
|
// they are not needed here:
|
||||||
|
@ -10,6 +10,26 @@
|
|||||||
#include <boost/algorithm/string/join.hpp>
|
#include <boost/algorithm/string/join.hpp>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
|
#include <Common/CurrentMetrics.h>
|
||||||
|
#include <Common/ProfileEvents.h>
|
||||||
|
|
||||||
|
namespace CurrentMetrics
|
||||||
|
{
|
||||||
|
extern const Metric KafkaAssignedPartitions;
|
||||||
|
extern const Metric KafkaConsumersWithAssignment;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event KafkaRebalanceRevocations;
|
||||||
|
extern const Event KafkaRebalanceAssignments;
|
||||||
|
extern const Event KafkaRebalanceErrors;
|
||||||
|
extern const Event KafkaMessagesPolled;
|
||||||
|
extern const Event KafkaCommitFailures;
|
||||||
|
extern const Event KafkaCommits;
|
||||||
|
extern const Event KafkaConsumerErrors;
|
||||||
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -45,6 +65,9 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
|
|||||||
// called (synchronously, during poll) when we enter the consumer group
|
// called (synchronously, during poll) when we enter the consumer group
|
||||||
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
||||||
{
|
{
|
||||||
|
CurrentMetrics::add(CurrentMetrics::KafkaAssignedPartitions, topic_partitions.size());
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRebalanceAssignments);
|
||||||
|
|
||||||
if (topic_partitions.empty())
|
if (topic_partitions.empty())
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Got empty assignment: Not enough partitions in the topic for all consumers?");
|
LOG_INFO(log, "Got empty assignment: Not enough partitions in the topic for all consumers?");
|
||||||
@ -52,6 +75,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions);
|
LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions);
|
||||||
|
CurrentMetrics::add(CurrentMetrics::KafkaConsumersWithAssignment, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
assignment = topic_partitions;
|
assignment = topic_partitions;
|
||||||
@ -60,10 +84,18 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
|
|||||||
// called (synchronously, during poll) when we leave the consumer group
|
// called (synchronously, during poll) when we leave the consumer group
|
||||||
consumer->set_revocation_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
consumer->set_revocation_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
||||||
{
|
{
|
||||||
|
CurrentMetrics::sub(CurrentMetrics::KafkaAssignedPartitions, topic_partitions.size());
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRebalanceRevocations);
|
||||||
|
|
||||||
// Rebalance is happening now, and now we have a chance to finish the work
|
// Rebalance is happening now, and now we have a chance to finish the work
|
||||||
// with topics/partitions we were working with before rebalance
|
// with topics/partitions we were working with before rebalance
|
||||||
LOG_TRACE(log, "Rebalance initiated. Revoking partitions: {}", topic_partitions);
|
LOG_TRACE(log, "Rebalance initiated. Revoking partitions: {}", topic_partitions);
|
||||||
|
|
||||||
|
if (!topic_partitions.empty())
|
||||||
|
{
|
||||||
|
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersWithAssignment, 1);
|
||||||
|
}
|
||||||
|
|
||||||
// we can not flush data to target from that point (it is pulled, not pushed)
|
// we can not flush data to target from that point (it is pulled, not pushed)
|
||||||
// so the best we can now it to
|
// so the best we can now it to
|
||||||
// 1) repeat last commit in sync mode (async could be still in queue, we need to be sure is is properly committed before rebalance)
|
// 1) repeat last commit in sync mode (async could be still in queue, we need to be sure is is properly committed before rebalance)
|
||||||
@ -91,6 +123,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
|
|||||||
consumer->set_rebalance_error_callback([this](cppkafka::Error err)
|
consumer->set_rebalance_error_callback([this](cppkafka::Error err)
|
||||||
{
|
{
|
||||||
LOG_ERROR(log, "Rebalance error: {}", err);
|
LOG_ERROR(log, "Rebalance error: {}", err);
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRebalanceErrors);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,8 +262,14 @@ void ReadBufferFromKafkaConsumer::commit()
|
|||||||
if (!committed)
|
if (!committed)
|
||||||
{
|
{
|
||||||
// TODO: insert atomicity / transactions is needed here (possibility to rollback, on 2 phase commits)
|
// TODO: insert atomicity / transactions is needed here (possibility to rollback, on 2 phase commits)
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaCommitFailures);
|
||||||
throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not committed to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET);
|
throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not committed to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaCommits);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -423,6 +462,8 @@ bool ReadBufferFromKafkaConsumer::poll()
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaMessagesPolled, messages.size());
|
||||||
|
|
||||||
stalled_status = NOT_STALLED;
|
stalled_status = NOT_STALLED;
|
||||||
allowed = true;
|
allowed = true;
|
||||||
return true;
|
return true;
|
||||||
@ -436,6 +477,7 @@ size_t ReadBufferFromKafkaConsumer::filterMessageErrors()
|
|||||||
{
|
{
|
||||||
if (auto error = message.get_error())
|
if (auto error = message.get_error())
|
||||||
{
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaConsumerErrors);
|
||||||
LOG_ERROR(log, "Consumer error: {}", error);
|
LOG_ERROR(log, "Consumer error: {}", error);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,12 @@
|
|||||||
#include <IO/ReadBuffer.h>
|
#include <IO/ReadBuffer.h>
|
||||||
|
|
||||||
#include <cppkafka/cppkafka.h>
|
#include <cppkafka/cppkafka.h>
|
||||||
|
#include <Common/CurrentMetrics.h>
|
||||||
|
|
||||||
|
namespace CurrentMetrics
|
||||||
|
{
|
||||||
|
extern const Metric KafkaConsumers;
|
||||||
|
}
|
||||||
|
|
||||||
namespace Poco
|
namespace Poco
|
||||||
{
|
{
|
||||||
@ -67,6 +73,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
using Messages = std::vector<cppkafka::Message>;
|
using Messages = std::vector<cppkafka::Message>;
|
||||||
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
|
||||||
|
|
||||||
enum StalledStatus
|
enum StalledStatus
|
||||||
{
|
{
|
||||||
|
@ -41,6 +41,26 @@
|
|||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
|
|
||||||
|
|
||||||
|
#include <Common/CurrentMetrics.h>
|
||||||
|
#include <Common/ProfileEvents.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace CurrentMetrics
|
||||||
|
{
|
||||||
|
extern const Metric KafkaLibrdkafkaThreads;
|
||||||
|
extern const Metric KafkaBackgroundReads;
|
||||||
|
extern const Metric KafkaConsumersInUse;
|
||||||
|
extern const Metric KafkaWrites;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event KafkaDirectReads;
|
||||||
|
extern const Event KafkaBackgroundReads;
|
||||||
|
extern const Event KafkaWrites;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -58,6 +78,7 @@ struct StorageKafkaInterceptors
|
|||||||
static rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
|
static rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
|
||||||
{
|
{
|
||||||
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
|
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
|
||||||
|
CurrentMetrics::add(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
|
||||||
|
|
||||||
const auto & storage_id = self->getStorageID();
|
const auto & storage_id = self->getStorageID();
|
||||||
const auto & table = storage_id.getTableName();
|
const auto & table = storage_id.getTableName();
|
||||||
@ -89,6 +110,7 @@ struct StorageKafkaInterceptors
|
|||||||
static rd_kafka_resp_err_t rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx)
|
static rd_kafka_resp_err_t rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx)
|
||||||
{
|
{
|
||||||
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
|
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
|
||||||
|
CurrentMetrics::sub(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
|
||||||
|
|
||||||
std::lock_guard lock(self->thread_statuses_mutex);
|
std::lock_guard lock(self->thread_statuses_mutex);
|
||||||
const auto it = std::find_if(self->thread_statuses.begin(), self->thread_statuses.end(), [](const auto & thread_status_ptr)
|
const auto it = std::find_if(self->thread_statuses.begin(), self->thread_statuses.end(), [](const auto & thread_status_ptr)
|
||||||
@ -279,6 +301,8 @@ Pipe StorageKafka::read(
|
|||||||
if (mv_attached)
|
if (mv_attached)
|
||||||
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
|
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaDirectReads);
|
||||||
|
|
||||||
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
|
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
pipes.reserve(num_created_consumers);
|
pipes.reserve(num_created_consumers);
|
||||||
@ -304,6 +328,9 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
|
|||||||
auto modified_context = Context::createCopy(local_context);
|
auto modified_context = Context::createCopy(local_context);
|
||||||
modified_context->applySettingsChanges(settings_adjustments);
|
modified_context->applySettingsChanges(settings_adjustments);
|
||||||
|
|
||||||
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaWrites};
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaWrites);
|
||||||
|
|
||||||
if (topics.size() > 1)
|
if (topics.size() > 1)
|
||||||
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
|
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
|
||||||
return std::make_shared<KafkaSink>(*this, metadata_snapshot, modified_context);
|
return std::make_shared<KafkaSink>(*this, metadata_snapshot, modified_context);
|
||||||
@ -358,6 +385,7 @@ void StorageKafka::pushReadBuffer(ConsumerBufferPtr buffer)
|
|||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
buffers.push_back(buffer);
|
buffers.push_back(buffer);
|
||||||
semaphore.set();
|
semaphore.set();
|
||||||
|
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -382,6 +410,7 @@ ConsumerBufferPtr StorageKafka::popReadBuffer(std::chrono::milliseconds timeout)
|
|||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
auto buffer = buffers.back();
|
auto buffer = buffers.back();
|
||||||
buffers.pop_back();
|
buffers.pop_back();
|
||||||
|
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -615,6 +644,9 @@ bool StorageKafka::streamToViews()
|
|||||||
if (!table)
|
if (!table)
|
||||||
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaBackgroundReads};
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaBackgroundReads);
|
||||||
|
|
||||||
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
|
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
|
||||||
|
|
||||||
// Create an INSERT query for streaming data
|
// Create an INSERT query for streaming data
|
||||||
|
@ -3,6 +3,16 @@
|
|||||||
#include "Columns/ColumnString.h"
|
#include "Columns/ColumnString.h"
|
||||||
#include "Columns/ColumnsNumber.h"
|
#include "Columns/ColumnsNumber.h"
|
||||||
|
|
||||||
|
#include <Common/ProfileEvents.h>
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event KafkaRowsWritten;
|
||||||
|
extern const Event KafkaProducerFlushes;
|
||||||
|
extern const Event KafkaMessagesProduced;
|
||||||
|
extern const Event KafkaProducerErrors;
|
||||||
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
|
WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
|
||||||
@ -53,6 +63,8 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
|
|||||||
|
|
||||||
void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row)
|
void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row)
|
||||||
{
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRowsWritten);
|
||||||
|
|
||||||
if (++rows % max_rows == 0)
|
if (++rows % max_rows == 0)
|
||||||
{
|
{
|
||||||
const std::string & last_chunk = chunks.back();
|
const std::string & last_chunk = chunks.back();
|
||||||
@ -103,8 +115,10 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren
|
|||||||
producer->poll(timeout);
|
producer->poll(timeout);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaProducerErrors);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaMessagesProduced);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -126,9 +140,12 @@ void WriteBufferToKafkaProducer::flush()
|
|||||||
{
|
{
|
||||||
if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
|
if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaProducerErrors);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaProducerFlushes);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,14 @@
|
|||||||
|
|
||||||
#include <list>
|
#include <list>
|
||||||
|
|
||||||
|
#include <Common/CurrentMetrics.h>
|
||||||
|
|
||||||
|
namespace CurrentMetrics
|
||||||
|
{
|
||||||
|
extern const Metric KafkaProducers;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
class Block;
|
class Block;
|
||||||
@ -32,6 +40,7 @@ private:
|
|||||||
void nextImpl() override;
|
void nextImpl() override;
|
||||||
void addChunk();
|
void addChunk();
|
||||||
void reinitializeChunks();
|
void reinitializeChunks();
|
||||||
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaProducers};
|
||||||
|
|
||||||
ProducerPtr producer;
|
ProducerPtr producer;
|
||||||
const std::string topic;
|
const std::string topic;
|
||||||
|
@ -4928,6 +4928,7 @@ static void selectBestProjection(
|
|||||||
Block MergeTreeData::getMinMaxCountProjectionBlock(
|
Block MergeTreeData::getMinMaxCountProjectionBlock(
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const Names & required_columns,
|
const Names & required_columns,
|
||||||
|
bool has_filter,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
const DataPartsVector & parts,
|
const DataPartsVector & parts,
|
||||||
DataPartsVector & normal_parts,
|
DataPartsVector & normal_parts,
|
||||||
@ -4942,13 +4943,12 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
|
|||||||
auto block = metadata_snapshot->minmax_count_projection->sample_block.cloneEmpty();
|
auto block = metadata_snapshot->minmax_count_projection->sample_block.cloneEmpty();
|
||||||
bool need_primary_key_max_column = false;
|
bool need_primary_key_max_column = false;
|
||||||
const auto & primary_key_max_column_name = metadata_snapshot->minmax_count_projection->primary_key_max_column_name;
|
const auto & primary_key_max_column_name = metadata_snapshot->minmax_count_projection->primary_key_max_column_name;
|
||||||
|
NameSet required_columns_set(required_columns.begin(), required_columns.end());
|
||||||
if (!primary_key_max_column_name.empty())
|
if (!primary_key_max_column_name.empty())
|
||||||
{
|
need_primary_key_max_column = required_columns_set.contains(primary_key_max_column_name);
|
||||||
need_primary_key_max_column = std::any_of(
|
|
||||||
required_columns.begin(), required_columns.end(), [&](const auto & name) { return primary_key_max_column_name == name; });
|
|
||||||
}
|
|
||||||
|
|
||||||
auto partition_minmax_count_columns = block.mutateColumns();
|
auto partition_minmax_count_columns = block.mutateColumns();
|
||||||
|
auto partition_minmax_count_column_names = block.getNames();
|
||||||
auto insert = [](ColumnAggregateFunction & column, const Field & value)
|
auto insert = [](ColumnAggregateFunction & column, const Field & value)
|
||||||
{
|
{
|
||||||
auto func = column.getAggregateFunction();
|
auto func = column.getAggregateFunction();
|
||||||
@ -4957,20 +4957,34 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
|
|||||||
size_t align_of_state = func->alignOfData();
|
size_t align_of_state = func->alignOfData();
|
||||||
auto * place = arena.alignedAlloc(size_of_state, align_of_state);
|
auto * place = arena.alignedAlloc(size_of_state, align_of_state);
|
||||||
func->create(place);
|
func->create(place);
|
||||||
|
if (const AggregateFunctionCount * agg_count = typeid_cast<const AggregateFunctionCount *>(func.get()))
|
||||||
|
agg_count->set(place, value.get<UInt64>());
|
||||||
|
else
|
||||||
|
{
|
||||||
auto value_column = func->getReturnType()->createColumnConst(1, value)->convertToFullColumnIfConst();
|
auto value_column = func->getReturnType()->createColumnConst(1, value)->convertToFullColumnIfConst();
|
||||||
const auto * value_column_ptr = value_column.get();
|
const auto * value_column_ptr = value_column.get();
|
||||||
func->add(place, &value_column_ptr, 0, &arena);
|
func->add(place, &value_column_ptr, 0, &arena);
|
||||||
|
}
|
||||||
column.insertFrom(place);
|
column.insertFrom(place);
|
||||||
};
|
};
|
||||||
|
|
||||||
ASTPtr expression_ast;
|
Block virtual_columns_block;
|
||||||
Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, false /* one_part */, true /* ignore_empty */);
|
auto virtual_block = getSampleBlockWithVirtualColumns();
|
||||||
|
bool has_virtual_column = std::any_of(required_columns.begin(), required_columns.end(), [&](const auto & name) { return virtual_block.has(name); });
|
||||||
|
if (has_virtual_column || has_filter)
|
||||||
|
{
|
||||||
|
virtual_columns_block = getBlockWithVirtualPartColumns(parts, false /* one_part */, true /* ignore_empty */);
|
||||||
if (virtual_columns_block.rows() == 0)
|
if (virtual_columns_block.rows() == 0)
|
||||||
return {};
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t rows = parts.size();
|
||||||
|
ColumnPtr part_name_column;
|
||||||
std::optional<PartitionPruner> partition_pruner;
|
std::optional<PartitionPruner> partition_pruner;
|
||||||
std::optional<KeyCondition> minmax_idx_condition;
|
std::optional<KeyCondition> minmax_idx_condition;
|
||||||
DataTypes minmax_columns_types;
|
DataTypes minmax_columns_types;
|
||||||
|
if (has_filter)
|
||||||
|
{
|
||||||
if (metadata_snapshot->hasPartitionKey())
|
if (metadata_snapshot->hasPartitionKey())
|
||||||
{
|
{
|
||||||
const auto & partition_key = metadata_snapshot->getPartitionKey();
|
const auto & partition_key = metadata_snapshot->getPartitionKey();
|
||||||
@ -4986,22 +5000,33 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Generate valid expressions for filtering
|
// Generate valid expressions for filtering
|
||||||
|
ASTPtr expression_ast;
|
||||||
VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, query_context, virtual_columns_block, expression_ast);
|
VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, query_context, virtual_columns_block, expression_ast);
|
||||||
if (expression_ast)
|
if (expression_ast)
|
||||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, query_context, expression_ast);
|
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, query_context, expression_ast);
|
||||||
|
|
||||||
size_t rows = virtual_columns_block.rows();
|
rows = virtual_columns_block.rows();
|
||||||
const ColumnString & part_name_column = typeid_cast<const ColumnString &>(*virtual_columns_block.getByName("_part").column);
|
part_name_column = virtual_columns_block.getByName("_part").column;
|
||||||
size_t part_idx = 0;
|
}
|
||||||
|
|
||||||
auto filter_column = ColumnUInt8::create();
|
auto filter_column = ColumnUInt8::create();
|
||||||
auto & filter_column_data = filter_column->getData();
|
auto & filter_column_data = filter_column->getData();
|
||||||
for (size_t row = 0; row < rows; ++row)
|
|
||||||
|
DataPartsVector real_parts;
|
||||||
|
real_parts.reserve(rows);
|
||||||
|
for (size_t row = 0, part_idx = 0; row < rows; ++row, ++part_idx)
|
||||||
{
|
{
|
||||||
while (parts[part_idx]->name != part_name_column.getDataAt(row))
|
if (part_name_column)
|
||||||
|
{
|
||||||
|
while (parts[part_idx]->name != part_name_column->getDataAt(row))
|
||||||
++part_idx;
|
++part_idx;
|
||||||
|
}
|
||||||
|
|
||||||
const auto & part = parts[part_idx];
|
const auto & part = parts[part_idx];
|
||||||
|
|
||||||
|
if (part->isEmpty())
|
||||||
|
continue;
|
||||||
|
|
||||||
if (!part->minmax_idx->initialized)
|
if (!part->minmax_idx->initialized)
|
||||||
throw Exception("Found a non-empty part with uninitialized minmax_idx. It's a bug", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Found a non-empty part with uninitialized minmax_idx. It's a bug", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
@ -5030,48 +5055,12 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
real_parts.push_back(part);
|
||||||
filter_column_data.back() = 1;
|
filter_column_data.back() = 1;
|
||||||
size_t pos = 0;
|
|
||||||
for (size_t i : metadata_snapshot->minmax_count_projection->partition_value_indices)
|
|
||||||
{
|
|
||||||
if (i >= part->partition.value.size())
|
|
||||||
throw Exception("Partition value index is out of boundary. It's a bug", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
partition_minmax_count_columns[pos++]->insert(part->partition.value[i]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t minmax_idx_size = part->minmax_idx->hyperrectangle.size();
|
if (real_parts.empty())
|
||||||
for (size_t i = 0; i < minmax_idx_size; ++i)
|
return {};
|
||||||
{
|
|
||||||
auto & min_column = assert_cast<ColumnAggregateFunction &>(*partition_minmax_count_columns[pos++]);
|
|
||||||
auto & max_column = assert_cast<ColumnAggregateFunction &>(*partition_minmax_count_columns[pos++]);
|
|
||||||
const auto & range = part->minmax_idx->hyperrectangle[i];
|
|
||||||
insert(min_column, range.left);
|
|
||||||
insert(max_column, range.right);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!primary_key_max_column_name.empty())
|
|
||||||
{
|
|
||||||
const auto & primary_key_column = *part->index[0];
|
|
||||||
auto & min_column = assert_cast<ColumnAggregateFunction &>(*partition_minmax_count_columns[pos++]);
|
|
||||||
auto & max_column = assert_cast<ColumnAggregateFunction &>(*partition_minmax_count_columns[pos++]);
|
|
||||||
insert(min_column, primary_key_column[0]);
|
|
||||||
insert(max_column, primary_key_column[primary_key_column.size() - 1]);
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
auto & column = assert_cast<ColumnAggregateFunction &>(*partition_minmax_count_columns.back());
|
|
||||||
auto func = column.getAggregateFunction();
|
|
||||||
Arena & arena = column.createOrGetArena();
|
|
||||||
size_t size_of_state = func->sizeOfData();
|
|
||||||
size_t align_of_state = func->alignOfData();
|
|
||||||
auto * place = arena.alignedAlloc(size_of_state, align_of_state);
|
|
||||||
func->create(place);
|
|
||||||
const AggregateFunctionCount & agg_count = assert_cast<const AggregateFunctionCount &>(*func);
|
|
||||||
agg_count.set(place, part->rows_count);
|
|
||||||
column.insertFrom(place);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
block.setColumns(std::move(partition_minmax_count_columns));
|
|
||||||
|
|
||||||
FilterDescription filter(*filter_column);
|
FilterDescription filter(*filter_column);
|
||||||
for (size_t i = 0; i < virtual_columns_block.columns(); ++i)
|
for (size_t i = 0; i < virtual_columns_block.columns(); ++i)
|
||||||
@ -5080,8 +5069,78 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
|
|||||||
column = column->filter(*filter.data, -1);
|
column = column->filter(*filter.data, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (block.rows() == 0)
|
size_t pos = 0;
|
||||||
return {};
|
for (size_t i : metadata_snapshot->minmax_count_projection->partition_value_indices)
|
||||||
|
{
|
||||||
|
if (required_columns_set.contains(partition_minmax_count_column_names[pos]))
|
||||||
|
for (const auto & part : real_parts)
|
||||||
|
partition_minmax_count_columns[pos]->insert(part->partition.value[i]);
|
||||||
|
++pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t minmax_idx_size = real_parts.front()->minmax_idx->hyperrectangle.size();
|
||||||
|
for (size_t i = 0; i < minmax_idx_size; ++i)
|
||||||
|
{
|
||||||
|
if (required_columns_set.contains(partition_minmax_count_column_names[pos]))
|
||||||
|
{
|
||||||
|
for (const auto & part : real_parts)
|
||||||
|
{
|
||||||
|
const auto & range = part->minmax_idx->hyperrectangle[i];
|
||||||
|
auto & min_column = assert_cast<ColumnAggregateFunction &>(*partition_minmax_count_columns[pos]);
|
||||||
|
insert(min_column, range.left);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
++pos;
|
||||||
|
|
||||||
|
if (required_columns_set.contains(partition_minmax_count_column_names[pos]))
|
||||||
|
{
|
||||||
|
for (const auto & part : real_parts)
|
||||||
|
{
|
||||||
|
const auto & range = part->minmax_idx->hyperrectangle[i];
|
||||||
|
auto & max_column = assert_cast<ColumnAggregateFunction &>(*partition_minmax_count_columns[pos]);
|
||||||
|
insert(max_column, range.right);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
++pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!primary_key_max_column_name.empty())
|
||||||
|
{
|
||||||
|
if (required_columns_set.contains(partition_minmax_count_column_names[pos]))
|
||||||
|
{
|
||||||
|
for (const auto & part : real_parts)
|
||||||
|
{
|
||||||
|
const auto & primary_key_column = *part->index[0];
|
||||||
|
auto & min_column = assert_cast<ColumnAggregateFunction &>(*partition_minmax_count_columns[pos]);
|
||||||
|
insert(min_column, primary_key_column[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
++pos;
|
||||||
|
|
||||||
|
if (required_columns_set.contains(partition_minmax_count_column_names[pos]))
|
||||||
|
{
|
||||||
|
for (const auto & part : real_parts)
|
||||||
|
{
|
||||||
|
const auto & primary_key_column = *part->index[0];
|
||||||
|
auto & max_column = assert_cast<ColumnAggregateFunction &>(*partition_minmax_count_columns[pos]);
|
||||||
|
insert(max_column, primary_key_column[primary_key_column.size() - 1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
++pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool has_count
|
||||||
|
= std::any_of(required_columns.begin(), required_columns.end(), [&](const auto & name) { return startsWith(name, "count"); });
|
||||||
|
if (has_count)
|
||||||
|
{
|
||||||
|
for (const auto & part : real_parts)
|
||||||
|
{
|
||||||
|
auto & column = assert_cast<ColumnAggregateFunction &>(*partition_minmax_count_columns.back());
|
||||||
|
insert(column, part->rows_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
block.setColumns(std::move(partition_minmax_count_columns));
|
||||||
|
|
||||||
Block res;
|
Block res;
|
||||||
for (const auto & name : required_columns)
|
for (const auto & name : required_columns)
|
||||||
@ -5090,6 +5149,11 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
|
|||||||
res.insert(virtual_columns_block.getByName(name));
|
res.insert(virtual_columns_block.getByName(name));
|
||||||
else if (block.has(name))
|
else if (block.has(name))
|
||||||
res.insert(block.getByName(name));
|
res.insert(block.getByName(name));
|
||||||
|
else if (startsWith(name, "count")) // special case to match count(...) variants
|
||||||
|
{
|
||||||
|
const auto & column = block.getByName("count()");
|
||||||
|
res.insert({column.column, column.type, name});
|
||||||
|
}
|
||||||
else
|
else
|
||||||
throw Exception(
|
throw Exception(
|
||||||
ErrorCodes::LOGICAL_ERROR,
|
ErrorCodes::LOGICAL_ERROR,
|
||||||
@ -5261,7 +5325,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
|||||||
};
|
};
|
||||||
|
|
||||||
auto virtual_block = getSampleBlockWithVirtualColumns();
|
auto virtual_block = getSampleBlockWithVirtualColumns();
|
||||||
auto add_projection_candidate = [&](const ProjectionDescription & projection)
|
auto add_projection_candidate = [&](const ProjectionDescription & projection, bool minmax_count_projection = false)
|
||||||
{
|
{
|
||||||
ProjectionCandidate candidate{};
|
ProjectionCandidate candidate{};
|
||||||
candidate.desc = &projection;
|
candidate.desc = &projection;
|
||||||
@ -5288,22 +5352,30 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
|||||||
|
|
||||||
if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection)
|
if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection)
|
||||||
{
|
{
|
||||||
bool match = true;
|
|
||||||
Block aggregates;
|
Block aggregates;
|
||||||
// Let's first check if all aggregates are provided by current projection
|
// Let's first check if all aggregates are provided by current projection
|
||||||
for (const auto & aggregate : select.getQueryAnalyzer()->aggregates())
|
for (const auto & aggregate : select.getQueryAnalyzer()->aggregates())
|
||||||
{
|
{
|
||||||
const auto * column = sample_block.findByName(aggregate.column_name);
|
if (const auto * column = sample_block.findByName(aggregate.column_name))
|
||||||
if (column)
|
|
||||||
aggregates.insert(*column);
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
match = false;
|
aggregates.insert(*column);
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We can treat every count_not_null_column as count() when selecting minmax_count_projection
|
||||||
|
if (minmax_count_projection && dynamic_cast<const AggregateFunctionCount *>(aggregate.function.get()))
|
||||||
|
{
|
||||||
|
const auto * count_column = sample_block.findByName("count()");
|
||||||
|
if (!count_column)
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::LOGICAL_ERROR, "`count()` column is missing when minmax_count_projection == true. It is a bug");
|
||||||
|
aggregates.insert({count_column->column, count_column->type, aggregate.column_name});
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
if (!match)
|
|
||||||
|
// No match
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Check if all aggregation keys can be either provided by some action, or by current
|
// Check if all aggregation keys can be either provided by some action, or by current
|
||||||
// projection directly. Reshape the `before_aggregation` action DAG so that it only
|
// projection directly. Reshape the `before_aggregation` action DAG so that it only
|
||||||
@ -5323,9 +5395,20 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
|||||||
candidate.before_aggregation->reorderAggregationKeysForProjection(key_name_pos_map);
|
candidate.before_aggregation->reorderAggregationKeysForProjection(key_name_pos_map);
|
||||||
candidate.before_aggregation->addAggregatesViaProjection(aggregates);
|
candidate.before_aggregation->addAggregatesViaProjection(aggregates);
|
||||||
|
|
||||||
|
// minmax_count_projections only have aggregation actions
|
||||||
|
if (minmax_count_projection)
|
||||||
|
candidate.required_columns = {required_columns.begin(), required_columns.end()};
|
||||||
|
|
||||||
if (rewrite_before_where(candidate, projection, required_columns, sample_block_for_keys, aggregates))
|
if (rewrite_before_where(candidate, projection, required_columns, sample_block_for_keys, aggregates))
|
||||||
{
|
{
|
||||||
|
if (minmax_count_projection)
|
||||||
|
{
|
||||||
|
candidate.before_where = nullptr;
|
||||||
|
candidate.prewhere_info = nullptr;
|
||||||
|
}
|
||||||
|
else
|
||||||
candidate.required_columns = {required_columns.begin(), required_columns.end()};
|
candidate.required_columns = {required_columns.begin(), required_columns.end()};
|
||||||
|
|
||||||
for (const auto & aggregate : aggregates)
|
for (const auto & aggregate : aggregates)
|
||||||
candidate.required_columns.push_back(aggregate.name);
|
candidate.required_columns.push_back(aggregate.name);
|
||||||
candidates.push_back(std::move(candidate));
|
candidates.push_back(std::move(candidate));
|
||||||
@ -5356,11 +5439,11 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
|||||||
ProjectionCandidate * selected_candidate = nullptr;
|
ProjectionCandidate * selected_candidate = nullptr;
|
||||||
size_t min_sum_marks = std::numeric_limits<size_t>::max();
|
size_t min_sum_marks = std::numeric_limits<size_t>::max();
|
||||||
if (metadata_snapshot->minmax_count_projection)
|
if (metadata_snapshot->minmax_count_projection)
|
||||||
add_projection_candidate(*metadata_snapshot->minmax_count_projection);
|
add_projection_candidate(*metadata_snapshot->minmax_count_projection, true);
|
||||||
std::optional<ProjectionCandidate> minmax_conut_projection_candidate;
|
std::optional<ProjectionCandidate> minmax_count_projection_candidate;
|
||||||
if (!candidates.empty())
|
if (!candidates.empty())
|
||||||
{
|
{
|
||||||
minmax_conut_projection_candidate.emplace(std::move(candidates.front()));
|
minmax_count_projection_candidate.emplace(std::move(candidates.front()));
|
||||||
candidates.clear();
|
candidates.clear();
|
||||||
}
|
}
|
||||||
MergeTreeDataSelectExecutor reader(*this);
|
MergeTreeDataSelectExecutor reader(*this);
|
||||||
@ -5374,21 +5457,22 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
|||||||
auto parts = getVisibleDataPartsVector(query_context);
|
auto parts = getVisibleDataPartsVector(query_context);
|
||||||
|
|
||||||
// If minmax_count_projection is a valid candidate, check its completeness.
|
// If minmax_count_projection is a valid candidate, check its completeness.
|
||||||
if (minmax_conut_projection_candidate)
|
if (minmax_count_projection_candidate)
|
||||||
{
|
{
|
||||||
DataPartsVector normal_parts;
|
DataPartsVector normal_parts;
|
||||||
query_info.minmax_count_projection_block = getMinMaxCountProjectionBlock(
|
query_info.minmax_count_projection_block = getMinMaxCountProjectionBlock(
|
||||||
metadata_snapshot,
|
metadata_snapshot,
|
||||||
minmax_conut_projection_candidate->required_columns,
|
minmax_count_projection_candidate->required_columns,
|
||||||
|
analysis_result.prewhere_info || analysis_result.before_where,
|
||||||
query_info,
|
query_info,
|
||||||
parts,
|
parts,
|
||||||
normal_parts,
|
normal_parts,
|
||||||
max_added_blocks.get(),
|
max_added_blocks.get(),
|
||||||
query_context);
|
query_context);
|
||||||
|
|
||||||
if (query_info.minmax_count_projection_block && minmax_conut_projection_candidate->prewhere_info)
|
if (query_info.minmax_count_projection_block && minmax_count_projection_candidate->prewhere_info)
|
||||||
{
|
{
|
||||||
const auto & prewhere_info = minmax_conut_projection_candidate->prewhere_info;
|
const auto & prewhere_info = minmax_count_projection_candidate->prewhere_info;
|
||||||
if (prewhere_info->alias_actions)
|
if (prewhere_info->alias_actions)
|
||||||
ExpressionActions(prewhere_info->alias_actions, actions_settings).execute(query_info.minmax_count_projection_block);
|
ExpressionActions(prewhere_info->alias_actions, actions_settings).execute(query_info.minmax_count_projection_block);
|
||||||
|
|
||||||
@ -5407,7 +5491,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
|||||||
|
|
||||||
if (normal_parts.empty())
|
if (normal_parts.empty())
|
||||||
{
|
{
|
||||||
selected_candidate = &*minmax_conut_projection_candidate;
|
selected_candidate = &*minmax_count_projection_candidate;
|
||||||
selected_candidate->complete = true;
|
selected_candidate->complete = true;
|
||||||
min_sum_marks = query_info.minmax_count_projection_block.rows();
|
min_sum_marks = query_info.minmax_count_projection_block.rows();
|
||||||
}
|
}
|
||||||
@ -5431,7 +5515,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
|||||||
|
|
||||||
if (!normal_result_ptr->error())
|
if (!normal_result_ptr->error())
|
||||||
{
|
{
|
||||||
selected_candidate = &*minmax_conut_projection_candidate;
|
selected_candidate = &*minmax_count_projection_candidate;
|
||||||
selected_candidate->merge_tree_normal_select_result_ptr = normal_result_ptr;
|
selected_candidate->merge_tree_normal_select_result_ptr = normal_result_ptr;
|
||||||
min_sum_marks = query_info.minmax_count_projection_block.rows() + normal_result_ptr->marks();
|
min_sum_marks = query_info.minmax_count_projection_block.rows() + normal_result_ptr->marks();
|
||||||
}
|
}
|
||||||
|
@ -385,6 +385,8 @@ public:
|
|||||||
/// Build a block of minmax and count values of a MergeTree table. These values are extracted
|
/// Build a block of minmax and count values of a MergeTree table. These values are extracted
|
||||||
/// from minmax_indices, the first expression of primary key, and part rows.
|
/// from minmax_indices, the first expression of primary key, and part rows.
|
||||||
///
|
///
|
||||||
|
/// has_filter - if query has no filter, bypass partition pruning completely
|
||||||
|
///
|
||||||
/// query_info - used to filter unneeded parts
|
/// query_info - used to filter unneeded parts
|
||||||
///
|
///
|
||||||
/// parts - part set to filter
|
/// parts - part set to filter
|
||||||
@ -395,6 +397,7 @@ public:
|
|||||||
Block getMinMaxCountProjectionBlock(
|
Block getMinMaxCountProjectionBlock(
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const Names & required_columns,
|
const Names & required_columns,
|
||||||
|
bool has_filter,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
const DataPartsVector & parts,
|
const DataPartsVector & parts,
|
||||||
DataPartsVector & normal_parts,
|
DataPartsVector & normal_parts,
|
||||||
|
@ -224,16 +224,14 @@ namespace
|
|||||||
}
|
}
|
||||||
|
|
||||||
Chunk chunk;
|
Chunk chunk;
|
||||||
|
std::lock_guard lock(reader_mutex);
|
||||||
if (reader->pull(chunk))
|
if (reader->pull(chunk))
|
||||||
return chunk;
|
return chunk;
|
||||||
|
|
||||||
{
|
|
||||||
std::lock_guard lock(reader_mutex);
|
|
||||||
pipeline->reset();
|
pipeline->reset();
|
||||||
reader.reset();
|
reader.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer(
|
static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer(
|
||||||
std::vector<String>::const_iterator & option,
|
std::vector<String>::const_iterator & option,
|
||||||
|
@ -731,7 +731,7 @@ class TestCase:
|
|||||||
pattern = '{test} > {stdout} 2> {stderr}'
|
pattern = '{test} > {stdout} 2> {stderr}'
|
||||||
|
|
||||||
if self.ext == '.sql':
|
if self.ext == '.sql':
|
||||||
pattern = "{client} --send_logs_level={logs_level} --testmode --multiquery {options} < " + pattern
|
pattern = "{client} --send_logs_level={logs_level} --multiquery {options} < " + pattern
|
||||||
|
|
||||||
command = pattern.format(**params)
|
command = pattern.format(**params)
|
||||||
|
|
||||||
|
@ -55,7 +55,7 @@ class Client:
|
|||||||
command = self.command[:]
|
command = self.command[:]
|
||||||
|
|
||||||
if stdin is None:
|
if stdin is None:
|
||||||
command += ["--multiquery", "--testmode"]
|
command += ["--multiquery"]
|
||||||
stdin = sql
|
stdin = sql
|
||||||
else:
|
else:
|
||||||
command += ["--query", sql]
|
command += ["--query", sql]
|
||||||
|
@ -48,6 +48,8 @@ import docker
|
|||||||
from .client import Client
|
from .client import Client
|
||||||
from .hdfs_api import HDFSApi
|
from .hdfs_api import HDFSApi
|
||||||
|
|
||||||
|
from .config_cluster import *
|
||||||
|
|
||||||
HELPERS_DIR = p.dirname(__file__)
|
HELPERS_DIR = p.dirname(__file__)
|
||||||
CLICKHOUSE_ROOT_DIR = p.join(p.dirname(__file__), "../../..")
|
CLICKHOUSE_ROOT_DIR = p.join(p.dirname(__file__), "../../..")
|
||||||
LOCAL_DOCKER_COMPOSE_DIR = p.join(
|
LOCAL_DOCKER_COMPOSE_DIR = p.join(
|
||||||
@ -1664,8 +1666,8 @@ class ClickHouseCluster:
|
|||||||
while time.time() - start < timeout:
|
while time.time() - start < timeout:
|
||||||
try:
|
try:
|
||||||
conn = pymysql.connect(
|
conn = pymysql.connect(
|
||||||
user="root",
|
user=mysql_user,
|
||||||
password="clickhouse",
|
password=mysql_pass,
|
||||||
host=self.mysql_ip,
|
host=self.mysql_ip,
|
||||||
port=self.mysql_port,
|
port=self.mysql_port,
|
||||||
)
|
)
|
||||||
@ -1686,8 +1688,8 @@ class ClickHouseCluster:
|
|||||||
while time.time() - start < timeout:
|
while time.time() - start < timeout:
|
||||||
try:
|
try:
|
||||||
conn = pymysql.connect(
|
conn = pymysql.connect(
|
||||||
user="root",
|
user=mysql8_user,
|
||||||
password="clickhouse",
|
password=mysql8_pass,
|
||||||
host=self.mysql8_ip,
|
host=self.mysql8_ip,
|
||||||
port=self.mysql8_port,
|
port=self.mysql8_port,
|
||||||
)
|
)
|
||||||
@ -1711,8 +1713,8 @@ class ClickHouseCluster:
|
|||||||
try:
|
try:
|
||||||
for ip in [self.mysql2_ip, self.mysql3_ip, self.mysql4_ip]:
|
for ip in [self.mysql2_ip, self.mysql3_ip, self.mysql4_ip]:
|
||||||
conn = pymysql.connect(
|
conn = pymysql.connect(
|
||||||
user="root",
|
user=mysql_user,
|
||||||
password="clickhouse",
|
password=mysql_pass,
|
||||||
host=ip,
|
host=ip,
|
||||||
port=self.mysql_port,
|
port=self.mysql_port,
|
||||||
)
|
)
|
||||||
@ -1735,9 +1737,9 @@ class ClickHouseCluster:
|
|||||||
self.postgres_conn = psycopg2.connect(
|
self.postgres_conn = psycopg2.connect(
|
||||||
host=self.postgres_ip,
|
host=self.postgres_ip,
|
||||||
port=self.postgres_port,
|
port=self.postgres_port,
|
||||||
database="postgres",
|
database=pg_db,
|
||||||
user="postgres",
|
user=pg_user,
|
||||||
password="mysecretpassword",
|
password=pg_pass,
|
||||||
)
|
)
|
||||||
self.postgres_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
self.postgres_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
self.postgres_conn.autocommit = True
|
self.postgres_conn.autocommit = True
|
||||||
@ -1759,9 +1761,9 @@ class ClickHouseCluster:
|
|||||||
self.postgres2_conn = psycopg2.connect(
|
self.postgres2_conn = psycopg2.connect(
|
||||||
host=self.postgres2_ip,
|
host=self.postgres2_ip,
|
||||||
port=self.postgres_port,
|
port=self.postgres_port,
|
||||||
database="postgres",
|
database=pg_db,
|
||||||
user="postgres",
|
user=pg_user,
|
||||||
password="mysecretpassword",
|
password=pg_pass,
|
||||||
)
|
)
|
||||||
self.postgres2_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
self.postgres2_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
self.postgres2_conn.autocommit = True
|
self.postgres2_conn.autocommit = True
|
||||||
@ -1775,9 +1777,9 @@ class ClickHouseCluster:
|
|||||||
self.postgres3_conn = psycopg2.connect(
|
self.postgres3_conn = psycopg2.connect(
|
||||||
host=self.postgres3_ip,
|
host=self.postgres3_ip,
|
||||||
port=self.postgres_port,
|
port=self.postgres_port,
|
||||||
database="postgres",
|
database=pg_db,
|
||||||
user="postgres",
|
user=pg_user,
|
||||||
password="mysecretpassword",
|
password=pg_pass,
|
||||||
)
|
)
|
||||||
self.postgres3_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
self.postgres3_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
self.postgres3_conn.autocommit = True
|
self.postgres3_conn.autocommit = True
|
||||||
@ -1791,9 +1793,9 @@ class ClickHouseCluster:
|
|||||||
self.postgres4_conn = psycopg2.connect(
|
self.postgres4_conn = psycopg2.connect(
|
||||||
host=self.postgres4_ip,
|
host=self.postgres4_ip,
|
||||||
port=self.postgres_port,
|
port=self.postgres_port,
|
||||||
database="postgres",
|
database=pg_db,
|
||||||
user="postgres",
|
user=pg_user,
|
||||||
password="mysecretpassword",
|
password=pg_pass,
|
||||||
)
|
)
|
||||||
self.postgres4_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
self.postgres4_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
self.postgres4_conn.autocommit = True
|
self.postgres4_conn.autocommit = True
|
||||||
@ -1945,7 +1947,7 @@ class ClickHouseCluster:
|
|||||||
|
|
||||||
def wait_mongo_to_start(self, timeout=30, secure=False):
|
def wait_mongo_to_start(self, timeout=30, secure=False):
|
||||||
connection_str = "mongodb://{user}:{password}@{host}:{port}".format(
|
connection_str = "mongodb://{user}:{password}@{host}:{port}".format(
|
||||||
host="localhost", port=self.mongo_port, user="root", password="clickhouse"
|
host="localhost", port=self.mongo_port, user=mongo_user, password=mongo_pass
|
||||||
)
|
)
|
||||||
if secure:
|
if secure:
|
||||||
connection_str += "/?tls=true&tlsAllowInvalidCertificates=true"
|
connection_str += "/?tls=true&tlsAllowInvalidCertificates=true"
|
||||||
@ -1969,8 +1971,8 @@ class ClickHouseCluster:
|
|||||||
)
|
)
|
||||||
minio_client = Minio(
|
minio_client = Minio(
|
||||||
f"{self.minio_ip}:{self.minio_port}",
|
f"{self.minio_ip}:{self.minio_port}",
|
||||||
access_key="minio",
|
access_key=minio_access_key,
|
||||||
secret_key="minio123",
|
secret_key=minio_secret_key,
|
||||||
secure=secure,
|
secure=secure,
|
||||||
http_client=urllib3.PoolManager(cert_reqs="CERT_NONE"),
|
http_client=urllib3.PoolManager(cert_reqs="CERT_NONE"),
|
||||||
) # disable SSL check as we test ClickHouse and not Python library
|
) # disable SSL check as we test ClickHouse and not Python library
|
||||||
@ -3489,16 +3491,16 @@ class ClickHouseInstance:
|
|||||||
"MySQL": {
|
"MySQL": {
|
||||||
"DSN": "mysql_odbc",
|
"DSN": "mysql_odbc",
|
||||||
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so",
|
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so",
|
||||||
"Database": "clickhouse",
|
"Database": odbc_mysql_db,
|
||||||
"Uid": "root",
|
"Uid": odbc_mysql_uid,
|
||||||
"Pwd": "clickhouse",
|
"Pwd": odbc_mysql_pass,
|
||||||
"Server": self.cluster.mysql_host,
|
"Server": self.cluster.mysql_host,
|
||||||
},
|
},
|
||||||
"PostgreSQL": {
|
"PostgreSQL": {
|
||||||
"DSN": "postgresql_odbc",
|
"DSN": "postgresql_odbc",
|
||||||
"Database": "postgres",
|
"Database": odbc_psql_db,
|
||||||
"UserName": "postgres",
|
"UserName": odbc_psql_user,
|
||||||
"Password": "mysecretpassword",
|
"Password": odbc_psql_pass,
|
||||||
"Port": str(self.cluster.postgres_port),
|
"Port": str(self.cluster.postgres_port),
|
||||||
"Servername": self.cluster.postgres_host,
|
"Servername": self.cluster.postgres_host,
|
||||||
"Protocol": "9.3",
|
"Protocol": "9.3",
|
||||||
|
31
tests/integration/helpers/config_cluster.py
Normal file
31
tests/integration/helpers/config_cluster.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
# MYSQL CREDENTIALS
|
||||||
|
mysql_user = "root"
|
||||||
|
mysql_pass = "clickhouse"
|
||||||
|
|
||||||
|
|
||||||
|
# MYSQL8 CREDENTIALS
|
||||||
|
mysql8_user = "root"
|
||||||
|
mysql8_pass = "clickhouse"
|
||||||
|
|
||||||
|
# POSTGRES CREDENTIALS
|
||||||
|
pg_user = "postgres"
|
||||||
|
pg_pass = "mysecretpassword"
|
||||||
|
pg_db = "postgres"
|
||||||
|
|
||||||
|
|
||||||
|
# MINIO CREDENTIALS
|
||||||
|
minio_access_key = "minio"
|
||||||
|
minio_secret_key = "minio123"
|
||||||
|
|
||||||
|
# MONGODB CREDENTIALS
|
||||||
|
mongo_user = "root"
|
||||||
|
mongo_pass = "clickhouse"
|
||||||
|
|
||||||
|
# ODBC CREDENTIALS
|
||||||
|
odbc_mysql_uid = "root"
|
||||||
|
odbc_mysql_pass = "clickhouse"
|
||||||
|
odbc_mysql_db = "clickhouse"
|
||||||
|
|
||||||
|
odbc_psql_db = "postgres"
|
||||||
|
odbc_psql_user = "postgres"
|
||||||
|
odbc_psql_pass = "mysecretpassword"
|
@ -43,7 +43,7 @@ $CLICKHOUSE_CLIENT --query "SELECT * FROM roundtrip_no_length_delimiter_protobuf
|
|||||||
rm "$BINARY_FILE_PATH"
|
rm "$BINARY_FILE_PATH"
|
||||||
|
|
||||||
# The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.
|
# The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.
|
||||||
$CLICKHOUSE_CLIENT --multiquery --testmode > /dev/null <<EOF
|
$CLICKHOUSE_CLIENT --multiquery > /dev/null <<EOF
|
||||||
SELECT * FROM no_length_delimiter_protobuf_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$SCHEMADIR/00825_protobuf_format_no_length_delimiter:Message'; -- { clientError 546 }
|
SELECT * FROM no_length_delimiter_protobuf_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$SCHEMADIR/00825_protobuf_format_no_length_delimiter:Message'; -- { clientError 546 }
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
|
@ -13,5 +13,5 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|||||||
# ${CURDIR}/00921_datetime64_compatibility.python
|
# ${CURDIR}/00921_datetime64_compatibility.python
|
||||||
|
|
||||||
python3 "${CURDIR}"/00921_datetime64_compatibility_long.python \
|
python3 "${CURDIR}"/00921_datetime64_compatibility_long.python \
|
||||||
| ${CLICKHOUSE_CLIENT} --ignore-error -T -nm --calculate_text_stack_trace 0 --log-level 'error' 2>&1 \
|
| ${CLICKHOUSE_CLIENT} --ignore-error -nm --calculate_text_stack_trace 0 --log-level 'error' 2>&1 \
|
||||||
| grep -v -e 'Received exception .*$' -e '^(query: ' | sed 's/^\(Code: [0-9]\+\).*$/\1/g'
|
| grep -v -e 'Received exception .*$' -e '^(query: ' | sed 's/^\(Code: [0-9]\+\).*$/\1/g'
|
||||||
|
@ -88,9 +88,9 @@ from numbers(100000); -- { serverError 241; }" > /dev/null 2>&1
|
|||||||
|
|
||||||
# fails
|
# fails
|
||||||
echo "Should throw 1"
|
echo "Should throw 1"
|
||||||
execute_insert --testmode
|
execute_insert
|
||||||
echo "Should throw 2"
|
echo "Should throw 2"
|
||||||
execute_insert --testmode --min_insert_block_size_rows=1 --min_insert_block_size_rows_for_materialized_views=$((1<<20))
|
execute_insert --min_insert_block_size_rows=1 --min_insert_block_size_rows_for_materialized_views=$((1<<20))
|
||||||
|
|
||||||
# passes
|
# passes
|
||||||
echo "Should pass 1"
|
echo "Should pass 1"
|
||||||
|
@ -41,7 +41,7 @@ $CLICKHOUSE_CLIENT -n --query="
|
|||||||
LIFETIME(MIN 1000 MAX 2000)
|
LIFETIME(MIN 1000 MAX 2000)
|
||||||
LAYOUT(COMPLEX_KEY_SSD_CACHE(FILE_SIZE 8192 PATH '$USER_FILES_PATH/0d'));"
|
LAYOUT(COMPLEX_KEY_SSD_CACHE(FILE_SIZE 8192 PATH '$USER_FILES_PATH/0d'));"
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --testmode -nq "SELECT dictHas('01280_db.ssd_dict', 'a', tuple('1')); -- { serverError 43 }"
|
$CLICKHOUSE_CLIENT -nq "SELECT dictHas('01280_db.ssd_dict', 'a', tuple('1')); -- { serverError 43 }"
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -n --query="
|
$CLICKHOUSE_CLIENT -n --query="
|
||||||
SELECT 'TEST_SMALL';
|
SELECT 'TEST_SMALL';
|
||||||
@ -65,7 +65,7 @@ $CLICKHOUSE_CLIENT -n --query="
|
|||||||
SELECT dictGetInt32('01280_db.ssd_dict', 'b', tuple('10', toInt32(-20)));
|
SELECT dictGetInt32('01280_db.ssd_dict', 'b', tuple('10', toInt32(-20)));
|
||||||
SELECT dictGetString('01280_db.ssd_dict', 'c', tuple('10', toInt32(-20)));"
|
SELECT dictGetString('01280_db.ssd_dict', 'c', tuple('10', toInt32(-20)));"
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --testmode -nq "SELECT dictGetUInt64('01280_db.ssd_dict', 'a', tuple(toInt32(3))); -- { serverError 53 }"
|
$CLICKHOUSE_CLIENT -nq "SELECT dictGetUInt64('01280_db.ssd_dict', 'a', tuple(toInt32(3))); -- { serverError 53 }"
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -n --query="DROP DICTIONARY 01280_db.ssd_dict;
|
$CLICKHOUSE_CLIENT -n --query="DROP DICTIONARY 01280_db.ssd_dict;
|
||||||
DROP TABLE IF EXISTS 01280_db.keys_table;
|
DROP TABLE IF EXISTS 01280_db.keys_table;
|
||||||
|
File diff suppressed because one or more lines are too long
@ -9,6 +9,7 @@
|
|||||||
1 9999
|
1 9999
|
||||||
3
|
3
|
||||||
2021-10-25 10:00:00 2021-10-27 10:00:00 3
|
2021-10-25 10:00:00 2021-10-27 10:00:00 3
|
||||||
|
2021-10-25 10:00:00 2021-10-27 10:00:00 3
|
||||||
1
|
1
|
||||||
1
|
1
|
||||||
1
|
1
|
||||||
@ -17,3 +18,5 @@
|
|||||||
0
|
0
|
||||||
2021-10-24 10:00:00
|
2021-10-24 10:00:00
|
||||||
0
|
0
|
||||||
|
1000
|
||||||
|
1000
|
||||||
|
@ -50,6 +50,8 @@ drop table if exists d;
|
|||||||
create table d (dt DateTime, j int) engine MergeTree partition by (toDate(dt), ceiling(j), toDate(dt), CEILING(j)) order by tuple();
|
create table d (dt DateTime, j int) engine MergeTree partition by (toDate(dt), ceiling(j), toDate(dt), CEILING(j)) order by tuple();
|
||||||
insert into d values ('2021-10-24 10:00:00', 10), ('2021-10-25 10:00:00', 10), ('2021-10-26 10:00:00', 10), ('2021-10-27 10:00:00', 10);
|
insert into d values ('2021-10-24 10:00:00', 10), ('2021-10-25 10:00:00', 10), ('2021-10-26 10:00:00', 10), ('2021-10-27 10:00:00', 10);
|
||||||
select min(dt), max(dt), count() from d where toDate(dt) >= '2021-10-25';
|
select min(dt), max(dt), count() from d where toDate(dt) >= '2021-10-25';
|
||||||
|
-- fuzz crash
|
||||||
|
select min(dt), max(dt), count(toDate(dt) >= '2021-10-25') from d where toDate(dt) >= '2021-10-25';
|
||||||
select count() from d group by toDate(dt);
|
select count() from d group by toDate(dt);
|
||||||
|
|
||||||
-- fuzz crash
|
-- fuzz crash
|
||||||
@ -59,3 +61,15 @@ SELECT min(dt) FROM d PREWHERE ((0.9998999834060669 AND 1023) AND 255) <= ceil(j
|
|||||||
SELECT count('') AND NULL FROM d PREWHERE ceil(j) <= NULL;
|
SELECT count('') AND NULL FROM d PREWHERE ceil(j) <= NULL;
|
||||||
|
|
||||||
drop table d;
|
drop table d;
|
||||||
|
|
||||||
|
-- count variant optimization
|
||||||
|
|
||||||
|
drop table if exists test;
|
||||||
|
create table test (id Int64, d Int64, projection dummy(select * order by id)) engine MergeTree order by id;
|
||||||
|
insert into test select number, number from numbers(1e3);
|
||||||
|
|
||||||
|
select count(if(d=4, d, 1)) from test settings force_optimize_projection = 1;
|
||||||
|
select count(d/3) from test settings force_optimize_projection = 1;
|
||||||
|
select count(if(d=4, Null, 1)) from test settings force_optimize_projection = 1; -- { serverError 584 }
|
||||||
|
|
||||||
|
drop table test;
|
||||||
|
1
tests/queries/0_stateless/01825_type_json_9.reference
Normal file
1
tests/queries/0_stateless/01825_type_json_9.reference
Normal file
@ -0,0 +1 @@
|
|||||||
|
Tuple(foo Int8, k1 Int8, k2 Int8)
|
16
tests/queries/0_stateless/01825_type_json_9.sql
Normal file
16
tests/queries/0_stateless/01825_type_json_9.sql
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
-- Tags: no-fasttest
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS t_json;
|
||||||
|
|
||||||
|
SET allow_experimental_object_type = 1;
|
||||||
|
|
||||||
|
CREATE TABLE t_json(id UInt64, obj JSON) ENGINE = MergeTree ORDER BY id;
|
||||||
|
|
||||||
|
INSERT INTO t_json format JSONEachRow {"id": 1, "obj": {"foo": 1, "k1": 2}};
|
||||||
|
INSERT INTO t_json format JSONEachRow {"id": 2, "obj": {"foo": 1, "k2": 2}};
|
||||||
|
|
||||||
|
OPTIMIZE TABLE t_json FINAL;
|
||||||
|
|
||||||
|
SELECT any(toTypeName(obj)) from t_json;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS t_json;
|
@ -1,4 +1,4 @@
|
|||||||
-- Tags: long
|
-- Tags: long, no-backward-compatibility-check:22.3.2.1
|
||||||
DROP TABLE IF EXISTS t_json_parallel;
|
DROP TABLE IF EXISTS t_json_parallel;
|
||||||
|
|
||||||
SET allow_experimental_object_type = 1, max_insert_threads = 20, max_threads = 20;
|
SET allow_experimental_object_type = 1, max_insert_threads = 20, max_threads = 20;
|
||||||
|
@ -5,4 +5,4 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|||||||
# shellcheck source=../shell_config.sh
|
# shellcheck source=../shell_config.sh
|
||||||
. "$CUR_DIR"/../shell_config.sh
|
. "$CUR_DIR"/../shell_config.sh
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --testmode -n -q 'select 1 -- { clientError FOOBAR }' |& grep -o 'No error code with name:.*'
|
$CLICKHOUSE_CLIENT -n -q 'select 1 -- { clientError FOOBAR }' |& grep -o 'No error code with name:.*'
|
||||||
|
@ -4,7 +4,7 @@ DROP TABLE IF EXISTS test02008;
|
|||||||
CREATE TABLE test02008 (
|
CREATE TABLE test02008 (
|
||||||
col Tuple(
|
col Tuple(
|
||||||
a Tuple(key1 int, key2 int),
|
a Tuple(key1 int, key2 int),
|
||||||
b Tuple(key1 int, key3 int)
|
b Tuple(key1 int, key2 int)
|
||||||
)
|
)
|
||||||
) ENGINE=Memory();
|
) ENGINE=Memory();
|
||||||
INSERT INTO test02008 VALUES (tuple(tuple(1, 2), tuple(3, 4)));
|
INSERT INTO test02008 VALUES (tuple(tuple(1, 2), tuple(3, 4)));
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
1
|
0 00000
|
||||||
1
|
0 00000
|
||||||
10
|
9 99999
|
||||||
10
|
9 99999
|
||||||
100
|
99 9999999999
|
||||||
100
|
99 9999999999
|
||||||
10000
|
9999 99999999999999999999
|
||||||
10000
|
9999 99999999999999999999
|
||||||
|
@ -11,8 +11,8 @@ settings
|
|||||||
as select number, repeat(toString(number), 5) from numbers({{ rows_in_table }});
|
as select number, repeat(toString(number), 5) from numbers({{ rows_in_table }});
|
||||||
|
|
||||||
-- avoid any optimizations with ignore(*)
|
-- avoid any optimizations with ignore(*)
|
||||||
select count(ignore(*)) from data_02052_{{ rows_in_table }}_wide{{ wide }} settings max_read_buffer_size=1, max_threads=1;
|
select * apply max from data_02052_{{ rows_in_table }}_wide{{ wide }} settings max_read_buffer_size=1, max_threads=1;
|
||||||
select count(ignore(*)) from data_02052_{{ rows_in_table }}_wide{{ wide }} settings max_read_buffer_size=0, max_threads=1; -- { serverError CANNOT_READ_ALL_DATA }
|
select * apply max from data_02052_{{ rows_in_table }}_wide{{ wide }} settings max_read_buffer_size=0, max_threads=1; -- { serverError CANNOT_READ_ALL_DATA }
|
||||||
|
|
||||||
drop table data_02052_{{ rows_in_table }}_wide{{ wide }};
|
drop table data_02052_{{ rows_in_table }}_wide{{ wide }};
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
@ -4,4 +4,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|||||||
. "$CURDIR"/../shell_config.sh
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
|
||||||
${CLICKHOUSE_CLIENT} --allow_experimental_parallel_reading_from_replicas=1 -nmT < "$CURDIR"/01099_parallel_distributed_insert_select.sql > /dev/null
|
${CLICKHOUSE_CLIENT} --allow_experimental_parallel_reading_from_replicas=1 -nm < "$CURDIR"/01099_parallel_distributed_insert_select.sql > /dev/null
|
||||||
|
@ -6,5 +6,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|||||||
|
|
||||||
|
|
||||||
$CLICKHOUSE_LOCAL --query="SELECT n" 2>&1 | grep -q "Code: 47. DB::Exception: Missing columns:" && echo 'OK' || echo 'FAIL' ||:
|
$CLICKHOUSE_LOCAL --query="SELECT n" 2>&1 | grep -q "Code: 47. DB::Exception: Missing columns:" && echo 'OK' || echo 'FAIL' ||:
|
||||||
$CLICKHOUSE_LOCAL --testmode --query="SELECT n -- { serverError 47 }"
|
$CLICKHOUSE_LOCAL --query="SELECT n -- { serverError 47 }"
|
||||||
|
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
-- Tags: no-backward-compatibility-check:22.3.2.1
|
||||||
SET optimize_functions_to_subcolumns = 1;
|
SET optimize_functions_to_subcolumns = 1;
|
||||||
SELECT count(*) FROM numbers(2) AS n1, numbers(3) AS n2, numbers(4) AS n3
|
SELECT count(*) FROM numbers(2) AS n1, numbers(3) AS n2, numbers(4) AS n3
|
||||||
WHERE (n1.number = n2.number) AND (n2.number = n3.number);
|
WHERE (n1.number = n2.number) AND (n2.number = n3.number);
|
||||||
|
File diff suppressed because one or more lines are too long
@ -0,0 +1,2 @@
|
|||||||
|
usa
|
||||||
|
|
@ -0,0 +1,20 @@
|
|||||||
|
drop table if exists with_nullable;
|
||||||
|
drop table if exists without_nullable;
|
||||||
|
|
||||||
|
CREATE TABLE with_nullable
|
||||||
|
( timestamp UInt32,
|
||||||
|
country LowCardinality(Nullable(String)) ) ENGINE = Memory;
|
||||||
|
|
||||||
|
CREATE TABLE without_nullable
|
||||||
|
( timestamp UInt32,
|
||||||
|
country LowCardinality(String)) ENGINE = Memory;
|
||||||
|
|
||||||
|
insert into with_nullable values(0,'f'),(0,'usa');
|
||||||
|
insert into without_nullable values(0,'usa'),(0,'us2a');
|
||||||
|
|
||||||
|
select if(t0.country is null ,t2.country,t0.country) "country"
|
||||||
|
from without_nullable t0 right outer join with_nullable t2 on t0.country=t2.country;
|
||||||
|
|
||||||
|
drop table with_nullable;
|
||||||
|
drop table without_nullable;
|
||||||
|
|
@ -1,3 +1,4 @@
|
|||||||
|
-- Tags: no-backward-compatibility-check:22.3.2.2
|
||||||
select toString(toNullable(true));
|
select toString(toNullable(true));
|
||||||
select toString(CAST(NULL, 'Nullable(Bool)'));
|
select toString(CAST(NULL, 'Nullable(Bool)'));
|
||||||
select toString(toNullable(toIPv4('0.0.0.0')));
|
select toString(toNullable(toIPv4('0.0.0.0')));
|
||||||
|
@ -0,0 +1,5 @@
|
|||||||
|
2022-03-31T00:00:00Z 1
|
||||||
|
2022-04-01T09:10:24Z 2
|
||||||
|
2022-03-31T10:18:56Z 3
|
||||||
|
2022-03-31T10:18:56Z 4
|
||||||
|
2022-04-01T09:10:24Z 5
|
10
tests/queries/0_stateless/02249_parse_date_time_basic.sql
Normal file
10
tests/queries/0_stateless/02249_parse_date_time_basic.sql
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
SET date_time_output_format='iso';
|
||||||
|
drop table if exists t;
|
||||||
|
CREATE TABLE t (a DateTime('UTC'), b String, c String, d String, e Int32) ENGINE = Memory;
|
||||||
|
INSERT INTO t(a, b, c, d ,e) VALUES ('2022-03-31','','','',1);
|
||||||
|
INSERT INTO t(a, b, c, d ,e) VALUES (1648804224,'','','',2);
|
||||||
|
INSERT INTO t(a, b, c, d ,e) VALUES ('2022-03-31 10:18:56','','','',3);
|
||||||
|
INSERT INTO t(a, b, c, d ,e) VALUES ('2022-03-31T10:18:56','','','',4);
|
||||||
|
INSERT INTO t(a, b, c, d ,e) VALUES ('1648804224','','','',5);
|
||||||
|
select a, e from t order by e;
|
||||||
|
drop table if exists t;
|
@ -0,0 +1 @@
|
|||||||
|
0
|
@ -0,0 +1,10 @@
|
|||||||
|
SELECT number FROM numbers(10) WHERE number > 15 and test_function(number, number) == 4;
|
||||||
|
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
|
SELECT ProfileEvents['ExecuteShellCommand'] FROM system.query_log WHERE
|
||||||
|
current_database = currentDatabase()
|
||||||
|
AND type = 'QueryFinish'
|
||||||
|
AND query == 'SELECT number FROM numbers(10) WHERE number > 15 and test_function(number, number) == 4;'
|
||||||
|
AND event_date >= yesterday() AND event_time > now() - interval 10 minute
|
||||||
|
LIMIT 1;
|
@ -0,0 +1,4 @@
|
|||||||
|
0
|
||||||
|
1
|
||||||
|
0 1 2
|
||||||
|
1
|
31
tests/queries/0_stateless/02252_jit_profile_events.sql
Normal file
31
tests/queries/0_stateless/02252_jit_profile_events.sql
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
-- Tags: no-fasttest, no-ubsan, no-cpu-aarch64
|
||||||
|
|
||||||
|
SET compile_expressions = 1;
|
||||||
|
SET min_count_to_compile_expression = 0;
|
||||||
|
|
||||||
|
SYSTEM DROP COMPILED EXPRESSION CACHE;
|
||||||
|
|
||||||
|
SELECT number + number + number FROM numbers(1);
|
||||||
|
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
|
SELECT ProfileEvents['CompileFunction'] FROM system.query_log WHERE
|
||||||
|
current_database = currentDatabase()
|
||||||
|
AND type = 'QueryFinish'
|
||||||
|
AND query == 'SELECT number + number + number FROM numbers(1);'
|
||||||
|
AND event_date >= yesterday() AND event_time > now() - interval 10 minute
|
||||||
|
LIMIT 1;
|
||||||
|
|
||||||
|
SET compile_aggregate_expressions = 1;
|
||||||
|
SET min_count_to_compile_aggregate_expression = 0;
|
||||||
|
|
||||||
|
SELECT sum(number), sum(number + 1), sum(number + 2) FROM numbers(1) GROUP BY number;
|
||||||
|
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
|
SELECT ProfileEvents['CompileFunction'] FROM system.query_log WHERE
|
||||||
|
current_database = currentDatabase()
|
||||||
|
AND type = 'QueryFinish'
|
||||||
|
AND query == 'SELECT sum(number), sum(number + 1), sum(number + 2) FROM numbers(1) GROUP BY number;'
|
||||||
|
AND event_date >= yesterday() AND event_time > now() - interval 10 minute
|
||||||
|
LIMIT 1;
|
@ -68,8 +68,8 @@ do
|
|||||||
TESTNAME_RESULT="/tmp/result_$TESTNAME"
|
TESTNAME_RESULT="/tmp/result_$TESTNAME"
|
||||||
NEW_TESTNAME_RESULT="/tmp/result_dist_$TESTNAME"
|
NEW_TESTNAME_RESULT="/tmp/result_dist_$TESTNAME"
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT $SETTINGS -nm --testmode < $TESTPATH > $TESTNAME_RESULT
|
$CLICKHOUSE_CLIENT $SETTINGS -nm < $TESTPATH > $TESTNAME_RESULT
|
||||||
$CLICKHOUSE_CLIENT $SETTINGS -nm --testmode < $NEW_TESTNAME > $NEW_TESTNAME_RESULT
|
$CLICKHOUSE_CLIENT $SETTINGS -nm < $NEW_TESTNAME > $NEW_TESTNAME_RESULT
|
||||||
|
|
||||||
expected=$(cat $TESTNAME_RESULT | md5sum)
|
expected=$(cat $TESTNAME_RESULT | md5sum)
|
||||||
actual=$(cat $NEW_TESTNAME_RESULT | md5sum)
|
actual=$(cat $NEW_TESTNAME_RESULT | md5sum)
|
||||||
|
@ -7,7 +7,7 @@ tags: ['meetup', 'Paris', 'France', 'events']
|
|||||||
|
|
||||||
Agenda of Paris ClickHouse Meetup was full of use cases, mostly from France-based companies which are actively using ClickHouse. Slides for all talks are [available on the GitHub](https://github.com/clickhouse/clickhouse-presentations/tree/master/meetup18).
|
Agenda of Paris ClickHouse Meetup was full of use cases, mostly from France-based companies which are actively using ClickHouse. Slides for all talks are [available on the GitHub](https://github.com/clickhouse/clickhouse-presentations/tree/master/meetup18).
|
||||||
|
|
||||||
Christophe Kalenzaga and Vianney Foucault, engineers from ContentSquare, company that provided the meetup venue:
|
Christophe Kalenzaga and Vianney Foucault, engineers from Contentsquare, company that provided the meetup venue:
|
||||||
![Christophe Kalenzaga and Vianney Foucault](https://blog-images.clickhouse.com/en/2018/clickhouse-community-meetup-in-paris-on-october-2-2018/1.jpg)
|
![Christophe Kalenzaga and Vianney Foucault](https://blog-images.clickhouse.com/en/2018/clickhouse-community-meetup-in-paris-on-october-2-2018/1.jpg)
|
||||||
|
|
||||||
Matthieu Jacquet from Storetail (Criteo):
|
Matthieu Jacquet from Storetail (Criteo):
|
||||||
|
Loading…
Reference in New Issue
Block a user