Merge remote-tracking branch 'origin/master' into query-plan-update-sort-description

This commit is contained in:
Igor Nikonov 2023-03-16 14:53:52 +00:00
commit f5c8250f58
251 changed files with 2496 additions and 1354 deletions

View File

@ -23,9 +23,12 @@ Checks: '*,
-bugprone-implicit-widening-of-multiplication-result,
-bugprone-narrowing-conversions,
-bugprone-not-null-terminated-result,
-bugprone-reserved-identifier,
-bugprone-unchecked-optional-access,
-cert-dcl16-c,
-cert-dcl37-c,
-cert-dcl51-cpp,
-cert-err58-cpp,
-cert-msc32-c,
-cert-msc51-cpp,
@ -129,6 +132,7 @@ Checks: '*,
-readability-function-cognitive-complexity,
-readability-function-size,
-readability-identifier-length,
-readability-identifier-naming,
-readability-implicit-bool-conversion,
-readability-isolate-declaration,
-readability-magic-numbers,

View File

@ -301,7 +301,7 @@ if (ENABLE_BUILD_PROFILING)
endif ()
endif ()
set (CMAKE_CXX_STANDARD 20)
set (CMAKE_CXX_STANDARD 23)
set (CMAKE_CXX_EXTENSIONS ON) # Same as gnu++2a (ON) vs c++2a (OFF): https://cmake.org/cmake/help/latest/prop_tgt/CXX_EXTENSIONS.html
set (CMAKE_CXX_STANDARD_REQUIRED ON)

View File

@ -2,6 +2,10 @@ if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif ()
# TODO: Remove this. We like to compile with C++23 (set by top-level CMakeLists) but Clang crashes with our libcxx
# when instantiated from JSON.cpp. Try again when libcxx(abi) and Clang are upgraded to 16.
set (CMAKE_CXX_STANDARD 20)
set (SRCS
argsToConfig.cpp
coverage.cpp

View File

@ -159,22 +159,22 @@ inline const char * find_first_symbols_sse42(const char * const begin, const cha
#endif
for (; pos < end; ++pos)
if ( (num_chars >= 1 && maybe_negate<positive>(*pos == c01))
|| (num_chars >= 2 && maybe_negate<positive>(*pos == c02))
|| (num_chars >= 3 && maybe_negate<positive>(*pos == c03))
|| (num_chars >= 4 && maybe_negate<positive>(*pos == c04))
|| (num_chars >= 5 && maybe_negate<positive>(*pos == c05))
|| (num_chars >= 6 && maybe_negate<positive>(*pos == c06))
|| (num_chars >= 7 && maybe_negate<positive>(*pos == c07))
|| (num_chars >= 8 && maybe_negate<positive>(*pos == c08))
|| (num_chars >= 9 && maybe_negate<positive>(*pos == c09))
|| (num_chars >= 10 && maybe_negate<positive>(*pos == c10))
|| (num_chars >= 11 && maybe_negate<positive>(*pos == c11))
|| (num_chars >= 12 && maybe_negate<positive>(*pos == c12))
|| (num_chars >= 13 && maybe_negate<positive>(*pos == c13))
|| (num_chars >= 14 && maybe_negate<positive>(*pos == c14))
|| (num_chars >= 15 && maybe_negate<positive>(*pos == c15))
|| (num_chars >= 16 && maybe_negate<positive>(*pos == c16)))
if ( (num_chars == 1 && maybe_negate<positive>(is_in<c01>(*pos)))
|| (num_chars == 2 && maybe_negate<positive>(is_in<c01, c02>(*pos)))
|| (num_chars == 3 && maybe_negate<positive>(is_in<c01, c02, c03>(*pos)))
|| (num_chars == 4 && maybe_negate<positive>(is_in<c01, c02, c03, c04>(*pos)))
|| (num_chars == 5 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05>(*pos)))
|| (num_chars == 6 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06>(*pos)))
|| (num_chars == 7 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07>(*pos)))
|| (num_chars == 8 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08>(*pos)))
|| (num_chars == 9 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09>(*pos)))
|| (num_chars == 10 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10>(*pos)))
|| (num_chars == 11 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11>(*pos)))
|| (num_chars == 12 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12>(*pos)))
|| (num_chars == 13 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13>(*pos)))
|| (num_chars == 14 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13, c14>(*pos)))
|| (num_chars == 15 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13, c14, c15>(*pos)))
|| (num_chars == 16 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13, c14, c15, c16>(*pos))))
return pos;
return return_mode == ReturnMode::End ? end : nullptr;
}

View File

