Merge branch 'master' into fast-count-from-files

This commit is contained in:
Kruglov Pavel 2023-08-23 12:15:34 +02:00 committed by GitHub
commit e193aec583
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 483 additions and 259 deletions

2
contrib/openldap vendored

@ -1 +1 @@
Subproject commit 8688afe6bc95ebcd20edf4578c536362218cb70a Subproject commit 5671b80e369df2caf5f34e02924316205a43c895

View File

@ -96,71 +96,82 @@ target_compile_definitions(_lber
) )
set(_ldap_srcs set(_ldap_srcs
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/bind.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/open.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/result.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/error.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/compare.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/search.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/controls.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/messages.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/references.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/extended.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/cyrus.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/modify.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/add.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/modrdn.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/delete.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/abandon.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/abandon.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/sasl.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/add.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/sbind.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/addentry.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/unbind.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/assertion.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/avl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/bind.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/cancel.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/cancel.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/charray.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/compare.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/controls.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/cyrus.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/dds.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/delete.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/deref.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/dnssrv.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/error.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/extended.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/fetch.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/filter.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/filter.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/free.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/free.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/sort.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/getattr.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/passwd.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/whoami.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/vc.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/getdn.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/getdn.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/getentry.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/getentry.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/getattr.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/getvalues.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/getvalues.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/addentry.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/request.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/os-ip.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/url.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/pagectrl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/sortctrl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/vlvctrl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/init.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/init.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/options.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/print.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/string.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/util-int.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/schema.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/charray.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/os-local.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/dnssrv.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/utf-8.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/utf-8-conv.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/tls2.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/tls_o.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/tls_g.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/turn.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/ppolicy.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/dds.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/txn.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/ldap_sync.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/stctrl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/assertion.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/deref.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/ldifutil.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/ldif.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/fetch.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/lbase64.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/lbase64.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/ldap_sync.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/ldif.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/ldifutil.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/messages.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/modify.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/modrdn.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/msctrl.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/msctrl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/open.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/options.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/os-ip.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/os-local.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/pagectrl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/passwd.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/ppolicy.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/print.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/psearchctrl.c" "${OPENLDAP_SOURCE_DIR}/libraries/libldap/psearchctrl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/rdwr.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/references.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/request.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/result.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/rq.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/sasl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/sbind.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/schema.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/search.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/sort.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/sortctrl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/stctrl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/string.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/tavl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/thr_debug.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/thr_nt.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/thr_posix.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/thr_pth.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/thr_thr.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/threads.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/tls2.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/tls_g.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/tls_o.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/tpool.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/turn.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/txn.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/unbind.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/url.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/utf-8-conv.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/utf-8.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/util-int.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/vc.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/vlvctrl.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap/whoami.c"
) )
mkversion(ldap) mkversion(ldap)
@ -185,43 +196,5 @@ target_compile_definitions(_ldap
PRIVATE LDAP_LIBRARY PRIVATE LDAP_LIBRARY
) )
set(_ldap_r_specific_srcs add_library(ch_contrib::ldap ALIAS _ldap)
"${OPENLDAP_SOURCE_DIR}/libraries/libldap_r/threads.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap_r/rdwr.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap_r/tpool.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap_r/rq.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap_r/thr_posix.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap_r/thr_thr.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap_r/thr_nt.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap_r/thr_pth.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap_r/thr_stub.c"
"${OPENLDAP_SOURCE_DIR}/libraries/libldap_r/thr_debug.c"
)
mkversion(ldap_r)
add_library(_ldap_r
${_ldap_r_specific_srcs}
${_ldap_srcs}
"${CMAKE_CURRENT_BINARY_DIR}/ldap_r-version.c"
)
target_link_libraries(_ldap_r
PRIVATE _lber
PRIVATE OpenSSL::Crypto OpenSSL::SSL
)
target_include_directories(_ldap_r SYSTEM
PUBLIC ${_extra_build_dir}/include
PUBLIC "${OPENLDAP_SOURCE_DIR}/include"
PRIVATE "${OPENLDAP_SOURCE_DIR}/libraries/libldap_r"
PRIVATE "${OPENLDAP_SOURCE_DIR}/libraries/libldap"
)
target_compile_definitions(_ldap_r
PRIVATE LDAP_R_COMPILE
PRIVATE LDAP_LIBRARY
)
add_library(ch_contrib::ldap ALIAS _ldap_r)
add_library(ch_contrib::lber ALIAS _lber) add_library(ch_contrib::lber ALIAS _lber)

View File

