Merge branch 'master' into new-nav

This commit is contained in:
Rich Raposa 2023-03-15 22:23:44 -06:00 committed by GitHub
commit 81cd962773
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
100 changed files with 1065 additions and 250 deletions

View File

@ -23,9 +23,12 @@ Checks: '*,
-bugprone-implicit-widening-of-multiplication-result,
-bugprone-narrowing-conversions,
-bugprone-not-null-terminated-result,
-bugprone-reserved-identifier,
-bugprone-unchecked-optional-access,
-cert-dcl16-c,
-cert-dcl37-c,
-cert-dcl51-cpp,
-cert-err58-cpp,
-cert-msc32-c,
-cert-msc51-cpp,
@ -129,6 +132,7 @@ Checks: '*,
-readability-function-cognitive-complexity,
-readability-function-size,
-readability-identifier-length,
-readability-identifier-naming,
-readability-implicit-bool-conversion,
-readability-isolate-declaration,
-readability-magic-numbers,

View File

@ -301,7 +301,7 @@ if (ENABLE_BUILD_PROFILING)
endif ()
endif ()
set (CMAKE_CXX_STANDARD 20)
set (CMAKE_CXX_STANDARD 23)
set (CMAKE_CXX_EXTENSIONS ON) # Same as gnu++2a (ON) vs c++2a (OFF): https://cmake.org/cmake/help/latest/prop_tgt/CXX_EXTENSIONS.html
set (CMAKE_CXX_STANDARD_REQUIRED ON)

View File