@ -466,7 +466,7 @@ namespace Data
bool extractManualImpl(std::size_t pos, T & val, SQLSMALLINT cType)
{
SQLRETURN rc = 0;
T value = (T)0;
T value;
resizeLengths(pos);

View File

@ -105,6 +105,8 @@ public:
const std::string & getText() const;
/// Returns the text of the message.
void appendText(const std::string & text);
void setPriority(Priority prio);
/// Sets the priority of the message.

View File

@ -27,8 +27,7 @@ Message::Message():
_tid(0),
_file(0),
_line(0),
_pMap(0),
_fmt_str(0)
_pMap(0)
{
init();
}
@ -157,6 +156,12 @@ void Message::setText(const std::string& text)
}
void Message::appendText(const std::string & text)
{
_text.append(text);
}
void Message::setPriority(Priority prio)
{
_prio = prio;

View File

@ -48,6 +48,9 @@ set(gRPC_ABSL_PROVIDER "clickhouse" CACHE STRING "" FORCE)
# We don't want to build C# extensions.
set(gRPC_BUILD_CSHARP_EXT OFF)
# TODO: Remove this. We generally like to compile with C++23 but grpc isn't ready yet.
set (CMAKE_CXX_STANDARD 20)
set(_gRPC_CARES_LIBRARIES ch_contrib::c-ares)
set(gRPC_CARES_PROVIDER "clickhouse" CACHE STRING "" FORCE)
add_subdirectory("${_gRPC_SOURCE_DIR}" "${_gRPC_BINARY_DIR}")

2
contrib/krb5 vendored

@ -1 +1 @@
Subproject commit f8262a1b548eb29d97e059260042036255d07f8d
Subproject commit 9453aec0d50e5aff9b189051611b321b40935d02

View File

@ -160,6 +160,8 @@ set(ALL_SRCS
# "${KRB5_SOURCE_DIR}/lib/gssapi/spnego/negoex_trace.c"
"${KRB5_SOURCE_DIR}/lib/crypto/builtin/kdf.c"
"${KRB5_SOURCE_DIR}/lib/crypto/builtin/cmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/prng.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/enc_dk_cmac.c"
# "${KRB5_SOURCE_DIR}/lib/crypto/krb/crc32.c"
@ -183,7 +185,6 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/crypto/krb/block_size.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/string_to_key.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/verify_checksum.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/crypto_libinit.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/derive.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/random_to_key.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/verify_checksum_iov.c"
@ -217,9 +218,7 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/crypto/krb/s2k_rc4.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/valid_cksumtype.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/nfold.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/prng_fortuna.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/encrypt_length.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/cmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/keyblocks.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/prf_rc4.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/s2k_pbkdf2.c"
@ -228,11 +227,11 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/enc_provider/rc4.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/enc_provider/des3.c"
#"${KRB5_SOURCE_DIR}/lib/crypto/openssl/enc_provider/camellia.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/cmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/sha256.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/hmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/kdf.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/pbkdf2.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/init.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/stubs.c"
# "${KRB5_SOURCE_DIR}/lib/crypto/openssl/hash_provider/hash_crc32.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/hash_provider/hash_evp.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/des/des_keys.c"
@ -312,7 +311,6 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/krb5/krb/allow_weak.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/mk_rep.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/mk_priv.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/s4u_authdata.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/preauth_otp.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/init_keyblock.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/ser_addr.c"
@ -688,6 +686,7 @@ target_include_directories(_krb5 PRIVATE
target_compile_definitions(_krb5 PRIVATE
KRB5_PRIVATE
CRYPTO_OPENSSL
_GSS_STATIC_LINK=1
KRB5_DEPRECATED=1
LOCALEDIR="/usr/local/share/locale"

View File

@ -44,6 +44,8 @@ if [ "$is_tsan_build" -eq "0" ]; then
fi
export ZOOKEEPER_FAULT_INJECTION=1
# Initial run without S3 to create system.*_log on local file system to make it
# available for dump via clickhouse-local
configure
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log &

View File

@ -49,17 +49,19 @@ echo -e "Successfully cloned previous release tests$OK" >> /test_output/test_res
echo -e "Successfully downloaded previous release packages$OK" >> /test_output/test_results.tsv
# Make upgrade check more funny by forcing Ordinary engine for system database
mkdir /var/lib/clickhouse/metadata
mkdir -p /var/lib/clickhouse/metadata
echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/system.sql
# Install previous release packages
install_packages previous_release_package_folder
# Start server from previous release
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
# Previous version may not be ready for fault injections
export ZOOKEEPER_FAULT_INJECTION=0
# Initial run without S3 to create system.*_log on local file system to make it
# available for dump via clickhouse-local
configure
start
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
# force_sync=false doesn't work correctly on some older versions
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
@ -67,8 +69,6 @@ sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
configure
# But we still need default disk because some tables loaded only into it
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml \
| sed "s|<main><disk>s3</disk></main>|<main><disk>s3</disk></main><default><disk>default</disk></default>|" \
@ -76,6 +76,13 @@ sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml \
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
# Start server from previous release
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
# Previous version may not be ready for fault injections
export ZOOKEEPER_FAULT_INJECTION=0
configure
start
clickhouse-client --query="SELECT 'Server version: ', version()"
@ -185,8 +192,6 @@ tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||:
collect_query_and_trace_logs
check_oom_in_dmesg
mv /var/log/clickhouse-server/stderr.log /test_output/
# Write check result into check_status.tsv

View File

@ -309,6 +309,7 @@ The HTTP interface allows passing external data (external temporary tables) for
## Response Buffering {#response-buffering}
You can enable response buffering on the server-side. The `buffer_size` and `wait_end_of_query` URL parameters are provided for this purpose.
Also settings `http_response_buffer_size` and `http_wait_end_of_query` can be used.
`buffer_size` determines the number of bytes in the result to buffer in the server memory. If a result body is larger than this threshold, the buffer is written to the HTTP channel, and the remaining data is sent directly to the HTTP channel.

View File

@ -765,7 +765,7 @@ Default value: `0`.
## concurrent_threads_soft_limit_ratio_to_cores {#concurrent_threads_soft_limit_ratio_to_cores}
The maximum number of query processing threads as multiple of number of logical cores.
More details: [concurrent_threads_soft_limit_num](#concurrent-threads-soft-limit-num).
More details: [concurrent_threads_soft_limit_num](#concurrent_threads_soft_limit_num).
Possible values:

View File

@ -966,10 +966,10 @@ This is an expert-level setting, and you shouldn't change it if you're just gett
## max_query_size {#settings-max_query_size}
The maximum part of a query that can be taken to RAM for parsing with the SQL parser.
The INSERT query also contains data for INSERT that is processed by a separate stream parser (that consumes O(1) RAM), which is not included in this restriction.
The maximum number of bytes of a query string parsed by the SQL parser.
Data in the VALUES clause of INSERT queries is processed by a separate stream parser (that consumes O(1) RAM) and not affected by this restriction.
Default value: 256 KiB.
Default value: 262144 (= 256 KiB).
## max_parser_depth {#max_parser_depth}

View File

@ -80,7 +80,7 @@ Required parameters:
- `type``encrypted`. Otherwise the encrypted disk is not created.
- `disk` — Type of disk for data storage.
- `key` — The key for encryption and decryption. Type: [Uint64](/docs/en/sql-reference/data-types/int-uint.md). You can use `key_hex` parameter to encrypt in hexadecimal form.
- `key` — The key for encryption and decryption. Type: [Uint64](/docs/en/sql-reference/data-types/int-uint.md). You can use `key_hex` parameter to encode the key in hexadecimal form.
You can specify multiple keys using the `id` attribute (see example above).
Optional parameters:

View File

@ -6,25 +6,26 @@ sidebar_label: clickhouse-local
# clickhouse-local
The `clickhouse-local` program enables you to perform fast processing on local files, without having to deploy and configure the ClickHouse server.
The `clickhouse-local` program enables you to perform fast processing on local files, without having to deploy and configure the ClickHouse server. It accepts data that represent tables and queries them using [ClickHouse SQL dialect](../../sql-reference/). `clickhouse-local` uses the same core as ClickHouse server, so it supports most of the features and the same set of formats and table engines.
Accepts data that represent tables and queries them using [ClickHouse SQL dialect](../../sql-reference/).
`clickhouse-local` uses the same core as ClickHouse server, so it supports most of the features and the same set of formats and table engines.
By default `clickhouse-local` does not have access to data on the same host, but it supports loading server configuration using `--config-file` argument.
For temporary data, a unique temporary data directory is created by default.
By default `clickhouse-local` has access to data on the same host, and it does not depend on the server's configuration. It also supports loading server configuration using `--config-file` argument. For temporary data, a unique temporary data directory is created by default.
## Usage {#usage}
Basic usage:
Basic usage (Linux):
``` bash
$ clickhouse-local --structure "table_structure" --input-format "format_of_incoming_data" \
--query "query"
$ clickhouse-local --structure "table_structure" --input-format "format_of_incoming_data" --query "query"
```
Basic usage (Mac):
``` bash
$ ./clickhouse local --structure "table_structure" --input-format "format_of_incoming_data" --query "query"
```
Also supported on Windows through WSL2.
Arguments:
- `-S`, `--structure` — table structure for input data.

View File

@ -393,15 +393,15 @@ These codecs are designed to make compression more effective by using specific f
#### DoubleDelta
`DoubleDelta` — Calculates delta of deltas and writes it in compact binary form. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-byte deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
`DoubleDelta(bytes_size)` — Calculates delta of deltas and writes it in compact binary form. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-bit deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
#### Gorilla
`Gorilla` — Calculates XOR between current and previous floating point value and writes it in compact binary form. The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. For additional information, see section 4.1 in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](https://doi.org/10.14778/2824032.2824078).
`Gorilla(bytes_size)` — Calculates XOR between current and previous floating point value and writes it in compact binary form. The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1. For additional information, see section 4.1 in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](https://doi.org/10.14778/2824032.2824078).
#### FPC
`FPC` - Repeatedly predicts the next floating point value in the sequence using the better of two predictors, then XORs the actual with the predicted value, and leading-zero compresses the result. Similar to Gorilla, this is efficient when storing a series of floating point values that change slowly. For 64-bit values (double), FPC is faster than Gorilla, for 32-bit values your mileage may vary. For a detailed description of the algorithm see [High Throughput Compression of Double-Precision Floating-Point Data](https://userweb.cs.txstate.edu/~burtscher/papers/dcc07a.pdf).
`FPC(level, float_size)` - Repeatedly predicts the next floating point value in the sequence using the better of two predictors, then XORs the actual with the predicted value, and leading-zero compresses the result. Similar to Gorilla, this is efficient when storing a series of floating point values that change slowly. For 64-bit values (double), FPC is faster than Gorilla, for 32-bit values your mileage may vary. Possible `level` values: 1-28, the default value is 12. Possible `float_size` values: 4, 8, the default value is `sizeof(type)` if type is Float. In all other cases, its 4. For a detailed description of the algorithm see [High Throughput Compression of Double-Precision Floating-Point Data](https://userweb.cs.txstate.edu/~burtscher/papers/dcc07a.pdf).
#### T64

View File

@ -91,6 +91,13 @@ INSERT INTO t FORMAT TabSeparated
You can insert data separately from the query by using the command-line client or the HTTP interface. For more information, see the section “[Interfaces](../../interfaces)”.
:::note
If you want to specify `SETTINGS` for `INSERT` query then you have to do it _before_ `FORMAT` clause since everything after `FORMAT format_name` is treated as data. For example:
```sql
INSERT INTO table SETTINGS ... FORMAT format_name data_set
```
:::
## Constraints
If table has [constraints](../../sql-reference/statements/create/table.md#constraints), their expressions will be checked for each row of inserted data. If any of those constraints is not satisfied — server will raise an exception containing constraint name and expression, the query will be stopped.

View File

@ -66,6 +66,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
using namespace DB;
namespace po = boost::program_options;
bool print_stacktrace = false;
try
{
po::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
@ -84,6 +85,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("level", po::value<int>(), "compression level for codecs specified via flags")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
("stacktrace", "print stacktrace of exception")
;
po::positional_options_description positional_desc;
@ -107,6 +109,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
bool use_deflate_qpl = options.count("deflate_qpl");
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
print_stacktrace = options.count("stacktrace");
unsigned block_size = options["block-size"].as<unsigned>();
std::vector<std::string> codecs;
if (options.count("codec"))
@ -188,11 +191,12 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
/// Compression
CompressedWriteBuffer to(*wb, codec, block_size);
copyData(*rb, to);
to.finalize();
}
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
std::cerr << getCurrentExceptionMessage(print_stacktrace) << '\n';
return getCurrentExceptionCode();
}

View File

@ -1757,8 +1757,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
LOG_INFO(log, "All helping tables dropped partition {}", partition_name);
}
String ClusterCopier::getRemoteCreateTable(
const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
{
auto remote_context = Context::createCopy(context);
remote_context->setSettings(settings);
@ -1777,8 +1776,10 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time
{
/// Fetch and parse (possibly) new definition
auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull, true);
String create_query_pull_str
= getRemoteCreateTable(task_shard.task_table.table_pull, *connection_entry, task_cluster->settings_pull);
String create_query_pull_str = getRemoteCreateTable(
task_shard.task_table.table_pull,
*connection_entry,
task_cluster->settings_pull);
ParserCreateQuery parser_create_query;
const auto & settings = getContext()->getSettingsRef();
@ -2025,8 +2026,8 @@ UInt64 ClusterCopier::executeQueryOnCluster(
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
*connections.back(), query, header, getContext(),
/*throttler=*/nullptr, Scalars(), Tables(), QueryProcessingStage::Complete);
*connections.back(), query, header, getContext(),
/*throttler=*/nullptr, Scalars(), Tables(), QueryProcessingStage::Complete);
try
{

View File

@ -30,7 +30,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
}
@ -180,8 +180,19 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
columns.emplace_back(column_name, std::move(column_type));
}
/// Usually this should not happen, since in case of table does not
/// exists, the call should be succeeded.
/// However it is possible sometimes because internally there are two
/// queries in ClickHouse ODBC bridge:
/// - system.tables
/// - system.columns
/// And if between this two queries the table will be removed, them
/// there will be no columns
///
/// Also sometimes system.columns can return empty result because of
/// the cached value of total tables to scan.
if (columns.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns definition was not returned");
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Columns definition was not returned");
WriteBufferFromHTTPServerResponse out(
response,

View File

@ -67,7 +67,6 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
#include <Storages/registerStorages.h>
#include <QueryPipeline/ConnectionCollector.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <IO/Resource/registerSchedulerNodes.h>
@ -816,8 +815,6 @@ try
}
);
ConnectionCollector::init(global_context, server_settings.max_threads_for_connection_collector);
bool has_zookeeper = config().has("zookeeper");
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });

View File

@ -348,10 +348,6 @@
<background_distributed_schedule_pool_size>16</background_distributed_schedule_pool_size>
-->
<!-- Number of workers to recycle connections in background (see also drain_timeout).
If the pool is full, connection will be drained synchronously. -->
<!-- <max_threads_for_connection_collector>10</max_threads_for_connection_collector> -->
<!-- On memory constrained environments you may have to set this to value larger than 1.
-->
<max_server_memory_usage_to_ram_ratio>0.9</max_server_memory_usage_to_ram_ratio>

View File

@ -72,12 +72,11 @@ namespace
return std::make_shared<BackupEntryFromMemory>(buf.str());
}
static AccessEntitiesInBackup fromBackupEntry(const IBackupEntry & backup_entry, const String & file_path)
static AccessEntitiesInBackup fromBackupEntry(std::unique_ptr<ReadBuffer> buf, const String & file_path)
{
try
{
AccessEntitiesInBackup res;
std::unique_ptr<ReadBuffer> buf = backup_entry.getReadBuffer();
bool dependencies_found = false;
@ -343,8 +342,8 @@ void AccessRestorerFromBackup::addDataPath(const String & data_path)
for (const String & filename : filenames)
{
String filepath_in_backup = data_path_in_backup_fs / filename;
auto backup_entry = backup->readFile(filepath_in_backup);
auto ab = AccessEntitiesInBackup::fromBackupEntry(*backup_entry, filepath_in_backup);
auto read_buffer_from_backup = backup->readFile(filepath_in_backup);
auto ab = AccessEntitiesInBackup::fromBackupEntry(std::move(read_buffer_from_backup), filepath_in_backup);
boost::range::copy(ab.entities, std::back_inserter(entities));
boost::range::copy(ab.dependencies, std::inserter(dependencies, dependencies.end()));

View File

@ -32,17 +32,17 @@ enum class GroupByKind
GROUPING_SETS
};
class GroupingFunctionResolveVisitor : public InDepthQueryTreeVisitor<GroupingFunctionResolveVisitor>
class GroupingFunctionResolveVisitor : public InDepthQueryTreeVisitorWithContext<GroupingFunctionResolveVisitor>
{
public:
GroupingFunctionResolveVisitor(GroupByKind group_by_kind_,
QueryTreeNodePtrWithHashMap<size_t> aggregation_key_to_index_,
ColumnNumbersList grouping_sets_keys_indices_,
ContextPtr context_)
: group_by_kind(group_by_kind_)
: InDepthQueryTreeVisitorWithContext(std::move(context_))
, group_by_kind(group_by_kind_)
, aggregation_key_to_index(std::move(aggregation_key_to_index_))
, grouping_sets_keys_indexes(std::move(grouping_sets_keys_indices_))
, context(std::move(context_))
{
}
@ -71,7 +71,7 @@ public:
FunctionOverloadResolverPtr grouping_function_resolver;
bool add_grouping_set_column = false;
bool force_grouping_standard_compatibility = context->getSettingsRef().force_grouping_standard_compatibility;
bool force_grouping_standard_compatibility = getSettings().force_grouping_standard_compatibility;
size_t aggregation_keys_size = aggregation_key_to_index.size();
switch (group_by_kind)
@ -132,7 +132,6 @@ private:
GroupByKind group_by_kind;
QueryTreeNodePtrWithHashMap<size_t> aggregation_key_to_index;
ColumnNumbersList grouping_sets_keys_indexes;
ContextPtr context;
};
void resolveGroupingFunctions(QueryTreeNodePtr & query_node, ContextPtr context)
@ -164,12 +163,17 @@ void resolveGroupingFunctions(QueryTreeNodePtr & query_node, ContextPtr context)
grouping_sets_used_aggregation_keys_list.emplace_back();
auto & grouping_sets_used_aggregation_keys = grouping_sets_used_aggregation_keys_list.back();
QueryTreeNodePtrWithHashSet used_keys_in_set;
for (auto & grouping_set_key_node : grouping_set_keys_list_node_typed.getNodes())
{
if (used_keys_in_set.contains(grouping_set_key_node))
continue;
used_keys_in_set.insert(grouping_set_key_node);
grouping_sets_used_aggregation_keys.push_back(grouping_set_key_node);
if (aggregation_key_to_index.contains(grouping_set_key_node))
continue;
grouping_sets_used_aggregation_keys.push_back(grouping_set_key_node);
aggregation_key_to_index.emplace(grouping_set_key_node, aggregation_node_index);
++aggregation_node_index;
}

View File

@ -5727,8 +5727,27 @@ void QueryAnalyzer::resolveInterpolateColumnsNodeList(QueryTreeNodePtr & interpo
{
auto & interpolate_node_typed = interpolate_node->as<InterpolateNode &>();
auto * column_to_interpolate = interpolate_node_typed.getExpression()->as<IdentifierNode>();
if (!column_to_interpolate)
throw Exception(ErrorCodes::LOGICAL_ERROR, "INTERPOLATE can work only for indentifiers, but {} is found",
interpolate_node_typed.getExpression()->formatASTForErrorMessage());
auto column_to_interpolate_name = column_to_interpolate->getIdentifier().getFullName();
resolveExpressionNode(interpolate_node_typed.getExpression(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
resolveExpressionNode(interpolate_node_typed.getInterpolateExpression(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
bool is_column_constant = interpolate_node_typed.getExpression()->getNodeType() == QueryTreeNodeType::CONSTANT;
auto & interpolation_to_resolve = interpolate_node_typed.getInterpolateExpression();
IdentifierResolveScope interpolate_scope(interpolation_to_resolve, &scope /*parent_scope*/);
auto fake_column_node = std::make_shared<ColumnNode>(NameAndTypePair(column_to_interpolate_name, interpolate_node_typed.getExpression()->getResultType()), interpolate_node_typed.getExpression());
if (is_column_constant)
interpolate_scope.expression_argument_name_to_node.emplace(column_to_interpolate_name, fake_column_node);
resolveExpressionNode(interpolation_to_resolve, interpolate_scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
if (is_column_constant)
interpolation_to_resolve = interpolation_to_resolve->cloneAndReplace(fake_column_node, interpolate_node_typed.getExpression());
}
}

View File

@ -56,7 +56,7 @@ public:
}
if (!found_argument_in_group_by_keys)
throw Exception(ErrorCodes::NOT_AN_AGGREGATE,
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"GROUPING function argument {} is not in GROUP BY keys. In query {}",
grouping_function_arguments_node->formatASTForErrorMessage(),
query_node->formatASTForErrorMessage());

View File

@ -1,9 +1,10 @@
#include <Backups/BackupIO.h>
#include <IO/copyData.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/SeekableReadBuffer.h>
namespace DB
{
@ -12,6 +13,15 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
void IBackupReader::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
auto read_buffer = readFile(file_name);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(size, DBMS_DEFAULT_BUFFER_SIZE), write_mode, write_settings);
copyData(*read_buffer, *write_buffer, size);
write_buffer->finalize();
}
void IBackupWriter::copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name)
{
auto read_buffer = create_read_buffer();

View File

@ -17,6 +17,8 @@ public:
virtual bool fileExists(const String & file_name) = 0;
virtual UInt64 getFileSize(const String & file_name) = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) = 0;
virtual void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings);
virtual DataSourceDescription getDataSourceDescription() const = 0;
};

View File

@ -1,4 +1,5 @@
#include <Backups/BackupIO_Disk.h>
#include <Common/logger_useful.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
@ -12,7 +13,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_)
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & path_)
: disk(disk_), path(path_), log(&Poco::Logger::get("BackupReaderDisk"))
{
}
@ -33,6 +35,21 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderDisk::readFile(const String & fi
return disk->readFile(path / file_name);
}
void BackupReaderDisk::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
if (write_mode == WriteMode::Rewrite)
{
LOG_TRACE(log, "Copying {}/{} from disk {} to {} by the disk", path, file_name, disk->getName(), destination_disk->getName());
disk->copyFile(path / file_name, *destination_disk, destination_path, write_settings);
return;
}
LOG_TRACE(log, "Copying {}/{} from disk {} to {} through buffers", path, file_name, disk->getName(), destination_disk->getName());
IBackupReader::copyFileToDisk(file_name, size, destination_disk, destination_path, write_mode, write_settings);
}
BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_)
{
}

View File

@ -17,11 +17,14 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
private:
DiskPtr disk;
std::filesystem::path path;
Poco::Logger * log;
};
class BackupWriterDisk : public IBackupWriter

View File

@ -1,15 +1,18 @@
#include <Backups/BackupIO_File.h>
#include <Disks/IDisk.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
namespace fs = std::filesystem;
namespace DB
{
BackupReaderFile::BackupReaderFile(const String & path_) : path(path_)
BackupReaderFile::BackupReaderFile(const String & path_) : path(path_), log(&Poco::Logger::get("BackupReaderFile"))
{
}
@ -30,6 +33,22 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderFile::readFile(const String & fi
return createReadBufferFromFileBase(path / file_name, {});
}
void BackupReaderFile::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
if (destination_disk->getDataSourceDescription() == getDataSourceDescription())
{
/// Use more optimal way.
LOG_TRACE(log, "Copying {}/{} to disk {} locally", path, file_name, destination_disk->getName());
fs::copy(path / file_name, fullPath(destination_disk, destination_path), fs::copy_options::overwrite_existing);
return;
}
LOG_TRACE(log, "Copying {}/{} to disk {} through buffers", path, file_name, destination_disk->getName());
IBackupReader::copyFileToDisk(path / file_name, size, destination_disk, destination_path, write_mode, write_settings);
}
BackupWriterFile::BackupWriterFile(const String & path_) : path(path_)
{
}

View File

@ -15,10 +15,13 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
private:
std::filesystem::path path;
Poco::Logger * log;
};
class BackupWriterFile : public IBackupWriter