@ -644,7 +644,7 @@ function report
rm -r report ||: rm -r report ||:
mkdir report report/tmp ||: mkdir report report/tmp ||:
rm ./*.{rep,svg} test-times.tsv test-dump.tsv unstable.tsv unstable-query-ids.tsv unstable-query-metrics.tsv changed-perf.tsv unstable-tests.tsv unstable-queries.tsv bad-tests.tsv slow-on-client.tsv all-queries.tsv run-errors.tsv ||: rm ./*.{rep,svg} test-times.tsv test-dump.tsv unstable.tsv unstable-query-ids.tsv unstable-query-metrics.tsv changed-perf.tsv unstable-tests.tsv unstable-queries.tsv bad-tests.tsv all-queries.tsv run-errors.tsv ||:
cat analyze/errors.log >> report/errors.log ||: cat analyze/errors.log >> report/errors.log ||:
cat profile-errors.log >> report/errors.log ||: cat profile-errors.log >> report/errors.log ||:
@ -810,12 +810,6 @@ create view total_client_time_per_query as select *
from file('analyze/client-times.tsv', TSV, from file('analyze/client-times.tsv', TSV,
'test text, query_index int, client float, server float'); 'test text, query_index int, client float, server float');
create table slow_on_client_report engine File(TSV, 'report/slow-on-client.tsv')
as select client, server, round(client/server, 3) p,
test, query_display_name
from total_client_time_per_query left join query_display_names using (test, query_index)
where p > round(1.02, 3) order by p desc;
create table wall_clock_time_per_test engine Memory as select * create table wall_clock_time_per_test engine Memory as select *
from file('wall-clock-times.tsv', TSV, 'test text, real float, user float, system float'); from file('wall-clock-times.tsv', TSV, 'test text, real float, user float, system float');

View File

@ -364,20 +364,6 @@ if args.report == "main":
] ]
) )
slow_on_client_rows = tsvRows("report/slow-on-client.tsv")
error_tests += len(slow_on_client_rows)
addSimpleTable(
"Slow on Client",
["Client time, s", "Server time, s", "Ratio", "Test", "Query"],
slow_on_client_rows,
)
if slow_on_client_rows:
errors_explained.append(
[
f'<a href="#{currentTableAnchor()}">Some queries are taking noticeable time client-side (missing `FORMAT Null`?)</a>'
]
)
def add_backward_incompatible(): def add_backward_incompatible():
rows = tsvRows("report/partial-queries-report.tsv") rows = tsvRows("report/partial-queries-report.tsv")
if not rows: if not rows:

View File

@ -0,0 +1,14 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.3.10.5-lts (d8737007f9e) FIXME as compared to v23.3.9.55-lts (b9c5c8622d3)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Not-ready Set [#53162](https://github.com/ClickHouse/ClickHouse/pull/53162) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Correctly handle totals and extremes with `DelayedSource` [#53644](https://github.com/ClickHouse/ClickHouse/pull/53644) ([Antonio Andelic](https://github.com/antonio2368)).

View File

@ -114,7 +114,11 @@ Example of disk configuration:
## Using local cache {#using-local-cache} ## Using local cache {#using-local-cache}
It is possible to configure local cache over disks in storage configuration starting from version 22.3. For versions 22.3 - 22.7 cache is supported only for `s3` disk type. For versions >= 22.8 cache is supported for any disk type: S3, Azure, Local, Encrypted, etc. Cache uses `LRU` cache policy. It is possible to configure local cache over disks in storage configuration starting from version 22.3.
For versions 22.3 - 22.7 cache is supported only for `s3` disk type. For versions >= 22.8 cache is supported for any disk type: S3, Azure, Local, Encrypted, etc.
For versions >= 23.5 cache is supported only for remote disk types: S3, Azure, HDFS.
Cache uses `LRU` cache policy.
Example of configuration for versions later or equal to 22.8: Example of configuration for versions later or equal to 22.8:

View File

@ -815,16 +815,16 @@ Aliases: `dateDiff`, `DATE_DIFF`, `timestampDiff`, `timestamp_diff`, `TIMESTAMP_
- `unit` — The type of interval for result. [String](../../sql-reference/data-types/string.md). - `unit` — The type of interval for result. [String](../../sql-reference/data-types/string.md).
Possible values: Possible values:
- `microsecond` (possible abbreviations: `us`, `u`) - `microsecond` (possible abbreviations: `microseconds`, `us`, `u`)
- `millisecond` (possible abbreviations: `ms`) - `millisecond` (possible abbreviations: `milliseconds`, `ms`)
- `second` (possible abbreviations: `ss`, `s`) - `second` (possible abbreviations: `seconds`, `ss`, `s`)
- `minute` (possible abbreviations: `mi`, `n`) - `minute` (possible abbreviations: `minutes`, `mi`, `n`)
- `hour` (possible abbreviations: `hh`, `h`) - `hour` (possible abbreviations: `hours`, `hh`, `h`)
- `day` (possible abbreviations: `dd`, `d`) - `day` (possible abbreviations: `days`, `dd`, `d`)
- `week` (possible abbreviations: `wk`, `ww`) - `week` (possible abbreviations: `weeks`, `wk`, `ww`)
- `month` (possible abbreviations: `mm`, `m`) - `month` (possible abbreviations: `months`, `mm`, `m`)
- `quarter` (possible abbreviations: `qq`, `q`) - `quarter` (possible abbreviations: `quarters`, `qq`, `q`)
- `year` (possible abbreviations: `yyyy`, `yy`) - `year` (possible abbreviations: `years`, `yyyy`, `yy`)
- `startdate` — The first time value to subtract (the subtrahend). [Date](../../sql-reference/data-types/date.md), [Date32](../../sql-reference/data-types/date32.md), [DateTime](../../sql-reference/data-types/datetime.md) or [DateTime64](../../sql-reference/data-types/datetime64.md). - `startdate` — The first time value to subtract (the subtrahend). [Date](../../sql-reference/data-types/date.md), [Date32](../../sql-reference/data-types/date32.md), [DateTime](../../sql-reference/data-types/datetime.md) or [DateTime64](../../sql-reference/data-types/datetime64.md).

View File

@ -547,7 +547,7 @@ public:
/// For serialization we use signed Int32 (for historical reasons), -1 means "no value" /// For serialization we use signed Int32 (for historical reasons), -1 means "no value"
Int32 size_to_write = size ? size : -1; Int32 size_to_write = size ? size : -1;
writeBinary(size_to_write, buf); writeBinaryLittleEndian(size_to_write, buf);
if (has()) if (has())
buf.write(getData(), size); buf.write(getData(), size);
} }
@ -573,7 +573,7 @@ public:
{ {
/// For serialization we use signed Int32 (for historical reasons), -1 means "no value" /// For serialization we use signed Int32 (for historical reasons), -1 means "no value"
Int32 rhs_size_signed; Int32 rhs_size_signed;
readBinary(rhs_size_signed, buf); readBinaryLittleEndian(rhs_size_signed, buf);
if (rhs_size_signed < 0) if (rhs_size_signed < 0)
{ {

View File

@ -258,12 +258,12 @@ struct AggregateFunctionSumData
void write(WriteBuffer & buf) const void write(WriteBuffer & buf) const
{ {
writeBinary(sum, buf); writeBinaryLittleEndian(sum, buf);
} }
void read(ReadBuffer & buf) void read(ReadBuffer & buf)
{ {
readBinary(sum, buf); readBinaryLittleEndian(sum, buf);
} }
T get() const T get() const

View File

@ -1918,6 +1918,9 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
subquery_settings.max_result_rows = 1; subquery_settings.max_result_rows = 1;
subquery_settings.extremes = false; subquery_settings.extremes = false;
subquery_context->setSettings(subquery_settings); subquery_context->setSettings(subquery_settings);
/// When execute `INSERT INTO t WITH ... SELECT ...`, it may lead to `Unknown columns`
/// exception with this settings enabled(https://github.com/ClickHouse/ClickHouse/issues/52494).
subquery_context->setSetting("use_structure_from_insertion_table_in_table_functions", false);
auto options = SelectQueryOptions(QueryProcessingStage::Complete, scope.subquery_depth, true /*is_subquery*/); auto options = SelectQueryOptions(QueryProcessingStage::Complete, scope.subquery_depth, true /*is_subquery*/);
auto interpreter = std::make_unique<InterpreterSelectQueryAnalyzer>(node->toAST(), subquery_context, options); auto interpreter = std::make_unique<InterpreterSelectQueryAnalyzer>(node->toAST(), subquery_context, options);

View File

@ -101,6 +101,12 @@ bool ShellCommand::tryWaitProcessWithTimeout(size_t timeout_in_seconds)
out.close(); out.close();
err.close(); err.close();
for (auto & [_, fd] : write_fds)
fd.close();
for (auto & [_, fd] : read_fds)
fd.close();
return waitForPid(pid, timeout_in_seconds); return waitForPid(pid, timeout_in_seconds);
} }
@ -287,6 +293,12 @@ int ShellCommand::tryWait()
out.close(); out.close();
err.close(); err.close();
for (auto & [_, fd] : write_fds)
fd.close();
for (auto & [_, fd] : read_fds)
fd.close();
LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid); LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid);
int status = 0; int status = 0;

View File

@ -18,7 +18,6 @@
#include <Common/DNSResolver.h> #include <Common/DNSResolver.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <Poco/Net/NetException.h> #include <Poco/Net/NetException.h>
#include <Poco/Net/DNS.h> #include <Poco/Net/DNS.h>

View File