@ -2,6 +2,10 @@ if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif ()
# TODO: Remove this. We like to compile with C++23 (set by top-level CMakeLists) but Clang crashes with our libcxx
# when instantiated from JSON.cpp. Try again when libcxx(abi) and Clang are upgraded to 16.
set (CMAKE_CXX_STANDARD 20)
set (SRCS
argsToConfig.cpp
coverage.cpp

View File

@ -466,7 +466,7 @@ namespace Data
bool extractManualImpl(std::size_t pos, T & val, SQLSMALLINT cType)
{
SQLRETURN rc = 0;
T value = (T)0;
T value;
resizeLengths(pos);

View File

@ -27,8 +27,7 @@ Message::Message():
_tid(0),
_file(0),
_line(0),
_pMap(0),
_fmt_str(0)
_pMap(0)
{
init();
}

View File

@ -48,6 +48,9 @@ set(gRPC_ABSL_PROVIDER "clickhouse" CACHE STRING "" FORCE)
# We don't want to build C# extensions.
set(gRPC_BUILD_CSHARP_EXT OFF)
# TODO: Remove this. We generally like to compile with C++23 but grpc isn't ready yet.
set (CMAKE_CXX_STANDARD 20)
set(_gRPC_CARES_LIBRARIES ch_contrib::c-ares)
set(gRPC_CARES_PROVIDER "clickhouse" CACHE STRING "" FORCE)
add_subdirectory("${_gRPC_SOURCE_DIR}" "${_gRPC_BINARY_DIR}")

2
contrib/krb5 vendored

@ -1 +1 @@
Subproject commit f8262a1b548eb29d97e059260042036255d07f8d
Subproject commit 9453aec0d50e5aff9b189051611b321b40935d02

View File

@ -160,6 +160,8 @@ set(ALL_SRCS
# "${KRB5_SOURCE_DIR}/lib/gssapi/spnego/negoex_trace.c"
"${KRB5_SOURCE_DIR}/lib/crypto/builtin/kdf.c"
"${KRB5_SOURCE_DIR}/lib/crypto/builtin/cmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/prng.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/enc_dk_cmac.c"
# "${KRB5_SOURCE_DIR}/lib/crypto/krb/crc32.c"
@ -183,7 +185,6 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/crypto/krb/block_size.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/string_to_key.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/verify_checksum.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/crypto_libinit.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/derive.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/random_to_key.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/verify_checksum_iov.c"
@ -217,9 +218,7 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/crypto/krb/s2k_rc4.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/valid_cksumtype.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/nfold.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/prng_fortuna.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/encrypt_length.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/cmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/keyblocks.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/prf_rc4.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/s2k_pbkdf2.c"
@ -228,11 +227,11 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/enc_provider/rc4.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/enc_provider/des3.c"
#"${KRB5_SOURCE_DIR}/lib/crypto/openssl/enc_provider/camellia.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/cmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/sha256.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/hmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/kdf.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/pbkdf2.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/init.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/stubs.c"
# "${KRB5_SOURCE_DIR}/lib/crypto/openssl/hash_provider/hash_crc32.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/hash_provider/hash_evp.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/des/des_keys.c"
@ -312,7 +311,6 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/krb5/krb/allow_weak.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/mk_rep.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/mk_priv.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/s4u_authdata.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/preauth_otp.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/init_keyblock.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/ser_addr.c"
@ -688,6 +686,7 @@ target_include_directories(_krb5 PRIVATE
target_compile_definitions(_krb5 PRIVATE
KRB5_PRIVATE
CRYPTO_OPENSSL
_GSS_STATIC_LINK=1
KRB5_DEPRECATED=1
LOCALEDIR="/usr/local/share/locale"

View File

@ -44,6 +44,8 @@ if [ "$is_tsan_build" -eq "0" ]; then
fi
export ZOOKEEPER_FAULT_INJECTION=1
# Initial run without S3 to create system.*_log on local file system to make it
# available for dump via clickhouse-local
configure
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log &

View File

@ -49,17 +49,19 @@ echo -e "Successfully cloned previous release tests$OK" >> /test_output/test_res
echo -e "Successfully downloaded previous release packages$OK" >> /test_output/test_results.tsv
# Make upgrade check more funny by forcing Ordinary engine for system database
mkdir /var/lib/clickhouse/metadata
mkdir -p /var/lib/clickhouse/metadata
echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/system.sql
# Install previous release packages
install_packages previous_release_package_folder
# Start server from previous release
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
# Previous version may not be ready for fault injections
export ZOOKEEPER_FAULT_INJECTION=0
# Initial run without S3 to create system.*_log on local file system to make it
# available for dump via clickhouse-local
configure
start
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
# force_sync=false doesn't work correctly on some older versions
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
@ -67,8 +69,6 @@ sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
configure
# But we still need default disk because some tables loaded only into it
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml \
| sed "s|<main><disk>s3</disk></main>|<main><disk>s3</disk></main><default><disk>default</disk></default>|" \
@ -76,6 +76,13 @@ sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml \
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
# Start server from previous release
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
# Previous version may not be ready for fault injections
export ZOOKEEPER_FAULT_INJECTION=0
configure
start
clickhouse-client --query="SELECT 'Server version: ', version()"
@ -185,8 +192,6 @@ tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||:
collect_query_and_trace_logs
check_oom_in_dmesg
mv /var/log/clickhouse-server/stderr.log /test_output/
# Write check result into check_status.tsv

View File

@ -309,6 +309,7 @@ The HTTP interface allows passing external data (external temporary tables) for
## Response Buffering {#response-buffering}
You can enable response buffering on the server-side. The `buffer_size` and `wait_end_of_query` URL parameters are provided for this purpose.
Also settings `http_response_buffer_size` and `http_wait_end_of_query` can be used.
`buffer_size` determines the number of bytes in the result to buffer in the server memory. If a result body is larger than this threshold, the buffer is written to the HTTP channel, and the remaining data is sent directly to the HTTP channel.

View File

@ -766,7 +766,7 @@ Default value: `0`.
## concurrent_threads_soft_limit_ratio_to_cores {#concurrent_threads_soft_limit_ratio_to_cores}
The maximum number of query processing threads as multiple of number of logical cores.
More details: [concurrent_threads_soft_limit_num](#concurrent-threads-soft-limit-num).
More details: [concurrent_threads_soft_limit_num](#concurrent_threads_soft_limit_num).
Possible values:

View File

@ -966,10 +966,10 @@ This is an expert-level setting, and you shouldn't change it if you're just gett
## max_query_size {#settings-max_query_size}
The maximum part of a query that can be taken to RAM for parsing with the SQL parser.
The INSERT query also contains data for INSERT that is processed by a separate stream parser (that consumes O(1) RAM), which is not included in this restriction.
The maximum number of bytes of a query string parsed by the SQL parser.
Data in the VALUES clause of INSERT queries is processed by a separate stream parser (that consumes O(1) RAM) and not affected by this restriction.
Default value: 256 KiB.
Default value: 262144 (= 256 KiB).
## max_parser_depth {#max_parser_depth}

View File

@ -91,6 +91,13 @@ INSERT INTO t FORMAT TabSeparated
You can insert data separately from the query by using the [command-line client](/docs/en/integrations/sql-clients/clickhouse-client-local) or the [HTTP interface](/docs/en/interfaces/http/).
:::note
If you want to specify `SETTINGS` for `INSERT` query then you have to do it _before_ `FORMAT` clause since everything after `FORMAT format_name` is treated as data. For example:
```sql
INSERT INTO table SETTINGS ... FORMAT format_name data_set
```
:::
## Constraints
If table has [constraints](../../sql-reference/statements/create/table.md#constraints), their expressions will be checked for each row of inserted data. If any of those constraints is not satisfied — server will raise an exception containing constraint name and expression, the query will be stopped.

View File

@ -30,7 +30,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
}
@ -180,8 +180,19 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
columns.emplace_back(column_name, std::move(column_type));
}
/// Usually this should not happen, since in case of table does not
/// exists, the call should be succeeded.
/// However it is possible sometimes because internally there are two
/// queries in ClickHouse ODBC bridge:
/// - system.tables
/// - system.columns
/// And if between this two queries the table will be removed, them
/// there will be no columns
///
/// Also sometimes system.columns can return empty result because of
/// the cached value of total tables to scan.
if (columns.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns definition was not returned");
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Columns definition was not returned");
WriteBufferFromHTTPServerResponse out(
response,

View File

@ -72,12 +72,11 @@ namespace
return std::make_shared<BackupEntryFromMemory>(buf.str());
}
static AccessEntitiesInBackup fromBackupEntry(const IBackupEntry & backup_entry, const String & file_path)
static AccessEntitiesInBackup fromBackupEntry(std::unique_ptr<ReadBuffer> buf, const String & file_path)
{
try
{
AccessEntitiesInBackup res;
std::unique_ptr<ReadBuffer> buf = backup_entry.getReadBuffer();
bool dependencies_found = false;
@ -343,8 +342,8 @@ void AccessRestorerFromBackup::addDataPath(const String & data_path)
for (const String & filename : filenames)
{
String filepath_in_backup = data_path_in_backup_fs / filename;
auto backup_entry = backup->readFile(filepath_in_backup);
auto ab = AccessEntitiesInBackup::fromBackupEntry(*backup_entry, filepath_in_backup);
auto read_buffer_from_backup = backup->readFile(filepath_in_backup);
auto ab = AccessEntitiesInBackup::fromBackupEntry(std::move(read_buffer_from_backup), filepath_in_backup);
boost::range::copy(ab.entities, std::back_inserter(entities));
boost::range::copy(ab.dependencies, std::inserter(dependencies, dependencies.end()));

View File

@ -32,17 +32,17 @@ enum class GroupByKind
GROUPING_SETS
};
class GroupingFunctionResolveVisitor : public InDepthQueryTreeVisitor<GroupingFunctionResolveVisitor>
class GroupingFunctionResolveVisitor : public InDepthQueryTreeVisitorWithContext<GroupingFunctionResolveVisitor>
{
public:
GroupingFunctionResolveVisitor(GroupByKind group_by_kind_,
QueryTreeNodePtrWithHashMap<size_t> aggregation_key_to_index_,
ColumnNumbersList grouping_sets_keys_indices_,
ContextPtr context_)
: group_by_kind(group_by_kind_)
: InDepthQueryTreeVisitorWithContext(std::move(context_))
, group_by_kind(group_by_kind_)
, aggregation_key_to_index(std::move(aggregation_key_to_index_))
, grouping_sets_keys_indexes(std::move(grouping_sets_keys_indices_))
, context(std::move(context_))
{
}
@ -71,7 +71,7 @@ public:
FunctionOverloadResolverPtr grouping_function_resolver;
bool add_grouping_set_column = false;
bool force_grouping_standard_compatibility = context->getSettingsRef().force_grouping_standard_compatibility;
bool force_grouping_standard_compatibility = getSettings().force_grouping_standard_compatibility;
size_t aggregation_keys_size = aggregation_key_to_index.size();
switch (group_by_kind)
@ -132,7 +132,6 @@ private:
GroupByKind group_by_kind;
QueryTreeNodePtrWithHashMap<size_t> aggregation_key_to_index;
ColumnNumbersList grouping_sets_keys_indexes;
ContextPtr context;
};
void resolveGroupingFunctions(QueryTreeNodePtr & query_node, ContextPtr context)
@ -164,12 +163,17 @@ void resolveGroupingFunctions(QueryTreeNodePtr & query_node, ContextPtr context)
grouping_sets_used_aggregation_keys_list.emplace_back();
auto & grouping_sets_used_aggregation_keys = grouping_sets_used_aggregation_keys_list.back();
QueryTreeNodePtrWithHashSet used_keys_in_set;
for (auto & grouping_set_key_node : grouping_set_keys_list_node_typed.getNodes())
{
if (used_keys_in_set.contains(grouping_set_key_node))
continue;
used_keys_in_set.insert(grouping_set_key_node);
grouping_sets_used_aggregation_keys.push_back(grouping_set_key_node);
if (aggregation_key_to_index.contains(grouping_set_key_node))
continue;
grouping_sets_used_aggregation_keys.push_back(grouping_set_key_node);
aggregation_key_to_index.emplace(grouping_set_key_node, aggregation_node_index);
++aggregation_node_index;
}

View File

@ -5727,8 +5727,27 @@ void QueryAnalyzer::resolveInterpolateColumnsNodeList(QueryTreeNodePtr & interpo
{
auto & interpolate_node_typed = interpolate_node->as<InterpolateNode &>();
auto * column_to_interpolate = interpolate_node_typed.getExpression()->as<IdentifierNode>();
if (!column_to_interpolate)
throw Exception(ErrorCodes::LOGICAL_ERROR, "INTERPOLATE can work only for indentifiers, but {} is found",
interpolate_node_typed.getExpression()->formatASTForErrorMessage());
auto column_to_interpolate_name = column_to_interpolate->getIdentifier().getFullName();
resolveExpressionNode(interpolate_node_typed.getExpression(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
resolveExpressionNode(interpolate_node_typed.getInterpolateExpression(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
bool is_column_constant = interpolate_node_typed.getExpression()->getNodeType() == QueryTreeNodeType::CONSTANT;
auto & interpolation_to_resolve = interpolate_node_typed.getInterpolateExpression();
IdentifierResolveScope interpolate_scope(interpolation_to_resolve, &scope /*parent_scope*/);
auto fake_column_node = std::make_shared<ColumnNode>(NameAndTypePair(column_to_interpolate_name, interpolate_node_typed.getExpression()->getResultType()), interpolate_node_typed.getExpression());
if (is_column_constant)
interpolate_scope.expression_argument_name_to_node.emplace(column_to_interpolate_name, fake_column_node);
resolveExpressionNode(interpolation_to_resolve, interpolate_scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
if (is_column_constant)
interpolation_to_resolve = interpolation_to_resolve->cloneAndReplace(fake_column_node, interpolate_node_typed.getExpression());
}
}

View File

@ -56,7 +56,7 @@ public:
}
if (!found_argument_in_group_by_keys)
throw Exception(ErrorCodes::NOT_AN_AGGREGATE,
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"GROUPING function argument {} is not in GROUP BY keys. In query {}",
grouping_function_arguments_node->formatASTForErrorMessage(),
query_node->formatASTForErrorMessage());

View File

@ -1,9 +1,10 @@
#include <Backups/BackupIO.h>
#include <IO/copyData.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/SeekableReadBuffer.h>
namespace DB
{
@ -12,6 +13,15 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
void IBackupReader::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
auto read_buffer = readFile(file_name);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(size, DBMS_DEFAULT_BUFFER_SIZE), write_mode, write_settings);
copyData(*read_buffer, *write_buffer, size);
write_buffer->finalize();
}
void IBackupWriter::copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name)
{
auto read_buffer = create_read_buffer();

View File

@ -17,6 +17,8 @@ public:
virtual bool fileExists(const String & file_name) = 0;
virtual UInt64 getFileSize(const String & file_name) = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) = 0;
virtual void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings);
virtual DataSourceDescription getDataSourceDescription() const = 0;
};

View File

@ -1,4 +1,5 @@
#include <Backups/BackupIO_Disk.h>
#include <Common/logger_useful.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
@ -12,7 +13,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_)
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & path_)
: disk(disk_), path(path_), log(&Poco::Logger::get("BackupReaderDisk"))
{
}
@ -33,6 +35,21 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderDisk::readFile(const String & fi
return disk->readFile(path / file_name);
}
void BackupReaderDisk::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
if (write_mode == WriteMode::Rewrite)
{
LOG_TRACE(log, "Copying {}/{} from disk {} to {} by the disk", path, file_name, disk->getName(), destination_disk->getName());
disk->copyFile(path / file_name, *destination_disk, destination_path, write_settings);
return;
}
LOG_TRACE(log, "Copying {}/{} from disk {} to {} through buffers", path, file_name, disk->getName(), destination_disk->getName());
IBackupReader::copyFileToDisk(file_name, size, destination_disk, destination_path, write_mode, write_settings);
}
BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_)
{
}

View File

@ -17,11 +17,14 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
private:
DiskPtr disk;
std::filesystem::path path;
Poco::Logger * log;
};
class BackupWriterDisk : public IBackupWriter

View File

@ -1,15 +1,18 @@
#include <Backups/BackupIO_File.h>
#include <Disks/IDisk.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
namespace fs = std::filesystem;
namespace DB
{
BackupReaderFile::BackupReaderFile(const String & path_) : path(path_)
BackupReaderFile::BackupReaderFile(const String & path_) : path(path_), log(&Poco::Logger::get("BackupReaderFile"))
{
}
@ -30,6 +33,22 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderFile::readFile(const String & fi
return createReadBufferFromFileBase(path / file_name, {});
}
void BackupReaderFile::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
if (destination_disk->getDataSourceDescription() == getDataSourceDescription())
{
/// Use more optimal way.
LOG_TRACE(log, "Copying {}/{} to disk {} locally", path, file_name, destination_disk->getName());
fs::copy(path / file_name, fullPath(destination_disk, destination_path), fs::copy_options::overwrite_existing);
return;
}
LOG_TRACE(log, "Copying {}/{} to disk {} through buffers", path, file_name, destination_disk->getName());
IBackupReader::copyFileToDisk(path / file_name, size, destination_disk, destination_path, write_mode, write_settings);
}
BackupWriterFile::BackupWriterFile(const String & path_) : path(path_)
{
}

View File

@ -15,10 +15,13 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
private:
std::filesystem::path path;
Poco::Logger * log;
};
class BackupWriterFile : public IBackupWriter

View File

@ -2,6 +2,7 @@
#if USE_AWS_S3
#include <Common/quoteString.h>
#include <Disks/ObjectStorages/S3/copyS3FileToDisk.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <IO/BackupsIOThreadPool.h>
@ -96,6 +97,7 @@ BackupReaderS3::BackupReaderS3(
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, read_settings(context_->getReadSettings())
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
, log(&Poco::Logger::get("BackupReaderS3"))
{
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
}
@ -127,6 +129,27 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderS3::readFile(const String & file
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings);
}
void BackupReaderS3::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
LOG_TRACE(log, "Copying {} to disk {}", file_name, destination_disk->getName());
copyS3FileToDisk(
client,
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
s3_uri.version_id,
0,
size,
destination_disk,
destination_path,
write_mode,
read_settings,
write_settings,
request_settings,
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupReaderS3"));
}
BackupWriterS3::BackupWriterS3(
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, const ContextPtr & context_)

View File

@ -22,6 +22,8 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
private:
@ -29,6 +31,7 @@ private:
std::shared_ptr<S3::Client> client;
ReadSettings read_settings;
S3Settings::RequestSettings request_settings;
Poco::Logger * log;
};

View File

@ -79,66 +79,6 @@ namespace
}
class BackupImpl::BackupEntryFromBackupImpl : public IBackupEntry
{
public:
BackupEntryFromBackupImpl(
const std::shared_ptr<const BackupImpl> & backup_,
const String & archive_suffix_,
const String & data_file_name_,
UInt64 size_,
const UInt128 checksum_,
BackupEntryPtr base_backup_entry_ = {})
: backup(backup_), archive_suffix(archive_suffix_), data_file_name(data_file_name_), size(size_), checksum(checksum_),
base_backup_entry(std::move(base_backup_entry_))
{
}
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override
{
std::unique_ptr<SeekableReadBuffer> read_buffer;
if (backup->use_archives)
read_buffer = backup->getArchiveReader(archive_suffix)->readFile(data_file_name);
else
read_buffer = backup->reader->readFile(data_file_name);
if (base_backup_entry)
{
size_t base_size = base_backup_entry->getSize();
read_buffer = std::make_unique<ConcatSeekableReadBuffer>(
base_backup_entry->getReadBuffer(), base_size, std::move(read_buffer), size - base_size);
}
return read_buffer;
}
UInt64 getSize() const override { return size; }
std::optional<UInt128> getChecksum() const override { return checksum; }
String getFilePath() const override
{
return data_file_name;
}
DiskPtr tryGetDiskIfExists() const override
{
return nullptr;
}
DataSourceDescription getDataSourceDescription() const override
{
return backup->reader->getDataSourceDescription();
}
private:
const std::shared_ptr<const BackupImpl> backup;
const String archive_suffix;
const String data_file_name;
const UInt64 size;
const UInt128 checksum;
BackupEntryPtr base_backup_entry;
};
BackupImpl::BackupImpl(
const String & backup_name_for_logging_,
const ArchiveParams & archive_params_,
@ -645,24 +585,22 @@ SizeAndChecksum BackupImpl::getFileSizeAndChecksum(const String & file_name) con
return {info->size, info->checksum};
}
BackupEntryPtr BackupImpl::readFile(const String & file_name) const
std::unique_ptr<SeekableReadBuffer> BackupImpl::readFile(const String & file_name) const
{
return readFile(getFileSizeAndChecksum(file_name));
}
BackupEntryPtr BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) const
std::unique_ptr<SeekableReadBuffer> BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) const
{
std::lock_guard lock{mutex};
if (open_mode != OpenMode::READ)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup is not opened for reading");
++num_read_files;
num_read_bytes += size_and_checksum.first;
if (!size_and_checksum.first)
if (size_and_checksum.first == 0)
{
/// Entry's data is empty.
return std::make_unique<BackupEntryFromMemory>(nullptr, 0, UInt128{0, 0});
std::lock_guard lock{mutex};
++num_read_files;
return std::make_unique<ReadBufferFromMemory>(static_cast<char *>(nullptr), 0);
}
auto info_opt = coordination->getFileInfo(size_and_checksum);
@ -677,45 +615,149 @@ BackupEntryPtr BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) c
const auto & info = *info_opt;
std::unique_ptr<SeekableReadBuffer> read_buffer;
std::unique_ptr<SeekableReadBuffer> base_read_buffer;
if (info.size > info.base_size)
{
/// Make `read_buffer` if there is data for this backup entry in this backup.
if (use_archives)
{
std::shared_ptr<IArchiveReader> archive_reader;
{
std::lock_guard lock{mutex};
archive_reader = getArchiveReader(info.archive_suffix);
}
read_buffer = archive_reader->readFile(info.data_file_name);
}
else
{
read_buffer = reader->readFile(info.data_file_name);
}
}
if (info.base_size)
{
/// Make `base_read_buffer` if there is data for this backup entry in the base backup.
if (!base_backup)
{
throw Exception(
ErrorCodes::NO_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
}
if (!base_backup->fileExists(std::pair(info.base_size, info.base_checksum)))
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
}
base_read_buffer = base_backup->readFile(std::pair{info.base_size, info.base_checksum});
}
{
/// Update number of read files.
std::lock_guard lock{mutex};
++num_read_files;
num_read_bytes += info.size;
}
if (!info.base_size)
{
/// Data goes completely from this backup, the base backup isn't used.
return std::make_unique<BackupEntryFromBackupImpl>(
std::static_pointer_cast<const BackupImpl>(shared_from_this()), info.archive_suffix, info.data_file_name, info.size, info.checksum);
/// Data comes completely from this backup, the base backup isn't used.
return read_buffer;
}
if (!base_backup)
else if (info.size == info.base_size)
{
throw Exception(
ErrorCodes::NO_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
/// Data comes completely from the base backup (nothing comes from this backup).
return base_read_buffer;
}
if (!base_backup->fileExists(std::pair(info.base_size, info.base_checksum)))
else
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
}
auto base_entry = base_backup->readFile(std::pair{info.base_size, info.base_checksum});
if (info.size == info.base_size)
{
/// Data goes completely from the base backup (nothing goes from this backup).
return base_entry;
}
{
/// The beginning of the data goes from the base backup,
/// and the ending goes from this backup.
return std::make_unique<BackupEntryFromBackupImpl>(
static_pointer_cast<const BackupImpl>(shared_from_this()), info.archive_suffix, info.data_file_name, info.size, info.checksum, std::move(base_entry));
/// The beginning of the data comes from the base backup,
/// and the ending comes from this backup.
return std::make_unique<ConcatSeekableReadBuffer>(
std::move(base_read_buffer), info.base_size, std::move(read_buffer), info.size - info.base_size);
}
}
size_t BackupImpl::copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const
{
return copyFileToDisk(getFileSizeAndChecksum(file_name), destination_disk, destination_path, write_mode, write_settings);
}
size_t BackupImpl::copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const
{
if (open_mode != OpenMode::READ)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup is not opened for reading");
if (size_and_checksum.first == 0)
{
/// Entry's data is empty.
if (write_mode == WriteMode::Rewrite)
{
/// Just create an empty file.
destination_disk->createFile(destination_path);
}
std::lock_guard lock{mutex};
++num_read_files;
return 0;
}
auto info_opt = coordination->getFileInfo(size_and_checksum);
if (!info_opt)
{
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND,
"Backup {}: Entry {} not found in the backup",
backup_name_for_logging,
formatSizeAndChecksum(size_and_checksum));
}
const auto & info = *info_opt;
bool file_copied = false;
if (info.size && !info.base_size && !use_archives)
{
/// Data comes completely from this backup.
reader->copyFileToDisk(info.data_file_name, info.size, destination_disk, destination_path, write_mode, write_settings);
file_copied = true;
}
else if (info.size && (info.size == info.base_size))
{
/// Data comes completely from the base backup (nothing comes from this backup).
base_backup->copyFileToDisk(std::pair{info.base_size, info.base_checksum}, destination_disk, destination_path, write_mode, write_settings);
file_copied = true;
}
if (file_copied)
{
/// The file is already copied, but `num_read_files` is not updated yet.
std::lock_guard lock{mutex};
++num_read_files;
num_read_bytes += info.size;
}
else
{
/// Use the generic way to copy data. `readFile()` will update `num_read_files`.
auto read_buffer = readFile(size_and_checksum);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(info.size, DBMS_DEFAULT_BUFFER_SIZE),
write_mode, write_settings);
copyData(*read_buffer, *write_buffer, info.size);
write_buffer->finalize();
}
return info.size;
}
namespace
{

View File

@ -73,8 +73,12 @@ public:
UInt64 getFileSize(const String & file_name) const override;
UInt128 getFileChecksum(const String & file_name) const override;
SizeAndChecksum getFileSizeAndChecksum(const String & file_name) const override;
BackupEntryPtr readFile(const String & file_name) const override;
BackupEntryPtr readFile(const SizeAndChecksum & size_and_checksum) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const SizeAndChecksum & size_and_checksum) const override;
size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const override;
size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const override;
void writeFile(const String & file_name, BackupEntryPtr entry) override;
void finalizeWriting() override;
bool supportsWritingInMultipleThreads() const override { return !use_archives; }