View File

@ -2,6 +2,7 @@
#if USE_AWS_S3
#include <Common/quoteString.h>
#include <Disks/ObjectStorages/S3/copyS3FileToDisk.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <IO/BackupsIOThreadPool.h>
@ -96,6 +97,7 @@ BackupReaderS3::BackupReaderS3(
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, read_settings(context_->getReadSettings())
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
, log(&Poco::Logger::get("BackupReaderS3"))
{
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
}
@ -127,6 +129,27 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderS3::readFile(const String & file
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings);
}
void BackupReaderS3::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
LOG_TRACE(log, "Copying {} to disk {}", file_name, destination_disk->getName());
copyS3FileToDisk(
client,
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
s3_uri.version_id,
0,
size,
destination_disk,
destination_path,
write_mode,
read_settings,
write_settings,
request_settings,
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupReaderS3"));
}
BackupWriterS3::BackupWriterS3(
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, const ContextPtr & context_)

View File

@ -22,6 +22,8 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
private:
@ -29,6 +31,7 @@ private:
std::shared_ptr<S3::Client> client;
ReadSettings read_settings;
S3Settings::RequestSettings request_settings;
Poco::Logger * log;
};

View File

@ -79,66 +79,6 @@ namespace
}
class BackupImpl::BackupEntryFromBackupImpl : public IBackupEntry
{
public:
BackupEntryFromBackupImpl(
const std::shared_ptr<const BackupImpl> & backup_,
const String & archive_suffix_,
const String & data_file_name_,
UInt64 size_,
const UInt128 checksum_,
BackupEntryPtr base_backup_entry_ = {})
: backup(backup_), archive_suffix(archive_suffix_), data_file_name(data_file_name_), size(size_), checksum(checksum_),
base_backup_entry(std::move(base_backup_entry_))
{
}
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override
{
std::unique_ptr<SeekableReadBuffer> read_buffer;
if (backup->use_archives)
read_buffer = backup->getArchiveReader(archive_suffix)->readFile(data_file_name);
else
read_buffer = backup->reader->readFile(data_file_name);
if (base_backup_entry)
{
size_t base_size = base_backup_entry->getSize();
read_buffer = std::make_unique<ConcatSeekableReadBuffer>(
base_backup_entry->getReadBuffer(), base_size, std::move(read_buffer), size - base_size);
}
return read_buffer;
}
UInt64 getSize() const override { return size; }
std::optional<UInt128> getChecksum() const override { return checksum; }
String getFilePath() const override
{
return data_file_name;
}
DiskPtr tryGetDiskIfExists() const override
{
return nullptr;
}
DataSourceDescription getDataSourceDescription() const override
{
return backup->reader->getDataSourceDescription();
}
private:
const std::shared_ptr<const BackupImpl> backup;
const String archive_suffix;
const String data_file_name;
const UInt64 size;
const UInt128 checksum;
BackupEntryPtr base_backup_entry;
};
BackupImpl::BackupImpl(
const String & backup_name_for_logging_,
const ArchiveParams & archive_params_,
@ -645,24 +585,22 @@ SizeAndChecksum BackupImpl::getFileSizeAndChecksum(const String & file_name) con
return {info->size, info->checksum};
}
BackupEntryPtr BackupImpl::readFile(const String & file_name) const
std::unique_ptr<SeekableReadBuffer> BackupImpl::readFile(const String & file_name) const
{
return readFile(getFileSizeAndChecksum(file_name));
}
BackupEntryPtr BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) const
std::unique_ptr<SeekableReadBuffer> BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) const
{
std::lock_guard lock{mutex};
if (open_mode != OpenMode::READ)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup is not opened for reading");
++num_read_files;
num_read_bytes += size_and_checksum.first;
if (!size_and_checksum.first)
if (size_and_checksum.first == 0)
{
/// Entry's data is empty.
return std::make_unique<BackupEntryFromMemory>(nullptr, 0, UInt128{0, 0});
std::lock_guard lock{mutex};
++num_read_files;
return std::make_unique<ReadBufferFromMemory>(static_cast<char *>(nullptr), 0);
}
auto info_opt = coordination->getFileInfo(size_and_checksum);
@ -677,45 +615,149 @@ BackupEntryPtr BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) c
const auto & info = *info_opt;
std::unique_ptr<SeekableReadBuffer> read_buffer;
std::unique_ptr<SeekableReadBuffer> base_read_buffer;
if (info.size > info.base_size)
{
/// Make `read_buffer` if there is data for this backup entry in this backup.
if (use_archives)
{
std::shared_ptr<IArchiveReader> archive_reader;
{
std::lock_guard lock{mutex};
archive_reader = getArchiveReader(info.archive_suffix);
}
read_buffer = archive_reader->readFile(info.data_file_name);
}
else
{
read_buffer = reader->readFile(info.data_file_name);
}
}
if (info.base_size)
{
/// Make `base_read_buffer` if there is data for this backup entry in the base backup.
if (!base_backup)
{
throw Exception(
ErrorCodes::NO_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
}
if (!base_backup->fileExists(std::pair(info.base_size, info.base_checksum)))
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
}
base_read_buffer = base_backup->readFile(std::pair{info.base_size, info.base_checksum});
}
{
/// Update number of read files.
std::lock_guard lock{mutex};
++num_read_files;
num_read_bytes += info.size;
}
if (!info.base_size)
{
/// Data goes completely from this backup, the base backup isn't used.
return std::make_unique<BackupEntryFromBackupImpl>(
std::static_pointer_cast<const BackupImpl>(shared_from_this()), info.archive_suffix, info.data_file_name, info.size, info.checksum);
/// Data comes completely from this backup, the base backup isn't used.
return read_buffer;
}
if (!base_backup)
else if (info.size == info.base_size)
{
throw Exception(
ErrorCodes::NO_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
/// Data comes completely from the base backup (nothing comes from this backup).
return base_read_buffer;
}
if (!base_backup->fileExists(std::pair(info.base_size, info.base_checksum)))
else
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
}
auto base_entry = base_backup->readFile(std::pair{info.base_size, info.base_checksum});
if (info.size == info.base_size)
{
/// Data goes completely from the base backup (nothing goes from this backup).
return base_entry;
}
{
/// The beginning of the data goes from the base backup,
/// and the ending goes from this backup.
return std::make_unique<BackupEntryFromBackupImpl>(
static_pointer_cast<const BackupImpl>(shared_from_this()), info.archive_suffix, info.data_file_name, info.size, info.checksum, std::move(base_entry));
/// The beginning of the data comes from the base backup,
/// and the ending comes from this backup.
return std::make_unique<ConcatSeekableReadBuffer>(
std::move(base_read_buffer), info.base_size, std::move(read_buffer), info.size - info.base_size);
}
}
size_t BackupImpl::copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const
{
return copyFileToDisk(getFileSizeAndChecksum(file_name), destination_disk, destination_path, write_mode, write_settings);
}
size_t BackupImpl::copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const
{
if (open_mode != OpenMode::READ)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup is not opened for reading");
if (size_and_checksum.first == 0)
{
/// Entry's data is empty.
if (write_mode == WriteMode::Rewrite)
{
/// Just create an empty file.
destination_disk->createFile(destination_path);
}
std::lock_guard lock{mutex};
++num_read_files;
return 0;
}
auto info_opt = coordination->getFileInfo(size_and_checksum);
if (!info_opt)
{
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND,
"Backup {}: Entry {} not found in the backup",
backup_name_for_logging,
formatSizeAndChecksum(size_and_checksum));
}
const auto & info = *info_opt;
bool file_copied = false;
if (info.size && !info.base_size && !use_archives)
{
/// Data comes completely from this backup.
reader->copyFileToDisk(info.data_file_name, info.size, destination_disk, destination_path, write_mode, write_settings);
file_copied = true;
}
else if (info.size && (info.size == info.base_size))
{
/// Data comes completely from the base backup (nothing comes from this backup).
base_backup->copyFileToDisk(std::pair{info.base_size, info.base_checksum}, destination_disk, destination_path, write_mode, write_settings);
file_copied = true;
}
if (file_copied)
{
/// The file is already copied, but `num_read_files` is not updated yet.
std::lock_guard lock{mutex};
++num_read_files;
num_read_bytes += info.size;
}
else
{
/// Use the generic way to copy data. `readFile()` will update `num_read_files`.
auto read_buffer = readFile(size_and_checksum);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(info.size, DBMS_DEFAULT_BUFFER_SIZE),
write_mode, write_settings);
copyData(*read_buffer, *write_buffer, info.size);
write_buffer->finalize();
}
return info.size;
}
namespace
{

View File

@ -73,8 +73,12 @@ public:
UInt64 getFileSize(const String & file_name) const override;
UInt128 getFileChecksum(const String & file_name) const override;
SizeAndChecksum getFileSizeAndChecksum(const String & file_name) const override;
BackupEntryPtr readFile(const String & file_name) const override;
BackupEntryPtr readFile(const SizeAndChecksum & size_and_checksum) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const SizeAndChecksum & size_and_checksum) const override;
size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const override;
size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const override;
void writeFile(const String & file_name, BackupEntryPtr entry) override;
void finalizeWriting() override;
bool supportsWritingInMultipleThreads() const override { return !use_archives; }

View File

@ -1,6 +1,8 @@
#pragma once
#include <Core/Types.h>
#include <Disks/WriteMode.h>
#include <IO/WriteSettings.h>
#include <memory>
#include <optional>
@ -8,7 +10,10 @@
namespace DB
{
class IBackupEntry;
class IDisk;
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;
using DiskPtr = std::shared_ptr<IDisk>;
class SeekableReadBuffer;
/// Represents a backup, i.e. a storage of BackupEntries which can be accessed by their names.
/// A backup can be either incremental or non-incremental. An incremental backup doesn't store
@ -95,8 +100,15 @@ public:
virtual SizeAndChecksum getFileSizeAndChecksum(const String & file_name) const = 0;
/// Reads an entry from the backup.
virtual BackupEntryPtr readFile(const String & file_name) const = 0;
virtual BackupEntryPtr readFile(const SizeAndChecksum & size_and_checksum) const = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) const = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const SizeAndChecksum & size_and_checksum) const = 0;
/// Copies a file from the backup to a specified destination disk. Returns the number of bytes written.
virtual size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite, const WriteSettings & write_settings = {}) const = 0;
virtual size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite, const WriteSettings & write_settings = {}) const = 0;
/// Puts a new entry to the backup.
virtual void writeFile(const String & file_name, BackupEntryPtr entry) = 0;

View File