@ -7,6 +7,7 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
@ -668,7 +669,13 @@ public:
return; return;
try try
{ {
zookeeper.tryRemove(path); if (!zookeeper.expired())
zookeeper.tryRemove(path);
else
{
ProfileEvents::increment(ProfileEvents::CannotRemoveEphemeralNode);
LOG_DEBUG(&Poco::Logger::get("EphemeralNodeHolder"), "Cannot remove {} since session has been expired", path);
}
} }
catch (...) catch (...)
{ {

View File

@ -41,7 +41,7 @@ TEST(Common, makeRegexpPatternFromGlobs)
EXPECT_EQ(makeRegexpPatternFromGlobs("f{1..2}{1..2}"), "f(1|2)(1|2)"); EXPECT_EQ(makeRegexpPatternFromGlobs("f{1..2}{1..2}"), "f(1|2)(1|2)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{1..1}{1..1}"), "f(1)(1)"); EXPECT_EQ(makeRegexpPatternFromGlobs("f{1..1}{1..1}"), "f(1)(1)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{0..0}{0..0}"), "f(0)(0)"); EXPECT_EQ(makeRegexpPatternFromGlobs("f{0..0}{0..0}"), "f(0)(0)");
EXPECT_EQ(makeRegexpPatternFromGlobs("file{1..5}"),"file(1|2|3|4|5)"); EXPECT_EQ(makeRegexpPatternFromGlobs("file{1..5}"), "file(1|2|3|4|5)");
EXPECT_EQ(makeRegexpPatternFromGlobs("file{1,2,3}"),"file(1|2|3)"); EXPECT_EQ(makeRegexpPatternFromGlobs("file{1,2,3}"), "file(1|2|3)");
EXPECT_EQ(makeRegexpPatternFromGlobs("{1,2,3}blabla{a.x,b.x,c.x}smth[]_else{aa,bb}?*"), "(1|2|3)blabla(a\\.x|b\\.x|c\\.x)smth\\[\\]_else(aa|bb)[^/][^/]*"); EXPECT_EQ(makeRegexpPatternFromGlobs("{1,2,3}blabla{a.x,b.x,c.x}smth[]_else{aa,bb}?*"), "(1|2|3)blabla(a\\.x|b\\.x|c\\.x)smth\\[\\]_else(aa|bb)[^/][^/]*");
} }

View File

@ -114,6 +114,26 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
{ {
if (!read_until_position || position != *read_until_position) if (!read_until_position || position != *read_until_position)
{ {
if (position < file_offset_of_buffer_end)
{
/// file has been read beyond new read until position already
if (working_buffer.size() >= file_offset_of_buffer_end - position)
{
/// new read until position is inside working buffer
file_offset_of_buffer_end = position;
}
else
{
/// new read until position is before working buffer begin
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to set read until position before already read data ({} > {}, info: {})",
position,
getPosition(),
impl->getInfoForLog());
}
}
read_until_position = position; read_until_position = position;
/// We must wait on future and reset the prefetch here, because otherwise there might be /// We must wait on future and reset the prefetch here, because otherwise there might be
@ -248,7 +268,6 @@ off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
{ {
/// Position is still inside the buffer. /// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call. /// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos; pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin()); assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end()); assert(pos <= working_buffer.end());

View File

@ -46,6 +46,8 @@ public:
void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); } void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); }
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; } off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }
private: private:

View File

@ -58,6 +58,10 @@ bool ParserJSONPathMemberAccess::parseImpl(Pos & pos, ASTPtr & node, Expected &
member_name = std::make_shared<ASTIdentifier>(String(last_begin, pos->end)); member_name = std::make_shared<ASTIdentifier>(String(last_begin, pos->end));
++pos; ++pos;
} }
else if (!pos.isValid() && pos->type == TokenType::EndOfStream)
{
member_name = std::make_shared<ASTIdentifier>(String(last_begin, last_end));
}
else else
{ {
return false; return false;

View File

@ -8,18 +8,12 @@ namespace DB
template <typename... Ts, typename T, typename F> template <typename... Ts, typename T, typename F>
static bool castTypeToEither(const T * type, F && f) static bool castTypeToEither(const T * type, F && f)
{ {
/// XXX can't use && here because gcc-7 complains about parentheses around && within || return ((typeid_cast<const Ts *>(type) && f(*typeid_cast<const Ts *>(type))) || ...);
return ((typeid_cast<const Ts *>(type) ? f(*typeid_cast<const Ts *>(type)) : false) || ...);
} }
template <class ...Args> template <class ...Args>
constexpr bool castTypeToEither(TypeList<Args...>, const auto * type, auto && f) constexpr bool castTypeToEither(TypeList<Args...>, const auto * type, auto && f)
{ {
return ( return ((typeid_cast<const Args *>(type) != nullptr && std::forward<decltype(f)>(f)(*typeid_cast<const Args *>(type))) || ...);
(typeid_cast<const Args *>(type) != nullptr
? std::forward<decltype(f)>(f)(
*typeid_cast<const Args *>(type))
: false)
|| ...);
} }
} }

View File

@ -381,25 +381,25 @@ public:
const auto & timezone_x = extractTimeZoneFromFunctionArguments(arguments, 3, 1); const auto & timezone_x = extractTimeZoneFromFunctionArguments(arguments, 3, 1);
const auto & timezone_y = extractTimeZoneFromFunctionArguments(arguments, 3, 2); const auto & timezone_y = extractTimeZoneFromFunctionArguments(arguments, 3, 2);
if (unit == "year" || unit == "yy" || unit == "yyyy") if (unit == "year" || unit == "years" || unit == "yy" || unit == "yyyy")
impl.template dispatchForColumns<ToRelativeYearNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData()); impl.template dispatchForColumns<ToRelativeYearNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "quarter" || unit == "qq" || unit == "q") else if (unit == "quarter" || unit == "quarters" || unit == "qq" || unit == "q")
impl.template dispatchForColumns<ToRelativeQuarterNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData()); impl.template dispatchForColumns<ToRelativeQuarterNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "month" || unit == "mm" || unit == "m") else if (unit == "month" || unit == "months" || unit == "mm" || unit == "m")
impl.template dispatchForColumns<ToRelativeMonthNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData()); impl.template dispatchForColumns<ToRelativeMonthNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "week" || unit == "wk" || unit == "ww") else if (unit == "week" || unit == "weeks" || unit == "wk" || unit == "ww")
impl.template dispatchForColumns<ToRelativeWeekNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData()); impl.template dispatchForColumns<ToRelativeWeekNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "day" || unit == "dd" || unit == "d") else if (unit == "day" || unit == "days" || unit == "dd" || unit == "d")
impl.template dispatchForColumns<ToRelativeDayNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData()); impl.template dispatchForColumns<ToRelativeDayNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "hour" || unit == "hh" || unit == "h") else if (unit == "hour" || unit == "hours" || unit == "hh" || unit == "h")
impl.template dispatchForColumns<ToRelativeHourNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData()); impl.template dispatchForColumns<ToRelativeHourNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "minute" || unit == "mi" || unit == "n") else if (unit == "minute" || unit == "minutes" || unit == "mi" || unit == "n")
impl.template dispatchForColumns<ToRelativeMinuteNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData()); impl.template dispatchForColumns<ToRelativeMinuteNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "second" || unit == "ss" || unit == "s") else if (unit == "second" || unit == "seconds" || unit == "ss" || unit == "s")
impl.template dispatchForColumns<ToRelativeSecondNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData()); impl.template dispatchForColumns<ToRelativeSecondNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "millisecond" || unit == "ms") else if (unit == "millisecond" || unit == "milliseconds" || unit == "ms")
impl.template dispatchForColumns<ToRelativeSubsecondNumImpl<millisecond_multiplier>>(x, y, timezone_x, timezone_y, res->getData()); impl.template dispatchForColumns<ToRelativeSubsecondNumImpl<millisecond_multiplier>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "microsecond" || unit == "us" || unit == "u") else if (unit == "microsecond" || unit == "microseconds" || unit == "us" || unit == "u")
impl.template dispatchForColumns<ToRelativeSubsecondNumImpl<microsecond_multiplier>>(x, y, timezone_x, timezone_y, res->getData()); impl.template dispatchForColumns<ToRelativeSubsecondNumImpl<microsecond_multiplier>>(x, y, timezone_x, timezone_y, res->getData());
else else
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,

View File