View File

@ -1,6 +1,8 @@
#pragma once
#include <Core/Types.h>
#include <Disks/WriteMode.h>
#include <IO/WriteSettings.h>
#include <memory>
#include <optional>
@ -8,7 +10,10 @@
namespace DB
{
class IBackupEntry;
class IDisk;
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;
using DiskPtr = std::shared_ptr<IDisk>;
class SeekableReadBuffer;
/// Represents a backup, i.e. a storage of BackupEntries which can be accessed by their names.
/// A backup can be either incremental or non-incremental. An incremental backup doesn't store
@ -95,8 +100,15 @@ public:
virtual SizeAndChecksum getFileSizeAndChecksum(const String & file_name) const = 0;
/// Reads an entry from the backup.
virtual BackupEntryPtr readFile(const String & file_name) const = 0;
virtual BackupEntryPtr readFile(const SizeAndChecksum & size_and_checksum) const = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) const = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const SizeAndChecksum & size_and_checksum) const = 0;
/// Copies a file from the backup to a specified destination disk. Returns the number of bytes written.
virtual size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite, const WriteSettings & write_settings = {}) const = 0;
virtual size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite, const WriteSettings & write_settings = {}) const = 0;
/// Puts a new entry to the backup.
virtual void writeFile(const String & file_name, BackupEntryPtr entry) = 0;

View File