@ -316,7 +316,7 @@ void RestorerFromBackup::findTableInBackup(const QualifiedTableName & table_name
= *root_path_in_use / "data" / escapeForFileName(table_name_in_backup.database) / escapeForFileName(table_name_in_backup.table);
}
auto read_buffer = backup->readFile(*metadata_path)->getReadBuffer();
auto read_buffer = backup->readFile(*metadata_path);
String create_query_str;
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();
@ -410,7 +410,7 @@ void RestorerFromBackup::findDatabaseInBackup(const String & database_name_in_ba
if (metadata_path)
{
auto read_buffer = backup->readFile(*metadata_path)->getReadBuffer();
auto read_buffer = backup->readFile(*metadata_path);
String create_query_str;
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();

View File

@ -216,6 +216,7 @@ void Connection::disconnect()
socket->close();
socket = nullptr;
connected = false;
nonce.reset();
}
@ -324,6 +325,14 @@ void Connection::receiveHello()
password_complexity_rules.push_back({std::move(original_pattern), std::move(exception_message)});
}
}
if (server_revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2)
{
chassert(!nonce.has_value());
UInt64 read_nonce;
readIntBinary(read_nonce, *in);
nonce.emplace(read_nonce);
}
}
else if (packet_type == Protocol::Server::Exception)
receiveException()->rethrow();
@ -584,6 +593,9 @@ void Connection::sendQuery(
{
#if USE_SSL
std::string data(salt);
// For backward compatibility
if (nonce.has_value())
data += std::to_string(nonce.value());
data += cluster_secret;
data += query;
data += query_id;
@ -593,8 +605,8 @@ void Connection::sendQuery(
std::string hash = encodeSHA256(data);
writeStringBinary(hash, *out);
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Inter-server secret support is disabled, because ClickHouse was built without SSL library");
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Inter-server secret support is disabled, because ClickHouse was built without SSL library");
#endif
}
else

View File

@ -167,7 +167,10 @@ private:
/// For inter-server authorization
String cluster;
String cluster_secret;
/// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET
String salt;
/// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
std::optional<UInt64> nonce;
/// Address is resolved during the first connection (or the following reconnects)
/// Use it only for logging purposes

View File

@ -31,8 +31,6 @@ HedgedConnections::HedgedConnections(
: hedged_connections_factory(pool_, &context_->getSettingsRef(), timeouts_, table_to_check_)
, context(std::move(context_))
, settings(context->getSettingsRef())
, drain_timeout(settings.drain_timeout)
, allow_changing_replica_until_first_data_packet(settings.allow_changing_replica_until_first_data_packet)
, throttler(throttler_)
{
std::vector<Connection *> connections = hedged_connections_factory.getManyConnections(pool_mode);
@ -263,7 +261,7 @@ Packet HedgedConnections::drain()
while (!epoll.empty())
{
ReplicaLocation location = getReadyReplicaLocation(DrainCallback{drain_timeout});
ReplicaLocation location = getReadyReplicaLocation();
Packet packet = receivePacketFromReplica(location);
switch (packet.type)
{
@ -290,10 +288,10 @@ Packet HedgedConnections::drain()
Packet HedgedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
return receivePacketUnlocked({}, false /* is_draining */);
return receivePacketUnlocked({});
}
Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback, bool /* is_draining */)
Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback)
{
if (!sent_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot receive packets: no query sent.");
@ -413,7 +411,7 @@ Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & repli
{
/// If we are allowed to change replica until the first data packet,
/// just restart timeout (if it hasn't expired yet). Otherwise disable changing replica with this offset.
if (allow_changing_replica_until_first_data_packet && !replica.is_change_replica_timeout_expired)
if (settings.allow_changing_replica_until_first_data_packet && !replica.is_change_replica_timeout_expired)
replica.change_replica_timeout.setRelative(hedged_connections_factory.getConnectionTimeouts().receive_data_timeout);
else
disableChangingReplica(replica_location);

View File

@ -101,7 +101,7 @@ public:
Packet receivePacket() override;
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
Packet receivePacketUnlocked(AsyncCallback async_callback) override;
void disconnect() override;
@ -196,12 +196,6 @@ private:
Epoll epoll;
ContextPtr context;
const Settings & settings;
/// The following two fields are from settings but can be referenced outside the lifetime of
/// settings when connection is drained asynchronously.
Poco::Timespan drain_timeout;
bool allow_changing_replica_until_first_data_packet;
ThrottlerPtr throttler;
bool sent_query = false;
bool cancelled = false;

View File

@ -1,36 +0,0 @@
#include <Client/IConnections.h>
#include <Poco/Net/SocketImpl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SOCKET_TIMEOUT;
}
/// This wrapper struct allows us to use Poco's socket polling code with a raw fd.
/// The only difference from Poco::Net::SocketImpl is that we don't close the fd in the destructor.
struct PocoSocketWrapper : public Poco::Net::SocketImpl
{
explicit PocoSocketWrapper(int fd)
{
reset(fd);
}
// Do not close fd.
~PocoSocketWrapper() override { reset(-1); }
};
void IConnections::DrainCallback::operator()(int fd, Poco::Timespan, const std::string & fd_description) const
{
if (!PocoSocketWrapper(fd).poll(drain_timeout, Poco::Net::Socket::SELECT_READ))
{
throw Exception(ErrorCodes::SOCKET_TIMEOUT,
"Read timeout ({} ms) while draining from {}",
drain_timeout.totalMilliseconds(),
fd_description);
}
}
}

View File

@ -13,12 +13,6 @@ namespace DB
class IConnections : boost::noncopyable
{
public:
struct DrainCallback
{
Poco::Timespan drain_timeout;
void operator()(int fd, Poco::Timespan, const std::string & fd_description = "") const;
};
/// Send all scalars to replicas.
virtual void sendScalarsData(Scalars & data) = 0;
/// Send all content of external tables to replicas.
@ -40,7 +34,7 @@ public:
virtual Packet receivePacket() = 0;
/// Version of `receivePacket` function without locking.
virtual Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) = 0;
virtual Packet receivePacketUnlocked(AsyncCallback async_callback) = 0;
/// Break all active connections.
virtual void disconnect() = 0;

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
: settings(settings_)
{
connection.setThrottler(throttler);
@ -33,7 +33,7 @@ MultiplexedConnections::MultiplexedConnections(Connection & connection, const Se
MultiplexedConnections::MultiplexedConnections(std::shared_ptr<Connection> connection_ptr_, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
: settings(settings_)
, connection_ptr(connection_ptr_)
{
connection_ptr->setThrottler(throttler);
@ -46,8 +46,9 @@ MultiplexedConnections::MultiplexedConnections(std::shared_ptr<Connection> conne
}
MultiplexedConnections::MultiplexedConnections(
std::vector<IConnectionPool::Entry> && connections, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_)
{
/// If we didn't get any connections from pool and getMany() did not throw exceptions, this means that
/// `skip_unavailable_shards` was set. Then just return.
@ -206,7 +207,7 @@ void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadRes
Packet MultiplexedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
Packet packet = receivePacketUnlocked({}, false /* is_draining */);
Packet packet = receivePacketUnlocked({});
return packet;
}
@ -254,7 +255,7 @@ Packet MultiplexedConnections::drain()
while (hasActiveConnections())
{
Packet packet = receivePacketUnlocked(DrainCallback{drain_timeout}, true /* is_draining */);
Packet packet = receivePacketUnlocked({});
switch (packet.type)
{
@ -304,14 +305,14 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
return buf.str();
}
Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callback, bool is_draining)
Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callback)
{
if (!sent_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot receive packets: no query sent.");
if (!hasActiveConnections())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No more packets are available.");
ReplicaState & state = getReplicaForReading(is_draining);
ReplicaState & state = getReplicaForReading();
current_connection = state.connection;
if (current_connection == nullptr)
throw Exception(ErrorCodes::NO_AVAILABLE_REPLICA, "Logical error: no available replica");
@ -366,10 +367,9 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
return packet;
}
MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading(bool is_draining)
MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading()
{
/// Fast path when we only focus on one replica and are not draining the connection.
if (replica_states.size() == 1 && !is_draining)
if (replica_states.size() == 1)
return replica_states[0];
Poco::Net::Socket::SocketList read_list;
@ -390,7 +390,7 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
auto timeout = is_draining ? drain_timeout : receive_timeout;
auto timeout = settings.receive_timeout;
int n = 0;
/// EINTR loop
@ -417,9 +417,7 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
break;
}
/// We treat any error as timeout for simplicity.
/// And we also check if read_list is still empty just in case.
if (n <= 0 || read_list.empty())
if (n == 0)
{
const auto & addresses = dumpAddressesUnlocked();
for (ReplicaState & state : replica_states)
@ -438,7 +436,9 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
}
}
/// TODO Motivation of rand is unclear.
/// TODO Absolutely wrong code: read_list could be empty; motivation of rand is unclear.
/// This code path is disabled by default.
auto & socket = read_list[thread_local_rng() % read_list.size()];
if (fd_to_replica_state_idx.empty())
{

View File

@ -65,7 +65,7 @@ public:
void setReplicaInfo(ReplicaInfo value) override { replica_info = value; }
private:
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
Packet receivePacketUnlocked(AsyncCallback async_callback) override;
/// Internal version of `dumpAddresses` function without locking.
std::string dumpAddressesUnlocked() const;
@ -78,18 +78,13 @@ private:
};
/// Get a replica where you can read the data.
ReplicaState & getReplicaForReading(bool is_draining);
ReplicaState & getReplicaForReading();
/// Mark the replica as invalid.
void invalidateReplica(ReplicaState & replica_state);
const Settings & settings;
/// The following two fields are from settings but can be referenced outside the lifetime of
/// settings when connection is drained asynchronously.
Poco::Timespan drain_timeout;
Poco::Timespan receive_timeout;
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count = 0;

View File

@ -84,10 +84,6 @@
M(MMappedFileBytes, "Sum size of mmapped file regions.") \
M(MMappedAllocs, "Total number of mmapped allocations") \
M(MMappedAllocBytes, "Sum bytes of mmapped allocations") \
M(AsyncDrainedConnections, "Number of connections drained asynchronously.") \
M(ActiveAsyncDrainedConnections, "Number of active connections drained asynchronously.") \
M(SyncDrainedConnections, "Number of connections drained synchronously.") \
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
M(PendingAsyncInsert, "Number of asynchronous inserts that are waiting for flush.") \
M(KafkaConsumers, "Number of active Kafka consumers") \

View File

@ -31,7 +31,7 @@ public:
* max_elements_size == 0 means no elements size restrictions.
*/
explicit LRUCachePolicy(size_t max_size_, size_t max_elements_size_ = 0, OnWeightLossFunction on_weight_loss_function_ = {})
: max_size(std::max(static_cast<size_t>(1), max_size_)), max_elements_size(max_elements_size_)
: max_size(std::max(1uz, max_size_)), max_elements_size(max_elements_size_)
{
Base::on_weight_loss_function = on_weight_loss_function_;
}

View File

@ -1,7 +1,76 @@
#include <Common/LoggingFormatStringHelpers.h>
#include <Common/SipHash.h>
#include <Common/thread_local_rng.h>
[[noreturn]] void functionThatFailsCompilationOfConstevalFunctions(const char * error)
{
throw std::runtime_error(error);
}
std::unordered_map<UInt64, std::pair<time_t, size_t>> LogFrequencyLimiterIml::logged_messages;
time_t LogFrequencyLimiterIml::last_cleanup = 0;
std::mutex LogFrequencyLimiterIml::mutex;
void LogFrequencyLimiterIml::log(Poco::Message & message)
{
std::string_view pattern = message.getFormatString();
if (pattern.empty())
{
/// Do not filter messages without a format string
if (auto * channel = logger->getChannel())
channel->log(message);
return;
}
SipHash hash;
hash.update(logger->name());
/// Format strings are compile-time constants, so they are uniquely identified by pointer and size
hash.update(pattern.data());
hash.update(pattern.size());
time_t now = time(nullptr);
size_t skipped_similar_messages = 0;
bool need_cleanup;
bool need_log;
{
std::lock_guard lock(mutex);
need_cleanup = last_cleanup + 300 <= now;
auto & info = logged_messages[hash.get64()];
need_log = info.first + min_interval_s <= now;
if (need_log)
{
skipped_similar_messages = info.second;
info.first = now;
info.second = 0;
}
else
{
++info.second;
}
}
/// We don't need all threads to do cleanup, just randomize
if (need_cleanup && thread_local_rng() % 100 == 0)
cleanup();
/// The message it too frequent, skip it for now
/// NOTE It's not optimal because we format the message first and only then check if we need to actually write it, see LOG_IMPL macro
if (!need_log)
return;
if (skipped_similar_messages)
message.appendText(fmt::format(" (skipped {} similar messages)", skipped_similar_messages));
if (auto * channel = logger->getChannel())
channel->log(message);
}
void LogFrequencyLimiterIml::cleanup(time_t too_old_threshold_s)
{
time_t now = time(nullptr);
time_t old = now - too_old_threshold_s;
std::lock_guard lock(mutex);
std::erase_if(logged_messages, [old](const auto & elem) { return elem.second.first < old; });
last_cleanup = now;
}

View File

@ -1,6 +1,11 @@
#pragma once
#include <base/defines.h>
#include <base/types.h>
#include <fmt/format.h>
#include <mutex>
#include <unordered_map>
#include <Poco/Logger.h>
#include <Poco/Message.h>
struct PreformattedMessage;
consteval void formatStringCheckArgsNumImpl(std::string_view str, size_t nargs);
@ -156,3 +161,59 @@ struct CheckArgsNumHelperImpl
template <typename... Args> using CheckArgsNumHelper = CheckArgsNumHelperImpl<std::type_identity_t<Args>...>;
template <typename... Args> void formatStringCheckArgsNum(CheckArgsNumHelper<Args...>, Args &&...) {}
/// This wrapper helps to avoid too frequent and noisy log messages.
/// For each pair (logger_name, format_string) it remembers when such a message was logged the last time.
/// The message will not be logged again if less than min_interval_s seconds passed since the previously logged message.
class LogFrequencyLimiterIml
{
/// Hash(logger_name, format_string) -> (last_logged_time_s, skipped_messages_count)
static std::unordered_map<UInt64, std::pair<time_t, size_t>> logged_messages;
static time_t last_cleanup;
static std::mutex mutex;
Poco::Logger * logger;
time_t min_interval_s;
public:
LogFrequencyLimiterIml(Poco::Logger * logger_, time_t min_interval_s_) : logger(logger_), min_interval_s(min_interval_s_) {}
LogFrequencyLimiterIml & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { return logger->is(priority); }
LogFrequencyLimiterIml * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(Poco::Message & message);
/// Clears messages that were logged last time more than too_old_threshold_s seconds ago
static void cleanup(time_t too_old_threshold_s = 600);
Poco::Logger * getLogger() { return logger; }
};
/// This wrapper is useful to save formatted message into a String before sending it to a logger
class LogToStrImpl
{
String & out_str;
Poco::Logger * logger;
std::unique_ptr<LogFrequencyLimiterIml> maybe_nested;
bool propagate_to_actual_log = true;
public:
LogToStrImpl(String & out_str_, Poco::Logger * logger_) : out_str(out_str_), logger(logger_) {}
LogToStrImpl(String & out_str_, std::unique_ptr<LogFrequencyLimiterIml> && maybe_nested_)
: out_str(out_str_), logger(maybe_nested_->getLogger()), maybe_nested(std::move(maybe_nested_)) {}
LogToStrImpl & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { propagate_to_actual_log &= logger->is(priority); return true; }
LogToStrImpl * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(Poco::Message & message)
{
out_str = message.getText();
if (!propagate_to_actual_log)
return;
if (maybe_nested)
maybe_nested->log(message);
else if (auto * channel = logger->getChannel())
channel->log(message);
}
};

View File

@ -166,6 +166,8 @@
\
M(WaitMarksLoadMicroseconds, "Time spent loading marks") \
M(BackgroundLoadingMarksTasks, "Number of background tasks for loading marks") \
M(LoadedMarksCount, "Number of marks loaded (total across columns).") \
M(LoadedMarksMemoryBytes, "Size of in-memory representations of loaded marks.") \
\
M(Merge, "Number of launched background merges.") \
M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \

View File

@ -97,7 +97,7 @@ private:
* Note: "SM" in the commentaries below stands for STATE MODIFICATION
*/
RWLockImpl::LockHolder
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms)
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms, bool throw_in_fast_path)
{
const auto lock_deadline_tp =
(lock_timeout_ms == std::chrono::milliseconds(0))
@ -130,11 +130,19 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
if (owner_query_it != owner_queries.end())
{
if (wrlock_owner != writers_queue.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode");
{
if (throw_in_fast_path)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode");
return nullptr;
}
/// Lock upgrading is not supported
if (type == Write)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked");
{
if (throw_in_fast_path)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked");
return nullptr;
}
/// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
++owner_query_it->second; /// SM1: nothrow

View File

@ -56,7 +56,7 @@ public:
/// Empty query_id means the lock is acquired from outside of query context (e.g. in a background thread).
LockHolder getLock(Type type, const String & query_id,
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0));
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0), bool throw_in_fast_path = true);
/// Use as query_id to acquire a lock outside the query context.
inline static const String NO_QUERY = String();

View File

@ -10,35 +10,16 @@
namespace Poco { class Logger; }
/// This wrapper is useful to save formatted message into a String before sending it to a logger
class LogToStrImpl
{
String & out_str;
Poco::Logger * logger;
bool propagate_to_actual_log = true;
public:
LogToStrImpl(String & out_str_, Poco::Logger * logger_) : out_str(out_str_) , logger(logger_) {}
LogToStrImpl & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { propagate_to_actual_log &= logger->is(priority); return true; }
LogToStrImpl * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(const Poco::Message & message)
{
out_str = message.getText();
if (!propagate_to_actual_log)
return;
if (auto * channel = logger->getChannel())
channel->log(message);
}
};
#define LogToStr(x, y) std::make_unique<LogToStrImpl>(x, y)
#define LogFrequencyLimiter(x, y) std::make_unique<LogFrequencyLimiterIml>(x, y)
namespace
{
[[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; };
[[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); };
[[maybe_unused]] std::unique_ptr<LogToStrImpl> getLogger(std::unique_ptr<LogToStrImpl> && logger) { return logger; };
[[maybe_unused]] std::unique_ptr<LogFrequencyLimiterIml> getLogger(std::unique_ptr<LogFrequencyLimiterIml> && logger) { return logger; };
}
#define LOG_IMPL_FIRST_ARG(X, ...) X

View File

@ -4,6 +4,15 @@
#include <gtest/gtest.h>
template <char ... symbols>
void test_find_first_not(const std::string & haystack, std::size_t expected_pos)
{
const char * begin = haystack.data();
const char * end = haystack.data() + haystack.size();
ASSERT_EQ(begin + expected_pos, find_first_not_symbols<symbols...>(begin, end));
}
TEST(FindSymbols, SimpleTest)
{
std::string s = "Hello, world! Goodbye...";
@ -36,3 +45,58 @@ TEST(FindSymbols, SimpleTest)
ASSERT_EQ(vals, (std::vector<std::string>{"s", "String"}));
}
}
TEST(FindNotSymbols, AllSymbolsPresent)
{
std::string str_with_17_bytes = "hello world hello";
std::string str_with_16_bytes = {str_with_17_bytes.begin(), str_with_17_bytes.end() - 1u};
std::string str_with_15_bytes = {str_with_16_bytes.begin(), str_with_16_bytes.end() - 1u};
/*
* The below variations will choose different implementation strategies:
* 1. Loop method only because it does not contain enough bytes for SSE 4.2
* 2. SSE4.2 only since string contains exactly 16 bytes
* 3. SSE4.2 + Loop method will take place because only first 16 bytes are treated by SSE 4.2 and remaining bytes is treated by loop
*
* Below code asserts that all calls return the ::end of the input string. This was not true prior to this fix as mentioned in PR #47304
* */
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(str_with_15_bytes, str_with_15_bytes.size());
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(str_with_16_bytes, str_with_16_bytes.size());
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(str_with_17_bytes, str_with_17_bytes.size());
}
TEST(FindNotSymbols, NoSymbolsMatch)
{
std::string s = "abcdefg";
// begin should be returned since the first character of the string does not match any of the below symbols
test_find_first_not<'h', 'i', 'j'>(s, 0u);
}
TEST(FindNotSymbols, ExtraSymbols)
{
std::string s = "hello_world_hello";
test_find_first_not<'h', 'e', 'l', 'o', ' '>(s, 5u);
}
TEST(FindNotSymbols, EmptyString)
{
std::string s;
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(s, s.size());
}
TEST(FindNotSymbols, SingleChar)
{
std::string s = "a";
test_find_first_not<'a'>(s, s.size());
}
TEST(FindNotSymbols, NullCharacter)
{
// special test to ensure only the passed template arguments are used as needles
// since current find_first_symbols implementation takes in 16 characters and defaults
// to \0.
std::string s("abcdefg\0x", 9u);
test_find_first_not<'a', 'b', 'c', 'd', 'e', 'f', 'g'>(s, 7u);
}

View File

@ -193,7 +193,8 @@ void registerCodecDelta(CompressionCodecFactory & factory)
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::Delta);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
UInt8 delta_bytes_size = 0;
/// Default bytes size is 1.
UInt8 delta_bytes_size = 1;
if (arguments && !arguments->children.empty())
{
@ -202,8 +203,8 @@ void registerCodecDelta(CompressionCodecFactory & factory)
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Delta codec argument must be integer");
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Delta codec argument must be unsigned integer");
size_t user_bytes_size = literal->value.safeGet<UInt64>();
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)

View File

@ -7,7 +7,7 @@
#include <Compression/CompressionFactory.h>
#include <base/unaligned.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/BitHelpers.h>
@ -31,7 +31,7 @@ namespace DB
/** DoubleDelta column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf, which was extended
* to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix
* to support 64bit types. The drawback is 1 extra bit for 32-bit wide deltas: 5-bit prefix
* instead of 4-bit prefix.
*
* This codec is best used against monotonic integer sequences with constant (or almost constant)
@ -145,6 +145,8 @@ namespace ErrorCodes
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
namespace
@ -549,10 +551,28 @@ void registerCodecDoubleDelta(CompressionCodecFactory & factory)
factory.registerCompressionCodecWithType("DoubleDelta", method_code,
[&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
if (arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec DoubleDelta does not accept any arguments");
/// Default bytes size is 1.
UInt8 data_bytes_size = 1;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "DoubleDelta codec must have 1 parameter, given {}", arguments->children.size());
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "DoubleDelta codec argument must be unsigned integer");
size_t user_bytes_size = literal->value.safeGet<UInt64>();
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Argument value for DoubleDelta codec can be 1, 2, 4 or 8, given {}", user_bytes_size);
data_bytes_size = static_cast<UInt8>(user_bytes_size);
}
else if (column_type)
{
data_bytes_size = getDataBytesSize(column_type);
}
UInt8 data_bytes_size = column_type ? getDataBytesSize(column_type) : 0;
return std::make_shared<CompressionCodecDoubleDelta>(data_bytes_size);
});
}

View File

@ -109,28 +109,42 @@ void registerCodecFPC(CompressionCodecFactory & factory)
auto method_code = static_cast<UInt8>(CompressionMethodByte::FPC);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
UInt8 float_width = 0;
/// Set default float width to 4.
UInt8 float_width = 4;
if (column_type != nullptr)
float_width = getFloatBytesSize(*column_type);
UInt8 level = CompressionCodecFPC::DEFAULT_COMPRESSION_LEVEL;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
if (arguments->children.size() > 2)
{
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE,
"FPC codec must have 1 parameter, given {}", arguments->children.size());
"FPC codec must have from 0 to 2 parameters, given {}", arguments->children.size());
}
const auto * literal = arguments->children.front()->as<ASTLiteral>();
if (!literal)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be integer");
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be unsigned integer");
level = literal->value.safeGet<UInt8>();
if (level < 1 || level > CompressionCodecFPC::MAX_COMPRESSION_LEVEL)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec level must be between {} and {}",
1, static_cast<int>(CompressionCodecFPC::MAX_COMPRESSION_LEVEL));
if (arguments->children.size() == 2)
{
literal = arguments->children[1]->as<ASTLiteral>();
if (!literal || !isInt64OrUInt64FieldType(literal->value.getType()))
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be unsigned integer");
size_t user_float_width = literal->value.safeGet<UInt64>();
if (user_float_width != 4 && user_float_width != 8)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Float size for FPC codec can be 4 or 8, given {}", user_float_width);
float_width = static_cast<UInt8>(user_float_width);
}
}
return std::make_shared<CompressionCodecFPC>(float_width, level);
};
factory.registerCompressionCodecWithType("FPC", method_code, codec_builder);

View File

@ -7,6 +7,7 @@
#include <Compression/CompressionFactory.h>
#include <base/unaligned.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTLiteral.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/BitHelpers.h>
@ -134,6 +135,8 @@ namespace ErrorCodes
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
namespace
@ -445,10 +448,28 @@ void registerCodecGorilla(CompressionCodecFactory & factory)
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::Gorilla);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
if (arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec Gorilla does not accept any arguments");
/// Default bytes size is 1
UInt8 data_bytes_size = 1;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "Gorilla codec must have 1 parameter, given {}", arguments->children.size());
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Gorilla codec argument must be unsigned integer");
size_t user_bytes_size = literal->value.safeGet<UInt64>();
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Argument value for Gorilla codec can be 1, 2, 4 or 8, given {}", user_bytes_size);
data_bytes_size = static_cast<UInt8>(user_bytes_size);
}
else if (column_type)
{
data_bytes_size = getDataBytesSize(column_type);
}
UInt8 data_bytes_size = column_type ? getDataBytesSize(column_type) : 0;
return std::make_shared<CompressionCodecGorilla>(data_bytes_size);
};
factory.registerCompressionCodecWithType("Gorilla", method_code, codec_builder);

View File

@ -33,7 +33,8 @@ public:
Bit
};
CompressionCodecT64(TypeIndex type_idx_, Variant variant_);
// type_idx_ is required for compression, but not for decompression.
CompressionCodecT64(std::optional<TypeIndex> type_idx_, Variant variant_);
uint8_t getMethodByte() const override;
@ -53,7 +54,7 @@ protected:
bool isGenericCompression() const override { return false; }
private:
TypeIndex type_idx;
std::optional<TypeIndex> type_idx;
Variant variant;
};
@ -91,9 +92,12 @@ enum class MagicNumber : uint8_t
IPv4 = 21,
};
MagicNumber serializeTypeId(TypeIndex type_id)
MagicNumber serializeTypeId(std::optional<TypeIndex> type_id)
{
switch (type_id)
if (!type_id)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "T64 codec doesn't support compression without information about column type");
switch (*type_id)
{
case TypeIndex::UInt8: return MagicNumber::UInt8;
case TypeIndex::UInt16: return MagicNumber::UInt16;
@ -115,7 +119,7 @@ MagicNumber serializeTypeId(TypeIndex type_id)
break;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type is not supported by T64 codec: {}", static_cast<UInt32>(type_id));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type is not supported by T64 codec: {}", static_cast<UInt32>(*type_id));
}
TypeIndex deserializeTypeId(uint8_t serialized_type_id)
@ -632,7 +636,7 @@ UInt32 CompressionCodecT64::doCompressData(const char * src, UInt32 src_size, ch
memcpy(dst, &cookie, 1);
dst += 1;
switch (baseType(type_idx))
switch (baseType(*type_idx))
{
case TypeIndex::Int8:
return 1 + compressData<Int8>(src, src_size, dst, variant);
@ -699,7 +703,7 @@ uint8_t CompressionCodecT64::getMethodByte() const
return codecId();
}
CompressionCodecT64::CompressionCodecT64(TypeIndex type_idx_, Variant variant_)
CompressionCodecT64::CompressionCodecT64(std::optional<TypeIndex> type_idx_, Variant variant_)
: type_idx(type_idx_)
, variant(variant_)
{
@ -712,7 +716,7 @@ CompressionCodecT64::CompressionCodecT64(TypeIndex type_idx_, Variant variant_)
void CompressionCodecT64::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(type_idx);
hash.update(type_idx.value_or(TypeIndex::Nothing));
hash.update(variant);
}
@ -742,9 +746,14 @@ void registerCodecT64(CompressionCodecFactory & factory)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Wrong modification for T64: {}", name);
}
auto type_idx = typeIdx(type);
if (type && type_idx == TypeIndex::Nothing)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "T64 codec is not supported for specified type {}", type->getName());
std::optional<TypeIndex> type_idx;
if (type)
{
type_idx = typeIdx(type);
if (type_idx == TypeIndex::Nothing)
throw Exception(
ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "T64 codec is not supported for specified type {}", type->getName());
}
return std::make_shared<CompressionCodecT64>(type_idx, variant);
};

View File

@ -35,7 +35,6 @@
#define DBMS_MERGE_TREE_PART_INFO_VERSION 1
/// Minimum revision supporting interserver secret.
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
#define DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO 54443
@ -54,7 +53,7 @@
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
#define DBMS_TCP_PROTOCOL_VERSION 54461
#define DBMS_TCP_PROTOCOL_VERSION 54462
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449
@ -72,3 +71,5 @@
#define DBMS_MIN_PROTOCOL_VERSION_WITH_SERVER_QUERY_TIME_IN_PROGRESS 54460
#define DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES 54461
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 54462

View File

@ -50,14 +50,13 @@ class IColumn;
M(UInt64, max_download_buffer_size, 10*1024*1024, "The maximal size of buffer for parallel downloading (e.g. for URL engine) per each thread.", 0) \
M(UInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
M(UInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).", 0) \
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)", 0) \
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "The maximum number of bytes of a query string parsed by the SQL parser. Data in the VALUES clause of INSERT queries is processed by a separate stream parser (that consumes O(1) RAM) and not affected by this restriction.", 0) \
M(UInt64, interactive_delay, 100000, "The interval in microseconds to check if the request is cancelled, and to send progress info.", 0) \
M(Seconds, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connection timeout if there are no replicas.", 0) \
M(Milliseconds, connect_timeout_with_failover_ms, 50, "Connection timeout for selecting first healthy replica.", 0) \
M(Milliseconds, connect_timeout_with_failover_secure_ms, 100, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "Timeout for receiving data from network, in seconds. If no bytes were received in this interval, exception is thrown. If you set this setting on client, the 'send_timeout' for the socket will be also set on the corresponding connection end on the server.", 0) \
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "Timeout for sending data to network, in seconds. If client needs to sent some data, but it did not able to send any bytes in this interval, exception is thrown. If you set this setting on client, the 'receive_timeout' for the socket will be also set on the corresponding connection end on the server.", 0) \
M(Seconds, drain_timeout, 3, "Timeout for draining remote connections, -1 means synchronous drain without ignoring errors", 0) \
M(Seconds, tcp_keep_alive_timeout, 290 /* less than DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC */, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
M(Milliseconds, hedged_connection_timeout_ms, 100, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
M(Milliseconds, receive_data_timeout_ms, 2000, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
@ -254,6 +253,8 @@ class IColumn;
M(Bool, send_progress_in_http_headers, false, "Send progress notifications using X-ClickHouse-Progress headers. Some clients do not support high amount of HTTP headers (Python requests in particular), so it is disabled by default.", 0) \
\
M(UInt64, http_headers_progress_interval_ms, 100, "Do not send HTTP headers X-ClickHouse-Progress more frequently than at each specified interval.", 0) \
M(Bool, http_wait_end_of_query, false, "Enable HTTP response buffering on the server-side.", 0) \
M(UInt64, http_response_buffer_size, false, "The number of bytes to buffer in the server memory before sending a HTTP response to the client or flushing to disk (when http_wait_end_of_query is enabled).", 0) \
\
M(Bool, fsync_metadata, true, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.", 0) \
\
@ -284,8 +285,6 @@ class IColumn;
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \
M(Milliseconds, sleep_after_receiving_query_ms, 0, "Time to sleep after receiving query in TCPHandler", 0) \
M(UInt64, unknown_packet_in_send_data, 0, "Send unknown packet instead of data Nth data packet", 0) \
/** Settings for testing connection collector */ \
M(Milliseconds, sleep_in_receive_cancel_ms, 0, "Time to sleep in receiving cancel in TCPHandler", 0) \
\
M(Bool, insert_allow_materialized_columns, false, "If setting is enabled, Allow materialized columns in INSERT.", 0) \
M(Seconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.", 0) \
@ -719,6 +718,7 @@ class IColumn;
M(Float, insert_keeper_fault_injection_probability, 0.0f, "Approximate probability of failure for a keeper request during insert. Valid value is in interval [0.0f, 1.0f]", 0) \
M(UInt64, insert_keeper_fault_injection_seed, 0, "0 - random seed, otherwise the setting value", 0) \
M(Bool, force_aggregation_in_order, false, "Force use of aggregation in order on remote nodes during distributed aggregation. PLEASE, NEVER CHANGE THIS SETTING VALUE MANUALLY!", IMPORTANT) \
M(UInt64, http_max_request_param_data_size, 10_MiB, "Limit on size of request data used as a query parameter in predefined HTTP requests.", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.
@ -759,7 +759,7 @@ class IColumn;
MAKE_OBSOLETE(M, Seconds, temporary_live_view_timeout, 1) \
MAKE_OBSOLETE(M, Milliseconds, async_insert_cleanup_timeout_ms, 1000) \
MAKE_OBSOLETE(M, Bool, optimize_fuse_sum_count_avg, 0) \
MAKE_OBSOLETE(M, Seconds, drain_timeout, 3) \
/** The section above is for obsolete settings. Do not add anything there. */
@ -803,6 +803,7 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \

View File

@ -68,6 +68,15 @@ public:
return disk.writeFile(path, buf_size, mode, settings);
}
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override
{
disk.writeFileUsingCustomWriteObject(path, mode, std::move(custom_write_object_function));
}
void removeFile(const std::string & path) override
{
disk.removeFile(path);

View File

@ -38,6 +38,15 @@ void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const Strin
out->finalize();
}
void IDisk::writeFileUsingCustomWriteObject(
const String &, WriteMode, std::function<size_t(const StoredObject &, WriteMode, const std::optional<ObjectAttributes> &)>)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method `writeFileUsingCustomWriteObject()` is not implemented for disk: {}",
getDataSourceDescription().type);
}
DiskTransactionPtr IDisk::createTransaction()
{

View File

@ -209,6 +209,15 @@ public:
WriteMode mode = WriteMode::Rewrite,
const WriteSettings & settings = {}) = 0;
/// Write a file using a custom function to write an object to the disk's object storage.
/// This method is alternative to writeFile(), the difference is that writeFile() calls IObjectStorage::writeObject()
/// to write an object to the object storage while this method allows to specify a callback for that.
virtual void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function);
/// Remove file. Throws exception if file doesn't exists or it's a directory.
/// Return whether file was finally removed. (For remote disks it is not always removed).
virtual void removeFile(const String & path) = 0;

View File

@ -68,6 +68,13 @@ public:
const WriteSettings & settings = {},
bool autocommit = true) = 0;
/// Write a file using a custom function to write an object to the disk's object storage.
virtual void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) = 0;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
virtual void removeFile(const std::string & path) = 0;

View File

@ -577,6 +577,17 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
return result;
}
void DiskObjectStorage::writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function)
{
LOG_TEST(log, "Write file: {}", path);
auto transaction = createObjectStorageTransaction();
return transaction->writeFileUsingCustomWriteObject(path, mode, std::move(custom_write_object_function));
}
void DiskObjectStorage::applyNewSettings(
const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &)
{

View File

@ -152,6 +152,12 @@ public:
WriteMode mode,
const WriteSettings & settings) override;
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;

View File

@ -670,6 +670,44 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
}
void DiskObjectStorageTransaction::writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function)
{
/// This function is a simplified and adapted version of DiskObjectStorageTransaction::writeFile().
auto blob_name = object_storage.generateBlobNameForPath(path);
std::optional<ObjectAttributes> object_attributes;
if (metadata_helper)
{
auto revision = metadata_helper->revision_counter + 1;
metadata_helper->revision_counter++;
object_attributes = {
{"path", path}
};
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
}
auto object = StoredObject::create(object_storage, fs::path(metadata_storage.getObjectStorageRootPath()) / blob_name);
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object);
operations_to_execute.emplace_back(std::move(write_operation));
/// We always use mode Rewrite because we simulate append using metadata and different files
size_t object_size = std::move(custom_write_object_function)(object, WriteMode::Rewrite, object_attributes);
/// Create metadata (see create_metadata_callback in DiskObjectStorageTransaction::writeFile()).
if (mode == WriteMode::Rewrite)
metadata_transaction->createMetadataFile(path, blob_name, object_size);
else
metadata_transaction->addBlobToMetadata(path, blob_name, object_size);
metadata_transaction->commit();
}
void DiskObjectStorageTransaction::createHardLink(const std::string & src_path, const std::string & dst_path)
{
operations_to_execute.emplace_back(

View File

@ -99,6 +99,13 @@ public:
const WriteSettings & settings = {},
bool autocommit = true) override;
/// Write a file using a custom function to write an object to the disk's object storage.
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override;
void removeFile(const std::string & path) override;
void removeFileIfExists(const std::string & path) override;
void removeDirectory(const std::string & path) override;

View File

@ -1,7 +1,4 @@
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Common/ProfileEvents.h>
#include <Interpreters/Context.h>
#if USE_AWS_S3
@ -18,10 +15,12 @@
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3/copyS3File.h>
#include <Interpreters/Context.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Common/getRandomASCIIString.h>
#include <Common/ProfileEvents.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/logger_useful.h>
#include <Common/MultiVersion.h>

View File

@ -0,0 +1,69 @@
#include <Disks/ObjectStorages/S3/copyS3FileToDisk.h>
#if USE_AWS_S3
#include <IO/S3/getObjectInfo.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/copyData.h>
#include <IO/S3/copyS3File.h>
namespace DB
{
void copyS3FileToDisk(
const std::shared_ptr<const S3::Client> & s3_client,
const String & src_bucket,
const String & src_key,
const std::optional<String> & version_id,
std::optional<size_t> src_offset,
std::optional<size_t> src_size,
DiskPtr destination_disk,
const String & destination_path,
WriteMode write_mode,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const S3Settings::RequestSettings & request_settings,
ThreadPoolCallbackRunner<void> scheduler)
{
if (!src_offset)
src_offset = 0;
if (!src_size)
src_size = S3::getObjectSize(*s3_client, src_bucket, src_key, version_id.value_or(""), request_settings) - *src_offset;
auto destination_data_source_description = destination_disk->getDataSourceDescription();
if (destination_data_source_description != DataSourceDescription{DataSourceType::S3, s3_client->getInitialEndpoint(), false, false})
{
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} through buffers", src_key, destination_disk->getName());
ReadBufferFromS3 read_buffer{s3_client, src_bucket, src_key, {}, request_settings, read_settings};
if (*src_offset)
read_buffer.seek(*src_offset, SEEK_SET);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(*src_size, DBMS_DEFAULT_BUFFER_SIZE), write_mode, write_settings);
copyData(read_buffer, *write_buffer, *src_size);
write_buffer->finalize();
return;
}
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} using native copy", src_key, destination_disk->getName());
String dest_bucket = destination_disk->getObjectStorage()->getObjectsNamespace();
auto custom_write_object = [&](const StoredObject & object_, WriteMode write_mode_, const std::optional<ObjectAttributes> & object_attributes_) -> size_t
{
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
chassert(write_mode_ == WriteMode::Rewrite);
copyS3File(s3_client, src_bucket, src_key, *src_offset, *src_size, dest_bucket, /* dest_key= */ object_.absolute_path,
request_settings, object_attributes_, scheduler, /* for_disk_s3= */ true);
return *src_size;
};
destination_disk->writeFileUsingCustomWriteObject(destination_path, write_mode, custom_write_object);
}
}
#endif