@ -40,9 +40,15 @@ DB::PooledHTTPSessionPtr getSession(Aws::S3::Model::GetObjectResult & read_resul
{ {
if (auto * session_aware_stream = dynamic_cast<DB::S3::SessionAwareIOStream<DB::PooledHTTPSessionPtr> *>(&read_result.GetBody())) if (auto * session_aware_stream = dynamic_cast<DB::S3::SessionAwareIOStream<DB::PooledHTTPSessionPtr> *>(&read_result.GetBody()))
return static_cast<DB::PooledHTTPSessionPtr &>(session_aware_stream->getSession()); return static_cast<DB::PooledHTTPSessionPtr &>(session_aware_stream->getSession());
else if (!dynamic_cast<DB::S3::SessionAwareIOStream<DB::HTTPSessionPtr> *>(&read_result.GetBody()))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session of unexpected type encountered"); if (dynamic_cast<DB::S3::SessionAwareIOStream<DB::HTTPSessionPtr> *>(&read_result.GetBody()))
return {}; return {};
/// accept result from S# mock in gtest_writebuffer_s3.cpp
if (dynamic_cast<Aws::Utils::Stream::DefaultUnderlyingStream *>(&read_result.GetBody()))
return {};
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session of unexpected type encountered");
} }
void resetSession(Aws::S3::Model::GetObjectResult & read_result) void resetSession(Aws::S3::Model::GetObjectResult & read_result)
@ -260,6 +266,7 @@ bool ReadBufferFromS3::processException(Poco::Exception & e, size_t read_offset,
"Attempt: {}, Message: {}", "Attempt: {}, Message: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, e.message()); bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, e.message());
if (auto * s3_exception = dynamic_cast<S3Exception *>(&e)) if (auto * s3_exception = dynamic_cast<S3Exception *>(&e))
{ {
/// It doesn't make sense to retry Access Denied or No Such Key /// It doesn't make sense to retry Access Denied or No Such Key

View File

@ -16,11 +16,6 @@
#include <aws/s3/model/GetObjectResult.h> #include <aws/s3/model/GetObjectResult.h>
namespace Aws::S3
{
class Client;
}
namespace DB namespace DB
{ {
/** /**

View File

@ -23,11 +23,21 @@
#include <IO/WriteBufferFromS3.h> #include <IO/WriteBufferFromS3.h>
#include <IO/S3Common.h> #include <IO/S3Common.h>
#include <IO/FileEncryptionCommon.h>
#include <IO/WriteBufferFromEncryptedFile.h>
#include <IO/ReadBufferFromEncryptedFile.h>
#include <IO/AsyncReadCounters.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/S3/Client.h>
#include <Disks/IO/ThreadPoolReader.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <IO/S3/Client.h>
#include <Core/Settings.h> #include <Core/Settings.h>
namespace DB namespace DB
{ {
@ -258,10 +268,22 @@ struct Client : DB::S3::Client
++counters.getObject; ++counters.getObject;
auto & bStore = store->GetBucketStore(request.GetBucket()); auto & bStore = store->GetBucketStore(request.GetBucket());
const String data = bStore.objects[request.GetKey()];
size_t begin = 0;
size_t end = data.size() - 1;
const String & range = request.GetRange();
const String prefix = "bytes=";
if (range.starts_with(prefix))
{
int ret = sscanf(range.c_str(), "bytes=%zu-%zu", &begin, &end); /// NOLINT
chassert(ret == 2);
}
auto factory = request.GetResponseStreamFactory(); auto factory = request.GetResponseStreamFactory();
Aws::Utils::Stream::ResponseStream responseStream(factory); Aws::Utils::Stream::ResponseStream responseStream(factory);
responseStream.GetUnderlyingStream() << std::stringstream(bStore.objects[request.GetKey()]).rdbuf(); responseStream.GetUnderlyingStream() << std::stringstream(data.substr(begin, end - begin + 1)).rdbuf();
Aws::AmazonWebServiceResult<Aws::Utils::Stream::ResponseStream> awsStream(std::move(responseStream), Aws::Http::HeaderValueCollection()); Aws::AmazonWebServiceResult<Aws::Utils::Stream::ResponseStream> awsStream(std::move(responseStream), Aws::Http::HeaderValueCollection());
Aws::S3::Model::GetObjectResult getObjectResult(std::move(awsStream)); Aws::S3::Model::GetObjectResult getObjectResult(std::move(awsStream));
@ -1148,4 +1170,108 @@ TEST_P(SyncAsync, StrictUploadPartSize) {
} }
} }
String fillStringWithPattern(String pattern, int n)
{
String data;
for (int i = 0; i < n; ++i)
{
data += pattern;
}
return data;
}
TEST_F(WBS3Test, ReadBeyondLastOffset) {
const String remote_file = "ReadBeyondLastOffset";
const String key = "1234567812345678";
const String data = fillStringWithPattern("0123456789", 10);
ReadSettings disk_read_settings;
disk_read_settings.enable_filesystem_cache = false;
disk_read_settings.local_fs_buffer_size = 70;
disk_read_settings.remote_fs_buffer_size = FileEncryption::Header::kSize + 60;
{
/// write encrypted file
FileEncryption::Header header;
header.algorithm = FileEncryption::Algorithm::AES_128_CTR;
header.key_fingerprint = FileEncryption::calculateKeyFingerprint(key);
header.init_vector = FileEncryption::InitVector::random();
auto wbs3 = getWriteBuffer(remote_file);
getAsyncPolicy().setAutoExecute(true);
WriteBufferFromEncryptedFile wb(10, std::move(wbs3), key, header);
wb.write(data.data(), data.size());
wb.finalize();
}
std::unique_ptr<ReadBufferFromEncryptedFile> encrypted_read_buffer;
{
/// create encrypted file reader
auto cache_log = std::shared_ptr<FilesystemCacheLog>();
const StoredObjects objects = { StoredObject(remote_file, data.size() + FileEncryption::Header::kSize) };
auto reader = std::make_unique<ThreadPoolReader>(1, 1);
auto async_read_counters = std::make_shared<AsyncReadCounters>();
auto prefetch_log = std::shared_ptr<FilesystemReadPrefetchesLog>();
auto rb_creator = [this, disk_read_settings] (const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
{
S3Settings::RequestSettings request_settings;
return std::make_unique<ReadBufferFromS3>(
client,
bucket,
path,
"Latest",
request_settings,
disk_read_settings,
/* use_external_buffer */true,
/* offset */0,
read_until_position,
/* restricted_seek */true);
};
auto rb_remote_fs = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(rb_creator),
objects,
disk_read_settings,
cache_log,
true);
auto rb_async = std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(rb_remote_fs), *reader, disk_read_settings, async_read_counters, prefetch_log);
/// read the header from the buffer
/// as a result AsynchronousBoundedReadBuffer consists some data from the file inside working buffer
FileEncryption::Header header;
header.read(*rb_async);
ASSERT_EQ(rb_async->available(), disk_read_settings.remote_fs_buffer_size - FileEncryption::Header::kSize);
ASSERT_EQ(rb_async->getPosition(), FileEncryption::Header::kSize);
ASSERT_EQ(rb_async->getFileOffsetOfBufferEnd(), disk_read_settings.remote_fs_buffer_size);
/// ReadBufferFromEncryptedFile is constructed over an ReadBuffer which was already in use.
/// The 'FileEncryption::Header' has been read from `rb_async`.
/// 'rb_async' will read the data from `rb_async` working buffer
encrypted_read_buffer = std::make_unique<ReadBufferFromEncryptedFile>(
disk_read_settings.local_fs_buffer_size, std::move(rb_async), key, header);
}
/// When header is read, file is read into working buffer till some position. Tn the test the file is read until remote_fs_buffer_size (124) position.
/// Set the right border before that position and make sure that encrypted_read_buffer does not have access to it
ASSERT_GT(disk_read_settings.remote_fs_buffer_size, 50);
encrypted_read_buffer->setReadUntilPosition(50);
/// encrypted_read_buffer reads the data with buffer size `local_fs_buffer_size`
/// If the impl file has read the data beyond the ReadUntilPosition, encrypted_read_buffer does not read it
/// getFileOffsetOfBufferEnd should read data till `ReadUntilPosition`
String res;
readStringUntilEOF(res, *encrypted_read_buffer);
ASSERT_EQ(res, data.substr(0, 50));
ASSERT_TRUE(encrypted_read_buffer->eof());
}
#endif #endif