@ -316,7 +316,7 @@ void RestorerFromBackup::findTableInBackup(const QualifiedTableName & table_name
= *root_path_in_use / "data" / escapeForFileName(table_name_in_backup.database) / escapeForFileName(table_name_in_backup.table);
}
auto read_buffer = backup->readFile(*metadata_path)->getReadBuffer();
auto read_buffer = backup->readFile(*metadata_path);
String create_query_str;
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();
@ -410,7 +410,7 @@ void RestorerFromBackup::findDatabaseInBackup(const String & database_name_in_ba
if (metadata_path)
{
auto read_buffer = backup->readFile(*metadata_path)->getReadBuffer();
auto read_buffer = backup->readFile(*metadata_path);
String create_query_str;
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();

View File

@ -216,6 +216,7 @@ void Connection::disconnect()
socket->close();
socket = nullptr;
connected = false;
nonce.reset();
}
@ -324,6 +325,14 @@ void Connection::receiveHello()
password_complexity_rules.push_back({std::move(original_pattern), std::move(exception_message)});
}
}
if (server_revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2)
{
chassert(!nonce.has_value());
UInt64 read_nonce;
readIntBinary(read_nonce, *in);
nonce.emplace(read_nonce);
}
}
else if (packet_type == Protocol::Server::Exception)
receiveException()->rethrow();
@ -584,6 +593,9 @@ void Connection::sendQuery(
{
#if USE_SSL
std::string data(salt);
// For backward compatibility
if (nonce.has_value())
data += std::to_string(nonce.value());
data += cluster_secret;
data += query;
data += query_id;
@ -593,8 +605,8 @@ void Connection::sendQuery(
std::string hash = encodeSHA256(data);
writeStringBinary(hash, *out);
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Inter-server secret support is disabled, because ClickHouse was built without SSL library");
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Inter-server secret support is disabled, because ClickHouse was built without SSL library");
#endif
}
else

View File

@ -167,7 +167,10 @@ private:
/// For inter-server authorization
String cluster;
String cluster_secret;
/// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET
String salt;
/// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
std::optional<UInt64> nonce;
/// Address is resolved during the first connection (or the following reconnects)
/// Use it only for logging purposes

View File

@ -31,7 +31,7 @@ public:
* max_elements_size == 0 means no elements size restrictions.
*/
explicit LRUCachePolicy(size_t max_size_, size_t max_elements_size_ = 0, OnWeightLossFunction on_weight_loss_function_ = {})
: max_size(std::max(static_cast<size_t>(1), max_size_)), max_elements_size(max_elements_size_)
: max_size(std::max(1uz, max_size_)), max_elements_size(max_elements_size_)
{
Base::on_weight_loss_function = on_weight_loss_function_;
}

View File

@ -97,7 +97,7 @@ private:
* Note: "SM" in the commentaries below stands for STATE MODIFICATION
*/
RWLockImpl::LockHolder
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms)
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms, bool throw_in_fast_path)
{
const auto lock_deadline_tp =
(lock_timeout_ms == std::chrono::milliseconds(0))
@ -130,11 +130,19 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
if (owner_query_it != owner_queries.end())
{
if (wrlock_owner != writers_queue.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode");
{
if (throw_in_fast_path)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode");
return nullptr;
}
/// Lock upgrading is not supported
if (type == Write)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked");
{
if (throw_in_fast_path)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked");
return nullptr;
}
/// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
++owner_query_it->second; /// SM1: nothrow

View File

@ -56,7 +56,7 @@ public:
/// Empty query_id means the lock is acquired from outside of query context (e.g. in a background thread).
LockHolder getLock(Type type, const String & query_id,
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0));
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0), bool throw_in_fast_path = true);
/// Use as query_id to acquire a lock outside the query context.
inline static const String NO_QUERY = String();

View File

@ -35,7 +35,6 @@
#define DBMS_MERGE_TREE_PART_INFO_VERSION 1
/// Minimum revision supporting interserver secret.
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
#define DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO 54443
@ -54,7 +53,7 @@
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
#define DBMS_TCP_PROTOCOL_VERSION 54461
#define DBMS_TCP_PROTOCOL_VERSION 54462
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449
@ -72,3 +71,5 @@
#define DBMS_MIN_PROTOCOL_VERSION_WITH_SERVER_QUERY_TIME_IN_PROGRESS 54460
#define DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES 54461
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 54462

View File

@ -50,7 +50,7 @@ class IColumn;
M(UInt64, max_download_buffer_size, 10*1024*1024, "The maximal size of buffer for parallel downloading (e.g. for URL engine) per each thread.", 0) \
M(UInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
M(UInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).", 0) \
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)", 0) \
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "The maximum number of bytes of a query string parsed by the SQL parser. Data in the VALUES clause of INSERT queries is processed by a separate stream parser (that consumes O(1) RAM) and not affected by this restriction.", 0) \
M(UInt64, interactive_delay, 100000, "The interval in microseconds to check if the request is cancelled, and to send progress info.", 0) \
M(Seconds, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connection timeout if there are no replicas.", 0) \
M(Milliseconds, connect_timeout_with_failover_ms, 50, "Connection timeout for selecting first healthy replica.", 0) \
@ -253,6 +253,8 @@ class IColumn;
M(Bool, send_progress_in_http_headers, false, "Send progress notifications using X-ClickHouse-Progress headers. Some clients do not support high amount of HTTP headers (Python requests in particular), so it is disabled by default.", 0) \
\
M(UInt64, http_headers_progress_interval_ms, 100, "Do not send HTTP headers X-ClickHouse-Progress more frequently than at each specified interval.", 0) \
M(Bool, http_wait_end_of_query, false, "Enable HTTP response buffering on the server-side.", 0) \
M(UInt64, http_response_buffer_size, false, "The number of bytes to buffer in the server memory before sending a HTTP response to the client or flushing to disk (when http_wait_end_of_query is enabled).", 0) \
\
M(Bool, fsync_metadata, true, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.", 0) \
\
@ -716,6 +718,7 @@ class IColumn;
M(Float, insert_keeper_fault_injection_probability, 0.0f, "Approximate probability of failure for a keeper request during insert. Valid value is in interval [0.0f, 1.0f]", 0) \
M(UInt64, insert_keeper_fault_injection_seed, 0, "0 - random seed, otherwise the setting value", 0) \
M(Bool, force_aggregation_in_order, false, "Force use of aggregation in order on remote nodes during distributed aggregation. PLEASE, NEVER CHANGE THIS SETTING VALUE MANUALLY!", IMPORTANT) \
M(UInt64, http_max_request_param_data_size, 10_MiB, "Limit on size of request data used as a query parameter in predefined HTTP requests.", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.

View File

@ -68,6 +68,15 @@ public:
return disk.writeFile(path, buf_size, mode, settings);
}
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override
{
disk.writeFileUsingCustomWriteObject(path, mode, std::move(custom_write_object_function));
}
void removeFile(const std::string & path) override
{
disk.removeFile(path);

View File

@ -38,6 +38,15 @@ void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const Strin
out->finalize();
}
void IDisk::writeFileUsingCustomWriteObject(
const String &, WriteMode, std::function<size_t(const StoredObject &, WriteMode, const std::optional<ObjectAttributes> &)>)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method `writeFileUsingCustomWriteObject()` is not implemented for disk: {}",
getDataSourceDescription().type);
}
DiskTransactionPtr IDisk::createTransaction()
{

View File

@ -209,6 +209,15 @@ public:
WriteMode mode = WriteMode::Rewrite,
const WriteSettings & settings = {}) = 0;
/// Write a file using a custom function to write an object to the disk's object storage.
/// This method is alternative to writeFile(), the difference is that writeFile() calls IObjectStorage::writeObject()
/// to write an object to the object storage while this method allows to specify a callback for that.
virtual void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function);
/// Remove file. Throws exception if file doesn't exists or it's a directory.
/// Return whether file was finally removed. (For remote disks it is not always removed).
virtual void removeFile(const String & path) = 0;

View File

@ -68,6 +68,13 @@ public:
const WriteSettings & settings = {},
bool autocommit = true) = 0;
/// Write a file using a custom function to write an object to the disk's object storage.
virtual void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) = 0;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
virtual void removeFile(const std::string & path) = 0;

View File

@ -577,6 +577,17 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
return result;
}
void DiskObjectStorage::writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function)
{
LOG_TEST(log, "Write file: {}", path);
auto transaction = createObjectStorageTransaction();
return transaction->writeFileUsingCustomWriteObject(path, mode, std::move(custom_write_object_function));
}
void DiskObjectStorage::applyNewSettings(
const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &)
{

View File

@ -152,6 +152,12 @@ public:
WriteMode mode,
const WriteSettings & settings) override;
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;

View File

@ -670,6 +670,44 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
}
void DiskObjectStorageTransaction::writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function)
{
/// This function is a simplified and adapted version of DiskObjectStorageTransaction::writeFile().
auto blob_name = object_storage.generateBlobNameForPath(path);
std::optional<ObjectAttributes> object_attributes;
if (metadata_helper)
{
auto revision = metadata_helper->revision_counter + 1;
metadata_helper->revision_counter++;
object_attributes = {
{"path", path}
};
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
}
auto object = StoredObject::create(object_storage, fs::path(metadata_storage.getObjectStorageRootPath()) / blob_name);
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object);
operations_to_execute.emplace_back(std::move(write_operation));
/// We always use mode Rewrite because we simulate append using metadata and different files
size_t object_size = std::move(custom_write_object_function)(object, WriteMode::Rewrite, object_attributes);
/// Create metadata (see create_metadata_callback in DiskObjectStorageTransaction::writeFile()).
if (mode == WriteMode::Rewrite)
metadata_transaction->createMetadataFile(path, blob_name, object_size);
else
metadata_transaction->addBlobToMetadata(path, blob_name, object_size);
metadata_transaction->commit();
}
void DiskObjectStorageTransaction::createHardLink(const std::string & src_path, const std::string & dst_path)
{
operations_to_execute.emplace_back(

View File

@ -99,6 +99,13 @@ public:
const WriteSettings & settings = {},
bool autocommit = true) override;
/// Write a file using a custom function to write an object to the disk's object storage.
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override;
void removeFile(const std::string & path) override;
void removeFileIfExists(const std::string & path) override;
void removeDirectory(const std::string & path) override;

View File

@ -1,7 +1,4 @@
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Common/ProfileEvents.h>
#include <Interpreters/Context.h>
#if USE_AWS_S3
@ -18,10 +15,12 @@
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3/copyS3File.h>
#include <Interpreters/Context.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Common/getRandomASCIIString.h>
#include <Common/ProfileEvents.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/logger_useful.h>
#include <Common/MultiVersion.h>

View File

@ -0,0 +1,69 @@
#include <Disks/ObjectStorages/S3/copyS3FileToDisk.h>
#if USE_AWS_S3
#include <IO/S3/getObjectInfo.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/copyData.h>
#include <IO/S3/copyS3File.h>
namespace DB
{
void copyS3FileToDisk(
const std::shared_ptr<const S3::Client> & s3_client,
const String & src_bucket,
const String & src_key,
const std::optional<String> & version_id,
std::optional<size_t> src_offset,
std::optional<size_t> src_size,
DiskPtr destination_disk,
const String & destination_path,
WriteMode write_mode,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const S3Settings::RequestSettings & request_settings,
ThreadPoolCallbackRunner<void> scheduler)
{
if (!src_offset)
src_offset = 0;
if (!src_size)
src_size = S3::getObjectSize(*s3_client, src_bucket, src_key, version_id.value_or(""), request_settings) - *src_offset;
auto destination_data_source_description = destination_disk->getDataSourceDescription();
if (destination_data_source_description != DataSourceDescription{DataSourceType::S3, s3_client->getInitialEndpoint(), false, false})
{
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} through buffers", src_key, destination_disk->getName());
ReadBufferFromS3 read_buffer{s3_client, src_bucket, src_key, {}, request_settings, read_settings};
if (*src_offset)
read_buffer.seek(*src_offset, SEEK_SET);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(*src_size, DBMS_DEFAULT_BUFFER_SIZE), write_mode, write_settings);
copyData(read_buffer, *write_buffer, *src_size);
write_buffer->finalize();
return;
}
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} using native copy", src_key, destination_disk->getName());
String dest_bucket = destination_disk->getObjectStorage()->getObjectsNamespace();
auto custom_write_object = [&](const StoredObject & object_, WriteMode write_mode_, const std::optional<ObjectAttributes> & object_attributes_) -> size_t
{
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
chassert(write_mode_ == WriteMode::Rewrite);
copyS3File(s3_client, src_bucket, src_key, *src_offset, *src_size, dest_bucket, /* dest_key= */ object_.absolute_path,
request_settings, object_attributes_, scheduler, /* for_disk_s3= */ true);
return *src_size;
};
destination_disk->writeFileUsingCustomWriteObject(destination_path, write_mode, custom_write_object);
}
}
#endif