View File

@ -0,0 +1,36 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <Disks/IDisk.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
namespace DB
{
/// Copies an object from S3 bucket to a disk of any type.
/// Depending on the disk the function can either do copying though buffers
/// (i.e. download the object by portions and then write those portions to the specified disk),
/// or perform a server-side copy.
void copyS3FileToDisk(
const std::shared_ptr<const S3::Client> & s3_client,
const String & src_bucket,
const String & src_key,
const std::optional<String> & version_id,
std::optional<size_t> src_offset,
std::optional<size_t> src_size,
DiskPtr destination_disk,
const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite,
const ReadSettings & read_settings = {},
const WriteSettings & write_settings = {},
const S3Settings::RequestSettings & request_settings = {},
ThreadPoolCallbackRunner<void> scheduler = {});
}
#endif

View File

@ -117,6 +117,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;

View File

@ -211,6 +211,7 @@ struct FormatSettings
std::unordered_set<int> skip_row_groups = {};
bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true;
UInt64 max_block_size = 8192;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
} parquet;

View File

@ -0,0 +1,103 @@
#include <Formats/MarkInCompressedFile.h>
#include <Common/BitHelpers.h>
namespace DB
{
// Write a range of bits in a bit-packed array.
// The array must be overallocated by one element.
// The bit range must be pre-filled with zeros.
void writeBits(UInt64 * dest, size_t bit_offset, UInt64 value)
{
size_t mod = bit_offset % 64;
dest[bit_offset / 64] |= value << mod;
if (mod)
dest[bit_offset / 64 + 1] |= value >> (64 - mod);
}
// The array must be overallocated by one element.
UInt64 readBits(const UInt64 * src, size_t bit_offset, size_t num_bits)
{
size_t mod = bit_offset % 64;
UInt64 value = src[bit_offset / 64] >> mod;
if (mod)
value |= src[bit_offset / 64 + 1] << (64 - mod);
return value & maskLowBits<UInt64>(num_bits);
}
MarksInCompressedFile::MarksInCompressedFile(const PlainArray & marks)
: num_marks(marks.size()), blocks((marks.size() + MARKS_PER_BLOCK - 1) / MARKS_PER_BLOCK, BlockInfo{})
{
if (num_marks == 0)
{
return;
}
// First pass: calculate layout of all blocks and total memory required.
size_t packed_bits = 0;
for (size_t block_idx = 0; block_idx < blocks.size(); ++block_idx)
{
BlockInfo & block = blocks[block_idx];
block.bit_offset_in_packed_array = packed_bits;
size_t max_x = 0;
size_t max_y = 0;
size_t num_marks_in_this_block = std::min(MARKS_PER_BLOCK, num_marks - block_idx * MARKS_PER_BLOCK);
for (size_t i = 0; i < num_marks_in_this_block; ++i)
{
const auto & mark = marks[block_idx * MARKS_PER_BLOCK + i];
block.min_x = std::min(block.min_x, mark.offset_in_compressed_file);
max_x = std::max(max_x, mark.offset_in_compressed_file);
block.min_y = std::min(block.min_y, mark.offset_in_decompressed_block);
max_y = std::max(max_y, mark.offset_in_decompressed_block);
block.trailing_zero_bits_in_y
= std::min(block.trailing_zero_bits_in_y, static_cast<UInt8>(getTrailingZeroBits(mark.offset_in_decompressed_block)));
}
block.bits_for_x = sizeof(size_t) * 8 - getLeadingZeroBits(max_x - block.min_x);
block.bits_for_y = sizeof(size_t) * 8 - getLeadingZeroBits((max_y - block.min_y) >> block.trailing_zero_bits_in_y);
packed_bits += num_marks_in_this_block * (block.bits_for_x + block.bits_for_y);
}
// Overallocate by +1 element to let the bit packing/unpacking do less bounds checking.
size_t packed_length = (packed_bits + 63) / 64 + 1;
packed.reserve_exact(packed_length);
packed.resize_fill(packed_length);
// Second pass: write out the packed marks.
for (size_t idx = 0; idx < num_marks; ++idx)
{
const auto & mark = marks[idx];
auto [block, offset] = lookUpMark(idx);
writeBits(packed.data(), offset, mark.offset_in_compressed_file - block->min_x);
writeBits(
packed.data(),
offset + block->bits_for_x,
(mark.offset_in_decompressed_block - block->min_y) >> block->trailing_zero_bits_in_y);
}
}
MarkInCompressedFile MarksInCompressedFile::get(size_t idx) const
{
auto [block, offset] = lookUpMark(idx);
size_t x = block->min_x + readBits(packed.data(), offset, block->bits_for_x);
size_t y = block->min_y + (readBits(packed.data(), offset + block->bits_for_x, block->bits_for_y) << block->trailing_zero_bits_in_y);
return MarkInCompressedFile{.offset_in_compressed_file = x, .offset_in_decompressed_block = y};
}
std::tuple<const MarksInCompressedFile::BlockInfo *, size_t> MarksInCompressedFile::lookUpMark(size_t idx) const
{
size_t block_idx = idx / MARKS_PER_BLOCK;
const BlockInfo & block = blocks[block_idx];
size_t offset = block.bit_offset_in_packed_array + (idx - block_idx * MARKS_PER_BLOCK) * (block.bits_for_x + block.bits_for_y);
return {&block, offset};
}
size_t MarksInCompressedFile::approximateMemoryUsage() const
{
return sizeof(*this) + blocks.size() * sizeof(blocks[0]) + packed.size() * sizeof(packed[0]);
}
}