View File

@ -77,6 +77,10 @@ static auto getQueryInterpreter(const ASTSubquery & subquery, ExecuteScalarSubqu
subquery_settings.max_result_rows = 1; subquery_settings.max_result_rows = 1;
subquery_settings.extremes = false; subquery_settings.extremes = false;
subquery_context->setSettings(subquery_settings); subquery_context->setSettings(subquery_settings);
/// When execute `INSERT INTO t WITH ... SELECT ...`, it may lead to `Unknown columns`
/// exception with this settings enabled(https://github.com/ClickHouse/ClickHouse/issues/52494).
subquery_context->getQueryContext()->setSetting("use_structure_from_insertion_table_in_table_functions", false);
if (!data.only_analyze && subquery_context->hasQueryContext()) if (!data.only_analyze && subquery_context->hasQueryContext())
{ {
/// Save current cached scalars in the context before analyzing the query /// Save current cached scalars in the context before analyzing the query

View File

@ -439,11 +439,7 @@ namespace
} }
if (!executor->pull(chunk)) if (!executor->pull(chunk))
{
if (check_exit_code)
command->wait();
return {}; return {};
}
current_read_rows += chunk.getNumRows(); current_read_rows += chunk.getNumRows();
} }
@ -466,6 +462,21 @@ namespace
if (thread.joinable()) if (thread.joinable())
thread.join(); thread.join();
if (check_exit_code)
{
if (process_pool)
{
bool valid_command
= configuration.read_fixed_number_of_rows && current_read_rows >= configuration.number_of_rows_to_read;
// We can only wait for pooled commands when they are invalid.
if (!valid_command)
command->wait();
}
else
command->wait();
}
rethrowExceptionDuringSendDataIfNeeded(); rethrowExceptionDuringSendDataIfNeeded();
} }

View File

@ -205,6 +205,8 @@ void DistinctSortedChunkTransform::transform(Chunk & chunk)
if (unlikely(0 == chunk_rows)) if (unlikely(0 == chunk_rows))
return; return;
convertToFullIfSparse(chunk);
Columns input_columns = chunk.detachColumns(); Columns input_columns = chunk.detachColumns();
/// split input columns into sorted and other("non-sorted") columns /// split input columns into sorted and other("non-sorted") columns
initChunkProcessing(input_columns); initChunkProcessing(input_columns);

View File

@ -1217,10 +1217,10 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
auto part_size_str = res.size_of_part ? formatReadableSizeWithBinarySuffix(*res.size_of_part) : "failed to calculate size"; auto part_size_str = res.size_of_part ? formatReadableSizeWithBinarySuffix(*res.size_of_part) : "failed to calculate size";
LOG_ERROR(log, LOG_ERROR(log,
"Detaching broken part {}{} (size: {}). " "Detaching broken part {} (size: {}). "
"If it happened after update, it is likely because of backward incompatibility. " "If it happened after update, it is likely because of backward incompatibility. "
"You need to resolve this manually", "You need to resolve this manually",
getFullPathOnDisk(part_disk_ptr), part_name, part_size_str); fs::path(getFullPathOnDisk(part_disk_ptr)) / part_name, part_size_str);
}; };
try try

View File

@ -527,7 +527,7 @@ StorageS3Source::StorageS3Source(
const String & bucket_, const String & bucket_,
const String & version_id_, const String & version_id_,
std::shared_ptr<IIterator> file_iterator_, std::shared_ptr<IIterator> file_iterator_,
const size_t download_thread_num_, const size_t max_parsing_threads_,
bool need_only_count_, bool need_only_count_,
std::optional<SelectQueryInfo> query_info_) std::optional<SelectQueryInfo> query_info_)
: ISource(info.source_header, false) : ISource(info.source_header, false)
@ -547,7 +547,7 @@ StorageS3Source::StorageS3Source(
, query_info(std::move(query_info_)) , query_info(std::move(query_info_))
, requested_virtual_columns(info.requested_virtual_columns) , requested_virtual_columns(info.requested_virtual_columns)
, file_iterator(file_iterator_) , file_iterator(file_iterator_)
, download_thread_num(download_thread_num_) , max_parsing_threads(max_parsing_threads_)
, need_only_count(need_only_count_) , need_only_count(need_only_count_)
, create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1) , create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1)
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3Reader")) , create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3Reader"))
@ -574,14 +574,21 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
auto compression_method = chooseCompressionMethod(key_with_info.key, compression_hint); auto compression_method = chooseCompressionMethod(key_with_info.key, compression_hint);
auto read_buf = createS3ReadBuffer(key_with_info.key, object_size); auto read_buf = createS3ReadBuffer(key_with_info.key, object_size);
std::optional<size_t> max_parsing_threads;
if (need_only_count) if (need_only_count)
max_parsing_threads = 1; max_parsing_threads = 1;
auto input_format = FormatFactory::instance().getInput( auto input_format = FormatFactory::instance().getInput(
format, *read_buf, sample_block, getContext(), max_block_size, format,
format_settings, max_parsing_threads, std::nullopt, *read_buf,
/* is_remote_fs */ true, compression_method); sample_block,
getContext(),
max_block_size,
format_settings,
max_parsing_threads,
/* max_download_threads= */ std::nullopt,
/* is_remote_fs */ true,
compression_method);
if (query_info.has_value()) if (query_info.has_value())
input_format->setQueryInfo(query_info.value(), getContext()); input_format->setQueryInfo(query_info.value(), getContext());
@ -1046,7 +1053,9 @@ Pipe StorageS3::read(
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files; && local_context->getSettingsRef().optimize_count_from_files;
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads; const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
for (size_t i = 0; i < num_streams; ++i) for (size_t i = 0; i < num_streams; ++i)
{ {
pipes.emplace_back(std::make_shared<StorageS3Source>( pipes.emplace_back(std::make_shared<StorageS3Source>(
@ -1062,7 +1071,7 @@ Pipe StorageS3::read(
query_configuration.url.bucket, query_configuration.url.bucket,
query_configuration.url.version_id, query_configuration.url.version_id,
iterator_wrapper, iterator_wrapper,
max_download_threads, max_parsing_threads,
need_only_count, need_only_count,
query_info)); query_info));
} }

View File

@ -130,7 +130,7 @@ public:
const String & bucket, const String & bucket,
const String & version_id, const String & version_id,
std::shared_ptr<IIterator> file_iterator_, std::shared_ptr<IIterator> file_iterator_,
size_t download_thread_num, size_t max_parsing_threads,
bool need_only_count_, bool need_only_count_,
std::optional<SelectQueryInfo> query_info); std::optional<SelectQueryInfo> query_info);
@ -219,7 +219,7 @@ private:
NamesAndTypesList requested_virtual_columns; NamesAndTypesList requested_virtual_columns;
std::shared_ptr<IIterator> file_iterator; std::shared_ptr<IIterator> file_iterator;
size_t download_thread_num = 1; size_t max_parsing_threads = 1;
bool need_only_count; bool need_only_count;
Poco::Logger * log = &Poco::Logger::get("StorageS3Source"); Poco::Logger * log = &Poco::Logger::get("StorageS3Source");