View File

@ -0,0 +1,36 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <Disks/IDisk.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
namespace DB
{
/// Copies an object from S3 bucket to a disk of any type.
/// Depending on the disk the function can either do copying though buffers
/// (i.e. download the object by portions and then write those portions to the specified disk),
/// or perform a server-side copy.
void copyS3FileToDisk(
const std::shared_ptr<const S3::Client> & s3_client,
const String & src_bucket,
const String & src_key,
const std::optional<String> & version_id,
std::optional<size_t> src_offset,
std::optional<size_t> src_size,
DiskPtr destination_disk,
const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite,
const ReadSettings & read_settings = {},
const WriteSettings & write_settings = {},
const S3Settings::RequestSettings & request_settings = {},
ThreadPoolCallbackRunner<void> scheduler = {});
}
#endif

View File

@ -285,9 +285,9 @@ struct NgramDistanceImpl
size_t first_size = dispatchSearcher(calculateHaystackStatsAndMetric<false>, data.data(), data_size, common_stats.get(), distance, nullptr);
/// For !symmetric version we should not use first_size.
if constexpr (symmetric)
res = distance * 1.f / std::max(first_size + second_size, static_cast<size_t>(1));
res = distance * 1.f / std::max(first_size + second_size, 1uz);
else
res = 1.f - distance * 1.f / std::max(second_size, static_cast<size_t>(1));
res = 1.f - distance * 1.f / std::max(second_size, 1uz);
}
else
{
@ -353,9 +353,9 @@ struct NgramDistanceImpl
/// For !symmetric version we should not use haystack_stats_size.
if constexpr (symmetric)
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, static_cast<size_t>(1));
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, 1uz);
else
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, static_cast<size_t>(1));
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, 1uz);
}
else
{
@ -424,7 +424,7 @@ struct NgramDistanceImpl
for (size_t j = 0; j < needle_stats_size; ++j)
--common_stats[needle_ngram_storage[j]];
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, static_cast<size_t>(1));
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, 1uz);
}
else
{
@ -471,9 +471,9 @@ struct NgramDistanceImpl
ngram_storage.get());
/// For !symmetric version we should not use haystack_stats_size.
if constexpr (symmetric)
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, static_cast<size_t>(1));
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, 1uz);
else
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, static_cast<size_t>(1));
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, 1uz);
}
else
{

View File

@ -71,8 +71,7 @@ restoreUserDefinedSQLObjects(RestorerFromBackup & restorer, const String & data_
String function_name = unescapeForFileName(escaped_function_name);
String filepath = data_path_in_backup_fs / filename;
auto backup_entry = backup->readFile(filepath);
auto in = backup_entry->getReadBuffer();
auto in = backup->readFile(filepath);
String statement_def;
readStringUntilEOF(statement_def, *in);

View File

@ -291,7 +291,7 @@ public:
ssize_t remain_byte = src.getElementSize() - offset_byte;
if (length < 0)
{
length_byte = std::max(remain_byte + (length / word_size), static_cast<ssize_t>(0));
length_byte = std::max(remain_byte + (length / word_size), 0z);
over_bit = word_size + (length % word_size);
if (length_byte == 1 && over_bit <= offset_bit) // begin and end are in same byte AND there are no gaps
length_byte = 0;
@ -330,7 +330,7 @@ public:
size_t size = src.getElementSize();
if (length < 0)
{
length_byte = std::max(static_cast<ssize_t>(offset_byte) + (length / word_size), static_cast<ssize_t>(0));
length_byte = std::max(static_cast<ssize_t>(offset_byte) + (length / word_size), 0z);
over_bit = word_size + (length % word_size);
if (length_byte == 1 && over_bit <= offset_bit) // begin and end are in same byte AND there are no gaps
length_byte = 0;
@ -395,7 +395,7 @@ public:
}
else
{
length_byte = std::max(remain_byte + (static_cast<ssize_t>(length) / word_size), static_cast<ssize_t>(0));
length_byte = std::max(remain_byte + (static_cast<ssize_t>(length) / word_size), 0z);
over_bit = word_size + (length % word_size);
if (length_byte == 1 && over_bit <= offset_bit) // begin and end are in same byte AND there are no gaps
length_byte = 0;

View File

@ -106,7 +106,7 @@ void MemoryWriteBuffer::addChunk()
}
else
{
next_chunk_size = std::max(static_cast<size_t>(1), static_cast<size_t>(chunk_tail->size() * growth_rate));
next_chunk_size = std::max(1uz, static_cast<size_t>(chunk_tail->size() * growth_rate));
next_chunk_size = std::min(next_chunk_size, max_chunk_size);
}

View File

@ -123,9 +123,8 @@ Client::Client(
{
auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(accessEndpointProvider().get());
endpoint_provider->GetBuiltInParameters().GetParameter("Region").GetString(explicit_region);
std::string endpoint;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && endpoint.find(".amazonaws.com") != std::string::npos;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(initial_endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && initial_endpoint.find(".amazonaws.com") != std::string::npos;
cache = std::make_shared<ClientCache>();
ClientCacheRegistry::instance().registerClient(cache);
@ -133,6 +132,7 @@ Client::Client(
Client::Client(const Client & other)
: Aws::S3::S3Client(other)
, initial_endpoint(other.initial_endpoint)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, max_redirects(other.max_redirects)

View File

@ -109,6 +109,9 @@ public:
}
}
/// Returns the initial endpoint.
const String & getInitialEndpoint() const { return initial_endpoint; }
/// Decorator for RetryStrategy needed for this client to work correctly.
/// We want to manually handle permanent moves (status code 301) because:
/// - redirect location is written in XML format inside the response body something that doesn't exist for HEAD
@ -198,6 +201,8 @@ private:
bool checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const;
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
String initial_endpoint;
std::string explicit_region;
mutable bool detect_region = true;

View File

@ -10,6 +10,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
}
namespace
@ -91,6 +92,13 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<v
copyDataImpl(from, to, true, bytes, cancellation_hook, nullptr);
}
void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes)
{
copyDataImpl(from, to, false, max_bytes, nullptr, nullptr);
if (!from.eof())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data, max readable size reached.");
}
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler)
{
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, throttler);

View File

@ -27,6 +27,9 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atom
void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook);
/// Copies at most `max_bytes` bytes from ReadBuffer to WriteBuffer. If there are more bytes, then throws an exception.
void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes);
/// Same as above but also use throttler to limit maximum speed
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler);
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler);

View File