View File

@ -2,8 +2,8 @@
#include <tuple>
#include <base/types.h>
#include <IO/WriteHelpers.h>
#include <base/types.h>
#include <Common/PODArray.h>
@ -23,15 +23,9 @@ struct MarkInCompressedFile
return std::tie(offset_in_compressed_file, offset_in_decompressed_block)
== std::tie(rhs.offset_in_compressed_file, rhs.offset_in_decompressed_block);
}
bool operator!=(const MarkInCompressedFile & rhs) const
{
return !(*this == rhs);
}
bool operator!=(const MarkInCompressedFile & rhs) const { return !(*this == rhs); }
auto asTuple() const
{
return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block);
}
auto asTuple() const { return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block); }
String toString() const
{
@ -40,20 +34,87 @@ struct MarkInCompressedFile
String toStringWithRows(size_t rows_num) const
{
return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + "," + DB::toString(rows_num) + ")";
return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + ","
+ DB::toString(rows_num) + ")";
}
};
class MarksInCompressedFile : public PODArray<MarkInCompressedFile>
/**
* In-memory representation of an array of marks.
*
* Uses an ad-hoc compression scheme that decreases memory usage while allowing
* random access in O(1) time.
* This is independent from the marks *file* format, which may be uncompressed
* or use a different compression method.
*
* Typical memory usage:
* * ~3 bytes/mark for integer columns
* * ~5 bytes/mark for string columns
* * ~0.3 bytes/mark for trivial marks in auxiliary dict files of LowCardinality columns
*/
class MarksInCompressedFile
{
public:
explicit MarksInCompressedFile(size_t n) : PODArray(n) {}
using PlainArray = PODArray<MarkInCompressedFile>;
void read(ReadBuffer & buffer, size_t from, size_t count)
MarksInCompressedFile(const PlainArray & marks);
MarkInCompressedFile get(size_t idx) const;
size_t approximateMemoryUsage() const;
private:
/** Throughout this class:
* * "x" stands for offset_in_compressed_file,
* * "y" stands for offset_in_decompressed_block.
*/
/** We need to store a sequence of marks, each consisting of two 64-bit integers:
* offset_in_compressed_file and offset_in_decompressed_block. We'll call them x and y for
* convenience, since compression doesn't care what they mean. The compression exploits the
* following regularities:
* * y is usually zero.
* * x usually increases steadily.
* * Differences between x values in nearby marks usually fit in much fewer than 64 bits.
*
* We split the sequence of marks into blocks, each containing MARKS_PER_BLOCK marks.
* (Not to be confused with data blocks.)
* For each mark, we store the difference [value] - [min value in the block], for each of the
* two values in the mark. Each block specifies the number of bits to use for these differences
* for all marks in this block.
* The smaller the blocks the fewer bits are required, but the bigger the relative overhead of
* block headers.
*
* Packed marks and block headers all live in one contiguous array.
*/
struct BlockInfo
{
buffer.readStrict(reinterpret_cast<char *>(data() + from), count * sizeof(MarkInCompressedFile));
}
// Min offset_in_compressed_file and offset_in_decompressed_block, correspondingly.
size_t min_x = UINT64_MAX;
size_t min_y = UINT64_MAX;
// Place in `packed` where this block start.
size_t bit_offset_in_packed_array;
// How many bits each mark takes. These numbers are bit-packed in the `packed` array.
// Can be zero. (Especially for y, which is typically all zeroes.)
UInt8 bits_for_x;
UInt8 bits_for_y;
// The `y` values should be <<'ed by this amount.
// Useful for integer columns when marks granularity is a power of 2; in this case all
// offset_in_decompressed_block values are divisible by 2^15 or so.
UInt8 trailing_zero_bits_in_y = 63;
};
static constexpr size_t MARKS_PER_BLOCK = 256;
size_t num_marks;
PODArray<BlockInfo> blocks;
PODArray<UInt64> packed;
// Mark idx -> {block info, bit offset in `packed`}.
std::tuple<const BlockInfo *, size_t> lookUpMark(size_t idx) const;
};
}