View File

@ -222,7 +222,7 @@ StorageURLSource::StorageURLSource(
UInt64 max_block_size, UInt64 max_block_size,
const ConnectionTimeouts & timeouts, const ConnectionTimeouts & timeouts,
CompressionMethod compression_method, CompressionMethod compression_method,
size_t download_threads, size_t max_parsing_threads,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
const HTTPHeaderEntries & headers_, const HTTPHeaderEntries & headers_,
const URIParams & params, const URIParams & params,
@ -277,7 +277,6 @@ StorageURLSource::StorageURLSource(
file_progress_callback(FileProgress(0, file_size)); file_progress_callback(FileProgress(0, file_size));
} }
// TODO: Pass max_parsing_threads and max_download_threads adjusted for num_streams.
input_format = FormatFactory::instance().getInput( input_format = FormatFactory::instance().getInput(
format, format,
*read_buf, *read_buf,
@ -285,7 +284,7 @@ StorageURLSource::StorageURLSource(
context, context,
max_block_size, max_block_size,
format_settings, format_settings,
need_only_count ? 1 : download_threads, need_only_count ? 1 : max_parsing_threads,
/*max_download_threads*/ std::nullopt, /*max_download_threads*/ std::nullopt,
/* is_remote_fs */ true, /* is_remote_fs */ true,
compression_method); compression_method);
@ -711,8 +710,6 @@ Pipe IStorageURLBase::read(
{ {
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size); auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr}; std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
bool is_url_with_globs = urlWithGlobs(uri); bool is_url_with_globs = urlWithGlobs(uri);
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements; size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
@ -761,7 +758,9 @@ Pipe IStorageURLBase::read(
Pipes pipes; Pipes pipes;
pipes.reserve(num_streams); pipes.reserve(num_streams);
size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams); const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
for (size_t i = 0; i < num_streams; ++i) for (size_t i = 0; i < num_streams; ++i)
{ {
pipes.emplace_back(std::make_shared<StorageURLSource>( pipes.emplace_back(std::make_shared<StorageURLSource>(
@ -782,7 +781,7 @@ Pipe IStorageURLBase::read(
max_block_size, max_block_size,
getHTTPTimeouts(local_context), getHTTPTimeouts(local_context),
compression_method, compression_method,
download_threads, max_parsing_threads,
query_info, query_info,
headers, headers,
params, params,
@ -801,7 +800,7 @@ Pipe StorageURLWithFailover::read(
ContextPtr local_context, ContextPtr local_context,
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
size_t max_block_size, size_t max_block_size,
size_t /*num_streams*/) size_t num_streams)
{ {
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size); auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
@ -815,6 +814,9 @@ Pipe StorageURLWithFailover::read(
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals()); auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
auto pipe = Pipe(std::make_shared<StorageURLSource>( auto pipe = Pipe(std::make_shared<StorageURLSource>(
read_from_format_info, read_from_format_info,
iterator_wrapper, iterator_wrapper,
@ -827,7 +829,7 @@ Pipe StorageURLWithFailover::read(
max_block_size, max_block_size,
getHTTPTimeouts(local_context), getHTTPTimeouts(local_context),
compression_method, compression_method,
local_context->getSettingsRef().max_download_threads, max_parsing_threads,
query_info, query_info,
headers, headers,
params)); params));

View File

@ -170,7 +170,7 @@ public:
UInt64 max_block_size, UInt64 max_block_size,
const ConnectionTimeouts & timeouts, const ConnectionTimeouts & timeouts,
CompressionMethod compression_method, CompressionMethod compression_method,
size_t download_threads, size_t max_parsing_threads,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
const HTTPHeaderEntries & headers_ = {}, const HTTPHeaderEntries & headers_ = {},
const URIParams & params = {}, const URIParams & params = {},

View File

@ -1467,7 +1467,7 @@ class TestSuite:
else: else:
raise Exception(f"Unknown file_extension: {filename}") raise Exception(f"Unknown file_extension: {filename}")
def parse_tags_from_line(line, comment_sign) -> set[str]: def parse_tags_from_line(line, comment_sign) -> Set[str]:
if not line.startswith(comment_sign): if not line.startswith(comment_sign):
return set() return set()
tags_str = line[len(comment_sign) :].lstrip() # noqa: ignore E203 tags_str = line[len(comment_sign) :].lstrip() # noqa: ignore E203

View File

@ -144,6 +144,7 @@ class Client:
user=None, user=None,
password=None, password=None,
database=None, database=None,
query_id=None,
): ):
return self.get_query_request( return self.get_query_request(
sql, sql,
@ -153,6 +154,7 @@ class Client:
user=user, user=user,
password=password, password=password,
database=database, database=database,
query_id=query_id,
).get_answer_and_error() ).get_answer_and_error()

View File

@ -3476,6 +3476,7 @@ class ClickHouseInstance:
user=None, user=None,
password=None, password=None,
database=None, database=None,
query_id=None,
): ):
logging.debug(f"Executing query {sql} on {self.name}") logging.debug(f"Executing query {sql} on {self.name}")
return self.client.query_and_get_answer_with_error( return self.client.query_and_get_answer_with_error(
@ -3486,6 +3487,7 @@ class ClickHouseInstance:
user=user, user=user,
password=password, password=password,
database=database, database=database,
query_id=query_id,
) )
# Connects to the instance via HTTP interface, sends a query and returns the answer # Connects to the instance via HTTP interface, sends a query and returns the answer

View File

@ -2,7 +2,9 @@ import pytest
import threading import threading
import time import time
import uuid
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node_wait_queries = cluster.add_instance( node_wait_queries = cluster.add_instance(
@ -30,32 +32,55 @@ def start_cluster():
cluster.shutdown() cluster.shutdown()
def do_long_query(node): def do_long_query(node, query_id):
global result global result
result = node.query_and_get_answer_with_error( result = node.query_and_get_answer_with_error(
"SELECT sleepEachRow(1) FROM system.numbers LIMIT 10", "SELECT sleepEachRow(1) FROM system.numbers LIMIT 10",
settings={"send_logs_level": "trace"}, settings={"send_logs_level": "trace"},
query_id=query_id,
) )
def test_shutdown_wait_unfinished_queries(start_cluster): def test_shutdown_wait_unfinished_queries(start_cluster):
global result global result
long_query = threading.Thread(target=do_long_query, args=(node_wait_queries,)) query_id = uuid.uuid4().hex
long_query = threading.Thread(
target=do_long_query,
args=(
node_wait_queries,
query_id,
),
)
long_query.start() long_query.start()
time.sleep(1) assert_eq_with_retry(
node_wait_queries,
f"SELECT query_id FROM system.processes WHERE query_id = '{query_id}'",
query_id,
)
node_wait_queries.stop_clickhouse(kill=False) node_wait_queries.stop_clickhouse(kill=False)
long_query.join() long_query.join()
assert result[0].count("0") == 10 assert result[0].count("0") == 10
long_query = threading.Thread(target=do_long_query, args=(node_kill_queries,)) query_id = uuid.uuid4().hex
long_query = threading.Thread(
target=do_long_query,
args=(
node_kill_queries,
query_id,
),
)
long_query.start() long_query.start()
time.sleep(1) assert_eq_with_retry(
node_kill_queries,
f"SELECT query_id FROM system.processes WHERE query_id = '{query_id}'",
query_id,
)
node_kill_queries.stop_clickhouse(kill=False) node_kill_queries.stop_clickhouse(kill=False)
long_query.join() long_query.join()

View File

@ -1,55 +0,0 @@
<test>
<!-- "Strict" mode (key and iv length checks), empty plaintext.
Ciphers that produce non-empty ciphertext on empty plaintext, and hence can't be optimized.-->
<substitutions>
<substitution>
<name>func</name>
<values>
<!-- materialize(plaitext) is to avoid all-args-are-const optimization, resulting in executing function exactly once. -->
<value>encrypt('aes-128-cbc', materialize(plaintext), key16, iv16)</value>
<value>encrypt('aes-128-ecb', materialize(plaintext), key16)</value>
<value>encrypt('aes-128-gcm', materialize(plaintext), key16, iv12, 'aadaadaadaad')</value>
<value>encrypt('aes-192-cbc', materialize(plaintext), key24, iv16)</value>
<value>encrypt('aes-192-ecb', materialize(plaintext), key24)</value>
<value>encrypt('aes-192-gcm', materialize(plaintext), key24, iv12, 'aadaadaadaad')</value>
<value>encrypt('aes-256-cbc', materialize(plaintext), key32, iv16)</value>
<value>encrypt('aes-256-ecb', materialize(plaintext), key32)</value>
<value>encrypt('aes-256-gcm', materialize(plaintext), key32, iv12, 'aadaadaadaad')</value>
<!-- decrypt + encrypt since it is really hard to compose decrypt-only case -->
<value>decrypt('aes-128-cbc', encrypt('aes-128-cbc', materialize(plaintext), key16, iv16), key16, iv16)</value>
<value>decrypt('aes-128-ecb', encrypt('aes-128-ecb', materialize(plaintext), key16), key16)</value>
<value>decrypt('aes-128-gcm', encrypt('aes-128-gcm', materialize(plaintext), key16, iv12, 'aadaadaadaad'), key16, iv12, 'aadaadaadaad')</value>
<value>decrypt('aes-192-cbc', encrypt('aes-192-cbc', materialize(plaintext), key24, iv16), key24, iv16)</value>
<value>decrypt('aes-192-ecb', encrypt('aes-192-ecb', materialize(plaintext), key24), key24)</value>
<value>decrypt('aes-192-gcm', encrypt('aes-192-gcm', materialize(plaintext), key24, iv12, 'aadaadaadaad'), key24, iv12, 'aadaadaadaad')</value>
<value>decrypt('aes-256-cbc', encrypt('aes-256-cbc', materialize(plaintext), key32, iv16), key32, iv16)</value>
<value>decrypt('aes-256-ecb', encrypt('aes-256-ecb', materialize(plaintext), key32), key32)</value>
<value>decrypt('aes-256-gcm', encrypt('aes-256-gcm', materialize(plaintext), key32, iv12, 'aadaadaadaad'), key32, iv12, 'aadaadaadaad')</value>
</values>
</substitution>
<substitution>
<name>table</name>
<values>
<value>numbers(2000000)</value>
</values>
</substitution>
<substitution>
<name>plaintext</name>
<values>
<value>''</value>
</values>
</substitution>
</substitutions>
<!-- allow OpenSSL-related code load ciphers and warm-up -->
<fill_query>WITH {plaintext} as plaintext, repeat('k', 32) as key32, substring(key32, 1, 24) as key24, substring(key32, 1, 16) as key16, repeat('iv', 8) as iv16, substring(iv16, 1, 12) as iv12 SELECT count() FROM {table} WHERE NOT ignore({func}) LIMIT 1</fill_query>
<query>WITH {plaintext} as plaintext, repeat('k', 32) as key32, substring(key32, 1, 24) as key24, substring(key32, 1, 16) as key16, repeat('iv', 8) as iv16, substring(iv16, 1, 12) as iv12 SELECT count() FROM {table} WHERE NOT ignore({func})</query>
</test>

View File

@ -6,10 +6,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
echo -ne '1,Hello\n2,World\n' | ${CLICKHOUSE_CURL} -sSF 'file=@-' "${CLICKHOUSE_URL}&query=SELECT+*+FROM+file&file_format=CSV&file_types=UInt8,String"; echo -ne '1,Hello\n2,World\n' | ${CLICKHOUSE_CURL} -sSF 'file=@-' "${CLICKHOUSE_URL}&query=SELECT+*+FROM+file&file_format=CSV&file_types=UInt8,String";
echo -ne '1@Hello\n2@World\n' | ${CLICKHOUSE_CURL} -sSF 'file=@-' "${CLICKHOUSE_URL}&query=SELECT+*+FROM+file&file_format=CSV&file_types=UInt8,String&format_csv_delimiter=@"; echo -ne '1@Hello\n2@World\n' | ${CLICKHOUSE_CURL} -sSF 'file=@-' "${CLICKHOUSE_URL}&query=SELECT+*+FROM+file&file_format=CSV&file_types=UInt8,String&format_csv_delimiter=@";
echo -ne '\x01\x00\x00\x00\x02\x00\x00\x00' | ${CLICKHOUSE_CURL} -sSF "tmp=@-" "${CLICKHOUSE_URL}&query=SELECT+*+FROM+tmp&tmp_structure=TaskID+UInt32&tmp_format=RowBinary";
# use big-endian version of binary data for s390x
if [[ $(uname -a | grep s390x) ]]; then
echo -ne '\x00\x00\x00\x01\x00\x00\x00\x02' | ${CLICKHOUSE_CURL} -sSF "tmp=@-" "${CLICKHOUSE_URL}&query=SELECT+*+FROM+tmp&tmp_structure=TaskID+UInt32&tmp_format=RowBinary";
else
echo -ne '\x01\x00\x00\x00\x02\x00\x00\x00' | ${CLICKHOUSE_CURL} -sSF "tmp=@-" "${CLICKHOUSE_URL}&query=SELECT+*+FROM+tmp&tmp_structure=TaskID+UInt32&tmp_format=RowBinary";
fi

View File

@ -69,3 +69,13 @@ Additional test
1 1
1 1
1 1
-1
-7
-23
-104
-730
-17520
-1051200
-63072000
-63072000000
-63072000000000

View File

@ -74,3 +74,14 @@ SELECT dateDiff('second', toDateTime('2014-10-26 00:00:00', 'UTC'), toDateTime('
SELECT 'Additional test'; SELECT 'Additional test';
SELECT number = dateDiff('month', now() - INTERVAL number MONTH, now()) FROM system.numbers LIMIT 10; SELECT number = dateDiff('month', now() - INTERVAL number MONTH, now()) FROM system.numbers LIMIT 10;
SELECT dateDiff('years', toDate('2017-12-31'), toDate('2016-01-01'));
SELECT dateDiff('quarters', toDate('2017-12-31'), toDate('2016-01-01'));
SELECT dateDiff('months', toDateTime('2017-12-31'), toDateTime('2016-01-01'));
SELECT dateDiff('weeks', toDateTime('2017-12-31'), toDateTime('2016-01-01'));
SELECT dateDiff('days', toDateTime('2017-12-31'), toDateTime('2016-01-01'));
SELECT dateDiff('hours', toDateTime('2017-12-31'), toDateTime('2016-01-01'), 'UTC');
SELECT dateDiff('minutes', toDateTime('2017-12-31'), toDateTime('2016-01-01'), 'UTC');
SELECT dateDiff('seconds', toDateTime('2017-12-31'), toDateTime('2016-01-01'), 'UTC');
SELECT dateDiff('milliseconds', toDateTime('2017-12-31'), toDateTime('2016-01-01'), 'UTC');
SELECT dateDiff('microseconds', toDateTime('2017-12-31'), toDateTime('2016-01-01'), 'UTC');

View File

@ -77,6 +77,10 @@ SELECT JSON_QUERY('{"array":[[0, 1, 2, 3, 4, 5], [0, -1, -2, -3, -4, -5]]}', '$.
[0, 1, 4, 0, -1, -4] [0, 1, 4, 0, -1, -4]
SELECT JSON_QUERY('{"1key":1}', '$.1key'); SELECT JSON_QUERY('{"1key":1}', '$.1key');
[1] [1]
SELECT JSON_QUERY('{"123":1}', '$.123');
[1]
SELECT JSON_QUERY('{"123":1}', '$[123]');
SELECT JSON_QUERY('{"hello":1}', '$[hello]'); SELECT JSON_QUERY('{"hello":1}', '$[hello]');
[1] [1]
SELECT JSON_QUERY('{"hello":1}', '$["hello"]'); SELECT JSON_QUERY('{"hello":1}', '$["hello"]');

View File

@ -43,6 +43,8 @@ SELECT JSON_QUERY( '{hello:{"world":"!"}}}', '$.hello'); -- invalid json => defa
SELECT JSON_QUERY('', '$.hello'); SELECT JSON_QUERY('', '$.hello');
SELECT JSON_QUERY('{"array":[[0, 1, 2, 3, 4, 5], [0, -1, -2, -3, -4, -5]]}', '$.array[*][0 to 2, 4]'); SELECT JSON_QUERY('{"array":[[0, 1, 2, 3, 4, 5], [0, -1, -2, -3, -4, -5]]}', '$.array[*][0 to 2, 4]');
SELECT JSON_QUERY('{"1key":1}', '$.1key'); SELECT JSON_QUERY('{"1key":1}', '$.1key');
SELECT JSON_QUERY('{"123":1}', '$.123');
SELECT JSON_QUERY('{"123":1}', '$[123]');
SELECT JSON_QUERY('{"hello":1}', '$[hello]'); SELECT JSON_QUERY('{"hello":1}', '$[hello]');
SELECT JSON_QUERY('{"hello":1}', '$["hello"]'); SELECT JSON_QUERY('{"hello":1}', '$["hello"]');
SELECT JSON_QUERY('{"hello":1}', '$[\'hello\']'); SELECT JSON_QUERY('{"hello":1}', '$[\'hello\']');

View File

@ -16,13 +16,13 @@ set max_bytes_before_external_group_by = '2G',
max_block_size = 65505; max_block_size = 65505;
-- whole aggregation state of local aggregation uncompressed is 5.8G -- whole aggregation state of local aggregation uncompressed is 5.8G
-- it is hard to provide an accurate estimation for memory usage, so 4G is just the actual value taken from the logs + delta -- it is hard to provide an accurate estimation for memory usage, so 5G is just the actual value taken from the logs + delta
-- also avoid using localhost, so the queries will go over separate connections -- also avoid using localhost, so the queries will go over separate connections
-- (otherwise the memory usage for merge will be counted together with the localhost query) -- (otherwise the memory usage for merge will be counted together with the localhost query)
select a, b, c, sum(a) as s select a, b, c, sum(a) as s
from remote('127.0.0.{2,3}', currentDatabase(), t_2354_dist_with_external_aggr) from remote('127.0.0.{2,3}', currentDatabase(), t_2354_dist_with_external_aggr)
group by a, b, c group by a, b, c
format Null format Null
settings max_memory_usage = '4Gi'; settings max_memory_usage = '5Gi';
DROP TABLE t_2354_dist_with_external_aggr; DROP TABLE t_2354_dist_with_external_aggr;

View File

@ -0,0 +1,3 @@
user127 1
user405 1
user902 1

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
DATA_DIR=$CUR_DIR/data_tsv
$CLICKHOUSE_LOCAL --multiquery \
"CREATE VIEW users AS SELECT * FROM file('$DATA_DIR/mock_data.tsv', TSVWithNamesAndTypes);
CREATE TABLE users_output (name String, tag UInt64)ENGINE = Memory;
INSERT INTO users_output WITH (SELECT groupUniqArrayArray(mapKeys(Tags)) FROM users) AS unique_tags SELECT UserName AS name, length(unique_tags) AS tag FROM users;
SELECT * FROM users_output;"

View File

@ -0,0 +1,12 @@
-- { echoOn }
SELECT name, column, serialization_kind
FROM system.parts_columns
WHERE table = 't_sparse_distinct' AND database = currentDatabase() AND column = 'v'
ORDER BY name;
all_1_1_0 v Default
all_2_2_0 v Sparse
set optimize_distinct_in_order=1;
set max_threads=1;
select trimLeft(explain) from (explain pipeline SELECT DISTINCT id, v FROM t_sparse_distinct) where explain ilike '%DistinctSortedChunkTransform%';
DistinctSortedChunkTransform
SELECT DISTINCT id, v FROM t_sparse_distinct format Null;

View File

@ -0,0 +1,25 @@
DROP TABLE IF EXISTS t_sparse_distinct;
CREATE TABLE t_sparse_distinct (id UInt32, v String)
ENGINE = MergeTree
ORDER BY id
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.9;
SYSTEM STOP MERGES t_sparse_distinct;
INSERT INTO t_sparse_distinct SELECT number % 10, toString(number % 100 = 0) FROM numbers(100);
INSERT INTO t_sparse_distinct(id) SELECT number % 10 FROM numbers(100);
-- { echoOn }
SELECT name, column, serialization_kind
FROM system.parts_columns
WHERE table = 't_sparse_distinct' AND database = currentDatabase() AND column = 'v'
ORDER BY name;
set optimize_distinct_in_order=1;
set max_threads=1;
select trimLeft(explain) from (explain pipeline SELECT DISTINCT id, v FROM t_sparse_distinct) where explain ilike '%DistinctSortedChunkTransform%';
SELECT DISTINCT id, v FROM t_sparse_distinct format Null;
DROP TABLE t_sparse_distinct;

View File

@ -0,0 +1,5 @@
UserName Age Tags
String Int8 Map(String, UInt64)
user127 20 {'test': 123}
user405 43 {'test': 123}
user902 43 {'test': 123}
1 UserName Age Tags
2 String Int8 Map(String, UInt64)
3 user127 20 {'test': 123}
4 user405 43 {'test': 123}
5 user902 43 {'test': 123}

View File

@ -14,6 +14,7 @@ v23.4.4.16-stable 2023-06-17
v23.4.3.48-stable 2023-06-12 v23.4.3.48-stable 2023-06-12
v23.4.2.11-stable 2023-05-02 v23.4.2.11-stable 2023-05-02
v23.4.1.1943-stable 2023-04-27 v23.4.1.1943-stable 2023-04-27
v23.3.10.5-lts 2023-08-23
v23.3.9.55-lts 2023-08-21 v23.3.9.55-lts 2023-08-21
v23.3.8.21-lts 2023-07-13 v23.3.8.21-lts 2023-07-13
v23.3.7.5-lts 2023-06-29 v23.3.7.5-lts 2023-06-29

1 v23.7.4.5-stable 2023-08-08
14 v23.4.3.48-stable 2023-06-12
15 v23.4.2.11-stable 2023-05-02
16 v23.4.1.1943-stable 2023-04-27
17 v23.3.10.5-lts 2023-08-23
18 v23.3.9.55-lts 2023-08-21
19 v23.3.8.21-lts 2023-07-13
20 v23.3.7.5-lts 2023-06-29