@ -83,7 +83,10 @@ void ReplaceQueryParameterVisitor::visitQueryParameter(ASTPtr & ast)
IColumn & temp_column = *temp_column_ptr;
ReadBufferFromString read_buffer{value};
FormatSettings format_settings;
data_type->getDefaultSerialization()->deserializeTextEscaped(temp_column, read_buffer, format_settings);
if (ast_param.name == "_request_body")
data_type->getDefaultSerialization()->deserializeWholeText(temp_column, read_buffer, format_settings);
else
data_type->getDefaultSerialization()->deserializeTextEscaped(temp_column, read_buffer, format_settings);
if (!read_buffer.eof())
throw Exception(ErrorCodes::BAD_QUERY_PARAMETER,

View File

@ -395,7 +395,11 @@ void addMergingAggregatedStep(QueryPlan & query_plan,
* but it can work more slowly.
*/
Aggregator::Params params(aggregation_analysis_result.aggregation_keys,
auto keys = aggregation_analysis_result.aggregation_keys;
if (!aggregation_analysis_result.grouping_sets_parameters_list.empty())
keys.insert(keys.begin(), "__grouping_set");
Aggregator::Params params(keys,
aggregation_analysis_result.aggregate_descriptions,
query_analysis_result.aggregate_overflow_row,
settings.max_threads,

View File

@ -45,7 +45,7 @@ void PrettySpaceBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port
if (col.type->shouldAlignRightInPrettyFormats())
{
for (ssize_t k = 0; k < std::max(static_cast<ssize_t>(0), static_cast<ssize_t>(max_widths[i] - name_widths[i])); ++k)
for (ssize_t k = 0; k < std::max(0z, static_cast<ssize_t>(max_widths[i] - name_widths[i])); ++k)
writeChar(' ', out);
if (format_settings.pretty.color)
@ -62,7 +62,7 @@ void PrettySpaceBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port
if (format_settings.pretty.color)
writeCString("\033[0m", out);
for (ssize_t k = 0; k < std::max(static_cast<ssize_t>(0), static_cast<ssize_t>(max_widths[i] - name_widths[i])); ++k)
for (ssize_t k = 0; k < std::max(0z, static_cast<ssize_t>(max_widths[i] - name_widths[i])); ++k)
writeChar(' ', out);
}
}

View File

@ -1,5 +1,7 @@
#include <memory>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
@ -7,6 +9,71 @@
namespace DB::QueryPlanOptimizations
{
/// build actions DAG from stack of steps
static ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
{
if (dag_stack.empty())
return nullptr;
ActionsDAGPtr path_actions = dag_stack.back()->clone();
dag_stack.pop_back();
while (!dag_stack.empty())
{
ActionsDAGPtr clone = dag_stack.back()->clone();
dag_stack.pop_back();
path_actions->mergeInplace(std::move(*clone));
}
return path_actions;
}
static const ActionsDAG::Node * getOriginalNodeForOutputAlias(const ActionsDAGPtr & actions, const String & output_name)
{
/// find alias in output
const ActionsDAG::Node * output_alias = nullptr;
for (const auto * node : actions->getOutputs())
{
if (node->result_name == output_name)
{
output_alias = node;
break;
}
}
if (!output_alias)
return nullptr;
/// find original(non alias) node it refers to
const ActionsDAG::Node * node = output_alias;
while (node && node->type == ActionsDAG::ActionType::ALIAS)
{
chassert(!node->children.empty());
node = node->children.front();
}
if (node && node->type != ActionsDAG::ActionType::INPUT)
return nullptr;
return node;
}
static std::set<std::string>
getOriginalDistinctColumns(const ColumnsWithTypeAndName & distinct_columns, std::vector<ActionsDAGPtr> & dag_stack)
{
auto actions = buildActionsForPlanPath(dag_stack);
std::set<std::string> original_distinct_columns;
for (const auto & column : distinct_columns)
{
/// const columns doesn't affect DISTINCT, so skip them
if (isColumnConst(*column.column))
continue;
const auto * input_node = getOriginalNodeForOutputAlias(actions, column.name);
if (!input_node)
break;
original_distinct_columns.insert(input_node->result_name);
}
return original_distinct_columns;
}
size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
{
/// check if it is preliminary distinct node
@ -22,8 +89,10 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
/// walk through the plan
/// (1) check if nodes below preliminary distinct preserve sorting
/// (2) gather transforming steps to update their sorting properties later
/// (3) gather actions DAG to find original names for columns in distinct step later
std::vector<ITransformingStep *> steps_to_update;
QueryPlan::Node * node = parent_node;
std::vector<ActionsDAGPtr> dag_stack;
while (!node->children.empty())
{
auto * step = dynamic_cast<ITransformingStep *>(node->step.get());
@ -36,6 +105,11 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
steps_to_update.push_back(step);
if (const auto * const expr = typeid_cast<const ExpressionStep *>(step); expr)
dag_stack.push_back(expr->getExpression());
else if (const auto * const filter = typeid_cast<const FilterStep *>(step); filter)
dag_stack.push_back(filter->getExpression());
node = node->children.front();
}
@ -50,28 +124,24 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
if (read_from_merge_tree->getOutputStream().sort_description.empty())
return 0;
/// find non-const columns in DISTINCT
/// get original names for DISTINCT columns
const ColumnsWithTypeAndName & distinct_columns = pre_distinct->getOutputStream().header.getColumnsWithTypeAndName();
std::set<std::string_view> non_const_columns;
for (const auto & column : distinct_columns)
{
if (!isColumnConst(*column.column))
non_const_columns.emplace(column.name);
}
auto original_distinct_columns = getOriginalDistinctColumns(distinct_columns, dag_stack);
const Names& sorting_key_columns = read_from_merge_tree->getStorageMetadata()->getSortingKeyColumns();
/// check if DISTINCT has the same columns as sorting key
const Names & sorting_key_columns = read_from_merge_tree->getStorageMetadata()->getSortingKeyColumns();
size_t number_of_sorted_distinct_columns = 0;
for (const auto & column_name : sorting_key_columns)
{
if (non_const_columns.end() == non_const_columns.find(column_name))
if (!original_distinct_columns.contains(column_name))
break;
++number_of_sorted_distinct_columns;
}
/// apply optimization only when distinct columns match or form prefix of sorting key
/// todo: check if reading in order optimization would be beneficial when sorting key is prefix of columns in DISTINCT
if (number_of_sorted_distinct_columns != non_const_columns.size())
if (number_of_sorted_distinct_columns != original_distinct_columns.size())
return 0;
/// check if another read in order optimization is already applied

View File

@ -589,11 +589,12 @@ void HTTPHandler::processQuery(
/// At least, we should postpone sending of first buffer_size result bytes
size_t buffer_size_total = std::max(
params.getParsed<size_t>("buffer_size", DBMS_DEFAULT_BUFFER_SIZE), static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE));
params.getParsed<size_t>("buffer_size", context->getSettingsRef().http_response_buffer_size),
static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE));
/// If it is specified, the whole result will be buffered.
/// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file.
bool buffer_until_eof = params.getParsed<bool>("wait_end_of_query", false);
bool buffer_until_eof = params.getParsed<bool>("wait_end_of_query", context->getSettingsRef().http_wait_end_of_query);
size_t buffer_size_http = DBMS_DEFAULT_BUFFER_SIZE;
size_t buffer_size_memory = (buffer_size_total > buffer_size_http) ? buffer_size_total : 0;
@ -782,7 +783,6 @@ void HTTPHandler::processQuery(
/// they will be applied in ProcessList::insert() from executeQuery() itself.
const auto & query = getQuery(request, params, context);
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query);
in = has_external_data ? std::move(in_param) : std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
/// HTTP response compression is turned on only if the client signalled that they support it
/// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on.
@ -832,7 +832,8 @@ void HTTPHandler::processQuery(
});
}
customizeContext(request, context);
customizeContext(request, context, *in_post_maybe_compressed);
in = has_external_data ? std::move(in_param) : std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response, this] (const QueryResultDetails & details)
@ -1152,7 +1153,7 @@ bool PredefinedQueryHandler::customizeQueryParam(ContextMutablePtr context, cons
return false;
}
void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, ContextMutablePtr context)
void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, ContextMutablePtr context, ReadBuffer & body)
{
/// If in the configuration file, the handler's header is regex and contains named capture group
/// We will extract regex named capture groups as query parameters
@ -1186,6 +1187,15 @@ void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, Conte
const auto & header_value = request.get(header_name);
set_query_params(header_value.data(), header_value.data() + header_value.size(), regex);
}
if (unlikely(receive_params.contains("_request_body") && !context->getQueryParameters().contains("_request_body")))
{
WriteBufferFromOwnString value;
const auto & settings = context->getSettingsRef();
copyDataMaxBytes(body, value, settings.http_max_request_param_data_size);
context->setQueryParameter("_request_body", value.str());
}
}
std::string PredefinedQueryHandler::getQuery(HTTPServerRequest & request, HTMLForm & params, ContextMutablePtr context)

View File

@ -36,7 +36,7 @@ public:
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
/// This method is called right before the query execution.
virtual void customizeContext(HTTPServerRequest & /* request */, ContextMutablePtr /* context */) {}
virtual void customizeContext(HTTPServerRequest & /* request */, ContextMutablePtr /* context */, ReadBuffer & /* body */) {}
virtual bool customizeQueryParam(ContextMutablePtr context, const std::string & key, const std::string & value) = 0;
@ -163,7 +163,7 @@ public:
, const CompiledRegexPtr & url_regex_, const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_
, const std::optional<std::string> & content_type_override_);
virtual void customizeContext(HTTPServerRequest & request, ContextMutablePtr context) override;
void customizeContext(HTTPServerRequest & request, ContextMutablePtr context, ReadBuffer & body) override;
std::string getQuery(HTTPServerRequest & request, HTMLForm & params, ContextMutablePtr context) override;

View File

@ -41,6 +41,7 @@
#include <Compression/CompressionFactory.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/thread_local_rng.h>
#include <fmt/format.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
@ -1279,6 +1280,18 @@ void TCPHandler::sendHello()
writeStringBinary(exception_message, *out);
}
}
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2)
{
chassert(!nonce.has_value());
/// Contains lots of stuff (including time), so this should be enough for NONCE.
nonce.emplace(thread_local_rng());
writeIntBinary(nonce.value(), *out);
}
else
{
LOG_WARNING(LogFrequencyLimiter(log, 10),
"Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster.");
}
out->next();
}
@ -1459,20 +1472,30 @@ void TCPHandler::receiveQuery()
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
passed_params.read(*in, settings_format);
/// TODO Unify interserver authentication (and make sure that it's secure enough)
if (is_interserver_mode)
{
client_info.interface = ClientInfo::Interface::TCP_INTERSERVER;
#if USE_SSL
String cluster_secret = server.context()->getCluster(cluster)->getSecret();
if (salt.empty() || cluster_secret.empty())
{
auto exception = Exception(ErrorCodes::AUTHENTICATION_FAILED, "Interserver authentication failed");
session->onAuthenticationFailure(/* user_name */ std::nullopt, socket().peerAddress(), exception);
auto exception = Exception(ErrorCodes::AUTHENTICATION_FAILED, "Interserver authentication failed (no salt/cluster secret)");
session->onAuthenticationFailure(/* user_name= */ std::nullopt, socket().peerAddress(), exception);
throw exception; /// NOLINT
}
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 && !nonce.has_value())
{
auto exception = Exception(ErrorCodes::AUTHENTICATION_FAILED, "Interserver authentication failed (no nonce)");
session->onAuthenticationFailure(/* user_name= */ std::nullopt, socket().peerAddress(), exception);
throw exception; /// NOLINT
}
std::string data(salt);
// For backward compatibility
if (nonce.has_value())
data += std::to_string(nonce.value());
data += cluster_secret;
data += state.query;
data += state.query_id;

View File

@ -1,5 +1,6 @@
#pragma once
#include <optional>
#include <Poco/Net/TCPServerConnection.h>
#include <base/getFQDNOrHostName.h>
@ -188,7 +189,10 @@ private:
/// For inter-server secret (remote_server.*.secret)
bool is_interserver_mode = false;
/// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET
String salt;
/// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
std::optional<UInt64> nonce;
String cluster;
std::mutex task_callback_mutex;

View File

@ -40,7 +40,7 @@ RWLockImpl::LockHolder IStorage::tryLockTimed(
{
const String type_str = type == RWLockImpl::Type::Read ? "READ" : "WRITE";
throw Exception(ErrorCodes::DEADLOCK_AVOIDED,
"{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry.",
"{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry",
type_str, getStorageID(), acquire_timeout.count());
}
return lock_holder;

View File

@ -5082,12 +5082,8 @@ void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> r
if (filename.ends_with(IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME))
continue;
auto backup_entry = backup->readFile(part_path_in_backup_fs / filename);
auto read_buffer = backup_entry->getReadBuffer();
auto write_buffer = disk->writeFile(temp_part_dir / filename);
copyData(*read_buffer, *write_buffer);
write_buffer->finalize();
reservation->update(reservation->getSize() - backup_entry->getSize());
size_t file_size = backup->copyFileToDisk(part_path_in_backup_fs / filename, disk, temp_part_dir / filename);
reservation->update(reservation->getSize() - file_size);
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);

View File