View File

@ -0,0 +1,52 @@
#include <random>
#include <gtest/gtest.h>
#include <Formats/MarkInCompressedFile.h>
using namespace DB;
TEST(Marks, Compression)
{
std::random_device dev;
std::mt19937 rng(dev());
auto gen = [&](size_t count, size_t max_x_increment, size_t max_y_increment)
{
size_t x = 0, y = 0;
PODArray<MarkInCompressedFile> plain(count);
for (int i = 0; i < count; ++i)
{
x += rng() % (max_x_increment + 1);
y += rng() % (max_y_increment + 1);
plain[i] = MarkInCompressedFile{.offset_in_compressed_file = x, .offset_in_decompressed_block = y};
}
return plain;
};
auto test = [](const PODArray<MarkInCompressedFile> & plain, size_t max_bits_per_mark)
{
PODArray<MarkInCompressedFile> copy;
copy.assign(plain); // paranoid in case next line mutates it
MarksInCompressedFile marks(copy);
for (size_t i = 0; i < plain.size(); ++i)
ASSERT_EQ(marks.get(i), plain[i]);
EXPECT_LE((marks.approximateMemoryUsage() - sizeof(MarksInCompressedFile)) * 8, plain.size() * max_bits_per_mark);
};
// Typical.
test(gen(10000, 1'000'000, 0), 30);
// Completely random 64-bit values.
test(gen(10000, UINT64_MAX - 1, UINT64_MAX - 1), 130);
// All zeros.
test(gen(10000, 0, 0), 2);
// Short.
test(gen(10, 1000, 1000), 65);
// Empty.
test(gen(0, 0, 0), 0);
}

View File

@ -53,6 +53,10 @@ struct ToDateImpl
{
static constexpr auto name = "toDate";
static inline UInt16 execute(const DecimalUtils::DecimalComponents<DateTime64> & t, const DateLUTImpl & time_zone)
{
return static_cast<UInt16>(time_zone.toDayNum(t.whole));
}
static inline UInt16 execute(Int64 t, const DateLUTImpl & time_zone)
{
return UInt16(time_zone.toDayNum(t));
@ -69,6 +73,10 @@ struct ToDateImpl
{
return d;
}
static inline DecimalUtils::DecimalComponents<DateTime64> executeExtendedResult(const DecimalUtils::DecimalComponents<DateTime64> & t, const DateLUTImpl & time_zone)
{
return {time_zone.toDayNum(t.whole), 0};
}
using FactorTransform = ZeroTransform;
};

View File

@ -285,9 +285,9 @@ struct NgramDistanceImpl
size_t first_size = dispatchSearcher(calculateHaystackStatsAndMetric<false>, data.data(), data_size, common_stats.get(), distance, nullptr);
/// For !symmetric version we should not use first_size.
if constexpr (symmetric)
res = distance * 1.f / std::max(first_size + second_size, static_cast<size_t>(1));
res = distance * 1.f / std::max(first_size + second_size, 1uz);
else
res = 1.f - distance * 1.f / std::max(second_size, static_cast<size_t>(1));
res = 1.f - distance * 1.f / std::max(second_size, 1uz);
}
else
{
@ -353,9 +353,9 @@ struct NgramDistanceImpl
/// For !symmetric version we should not use haystack_stats_size.
if constexpr (symmetric)
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, static_cast<size_t>(1));
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, 1uz);
else
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, static_cast<size_t>(1));
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, 1uz);
}
else
{
@ -424,7 +424,7 @@ struct NgramDistanceImpl
for (size_t j = 0; j < needle_stats_size; ++j)
--common_stats[needle_ngram_storage[j]];
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, static_cast<size_t>(1));
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, 1uz);
}
else
{
@ -471,9 +471,9 @@ struct NgramDistanceImpl
ngram_storage.get());
/// For !symmetric version we should not use haystack_stats_size.
if constexpr (symmetric)
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, static_cast<size_t>(1));
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, 1uz);
else
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, static_cast<size_t>(1));
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, 1uz);
}
else
{

View File

@ -46,36 +46,57 @@ public:
{
if constexpr (std::is_same_v<typename Transform::FactorTransform, ZeroTransform>)
return { .is_monotonic = true, .is_always_monotonic = true };
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
const DateLUTImpl * date_lut = &DateLUT::instance();
if (const auto * timezone = dynamic_cast<const TimezoneMixin *>(&type))
date_lut = &timezone->getTimeZone();
if (left.isNull() || right.isNull())
return is_not_monotonic;
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(&type))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(&type))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
const DateLUTImpl * date_lut = &DateLUT::instance();
if (const auto * timezone = dynamic_cast<const TimezoneMixin *>(&type))
date_lut = &timezone->getTimeZone();
if (left.isNull() || right.isNull())
return is_not_monotonic;
const auto * type_ptr = &type;
if (const auto * nullable_type = checkAndGetDataType<DataTypeNullable>(type_ptr))
type_ptr = nullable_type->getNestedType().get();
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(type_ptr))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(type_ptr))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDateTime>(type_ptr))
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
assert(checkAndGetDataType<DataTypeDateTime64>(type_ptr));
const auto & left_date_time = left.get<DateTime64>();
TransformDateTime64<typename Transform::FactorTransform> transformer_left(left_date_time.getScale());
const auto & right_date_time = right.get<DateTime64>();
TransformDateTime64<typename Transform::FactorTransform> transformer_right(right_date_time.getScale());
return transformer_left.execute(left_date_time.getValue(), *date_lut)
== transformer_right.execute(right_date_time.getValue(), *date_lut)
? is_monotonic : is_not_monotonic;
}
}
}

View File

@ -71,8 +71,7 @@ restoreUserDefinedSQLObjects(RestorerFromBackup & restorer, const String & data_
String function_name = unescapeForFileName(escaped_function_name);
String filepath = data_path_in_backup_fs / filename;
auto backup_entry = backup->readFile(filepath);
auto in = backup_entry->getReadBuffer();
auto in = backup->readFile(filepath);
String statement_def;
readStringUntilEOF(statement_def, *in);

View File

@ -291,7 +291,7 @@ public:
ssize_t remain_byte = src.getElementSize() - offset_byte;
if (length < 0)
{
length_byte = std::max(remain_byte + (length / word_size), static_cast<ssize_t>(0));
length_byte = std::max(remain_byte + (length / word_size), 0z);
over_bit = word_size + (length % word_size);
if (length_byte == 1 && over_bit <= offset_bit) // begin and end are in same byte AND there are no gaps
length_byte = 0;
@ -330,7 +330,7 @@ public:
size_t size = src.getElementSize();
if (length < 0)
{
length_byte = std::max(static_cast<ssize_t>(offset_byte) + (length / word_size), static_cast<ssize_t>(0));
length_byte = std::max(static_cast<ssize_t>(offset_byte) + (length / word_size), 0z);
over_bit = word_size + (length % word_size);
if (length_byte == 1 && over_bit <= offset_bit) // begin and end are in same byte AND there are no gaps
length_byte = 0;
@ -395,7 +395,7 @@ public:
}
else
{
length_byte = std::max(remain_byte + (static_cast<ssize_t>(length) / word_size), static_cast<ssize_t>(0));
length_byte = std::max(remain_byte + (static_cast<ssize_t>(length) / word_size), 0z);
over_bit = word_size + (length % word_size);
if (length_byte == 1 && over_bit <= offset_bit) // begin and end are in same byte AND there are no gaps
length_byte = 0;

View File

@ -20,5 +20,6 @@ using FunctionPositionCaseInsensitive = FunctionsStringSearch<PositionImpl<NameP
REGISTER_FUNCTION(PositionCaseInsensitive)
{
factory.registerFunction<FunctionPositionCaseInsensitive>();
factory.registerAlias("instr", NamePositionCaseInsensitive::name, FunctionFactory::CaseInsensitive);
}
}

View File

@ -106,7 +106,7 @@ void MemoryWriteBuffer::addChunk()
}
else
{
next_chunk_size = std::max(static_cast<size_t>(1), static_cast<size_t>(chunk_tail->size() * growth_rate));
next_chunk_size = std::max(1uz, static_cast<size_t>(chunk_tail->size() * growth_rate));
next_chunk_size = std::min(next_chunk_size, max_chunk_size);
}

View File

@ -123,9 +123,8 @@ Client::Client(
{
auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(accessEndpointProvider().get());
endpoint_provider->GetBuiltInParameters().GetParameter("Region").GetString(explicit_region);
std::string endpoint;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && endpoint.find(".amazonaws.com") != std::string::npos;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(initial_endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && initial_endpoint.find(".amazonaws.com") != std::string::npos;
cache = std::make_shared<ClientCache>();
ClientCacheRegistry::instance().registerClient(cache);
@ -133,6 +132,7 @@ Client::Client(
Client::Client(const Client & other)
: Aws::S3::S3Client(other)
, initial_endpoint(other.initial_endpoint)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, max_redirects(other.max_redirects)

View File

@ -109,6 +109,9 @@ public:
}
}
/// Returns the initial endpoint.
const String & getInitialEndpoint() const { return initial_endpoint; }
/// Decorator for RetryStrategy needed for this client to work correctly.
/// We want to manually handle permanent moves (status code 301) because:
/// - redirect location is written in XML format inside the response body something that doesn't exist for HEAD
@ -198,6 +201,8 @@ private:
bool checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const;
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
String initial_endpoint;
std::string explicit_region;
mutable bool detect_region = true;

View File

@ -10,6 +10,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
}
namespace
@ -91,6 +92,13 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<v
copyDataImpl(from, to, true, bytes, cancellation_hook, nullptr);
}
void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes)
{
copyDataImpl(from, to, false, max_bytes, nullptr, nullptr);
if (!from.eof())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data, max readable size reached.");
}
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler)
{
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, throttler);

View File

@ -27,6 +27,9 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atom
void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook);
/// Copies at most `max_bytes` bytes from ReadBuffer to WriteBuffer. If there are more bytes, then throws an exception.
void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes);
/// Same as above but also use throttler to limit maximum speed
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler);
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler);

View File

@ -1442,7 +1442,8 @@ void Aggregator::prepareAggregateInstructions(
aggregate_columns[i][j] = materialized_columns.back().get();
/// Sparse columns without defaults may be handled incorrectly.
if (aggregate_columns[i][j]->getNumberOfDefaultRows() == 0)
if (aggregate_columns[i][j]->isSparse()
&& aggregate_columns[i][j]->getNumberOfDefaultRows() == 0)
allow_sparse_arguments = false;
auto full_column = allow_sparse_arguments

View File

@ -189,7 +189,7 @@ std::vector<JoinedElement> getTables(const ASTSelectQuery & select)
if (t.hasUsing())
{
if (has_using)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Multuple USING statements are not supported");
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Multiple USING statements are not supported");
has_using = true;
}

View File

@ -848,6 +848,23 @@ std::string ExpressionActions::dumpActions() const
return ss.str();
}
void ExpressionActions::describeActions(WriteBuffer & out, std::string_view prefix) const
{
bool first = true;
for (const auto & action : actions)
{
out << prefix << (first ? "Actions: " : " ");
out << action.toString() << '\n';
first = false;
}
out << prefix << "Positions:";
for (const auto & pos : result_positions)
out << ' ' << pos;
out << '\n';
}
JSONBuilder::ItemPtr ExpressionActions::toTree() const
{
auto inputs_array = std::make_unique<JSONBuilder::JSONArray>();

View File

@ -109,6 +109,9 @@ public:
const Block & getSampleBlock() const { return sample_block; }
std::string dumpActions() const;
void describeActions(WriteBuffer & out, std::string_view prefix) const;
JSONBuilder::ItemPtr toTree() const;
static NameAndTypePair getSmallestColumn(const NamesAndTypesList & columns);

View File

@ -562,7 +562,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{
/// Allow push down and other optimizations for VIEW: replace with subquery and rewrite it.
ASTPtr view_table;
NameToNameMap parameter_values;
NameToNameMap parameter_types;
if (view)
{
@ -575,14 +574,13 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// and after query is replaced, we use these parameters to substitute in the parameterized view query
if (query_info.is_parameterized_view)
{
parameter_values = analyzeFunctionParamValues(query_ptr);
view->setParameterValues(parameter_values);
parameter_types = view->getParameterValues();
query_info.parameterized_view_values = analyzeFunctionParamValues(query_ptr);
parameter_types = view->getParameterTypes();
}
view->replaceWithSubquery(getSelectQuery(), view_table, metadata_snapshot, view->isParameterizedView());
if (query_info.is_parameterized_view)
{
view->replaceQueryParametersIfParametrizedView(query_ptr);
view->replaceQueryParametersIfParametrizedView(query_ptr, query_info.parameterized_view_values);
}
}
@ -595,7 +593,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
required_result_column_names,
table_join,
query_info.is_parameterized_view,
parameter_values,
query_info.parameterized_view_values,
parameter_types);
@ -747,7 +745,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.filter_asts.push_back(parallel_replicas_custom_filter_ast);
}
source_header = storage_snapshot->getSampleBlockForColumns(required_columns, parameter_values);
source_header = storage_snapshot->getSampleBlockForColumns(required_columns, query_info.parameterized_view_values);
}
/// Calculate structure of the result.

View File

@ -1,6 +1,11 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
namespace DB
{
@ -26,4 +31,59 @@ void InterpreterSetQuery::executeForCurrentContext()
getContext()->resetSettingsToDefaultValue(ast.default_settings);
}
static void applySettingsFromSelectWithUnion(const ASTSelectWithUnionQuery & select_with_union, ContextMutablePtr context)
{
const ASTs & children = select_with_union.list_of_selects->children;
if (children.empty())
return;
// We might have an arbitrarily complex UNION tree, so just give
// up if the last first-order child is not a plain SELECT.
// It is flattened later, when we process UNION ALL/DISTINCT.
const auto * last_select = children.back()->as<ASTSelectQuery>();
if (last_select && last_select->settings())
{
InterpreterSetQuery(last_select->settings(), context).executeForCurrentContext();
}
}
void InterpreterSetQuery::applySettingsFromQuery(const ASTPtr & ast, ContextMutablePtr context_)
{
if (!ast)
return;
if (const auto * select_query = ast->as<ASTSelectQuery>())
{
if (auto new_settings = select_query->settings())
InterpreterSetQuery(new_settings, context_).executeForCurrentContext();
}
else if (const auto * select_with_union_query = ast->as<ASTSelectWithUnionQuery>())
{
applySettingsFromSelectWithUnion(*select_with_union_query, context_);
}
else if (const auto * explain_query = ast->as<ASTExplainQuery>())
{
applySettingsFromQuery(explain_query->getExplainedQuery(), context_);
}
else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
if (query_with_output->settings_ast)
InterpreterSetQuery(query_with_output->settings_ast, context_).executeForCurrentContext();
if (const auto * create_query = ast->as<ASTCreateQuery>())
{
if (create_query->select)
{
applySettingsFromSelectWithUnion(create_query->select->as<ASTSelectWithUnionQuery &>(), context_);
}
}
}
else if (auto * insert_query = ast->as<ASTInsertQuery>())
{
context_->setInsertFormat(insert_query->format);
if (insert_query->settings_ast)
InterpreterSetQuery(insert_query->settings_ast, context_).executeForCurrentContext();
}
}
}

View File

@ -27,6 +27,9 @@ public:
bool supportsTransactions() const override { return true; }
/// To apply SETTINGS clauses from query as early as possible
static void applySettingsFromQuery(const ASTPtr & ast, ContextMutablePtr context_);
private:
ASTPtr query_ptr;
};

View File

@ -119,7 +119,9 @@ void LogicalExpressionsOptimizer::collectDisjunctiveEqualityChains()
bool found_chain = false;
auto * function = to_node->as<ASTFunction>();
if (function && function->name == "or" && function->children.size() == 1)
/// Optimization does not respect aliases properly, which can lead to MULTIPLE_EXPRESSION_FOR_ALIAS error.
/// Disable it if an expression has an alias. Proper implementation is done with the new analyzer.
if (function && function->alias.empty() && function->name == "or" && function->children.size() == 1)
{
const auto * expression_list = function->children[0]->as<ASTExpressionList>();
if (expression_list)
@ -128,14 +130,14 @@ void LogicalExpressionsOptimizer::collectDisjunctiveEqualityChains()
for (const auto & child : expression_list->children)
{
auto * equals = child->as<ASTFunction>();
if (equals && equals->name == "equals" && equals->children.size() == 1)
if (equals && equals->alias.empty() && equals->name == "equals" && equals->children.size() == 1)
{
const auto * equals_expression_list = equals->children[0]->as<ASTExpressionList>();
if (equals_expression_list && equals_expression_list->children.size() == 2)
{
/// Equality expr = xN.
const auto * literal = equals_expression_list->children[1]->as<ASTLiteral>();
if (literal)
if (literal && literal->alias.empty())
{
auto expr_lhs = equals_expression_list->children[0]->getTreeHash();
OrWithExpression or_with_expression{function, expr_lhs, function->tryGetAlias()};
@ -230,6 +232,9 @@ bool LogicalExpressionsOptimizer::mayOptimizeDisjunctiveEqualityChain(const Disj
const auto & equalities = chain.second;
const auto & equality_functions = equalities.functions;
if (settings.optimize_min_equality_disjunction_chain_length == 0)
return false;
/// For LowCardinality column, the dict is usually smaller and the index is relatively large.
/// In most cases, merging OR-chain as IN is better than converting each LowCardinality into full column individually.
/// For non-LowCardinality, we need to eliminate too short chains.

View File

@ -83,7 +83,10 @@ void ReplaceQueryParameterVisitor::visitQueryParameter(ASTPtr & ast)
IColumn & temp_column = *temp_column_ptr;
ReadBufferFromString read_buffer{value};
FormatSettings format_settings;
data_type->getDefaultSerialization()->deserializeTextEscaped(temp_column, read_buffer, format_settings);
if (ast_param.name == "_request_body")
data_type->getDefaultSerialization()->deserializeWholeText(temp_column, read_buffer, format_settings);
else
data_type->getDefaultSerialization()->deserializeTextEscaped(temp_column, read_buffer, format_settings);
if (!read_buffer.eof())
throw Exception(ErrorCodes::BAD_QUERY_PARAMETER,

Some files were not shown because too many files have changed in this diff Show More