@ -18,6 +18,7 @@
#include <Storages/getStructureOfRemoteTable.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageDummy.h>
#include <Storages/removeGroupingFunctionSpecializations.h>
#include <Columns/ColumnConst.h>
@ -1020,6 +1021,8 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
if (!replacement_map.empty())
query_tree_to_modify = query_tree_to_modify->cloneAndReplace(replacement_map);
removeGroupingFunctionSpecializations(query_tree_to_modify);
return query_tree_to_modify;
}

View File

@ -21,21 +21,24 @@
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Poco/String.h> /// toLower
#include <Poco/String.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int UNSUPPORTED_JOIN_KEYS;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int INCOMPATIBLE_TYPE_OF_JOIN;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int DEADLOCK_AVOIDED;
extern const int INCOMPATIBLE_TYPE_OF_JOIN;
extern const int LOGICAL_ERROR;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int NOT_IMPLEMENTED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNSUPPORTED_JOIN_KEYS;
}
StorageJoin::StorageJoin(
@ -78,6 +81,14 @@ RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock,
return tryLockTimed(lock, type, query_id, acquire_timeout);
}
RWLockImpl::LockHolder StorageJoin::tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context)
{
const String query_id = context ? context->getInitialQueryId() : RWLockImpl::NO_QUERY;
const std::chrono::milliseconds acquire_timeout
= context ? context->getSettingsRef().lock_acquire_timeout : std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC);
return lock->getLock(type, query_id, acquire_timeout, false);
}
SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
std::lock_guard mutate_lock(mutate_mutex);
@ -95,7 +106,7 @@ void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPt
LOG_INFO(&Poco::Logger::get("StorageJoin"), "Path {} is already removed from disk {}", path, disk->getName());
disk->createDirectories(path);
disk->createDirectories(path + "tmp/");
disk->createDirectories(fs::path(path) / "tmp/");
increment = 0;
join = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);
@ -238,8 +249,12 @@ void StorageJoin::insertBlock(const Block & block, ContextPtr context)
{
Block block_to_insert = block;
convertRightBlock(block_to_insert);
TableLockHolder holder = tryLockForCurrentQueryTimedWithContext(rwlock, RWLockImpl::Write, context);
/// Protection from `INSERT INTO test_table_join SELECT * FROM test_table_join`
if (!holder)
throw Exception(ErrorCodes::DEADLOCK_AVOIDED, "StorageJoin: cannot insert data because current query tries to read from this storage");
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
join->addJoinedBlock(block_to_insert, true);
}

View File

@ -100,12 +100,15 @@ private:
/// Protect state for concurrent use in insertFromBlock and joinBlock.
/// Lock is stored in HashJoin instance during query and blocks concurrent insertions.
mutable RWLock rwlock = RWLockImpl::create();
mutable std::mutex mutate_mutex;
void insertBlock(const Block & block, ContextPtr context) override;
void finishInsert() override {}
size_t getSize(ContextPtr context) const override;
RWLockImpl::LockHolder tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const;
/// Same as tryLockTimedWithContext, but returns `nullptr` if lock is already acquired by current query.
static RWLockImpl::LockHolder tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context);
void convertRightBlock(Block & block) const;
};

View File

@ -1027,11 +1027,7 @@ void StorageLog::restoreDataImpl(const BackupPtr & backup, const String & data_p
if (!backup->fileExists(file_path_in_backup))
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", file_path_in_backup);
auto backup_entry = backup->readFile(file_path_in_backup);
auto in = backup_entry->getReadBuffer();
auto out = disk->writeFile(data_file.path, max_compress_block_size, WriteMode::Append);
copyData(*in, *out);
out->finalize();
backup->copyFileToDisk(file_path_in_backup, disk, data_file.path, WriteMode::Append);
}
if (use_marks_file)
@ -1062,8 +1058,7 @@ void StorageLog::restoreDataImpl(const BackupPtr & backup, const String & data_p
old_num_rows[i] = num_marks ? data_files[i].marks[num_marks - 1].rows : 0;
}
auto backup_entry = backup->readFile(file_path_in_backup);
auto marks_rb = backup_entry->getReadBuffer();
auto marks_rb = backup->readFile(file_path_in_backup);
for (size_t i = 0; i != num_extra_marks; ++i)
{

View File

@ -595,8 +595,7 @@ void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & dat
if (!backup->fileExists(index_file_path))
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", index_file_path);
auto backup_entry = backup->readFile(index_file_path);
auto in = backup_entry->getReadBuffer();
auto in = backup->readFile(index_file_path);
CompressedReadBuffer compressed_in{*in};
index.read(compressed_in);
}
@ -610,8 +609,7 @@ void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & dat
if (!backup->fileExists(data_file_path))
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", data_file_path);
auto backup_entry = backup->readFile(data_file_path);
std::unique_ptr<ReadBuffer> in = backup_entry->getReadBuffer();
auto in = backup->readFile(data_file_path);
std::optional<TemporaryFileOnDisk> temp_data_file;
if (!dynamic_cast<ReadBufferFromFileBase *>(in.get()))
{

View File

@ -448,7 +448,7 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
size_t current_streams = std::min(current_need_streams, remaining_streams);
remaining_streams -= current_streams;
current_streams = std::max(static_cast<size_t>(1), current_streams);
current_streams = std::max(1uz, current_streams);
const auto & storage = std::get<1>(table);

View File

@ -622,11 +622,7 @@ void StorageStripeLog::restoreDataImpl(const BackupPtr & backup, const String &
if (!backup->fileExists(file_path_in_backup))
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", file_path_in_backup);
auto backup_entry = backup->readFile(file_path_in_backup);
auto in = backup_entry->getReadBuffer();
auto out = disk->writeFile(data_file_path, max_compress_block_size, WriteMode::Append);
copyData(*in, *out);
out->finalize();
backup->copyFileToDisk(file_path_in_backup, disk, data_file_path, WriteMode::Append);
}
/// Append the index.
@ -636,8 +632,7 @@ void StorageStripeLog::restoreDataImpl(const BackupPtr & backup, const String &
if (!backup->fileExists(index_path_in_backup))
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", index_path_in_backup);
auto backup_entry = backup->readFile(index_path_in_backup);
auto index_in = backup_entry->getReadBuffer();
auto index_in = backup->readFile(index_path_in_backup);
CompressedReadBuffer index_compressed_in{*index_in};
extra_indices.read(index_compressed_in);

View File

@ -0,0 +1,65 @@
#include <Storages/removeGroupingFunctionSpecializations.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Common/Exception.h>
#include <Functions/grouping.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class GeneralizeGroupingFunctionForDistributedVisitor : public InDepthQueryTreeVisitor<GeneralizeGroupingFunctionForDistributedVisitor>
{
public:
static void visitImpl(QueryTreeNodePtr & node)
{
auto * function = node->as<FunctionNode>();
if (!function)
return;
const auto & function_name = function->getFunctionName();
bool ordinary_grouping = function_name == "groupingOrdinary";
if (!ordinary_grouping
&& function_name != "groupingForRollup"
&& function_name != "groupingForCube"
&& function_name != "groupingForGroupingSets")
return;
if (!ordinary_grouping)
{
auto & arguments = function->getArguments().getNodes();
if (arguments.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Grouping function specialization must have arguments");
auto * grouping_set_arg = arguments[0]->as<ColumnNode>();
if (!grouping_set_arg || grouping_set_arg->getColumnName() != "__grouping_set")
throw Exception(ErrorCodes::LOGICAL_ERROR,
"The first argument of Grouping function specialization must be '__grouping_set' column but {} found",
arguments[0]->dumpTree());
arguments.erase(arguments.begin());
}
// This node will be only converted to AST, so we don't need
// to pass the correct force_compatibility flag to FunctionGrouping.
auto function_adaptor = std::make_shared<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionGrouping>(false)
);
function->resolveAsFunction(function_adaptor);
}
};
void removeGroupingFunctionSpecializations(QueryTreeNodePtr & node)
{
GeneralizeGroupingFunctionForDistributedVisitor visitor;
visitor.visit(node);
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <Analyzer/IQueryTreeNode.h>
namespace DB
{
void removeGroupingFunctionSpecializations(QueryTreeNodePtr & node);
}

View File

@ -437,7 +437,7 @@ class FailureReason(enum.Enum):
SERVER_DIED = "server died"
EXIT_CODE = "return code: "
STDERR = "having stderror: "
EXCEPTION = "having having exception in stdout: "
EXCEPTION = "having exception in stdout: "
RESULT_DIFF = "result differs with reference: "
TOO_LONG = "Test runs too long (> 60s). Make it faster."
INTERNAL_QUERY_FAIL = "Internal query (CREATE/DROP DATABASE) failed:"
@ -523,6 +523,8 @@ class SettingsRandomizer:
"merge_tree_coarse_index_granularity": lambda: random.randint(2, 32),
"optimize_distinct_in_order": lambda: random.randint(0, 1),
"optimize_sorting_by_input_stream_properties": lambda: random.randint(0, 1),
"http_response_buffer_size": lambda: random.randint(0, 10 * 1048576),
"http_wait_end_of_query": lambda: random.random() > 0.5,
"enable_memory_bound_merging_of_aggregation_results": lambda: random.randint(
0, 1
),
@ -2246,7 +2248,8 @@ def parse_args():
parser.add_argument(
"-b",
"--binary",
default=find_binary("clickhouse"),
default="clickhouse",
type=find_binary,
help="Path to clickhouse binary or name of binary in PATH",
)
parser.add_argument(

View File

@ -138,7 +138,8 @@ def test_backup_to_s3_native_copy():
f"S3('http://minio1:9001/root/data/backups/{backup_name}', 'minio', 'minio123')"
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("BackupImpl.*using native copy")
assert node.contains_in_log("copyS3FileToDisk.*using native copy")
assert node.contains_in_log(
f"copyS3File: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
)
@ -151,7 +152,8 @@ def test_backup_to_s3_native_copy_other_bucket():
f"S3('http://minio1:9001/root/data/backups/{backup_name}', 'minio', 'minio123')"
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("BackupImpl.*using native copy")
assert node.contains_in_log("copyS3FileToDisk.*using native copy")
assert node.contains_in_log(
f"copyS3File: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
)
@ -162,7 +164,8 @@ def test_backup_to_s3_native_copy_multipart():
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart/{backup_name}', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("BackupImpl.*using native copy")
assert node.contains_in_log("copyS3FileToDisk.*using native copy")
assert node.contains_in_log(
f"copyS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}/"
)

View File

@ -24,5 +24,17 @@
<priority>1</priority>
</node>
</secure>
<secure_backward>
<secret>foo_backward</secret>
<node>
<host>n1</host>
<port>9000</port>
</node>
<node>
<host>backward</host>
<port>9000</port>
</node>
</secure_backward>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,15 @@
<clickhouse>
<remote_servers>
<secure_disagree>
<secret>backward</secret>
<node>
<host>n1</host>
<port>9000</port>
</node>
<node>
<host>backward</host>
<port>9000</port>
</node>
</secure_disagree>
</remote_servers>
</clickhouse>

View File

@ -12,18 +12,28 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
def make_instance(name, cfg):
def make_instance(name, cfg, *args, **kwargs):
return cluster.add_instance(
name,
with_zookeeper=True,
main_configs=["configs/remote_servers.xml", cfg],
user_configs=["configs/users.xml"],
*args,
**kwargs,
)
# _n1/_n2 contains cluster with different <secret> -- should fail
n1 = make_instance("n1", "configs/remote_servers_n1.xml")
n2 = make_instance("n2", "configs/remote_servers_n2.xml")
backward = make_instance(
"backward",
"configs/remote_servers_backward.xml",
image="clickhouse/clickhouse-server",
# version without DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
tag="23.2.3",
with_installed_binary=True,
)
users = pytest.mark.parametrize(
"user,password",
@ -54,6 +64,12 @@ def bootstrap():
Engine=Distributed(secure, currentDatabase(), data, key)
"""
)
n.query(
"""
CREATE TABLE dist_secure_backward AS data
Engine=Distributed(secure_backward, currentDatabase(), data, key)
"""
)
n.query(
"""
CREATE TABLE dist_secure_from_buffer AS data_from_buffer
@ -409,3 +425,31 @@ def test_per_user_protocol_settings_secure_cluster(user, password):
assert int(get_query_setting_on_shard(n1, id_, "max_memory_usage_for_user")) == int(
1e9
)
@users
def test_user_secure_cluster_with_backward(user, password):
id_ = "with-backward-query-dist_secure-" + user
query_with_id(
n1, id_, "SELECT * FROM dist_secure_backward", user=user, password=password
)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]
@users
def test_user_secure_cluster_from_backward(user, password):
id_ = "from-backward-query-dist_secure-" + user
query_with_id(
backward,
id_,
"SELECT * FROM dist_secure_backward",
user=user,
password=password,
)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]
assert n1.contains_in_log(
"Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster."
)

View File

@ -147,6 +147,18 @@ def test_predefined_query_handler():
assert b"max_final_threads\t1\nmax_threads\t1\n" == res2.content
assert "application/generic+one" == res2.headers["content-type"]
cluster.instance.query(
"CREATE TABLE test_table (id UInt32, data String) Engine=TinyLog"
)
res3 = cluster.instance.http_request(
"test_predefined_handler_post_body?id=100",
method="POST",
data="TEST".encode("utf8"),
)
assert res3.status_code == 200
assert cluster.instance.query("SELECT * FROM test_table") == "100\tTEST\n"
cluster.instance.query("DROP TABLE test_table")
def test_fixed_static_handler():
with contextlib.closing(

View File

@ -21,5 +21,13 @@
<content_type>application/generic+one</content_type>
</handler>
</rule>
<rule>
<methods>POST</methods>
<url>/test_predefined_handler_post_body</url>
<handler>
<type>predefined_query_handler</type>
<query>INSERT INTO test_table(id, data) SELECT {id:UInt32}, {_request_body:String}</query>
</handler>
</rule>
</http_handlers>
</clickhouse>

View File

@ -9,6 +9,7 @@
dns_lookup_kdc = false
ticket_lifetime = 5s
forwardable = true
rdns = false
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
permitted_enctypes = des3-hmac-sha1

View File

@ -10,6 +10,7 @@
ticket_lifetime = 15s
renew_lifetime = 15s
forwardable = true
rdns = false
[realms]
TEST.CLICKHOUSE.TECH = {

View File

@ -28,7 +28,7 @@ exec kill -9 [exp_pid]
close
# Run client one more time and press "up" to see the last recorded query
spawn bash -c "source $basedir/../shell_config.sh ; \$CLICKHOUSE_CLIENT_BINARY \$CLICKHOUSE_CLIENT_OPT --history_file=$history_file"
spawn bash -c "source $basedir/../shell_config.sh ; \$CLICKHOUSE_CLIENT_BINARY \$CLICKHOUSE_CLIENT_OPT --disable_suggestion --history_file=$history_file"
expect ":) "
send -- "\[A"
expect "for the history"

View File

@ -1,3 +1,5 @@
set optimize_group_by_function_keys=0;
SELECT
number,
grouping(number, number % 2, number % 3) AS gr

View File

@ -1,3 +1,5 @@
set optimize_group_by_function_keys=0;
SELECT
number,
grouping(number, number % 2, number % 3) = 6

View File

@ -27,3 +27,17 @@ SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY ROLLUP(a,
5 0 0 2
5 1 0 2
10 0 0 0
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY GROUPING SETS ((a, b), (a, a), ()) ORDER BY (amount, a, b) SETTINGS force_grouping_standard_compatibility=0, allow_experimental_analyzer=1;
1 0 0 3
1 0 2 3
1 0 4 3
1 0 6 3
1 0 8 3
1 1 1 3
1 1 3 3
1 1 5 3
1 1 7 3
1 1 9 3
5 0 0 2
5 1 0 2
10 0 0 0

View File

@ -9,5 +9,7 @@ SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY GROUPING
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY ROLLUP(a, b) ORDER BY (amount, a, b) SETTINGS force_grouping_standard_compatibility=0;
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY GROUPING SETS ((a, b), (a, a), ()) ORDER BY (amount, a, b) SETTINGS force_grouping_standard_compatibility=0, allow_experimental_analyzer=1;
-- { echoOff }
DROP TABLE test02315;

View File

@ -191,6 +191,6 @@ E51B38608EF25F57
1
1
E28DBDE7FE22E41C
1CE422FEE7BD8DE20000000000000000
1
E28DBDE7FE22E41C
1CE422FEE7BD8DE20000000000000000
1

View File

@ -269,6 +269,6 @@ select sipHash64Keyed(toUInt64(0), '1'); -- { serverError 48 }
select sipHash128Keyed(toUInt64(0), '1'); -- { serverError 48 }
select hex(sipHash64());
select hex(sipHash128());
SELECT hex(sipHash128()) = hex(reverse(unhex('1CE422FEE7BD8DE20000000000000000'))) or hex(sipHash128()) = '1CE422FEE7BD8DE20000000000000000';
select hex(sipHash64Keyed());
select hex(sipHash128Keyed());
SELECT hex(sipHash128Keyed()) = hex(reverse(unhex('1CE422FEE7BD8DE20000000000000000'))) or hex(sipHash128Keyed()) = '1CE422FEE7BD8DE20000000000000000';

View File

@ -126,5 +126,5 @@ E3040C00EB28F15366CA73CBD872E740
1
1
1
1CE422FEE7BD8DE20000000000000000
1CE422FEE7BD8DE20000000000000000
1
1

View File

@ -203,5 +203,5 @@ select sipHash128ReferenceKeyed((toUInt64(0),toUInt64(0)),char(0, 1, 2, 3, 4, 5,
select sipHash128ReferenceKeyed((0, 0), '1'); -- { serverError 48 }
select sipHash128ReferenceKeyed(toUInt64(0), '1'); -- { serverError 48 }
select hex(sipHash128Reference());
select hex(sipHash128ReferenceKeyed());
SELECT hex(sipHash128Reference()) = hex(reverse(unhex('1CE422FEE7BD8DE20000000000000000'))) or hex(sipHash128()) = '1CE422FEE7BD8DE20000000000000000';
SELECT hex(sipHash128ReferenceKeyed()) = hex(reverse(unhex('1CE422FEE7BD8DE20000000000000000'))) or hex(sipHash128Keyed()) = '1CE422FEE7BD8DE20000000000000000';

View File

@ -0,0 +1 @@
MergeTreeInOrder

View File

@ -0,0 +1,8 @@
drop table if exists t;
set allow_experimental_analyzer=1;
create table t (a UInt64, b UInt64) engine=MergeTree() order by (a);
insert into t select number % 2, number from numbers(10);
select splitByChar(' ', trimBoth(explain))[1] from (explain pipeline select distinct a from t) where explain like '%MergeTreeInOrder%';

View File

@ -0,0 +1,4 @@
1 0
--------------
--------------
1 0

View File

@ -0,0 +1,36 @@
SELECT
bitmapHasAny(bitmapBuild([toUInt8(1)]), (
SELECT groupBitmapState(toUInt8(1))
)) has1,
bitmapHasAny(bitmapBuild([toUInt64(1)]), (
SELECT groupBitmapState(toUInt64(2))
)) has2;
SELECT '--------------';
SELECT *
FROM
(
SELECT
bitmapHasAny(bitmapBuild([toUInt8(1)]), (
SELECT groupBitmapState(toUInt8(1))
)) has1,
bitmapHasAny(bitmapBuild([toUInt64(1)]), (
SELECT groupBitmapState(toUInt64(2))
)) has2
); -- { serverError 43 }
SELECT '--------------';
SELECT *
FROM
(
SELECT
bitmapHasAny(bitmapBuild([toUInt8(1)]), (
SELECT groupBitmapState(toUInt8(1))
)) has1,
bitmapHasAny(bitmapBuild([toUInt64(1)]), (
SELECT groupBitmapState(toUInt64(2))
)) has2
) SETTINGS allow_experimental_analyzer = 1;

View File

@ -0,0 +1,16 @@
DROP TABLE IF EXISTS test_table_join;
CREATE TABLE test_table_join
(
id UInt64,
value String
) ENGINE = Join(Any, Left, id);
INSERT INTO test_table_join VALUES (1, 'q');
INSERT INTO test_table_join SELECT * from test_table_join; -- { serverError DEADLOCK_AVOIDED }
INSERT INTO test_table_join SELECT * FROM (SELECT 1 as id) AS t1 ANY LEFT JOIN test_table_join USING (id); -- { serverError DEADLOCK_AVOIDED }
INSERT INTO test_table_join SELECT id, toString(id) FROM (SELECT 1 as id) AS t1 ANY LEFT JOIN (SELECT id FROM test_table_join) AS t2 USING (id); -- { serverError DEADLOCK_AVOIDED }
DROP TABLE IF EXISTS test_table_join;