Merge branch 'master' into patch-2

This commit is contained in:
Alexey Milovidov 2023-03-15 15:04:18 +03:00 committed by GitHub
commit 0c98cae517
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
196 changed files with 1930 additions and 1120 deletions

View File

@ -23,9 +23,12 @@ Checks: '*,
-bugprone-implicit-widening-of-multiplication-result,
-bugprone-narrowing-conversions,
-bugprone-not-null-terminated-result,
-bugprone-reserved-identifier,
-bugprone-unchecked-optional-access,
-cert-dcl16-c,
-cert-dcl37-c,
-cert-dcl51-cpp,
-cert-err58-cpp,
-cert-msc32-c,
-cert-msc51-cpp,
@ -129,6 +132,7 @@ Checks: '*,
-readability-function-cognitive-complexity,
-readability-function-size,
-readability-identifier-length,
-readability-identifier-naming,
-readability-implicit-bool-conversion,
-readability-isolate-declaration,
-readability-magic-numbers,

View File

@ -301,7 +301,7 @@ if (ENABLE_BUILD_PROFILING)
endif ()
endif ()
set (CMAKE_CXX_STANDARD 20)
set (CMAKE_CXX_STANDARD 23)
set (CMAKE_CXX_EXTENSIONS ON) # Same as gnu++2a (ON) vs c++2a (OFF): https://cmake.org/cmake/help/latest/prop_tgt/CXX_EXTENSIONS.html
set (CMAKE_CXX_STANDARD_REQUIRED ON)

View File

@ -2,6 +2,10 @@ if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif ()
# TODO: Remove this. We like to compile with C++23 (set by top-level CMakeLists) but Clang crashes with our libcxx
# when instantiated from JSON.cpp. Try again when libcxx(abi) and Clang are upgraded to 16.
set (CMAKE_CXX_STANDARD 20)
set (SRCS
argsToConfig.cpp
coverage.cpp

View File

@ -159,22 +159,22 @@ inline const char * find_first_symbols_sse42(const char * const begin, const cha
#endif
for (; pos < end; ++pos)
if ( (num_chars >= 1 && maybe_negate<positive>(*pos == c01))
|| (num_chars >= 2 && maybe_negate<positive>(*pos == c02))
|| (num_chars >= 3 && maybe_negate<positive>(*pos == c03))
|| (num_chars >= 4 && maybe_negate<positive>(*pos == c04))
|| (num_chars >= 5 && maybe_negate<positive>(*pos == c05))
|| (num_chars >= 6 && maybe_negate<positive>(*pos == c06))
|| (num_chars >= 7 && maybe_negate<positive>(*pos == c07))
|| (num_chars >= 8 && maybe_negate<positive>(*pos == c08))
|| (num_chars >= 9 && maybe_negate<positive>(*pos == c09))
|| (num_chars >= 10 && maybe_negate<positive>(*pos == c10))
|| (num_chars >= 11 && maybe_negate<positive>(*pos == c11))
|| (num_chars >= 12 && maybe_negate<positive>(*pos == c12))
|| (num_chars >= 13 && maybe_negate<positive>(*pos == c13))
|| (num_chars >= 14 && maybe_negate<positive>(*pos == c14))
|| (num_chars >= 15 && maybe_negate<positive>(*pos == c15))
|| (num_chars >= 16 && maybe_negate<positive>(*pos == c16)))
if ( (num_chars == 1 && maybe_negate<positive>(is_in<c01>(*pos)))
|| (num_chars == 2 && maybe_negate<positive>(is_in<c01, c02>(*pos)))
|| (num_chars == 3 && maybe_negate<positive>(is_in<c01, c02, c03>(*pos)))
|| (num_chars == 4 && maybe_negate<positive>(is_in<c01, c02, c03, c04>(*pos)))
|| (num_chars == 5 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05>(*pos)))
|| (num_chars == 6 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06>(*pos)))
|| (num_chars == 7 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07>(*pos)))
|| (num_chars == 8 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08>(*pos)))
|| (num_chars == 9 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09>(*pos)))
|| (num_chars == 10 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10>(*pos)))
|| (num_chars == 11 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11>(*pos)))
|| (num_chars == 12 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12>(*pos)))
|| (num_chars == 13 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13>(*pos)))
|| (num_chars == 14 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13, c14>(*pos)))
|| (num_chars == 15 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13, c14, c15>(*pos)))
|| (num_chars == 16 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13, c14, c15, c16>(*pos))))
return pos;
return return_mode == ReturnMode::End ? end : nullptr;
}

View File

@ -466,7 +466,7 @@ namespace Data
bool extractManualImpl(std::size_t pos, T & val, SQLSMALLINT cType)
{
SQLRETURN rc = 0;
T value = (T)0;
T value;
resizeLengths(pos);

View File

@ -105,6 +105,8 @@ public:
const std::string & getText() const;
/// Returns the text of the message.
void appendText(const std::string & text);
void setPriority(Priority prio);
/// Sets the priority of the message.

View File

@ -27,8 +27,7 @@ Message::Message():
_tid(0),
_file(0),
_line(0),
_pMap(0),
_fmt_str(0)
_pMap(0)
{
init();
}
@ -157,6 +156,12 @@ void Message::setText(const std::string& text)
}
void Message::appendText(const std::string & text)
{
_text.append(text);
}
void Message::setPriority(Priority prio)
{
_prio = prio;

View File

@ -48,6 +48,9 @@ set(gRPC_ABSL_PROVIDER "clickhouse" CACHE STRING "" FORCE)
# We don't want to build C# extensions.
set(gRPC_BUILD_CSHARP_EXT OFF)
# TODO: Remove this. We generally like to compile with C++23 but grpc isn't ready yet.
set (CMAKE_CXX_STANDARD 20)
set(_gRPC_CARES_LIBRARIES ch_contrib::c-ares)
set(gRPC_CARES_PROVIDER "clickhouse" CACHE STRING "" FORCE)
add_subdirectory("${_gRPC_SOURCE_DIR}" "${_gRPC_BINARY_DIR}")

2
contrib/krb5 vendored

@ -1 +1 @@
Subproject commit f8262a1b548eb29d97e059260042036255d07f8d
Subproject commit 9453aec0d50e5aff9b189051611b321b40935d02

View File

@ -160,6 +160,8 @@ set(ALL_SRCS
# "${KRB5_SOURCE_DIR}/lib/gssapi/spnego/negoex_trace.c"
"${KRB5_SOURCE_DIR}/lib/crypto/builtin/kdf.c"
"${KRB5_SOURCE_DIR}/lib/crypto/builtin/cmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/prng.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/enc_dk_cmac.c"
# "${KRB5_SOURCE_DIR}/lib/crypto/krb/crc32.c"
@ -183,7 +185,6 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/crypto/krb/block_size.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/string_to_key.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/verify_checksum.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/crypto_libinit.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/derive.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/random_to_key.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/verify_checksum_iov.c"
@ -217,9 +218,7 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/crypto/krb/s2k_rc4.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/valid_cksumtype.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/nfold.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/prng_fortuna.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/encrypt_length.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/cmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/keyblocks.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/prf_rc4.c"
"${KRB5_SOURCE_DIR}/lib/crypto/krb/s2k_pbkdf2.c"
@ -228,11 +227,11 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/enc_provider/rc4.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/enc_provider/des3.c"
#"${KRB5_SOURCE_DIR}/lib/crypto/openssl/enc_provider/camellia.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/cmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/sha256.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/hmac.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/kdf.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/pbkdf2.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/init.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/stubs.c"
# "${KRB5_SOURCE_DIR}/lib/crypto/openssl/hash_provider/hash_crc32.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/hash_provider/hash_evp.c"
"${KRB5_SOURCE_DIR}/lib/crypto/openssl/des/des_keys.c"
@ -312,7 +311,6 @@ set(ALL_SRCS
"${KRB5_SOURCE_DIR}/lib/krb5/krb/allow_weak.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/mk_rep.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/mk_priv.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/s4u_authdata.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/preauth_otp.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/init_keyblock.c"
"${KRB5_SOURCE_DIR}/lib/krb5/krb/ser_addr.c"
@ -688,6 +686,7 @@ target_include_directories(_krb5 PRIVATE
target_compile_definitions(_krb5 PRIVATE
KRB5_PRIVATE
CRYPTO_OPENSSL
_GSS_STATIC_LINK=1
KRB5_DEPRECATED=1
LOCALEDIR="/usr/local/share/locale"

View File

@ -765,7 +765,7 @@ Default value: `0`.
## concurrent_threads_soft_limit_ratio_to_cores {#concurrent_threads_soft_limit_ratio_to_cores}
The maximum number of query processing threads as multiple of number of logical cores.
More details: [concurrent_threads_soft_limit_num](#concurrent-threads-soft-limit-num).
More details: [concurrent_threads_soft_limit_num](#concurrent_threads_soft_limit_num).
Possible values:

View File

@ -1548,7 +1548,7 @@ Enables or disables asynchronous inserts. This makes sense only for insertion ov
If enabled, the data is combined into batches before the insertion into tables, so it is possible to do small and frequent insertions into ClickHouse (up to 15000 queries per second) without buffer tables.
The data is inserted either after the [async_insert_max_data_size](#async-insert-max-data-size) is exceeded or after [async_insert_busy_timeout_ms](#async-insert-busy-timeout-ms) milliseconds since the first `INSERT` query. If the [async_insert_stale_timeout_ms](#async-insert-stale-timeout-ms) is set to a non-zero value, the data is inserted after `async_insert_stale_timeout_ms` milliseconds since the last query.
The data is inserted either after the [async_insert_max_data_size](#async-insert-max-data-size) is exceeded or after [async_insert_busy_timeout_ms](#async-insert-busy-timeout-ms) milliseconds since the first `INSERT` query. If the [async_insert_stale_timeout_ms](#async-insert-stale-timeout-ms) is set to a non-zero value, the data is inserted after `async_insert_stale_timeout_ms` milliseconds since the last query. Also the buffer will be flushed to disk if at least [async_insert_max_query_number](#async-insert-max-query-number) async insert queries per block were received. This last setting takes effect only if [async_insert_deduplicate](#async-insert-deduplicate) is enabled.
If [wait_for_async_insert](#wait-for-async-insert) is enabled, every client will wait for the data to be processed and flushed to the table. Otherwise, the query would be processed almost instantly, even if the data is not inserted.

View File

@ -80,7 +80,7 @@ Required parameters:
- `type``encrypted`. Otherwise the encrypted disk is not created.
- `disk` — Type of disk for data storage.
- `key` — The key for encryption and decryption. Type: [Uint64](/docs/en/sql-reference/data-types/int-uint.md). You can use `key_hex` parameter to encrypt in hexadecimal form.
- `key` — The key for encryption and decryption. Type: [Uint64](/docs/en/sql-reference/data-types/int-uint.md). You can use `key_hex` parameter to encode the key in hexadecimal form.
You can specify multiple keys using the `id` attribute (see example above).
Optional parameters:

View File

@ -15,6 +15,13 @@ Columns:
- `operation_name` ([String](../../sql-reference/data-types/string.md)) — The name of the operation.
- `kind` ([Enum8](../../sql-reference/data-types/enum.md)) — The [SpanKind](https://opentelemetry.io/docs/reference/specification/trace/api/#spankind) of the span.
- `INTERNAL` — Indicates that the span represents an internal operation within an application.
- `SERVER` — Indicates that the span covers server-side handling of a synchronous RPC or other remote request.
- `CLIENT` — Indicates that the span describes a request to some remote service.
- `PRODUCER` — Indicates that the span describes the initiators of an asynchronous request. This parent span will often end before the corresponding child CONSUMER span, possibly even before the child span starts.
- `CONSUMER` - Indicates that the span describes a child of an asynchronous PRODUCER request.
- `start_time_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The start time of the `trace span` (in microseconds).
- `finish_time_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The finish time of the `trace span` (in microseconds).
@ -42,6 +49,7 @@ trace_id: cdab0847-0d62-61d5-4d38-dd65b19a1914
span_id: 701487461015578150
parent_span_id: 2991972114672045096
operation_name: DB::Block DB::InterpreterSelectQuery::getSampleBlockImpl()
kind: INTERNAL
start_time_us: 1612374594529090
finish_time_us: 1612374594529108
finish_date: 2021-02-03

View File

@ -6,25 +6,26 @@ sidebar_label: clickhouse-local
# clickhouse-local
The `clickhouse-local` program enables you to perform fast processing on local files, without having to deploy and configure the ClickHouse server.
The `clickhouse-local` program enables you to perform fast processing on local files, without having to deploy and configure the ClickHouse server. It accepts data that represent tables and queries them using [ClickHouse SQL dialect](../../sql-reference/). `clickhouse-local` uses the same core as ClickHouse server, so it supports most of the features and the same set of formats and table engines.
Accepts data that represent tables and queries them using [ClickHouse SQL dialect](../../sql-reference/).
`clickhouse-local` uses the same core as ClickHouse server, so it supports most of the features and the same set of formats and table engines.
By default `clickhouse-local` does not have access to data on the same host, but it supports loading server configuration using `--config-file` argument.
For temporary data, a unique temporary data directory is created by default.
By default `clickhouse-local` has access to data on the same host, and it does not depend on the server's configuration. It also supports loading server configuration using `--config-file` argument. For temporary data, a unique temporary data directory is created by default.
## Usage {#usage}
Basic usage:
Basic usage (Linux):
``` bash
$ clickhouse-local --structure "table_structure" --input-format "format_of_incoming_data" \
--query "query"
$ clickhouse-local --structure "table_structure" --input-format "format_of_incoming_data" --query "query"
```
Basic usage (Mac):
``` bash
$ ./clickhouse local --structure "table_structure" --input-format "format_of_incoming_data" --query "query"
```
Also supported on Windows through WSL2.
Arguments:
- `-S`, `--structure` — table structure for input data.

View File

@ -1757,8 +1757,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
LOG_INFO(log, "All helping tables dropped partition {}", partition_name);
}
String ClusterCopier::getRemoteCreateTable(
const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
{
auto remote_context = Context::createCopy(context);
remote_context->setSettings(settings);
@ -1777,8 +1776,10 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time
{
/// Fetch and parse (possibly) new definition
auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull, true);
String create_query_pull_str
= getRemoteCreateTable(task_shard.task_table.table_pull, *connection_entry, task_cluster->settings_pull);
String create_query_pull_str = getRemoteCreateTable(
task_shard.task_table.table_pull,
*connection_entry,
task_cluster->settings_pull);
ParserCreateQuery parser_create_query;
const auto & settings = getContext()->getSettingsRef();
@ -2025,8 +2026,8 @@ UInt64 ClusterCopier::executeQueryOnCluster(
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
*connections.back(), query, header, getContext(),
/*throttler=*/nullptr, Scalars(), Tables(), QueryProcessingStage::Complete);
*connections.back(), query, header, getContext(),
/*throttler=*/nullptr, Scalars(), Tables(), QueryProcessingStage::Complete);
try
{

View File

@ -30,7 +30,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
}
@ -180,8 +180,19 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
columns.emplace_back(column_name, std::move(column_type));
}
/// Usually this should not happen, since in case of table does not
/// exists, the call should be succeeded.
/// However it is possible sometimes because internally there are two
/// queries in ClickHouse ODBC bridge:
/// - system.tables
/// - system.columns
/// And if between this two queries the table will be removed, them
/// there will be no columns
///
/// Also sometimes system.columns can return empty result because of
/// the cached value of total tables to scan.
if (columns.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns definition was not returned");
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Columns definition was not returned");
WriteBufferFromHTTPServerResponse out(
response,

View File

@ -67,7 +67,6 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
#include <Storages/registerStorages.h>
#include <QueryPipeline/ConnectionCollector.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <IO/Resource/registerSchedulerNodes.h>
@ -816,8 +815,6 @@ try
}
);
ConnectionCollector::init(global_context, server_settings.max_threads_for_connection_collector);
bool has_zookeeper = config().has("zookeeper");
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });

View File

@ -348,10 +348,6 @@
<background_distributed_schedule_pool_size>16</background_distributed_schedule_pool_size>
-->
<!-- Number of workers to recycle connections in background (see also drain_timeout).
If the pool is full, connection will be drained synchronously. -->
<!-- <max_threads_for_connection_collector>10</max_threads_for_connection_collector> -->
<!-- On memory constrained environments you may have to set this to value larger than 1.
-->
<max_server_memory_usage_to_ram_ratio>0.9</max_server_memory_usage_to_ram_ratio>

View File

@ -72,12 +72,11 @@ namespace
return std::make_shared<BackupEntryFromMemory>(buf.str());
}
static AccessEntitiesInBackup fromBackupEntry(const IBackupEntry & backup_entry, const String & file_path)
static AccessEntitiesInBackup fromBackupEntry(std::unique_ptr<ReadBuffer> buf, const String & file_path)
{
try
{
AccessEntitiesInBackup res;
std::unique_ptr<ReadBuffer> buf = backup_entry.getReadBuffer();
bool dependencies_found = false;
@ -343,8 +342,8 @@ void AccessRestorerFromBackup::addDataPath(const String & data_path)
for (const String & filename : filenames)
{
String filepath_in_backup = data_path_in_backup_fs / filename;
auto backup_entry = backup->readFile(filepath_in_backup);
auto ab = AccessEntitiesInBackup::fromBackupEntry(*backup_entry, filepath_in_backup);
auto read_buffer_from_backup = backup->readFile(filepath_in_backup);
auto ab = AccessEntitiesInBackup::fromBackupEntry(std::move(read_buffer_from_backup), filepath_in_backup);
boost::range::copy(ab.entities, std::back_inserter(entities));
boost::range::copy(ab.dependencies, std::inserter(dependencies, dependencies.end()));

View File

@ -1,9 +1,10 @@
#include <Backups/BackupIO.h>
#include <IO/copyData.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/SeekableReadBuffer.h>
namespace DB
{
@ -12,6 +13,15 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
void IBackupReader::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
auto read_buffer = readFile(file_name);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(size, DBMS_DEFAULT_BUFFER_SIZE), write_mode, write_settings);
copyData(*read_buffer, *write_buffer, size);
write_buffer->finalize();
}
void IBackupWriter::copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name)
{
auto read_buffer = create_read_buffer();

View File

@ -17,6 +17,8 @@ public:
virtual bool fileExists(const String & file_name) = 0;
virtual UInt64 getFileSize(const String & file_name) = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) = 0;
virtual void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings);
virtual DataSourceDescription getDataSourceDescription() const = 0;
};

View File

@ -1,4 +1,5 @@
#include <Backups/BackupIO_Disk.h>
#include <Common/logger_useful.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
@ -12,7 +13,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_)
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & path_)
: disk(disk_), path(path_), log(&Poco::Logger::get("BackupReaderDisk"))
{
}
@ -33,6 +35,21 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderDisk::readFile(const String & fi
return disk->readFile(path / file_name);
}
void BackupReaderDisk::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
if (write_mode == WriteMode::Rewrite)
{
LOG_TRACE(log, "Copying {}/{} from disk {} to {} by the disk", path, file_name, disk->getName(), destination_disk->getName());
disk->copyFile(path / file_name, *destination_disk, destination_path, write_settings);
return;
}
LOG_TRACE(log, "Copying {}/{} from disk {} to {} through buffers", path, file_name, disk->getName(), destination_disk->getName());
IBackupReader::copyFileToDisk(file_name, size, destination_disk, destination_path, write_mode, write_settings);
}
BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_)
{
}

View File

@ -17,11 +17,14 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
private:
DiskPtr disk;
std::filesystem::path path;
Poco::Logger * log;
};
class BackupWriterDisk : public IBackupWriter

View File

@ -1,15 +1,18 @@
#include <Backups/BackupIO_File.h>
#include <Disks/IDisk.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
namespace fs = std::filesystem;
namespace DB
{
BackupReaderFile::BackupReaderFile(const String & path_) : path(path_)
BackupReaderFile::BackupReaderFile(const String & path_) : path(path_), log(&Poco::Logger::get("BackupReaderFile"))
{
}
@ -30,6 +33,22 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderFile::readFile(const String & fi
return createReadBufferFromFileBase(path / file_name, {});
}
void BackupReaderFile::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
if (destination_disk->getDataSourceDescription() == getDataSourceDescription())
{
/// Use more optimal way.
LOG_TRACE(log, "Copying {}/{} to disk {} locally", path, file_name, destination_disk->getName());
fs::copy(path / file_name, fullPath(destination_disk, destination_path), fs::copy_options::overwrite_existing);
return;
}
LOG_TRACE(log, "Copying {}/{} to disk {} through buffers", path, file_name, destination_disk->getName());
IBackupReader::copyFileToDisk(path / file_name, size, destination_disk, destination_path, write_mode, write_settings);
}
BackupWriterFile::BackupWriterFile(const String & path_) : path(path_)
{
}

View File

@ -15,10 +15,13 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
private:
std::filesystem::path path;
Poco::Logger * log;
};
class BackupWriterFile : public IBackupWriter

View File

@ -2,6 +2,7 @@
#if USE_AWS_S3
#include <Common/quoteString.h>
#include <Disks/ObjectStorages/S3/copyS3FileToDisk.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <IO/BackupsIOThreadPool.h>
@ -96,6 +97,7 @@ BackupReaderS3::BackupReaderS3(
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, read_settings(context_->getReadSettings())
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
, log(&Poco::Logger::get("BackupReaderS3"))
{
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
}
@ -127,6 +129,27 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderS3::readFile(const String & file
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings);
}
void BackupReaderS3::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
LOG_TRACE(log, "Copying {} to disk {}", file_name, destination_disk->getName());
copyS3FileToDisk(
client,
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
s3_uri.version_id,
0,
size,
destination_disk,
destination_path,
write_mode,
read_settings,
write_settings,
request_settings,
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupReaderS3"));
}
BackupWriterS3::BackupWriterS3(
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, const ContextPtr & context_)

View File

@ -22,6 +22,8 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
private:
@ -29,6 +31,7 @@ private:
std::shared_ptr<S3::Client> client;
ReadSettings read_settings;
S3Settings::RequestSettings request_settings;
Poco::Logger * log;
};

View File

@ -79,66 +79,6 @@ namespace
}
class BackupImpl::BackupEntryFromBackupImpl : public IBackupEntry
{
public:
BackupEntryFromBackupImpl(
const std::shared_ptr<const BackupImpl> & backup_,
const String & archive_suffix_,
const String & data_file_name_,
UInt64 size_,
const UInt128 checksum_,
BackupEntryPtr base_backup_entry_ = {})
: backup(backup_), archive_suffix(archive_suffix_), data_file_name(data_file_name_), size(size_), checksum(checksum_),
base_backup_entry(std::move(base_backup_entry_))
{
}
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override
{
std::unique_ptr<SeekableReadBuffer> read_buffer;
if (backup->use_archives)
read_buffer = backup->getArchiveReader(archive_suffix)->readFile(data_file_name);
else
read_buffer = backup->reader->readFile(data_file_name);
if (base_backup_entry)
{
size_t base_size = base_backup_entry->getSize();
read_buffer = std::make_unique<ConcatSeekableReadBuffer>(
base_backup_entry->getReadBuffer(), base_size, std::move(read_buffer), size - base_size);
}
return read_buffer;
}
UInt64 getSize() const override { return size; }
std::optional<UInt128> getChecksum() const override { return checksum; }
String getFilePath() const override
{
return data_file_name;
}
DiskPtr tryGetDiskIfExists() const override
{
return nullptr;
}
DataSourceDescription getDataSourceDescription() const override
{
return backup->reader->getDataSourceDescription();
}
private:
const std::shared_ptr<const BackupImpl> backup;
const String archive_suffix;
const String data_file_name;
const UInt64 size;
const UInt128 checksum;
BackupEntryPtr base_backup_entry;
};
BackupImpl::BackupImpl(
const String & backup_name_for_logging_,
const ArchiveParams & archive_params_,
@ -645,24 +585,22 @@ SizeAndChecksum BackupImpl::getFileSizeAndChecksum(const String & file_name) con
return {info->size, info->checksum};
}
BackupEntryPtr BackupImpl::readFile(const String & file_name) const
std::unique_ptr<SeekableReadBuffer> BackupImpl::readFile(const String & file_name) const
{
return readFile(getFileSizeAndChecksum(file_name));
}
BackupEntryPtr BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) const
std::unique_ptr<SeekableReadBuffer> BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) const
{
std::lock_guard lock{mutex};
if (open_mode != OpenMode::READ)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup is not opened for reading");
++num_read_files;
num_read_bytes += size_and_checksum.first;
if (!size_and_checksum.first)
if (size_and_checksum.first == 0)
{
/// Entry's data is empty.
return std::make_unique<BackupEntryFromMemory>(nullptr, 0, UInt128{0, 0});
std::lock_guard lock{mutex};
++num_read_files;
return std::make_unique<ReadBufferFromMemory>(static_cast<char *>(nullptr), 0);
}
auto info_opt = coordination->getFileInfo(size_and_checksum);
@ -677,45 +615,149 @@ BackupEntryPtr BackupImpl::readFile(const SizeAndChecksum & size_and_checksum) c
const auto & info = *info_opt;
std::unique_ptr<SeekableReadBuffer> read_buffer;
std::unique_ptr<SeekableReadBuffer> base_read_buffer;
if (info.size > info.base_size)
{
/// Make `read_buffer` if there is data for this backup entry in this backup.
if (use_archives)
{
std::shared_ptr<IArchiveReader> archive_reader;
{
std::lock_guard lock{mutex};
archive_reader = getArchiveReader(info.archive_suffix);
}
read_buffer = archive_reader->readFile(info.data_file_name);
}
else
{
read_buffer = reader->readFile(info.data_file_name);
}
}
if (info.base_size)
{
/// Make `base_read_buffer` if there is data for this backup entry in the base backup.
if (!base_backup)
{
throw Exception(
ErrorCodes::NO_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
}
if (!base_backup->fileExists(std::pair(info.base_size, info.base_checksum)))
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
}
base_read_buffer = base_backup->readFile(std::pair{info.base_size, info.base_checksum});
}
{
/// Update number of read files.
std::lock_guard lock{mutex};
++num_read_files;
num_read_bytes += info.size;
}
if (!info.base_size)
{
/// Data goes completely from this backup, the base backup isn't used.
return std::make_unique<BackupEntryFromBackupImpl>(
std::static_pointer_cast<const BackupImpl>(shared_from_this()), info.archive_suffix, info.data_file_name, info.size, info.checksum);
/// Data comes completely from this backup, the base backup isn't used.
return read_buffer;
}
if (!base_backup)
else if (info.size == info.base_size)
{
throw Exception(
ErrorCodes::NO_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
/// Data comes completely from the base backup (nothing comes from this backup).
return base_read_buffer;
}
if (!base_backup->fileExists(std::pair(info.base_size, info.base_checksum)))
else
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there",
backup_name_for_logging, formatSizeAndChecksum(size_and_checksum));
}
auto base_entry = base_backup->readFile(std::pair{info.base_size, info.base_checksum});
if (info.size == info.base_size)
{
/// Data goes completely from the base backup (nothing goes from this backup).
return base_entry;
}
{
/// The beginning of the data goes from the base backup,
/// and the ending goes from this backup.
return std::make_unique<BackupEntryFromBackupImpl>(
static_pointer_cast<const BackupImpl>(shared_from_this()), info.archive_suffix, info.data_file_name, info.size, info.checksum, std::move(base_entry));
/// The beginning of the data comes from the base backup,
/// and the ending comes from this backup.
return std::make_unique<ConcatSeekableReadBuffer>(
std::move(base_read_buffer), info.base_size, std::move(read_buffer), info.size - info.base_size);
}
}
size_t BackupImpl::copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const
{
return copyFileToDisk(getFileSizeAndChecksum(file_name), destination_disk, destination_path, write_mode, write_settings);
}
size_t BackupImpl::copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const
{
if (open_mode != OpenMode::READ)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup is not opened for reading");
if (size_and_checksum.first == 0)
{
/// Entry's data is empty.
if (write_mode == WriteMode::Rewrite)
{
/// Just create an empty file.
destination_disk->createFile(destination_path);
}
std::lock_guard lock{mutex};
++num_read_files;
return 0;
}
auto info_opt = coordination->getFileInfo(size_and_checksum);
if (!info_opt)
{
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND,
"Backup {}: Entry {} not found in the backup",
backup_name_for_logging,
formatSizeAndChecksum(size_and_checksum));
}
const auto & info = *info_opt;
bool file_copied = false;
if (info.size && !info.base_size && !use_archives)
{
/// Data comes completely from this backup.
reader->copyFileToDisk(info.data_file_name, info.size, destination_disk, destination_path, write_mode, write_settings);
file_copied = true;
}
else if (info.size && (info.size == info.base_size))
{
/// Data comes completely from the base backup (nothing comes from this backup).
base_backup->copyFileToDisk(std::pair{info.base_size, info.base_checksum}, destination_disk, destination_path, write_mode, write_settings);
file_copied = true;
}
if (file_copied)
{
/// The file is already copied, but `num_read_files` is not updated yet.
std::lock_guard lock{mutex};
++num_read_files;
num_read_bytes += info.size;
}
else
{
/// Use the generic way to copy data. `readFile()` will update `num_read_files`.
auto read_buffer = readFile(size_and_checksum);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(info.size, DBMS_DEFAULT_BUFFER_SIZE),
write_mode, write_settings);
copyData(*read_buffer, *write_buffer, info.size);
write_buffer->finalize();
}
return info.size;
}
namespace
{

View File

@ -73,8 +73,12 @@ public:
UInt64 getFileSize(const String & file_name) const override;
UInt128 getFileChecksum(const String & file_name) const override;
SizeAndChecksum getFileSizeAndChecksum(const String & file_name) const override;
BackupEntryPtr readFile(const String & file_name) const override;
BackupEntryPtr readFile(const SizeAndChecksum & size_and_checksum) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const SizeAndChecksum & size_and_checksum) const override;
size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const override;
size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const override;
void writeFile(const String & file_name, BackupEntryPtr entry) override;
void finalizeWriting() override;
bool supportsWritingInMultipleThreads() const override { return !use_archives; }

View File

@ -1,6 +1,8 @@
#pragma once
#include <Core/Types.h>
#include <Disks/WriteMode.h>
#include <IO/WriteSettings.h>
#include <memory>
#include <optional>
@ -8,7 +10,10 @@
namespace DB
{
class IBackupEntry;
class IDisk;
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;
using DiskPtr = std::shared_ptr<IDisk>;
class SeekableReadBuffer;
/// Represents a backup, i.e. a storage of BackupEntries which can be accessed by their names.
/// A backup can be either incremental or non-incremental. An incremental backup doesn't store
@ -95,8 +100,15 @@ public:
virtual SizeAndChecksum getFileSizeAndChecksum(const String & file_name) const = 0;
/// Reads an entry from the backup.
virtual BackupEntryPtr readFile(const String & file_name) const = 0;
virtual BackupEntryPtr readFile(const SizeAndChecksum & size_and_checksum) const = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) const = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const SizeAndChecksum & size_and_checksum) const = 0;
/// Copies a file from the backup to a specified destination disk. Returns the number of bytes written.
virtual size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite, const WriteSettings & write_settings = {}) const = 0;
virtual size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite, const WriteSettings & write_settings = {}) const = 0;
/// Puts a new entry to the backup.
virtual void writeFile(const String & file_name, BackupEntryPtr entry) = 0;

View File

@ -316,7 +316,7 @@ void RestorerFromBackup::findTableInBackup(const QualifiedTableName & table_name
= *root_path_in_use / "data" / escapeForFileName(table_name_in_backup.database) / escapeForFileName(table_name_in_backup.table);
}
auto read_buffer = backup->readFile(*metadata_path)->getReadBuffer();
auto read_buffer = backup->readFile(*metadata_path);
String create_query_str;
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();
@ -410,7 +410,7 @@ void RestorerFromBackup::findDatabaseInBackup(const String & database_name_in_ba
if (metadata_path)
{
auto read_buffer = backup->readFile(*metadata_path)->getReadBuffer();
auto read_buffer = backup->readFile(*metadata_path);
String create_query_str;
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();

View File

@ -506,7 +506,7 @@ void Connection::sendQuery(
bool with_pending_data,
std::function<void(const Progress &)>)
{
OpenTelemetry::SpanHolder span("Connection::sendQuery()");
OpenTelemetry::SpanHolder span("Connection::sendQuery()", OpenTelemetry::CLIENT);
span.addAttribute("clickhouse.query_id", query_id_);
span.addAttribute("clickhouse.query", query);
span.addAttribute("target", [this] () { return this->getHost() + ":" + std::to_string(this->getPort()); });

View File

@ -31,8 +31,6 @@ HedgedConnections::HedgedConnections(
: hedged_connections_factory(pool_, &context_->getSettingsRef(), timeouts_, table_to_check_)
, context(std::move(context_))
, settings(context->getSettingsRef())
, drain_timeout(settings.drain_timeout)
, allow_changing_replica_until_first_data_packet(settings.allow_changing_replica_until_first_data_packet)
, throttler(throttler_)
{
std::vector<Connection *> connections = hedged_connections_factory.getManyConnections(pool_mode);
@ -263,7 +261,7 @@ Packet HedgedConnections::drain()
while (!epoll.empty())
{
ReplicaLocation location = getReadyReplicaLocation(DrainCallback{drain_timeout});
ReplicaLocation location = getReadyReplicaLocation();
Packet packet = receivePacketFromReplica(location);
switch (packet.type)
{
@ -290,10 +288,10 @@ Packet HedgedConnections::drain()
Packet HedgedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
return receivePacketUnlocked({}, false /* is_draining */);
return receivePacketUnlocked({});
}
Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback, bool /* is_draining */)
Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback)
{
if (!sent_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot receive packets: no query sent.");
@ -413,7 +411,7 @@ Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & repli
{
/// If we are allowed to change replica until the first data packet,
/// just restart timeout (if it hasn't expired yet). Otherwise disable changing replica with this offset.
if (allow_changing_replica_until_first_data_packet && !replica.is_change_replica_timeout_expired)
if (settings.allow_changing_replica_until_first_data_packet && !replica.is_change_replica_timeout_expired)
replica.change_replica_timeout.setRelative(hedged_connections_factory.getConnectionTimeouts().receive_data_timeout);
else
disableChangingReplica(replica_location);

View File

@ -101,7 +101,7 @@ public:
Packet receivePacket() override;
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
Packet receivePacketUnlocked(AsyncCallback async_callback) override;
void disconnect() override;
@ -196,12 +196,6 @@ private:
Epoll epoll;
ContextPtr context;
const Settings & settings;
/// The following two fields are from settings but can be referenced outside the lifetime of
/// settings when connection is drained asynchronously.
Poco::Timespan drain_timeout;
bool allow_changing_replica_until_first_data_packet;
ThrottlerPtr throttler;
bool sent_query = false;
bool cancelled = false;

View File

@ -1,36 +0,0 @@
#include <Client/IConnections.h>
#include <Poco/Net/SocketImpl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SOCKET_TIMEOUT;
}
/// This wrapper struct allows us to use Poco's socket polling code with a raw fd.
/// The only difference from Poco::Net::SocketImpl is that we don't close the fd in the destructor.
struct PocoSocketWrapper : public Poco::Net::SocketImpl
{
explicit PocoSocketWrapper(int fd)
{
reset(fd);
}
// Do not close fd.
~PocoSocketWrapper() override { reset(-1); }
};
void IConnections::DrainCallback::operator()(int fd, Poco::Timespan, const std::string & fd_description) const
{
if (!PocoSocketWrapper(fd).poll(drain_timeout, Poco::Net::Socket::SELECT_READ))
{
throw Exception(ErrorCodes::SOCKET_TIMEOUT,
"Read timeout ({} ms) while draining from {}",
drain_timeout.totalMilliseconds(),
fd_description);
}
}
}

View File

@ -13,12 +13,6 @@ namespace DB
class IConnections : boost::noncopyable
{
public:
struct DrainCallback
{
Poco::Timespan drain_timeout;
void operator()(int fd, Poco::Timespan, const std::string & fd_description = "") const;
};
/// Send all scalars to replicas.
virtual void sendScalarsData(Scalars & data) = 0;
/// Send all content of external tables to replicas.
@ -40,7 +34,7 @@ public:
virtual Packet receivePacket() = 0;
/// Version of `receivePacket` function without locking.
virtual Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) = 0;
virtual Packet receivePacketUnlocked(AsyncCallback async_callback) = 0;
/// Break all active connections.
virtual void disconnect() = 0;

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
: settings(settings_)
{
connection.setThrottler(throttler);
@ -33,7 +33,7 @@ MultiplexedConnections::MultiplexedConnections(Connection & connection, const Se
MultiplexedConnections::MultiplexedConnections(std::shared_ptr<Connection> connection_ptr_, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
: settings(settings_)
, connection_ptr(connection_ptr_)
{
connection_ptr->setThrottler(throttler);
@ -46,8 +46,9 @@ MultiplexedConnections::MultiplexedConnections(std::shared_ptr<Connection> conne
}
MultiplexedConnections::MultiplexedConnections(
std::vector<IConnectionPool::Entry> && connections, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_)
{
/// If we didn't get any connections from pool and getMany() did not throw exceptions, this means that
/// `skip_unavailable_shards` was set. Then just return.
@ -206,7 +207,7 @@ void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadRes
Packet MultiplexedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
Packet packet = receivePacketUnlocked({}, false /* is_draining */);
Packet packet = receivePacketUnlocked({});
return packet;
}
@ -254,7 +255,7 @@ Packet MultiplexedConnections::drain()
while (hasActiveConnections())
{
Packet packet = receivePacketUnlocked(DrainCallback{drain_timeout}, true /* is_draining */);
Packet packet = receivePacketUnlocked({});
switch (packet.type)
{
@ -304,14 +305,14 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
return buf.str();
}
Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callback, bool is_draining)
Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callback)
{
if (!sent_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot receive packets: no query sent.");
if (!hasActiveConnections())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No more packets are available.");
ReplicaState & state = getReplicaForReading(is_draining);
ReplicaState & state = getReplicaForReading();
current_connection = state.connection;
if (current_connection == nullptr)
throw Exception(ErrorCodes::NO_AVAILABLE_REPLICA, "Logical error: no available replica");
@ -366,10 +367,9 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
return packet;
}
MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading(bool is_draining)
MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading()
{
/// Fast path when we only focus on one replica and are not draining the connection.
if (replica_states.size() == 1 && !is_draining)
if (replica_states.size() == 1)
return replica_states[0];
Poco::Net::Socket::SocketList read_list;
@ -390,7 +390,7 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
auto timeout = is_draining ? drain_timeout : receive_timeout;
auto timeout = settings.receive_timeout;
int n = 0;
/// EINTR loop
@ -417,9 +417,7 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
break;
}
/// We treat any error as timeout for simplicity.
/// And we also check if read_list is still empty just in case.
if (n <= 0 || read_list.empty())
if (n == 0)
{
const auto & addresses = dumpAddressesUnlocked();
for (ReplicaState & state : replica_states)
@ -438,7 +436,9 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
}
}
/// TODO Motivation of rand is unclear.
/// TODO Absolutely wrong code: read_list could be empty; motivation of rand is unclear.
/// This code path is disabled by default.
auto & socket = read_list[thread_local_rng() % read_list.size()];
if (fd_to_replica_state_idx.empty())
{

View File

@ -65,7 +65,7 @@ public:
void setReplicaInfo(ReplicaInfo value) override { replica_info = value; }
private:
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
Packet receivePacketUnlocked(AsyncCallback async_callback) override;
/// Internal version of `dumpAddresses` function without locking.
std::string dumpAddressesUnlocked() const;
@ -78,18 +78,13 @@ private:
};
/// Get a replica where you can read the data.
ReplicaState & getReplicaForReading(bool is_draining);
ReplicaState & getReplicaForReading();
/// Mark the replica as invalid.
void invalidateReplica(ReplicaState & replica_state);
const Settings & settings;
/// The following two fields are from settings but can be referenced outside the lifetime of
/// settings when connection is drained asynchronously.
Poco::Timespan drain_timeout;
Poco::Timespan receive_timeout;
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count = 0;

View File

@ -84,10 +84,6 @@
M(MMappedFileBytes, "Sum size of mmapped file regions.") \
M(MMappedAllocs, "Total number of mmapped allocations") \
M(MMappedAllocBytes, "Sum bytes of mmapped allocations") \
M(AsyncDrainedConnections, "Number of connections drained asynchronously.") \
M(ActiveAsyncDrainedConnections, "Number of active connections drained asynchronously.") \
M(SyncDrainedConnections, "Number of connections drained synchronously.") \
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
M(PendingAsyncInsert, "Number of asynchronous inserts that are waiting for flush.") \
M(KafkaConsumers, "Number of active Kafka consumers") \

View File

@ -31,7 +31,7 @@ public:
* max_elements_size == 0 means no elements size restrictions.
*/
explicit LRUCachePolicy(size_t max_size_, size_t max_elements_size_ = 0, OnWeightLossFunction on_weight_loss_function_ = {})
: max_size(std::max(static_cast<size_t>(1), max_size_)), max_elements_size(max_elements_size_)
: max_size(std::max(1uz, max_size_)), max_elements_size(max_elements_size_)
{
Base::on_weight_loss_function = on_weight_loss_function_;
}

View File

@ -1,7 +1,76 @@
#include <Common/LoggingFormatStringHelpers.h>
#include <Common/SipHash.h>
#include <Common/thread_local_rng.h>
[[noreturn]] void functionThatFailsCompilationOfConstevalFunctions(const char * error)
{
throw std::runtime_error(error);
}
std::unordered_map<UInt64, std::pair<time_t, size_t>> LogFrequencyLimiterIml::logged_messages;
time_t LogFrequencyLimiterIml::last_cleanup = 0;
std::mutex LogFrequencyLimiterIml::mutex;
void LogFrequencyLimiterIml::log(Poco::Message & message)
{
std::string_view pattern = message.getFormatString();
if (pattern.empty())
{
/// Do not filter messages without a format string
if (auto * channel = logger->getChannel())
channel->log(message);
return;
}
SipHash hash;
hash.update(logger->name());
/// Format strings are compile-time constants, so they are uniquely identified by pointer and size
hash.update(pattern.data());
hash.update(pattern.size());
time_t now = time(nullptr);
size_t skipped_similar_messages = 0;
bool need_cleanup;
bool need_log;
{
std::lock_guard lock(mutex);
need_cleanup = last_cleanup + 300 <= now;
auto & info = logged_messages[hash.get64()];
need_log = info.first + min_interval_s <= now;
if (need_log)
{
skipped_similar_messages = info.second;
info.first = now;
info.second = 0;
}
else
{
++info.second;
}
}
/// We don't need all threads to do cleanup, just randomize
if (need_cleanup && thread_local_rng() % 100 == 0)
cleanup();
/// The message it too frequent, skip it for now
/// NOTE It's not optimal because we format the message first and only then check if we need to actually write it, see LOG_IMPL macro
if (!need_log)
return;
if (skipped_similar_messages)
message.appendText(fmt::format(" (skipped {} similar messages)", skipped_similar_messages));
if (auto * channel = logger->getChannel())
channel->log(message);
}
void LogFrequencyLimiterIml::cleanup(time_t too_old_threshold_s)
{
time_t now = time(nullptr);
time_t old = now - too_old_threshold_s;
std::lock_guard lock(mutex);
std::erase_if(logged_messages, [old](const auto & elem) { return elem.second.first < old; });
last_cleanup = now;
}

View File

@ -1,6 +1,11 @@
#pragma once
#include <base/defines.h>
#include <base/types.h>
#include <fmt/format.h>
#include <mutex>
#include <unordered_map>
#include <Poco/Logger.h>
#include <Poco/Message.h>
struct PreformattedMessage;
consteval void formatStringCheckArgsNumImpl(std::string_view str, size_t nargs);
@ -156,3 +161,59 @@ struct CheckArgsNumHelperImpl
template <typename... Args> using CheckArgsNumHelper = CheckArgsNumHelperImpl<std::type_identity_t<Args>...>;
template <typename... Args> void formatStringCheckArgsNum(CheckArgsNumHelper<Args...>, Args &&...) {}
/// This wrapper helps to avoid too frequent and noisy log messages.
/// For each pair (logger_name, format_string) it remembers when such a message was logged the last time.
/// The message will not be logged again if less than min_interval_s seconds passed since the previously logged message.
class LogFrequencyLimiterIml
{
/// Hash(logger_name, format_string) -> (last_logged_time_s, skipped_messages_count)
static std::unordered_map<UInt64, std::pair<time_t, size_t>> logged_messages;
static time_t last_cleanup;
static std::mutex mutex;
Poco::Logger * logger;
time_t min_interval_s;
public:
LogFrequencyLimiterIml(Poco::Logger * logger_, time_t min_interval_s_) : logger(logger_), min_interval_s(min_interval_s_) {}
LogFrequencyLimiterIml & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { return logger->is(priority); }
LogFrequencyLimiterIml * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(Poco::Message & message);
/// Clears messages that were logged last time more than too_old_threshold_s seconds ago
static void cleanup(time_t too_old_threshold_s = 600);
Poco::Logger * getLogger() { return logger; }
};
/// This wrapper is useful to save formatted message into a String before sending it to a logger
class LogToStrImpl
{
String & out_str;
Poco::Logger * logger;
std::unique_ptr<LogFrequencyLimiterIml> maybe_nested;
bool propagate_to_actual_log = true;
public:
LogToStrImpl(String & out_str_, Poco::Logger * logger_) : out_str(out_str_), logger(logger_) {}
LogToStrImpl(String & out_str_, std::unique_ptr<LogFrequencyLimiterIml> && maybe_nested_)
: out_str(out_str_), logger(maybe_nested_->getLogger()), maybe_nested(std::move(maybe_nested_)) {}
LogToStrImpl & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { propagate_to_actual_log &= logger->is(priority); return true; }
LogToStrImpl * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(Poco::Message & message)
{
out_str = message.getText();
if (!propagate_to_actual_log)
return;
if (maybe_nested)
maybe_nested->log(message);
else if (auto * channel = logger->getChannel())
channel->log(message);
}
};

View File

@ -92,7 +92,7 @@ bool Span::addAttributeImpl(std::string_view name, std::string_view value) noexc
return true;
}
SpanHolder::SpanHolder(std::string_view _operation_name)
SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
{
if (!current_thread_trace_context.isTraceEnabled())
{
@ -106,6 +106,7 @@ SpanHolder::SpanHolder(std::string_view _operation_name)
this->parent_span_id = current_thread_trace_context.span_id;
this->span_id = thread_local_rng(); // create a new id for this span
this->operation_name = _operation_name;
this->kind = _kind;
this->start_time_us
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();

View File

@ -13,6 +13,29 @@ class ReadBuffer;
namespace OpenTelemetry
{
/// See https://opentelemetry.io/docs/reference/specification/trace/api/#spankind
enum SpanKind
{
/// Default value. Indicates that the span represents an internal operation within an application,
/// as opposed to an operations with remote parents or children.
INTERNAL = 0,
/// Indicates that the span covers server-side handling of a synchronous RPC or other remote request.
/// This span is often the child of a remote CLIENT span that was expected to wait for a response.
SERVER = 1,
/// Indicates that the span describes a request to some remote service.
/// This span is usually the parent of a remote SERVER span and does not end until the response is received.
CLIENT = 2,
/// Indicates that the span describes the initiators of an asynchronous request. This parent span will often end before the corresponding child CONSUMER span, possibly even before the child span starts.
/// In messaging scenarios with batching, tracing individual messages requires a new PRODUCER span per message to be created.
PRODUCER = 3,
/// Indicates that the span describes a child of an asynchronous PRODUCER request
CONSUMER = 4
};
struct Span
{
UUID trace_id{};
@ -21,6 +44,7 @@ struct Span
String operation_name;
UInt64 start_time_us = 0;
UInt64 finish_time_us = 0;
SpanKind kind = INTERNAL;
Map attributes;
/// Following methods are declared as noexcept to make sure they're exception safe.
@ -155,7 +179,7 @@ using TracingContextHolderPtr = std::unique_ptr<TracingContextHolder>;
/// Once it's created or destructed, it automatically maitains the tracing context on the thread that it lives.
struct SpanHolder : public Span
{
SpanHolder(std::string_view);
SpanHolder(std::string_view, SpanKind _kind = INTERNAL);
~SpanHolder();
/// Finish a span explicitly if needed.

View File

@ -166,6 +166,8 @@
\
M(WaitMarksLoadMicroseconds, "Time spent loading marks") \
M(BackgroundLoadingMarksTasks, "Number of background tasks for loading marks") \
M(LoadedMarksCount, "Number of marks loaded (total across columns).") \
M(LoadedMarksMemoryBytes, "Size of in-memory representations of loaded marks.") \
\
M(Merge, "Number of launched background merges.") \
M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \

View File

@ -97,7 +97,7 @@ private:
* Note: "SM" in the commentaries below stands for STATE MODIFICATION
*/
RWLockImpl::LockHolder
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms)
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms, bool throw_in_fast_path)
{
const auto lock_deadline_tp =
(lock_timeout_ms == std::chrono::milliseconds(0))
@ -130,11 +130,19 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
if (owner_query_it != owner_queries.end())
{
if (wrlock_owner != writers_queue.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode");
{
if (throw_in_fast_path)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode");
return nullptr;
}
/// Lock upgrading is not supported
if (type == Write)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked");
{
if (throw_in_fast_path)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked");
return nullptr;
}
/// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
++owner_query_it->second; /// SM1: nothrow

View File

@ -56,7 +56,7 @@ public:
/// Empty query_id means the lock is acquired from outside of query context (e.g. in a background thread).
LockHolder getLock(Type type, const String & query_id,
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0));
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0), bool throw_in_fast_path = true);
/// Use as query_id to acquire a lock outside the query context.
inline static const String NO_QUERY = String();

View File

@ -10,35 +10,16 @@
namespace Poco { class Logger; }
/// This wrapper is useful to save formatted message into a String before sending it to a logger
class LogToStrImpl
{
String & out_str;
Poco::Logger * logger;
bool propagate_to_actual_log = true;
public:
LogToStrImpl(String & out_str_, Poco::Logger * logger_) : out_str(out_str_) , logger(logger_) {}
LogToStrImpl & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { propagate_to_actual_log &= logger->is(priority); return true; }
LogToStrImpl * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(const Poco::Message & message)
{
out_str = message.getText();
if (!propagate_to_actual_log)
return;
if (auto * channel = logger->getChannel())
channel->log(message);
}
};
#define LogToStr(x, y) std::make_unique<LogToStrImpl>(x, y)
#define LogFrequencyLimiter(x, y) std::make_unique<LogFrequencyLimiterIml>(x, y)
namespace
{
[[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; };
[[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); };
[[maybe_unused]] std::unique_ptr<LogToStrImpl> getLogger(std::unique_ptr<LogToStrImpl> && logger) { return logger; };
[[maybe_unused]] std::unique_ptr<LogFrequencyLimiterIml> getLogger(std::unique_ptr<LogFrequencyLimiterIml> && logger) { return logger; };
}
#define LOG_IMPL_FIRST_ARG(X, ...) X

View File

@ -4,6 +4,15 @@
#include <gtest/gtest.h>
template <char ... symbols>
void test_find_first_not(const std::string & haystack, std::size_t expected_pos)
{
const char * begin = haystack.data();
const char * end = haystack.data() + haystack.size();
ASSERT_EQ(begin + expected_pos, find_first_not_symbols<symbols...>(begin, end));
}
TEST(FindSymbols, SimpleTest)
{
std::string s = "Hello, world! Goodbye...";
@ -36,3 +45,58 @@ TEST(FindSymbols, SimpleTest)
ASSERT_EQ(vals, (std::vector<std::string>{"s", "String"}));
}
}
TEST(FindNotSymbols, AllSymbolsPresent)
{
std::string str_with_17_bytes = "hello world hello";
std::string str_with_16_bytes = {str_with_17_bytes.begin(), str_with_17_bytes.end() - 1u};
std::string str_with_15_bytes = {str_with_16_bytes.begin(), str_with_16_bytes.end() - 1u};
/*
* The below variations will choose different implementation strategies:
* 1. Loop method only because it does not contain enough bytes for SSE 4.2
* 2. SSE4.2 only since string contains exactly 16 bytes
* 3. SSE4.2 + Loop method will take place because only first 16 bytes are treated by SSE 4.2 and remaining bytes is treated by loop
*
* Below code asserts that all calls return the ::end of the input string. This was not true prior to this fix as mentioned in PR #47304
* */
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(str_with_15_bytes, str_with_15_bytes.size());
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(str_with_16_bytes, str_with_16_bytes.size());
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(str_with_17_bytes, str_with_17_bytes.size());
}
TEST(FindNotSymbols, NoSymbolsMatch)
{
std::string s = "abcdefg";
// begin should be returned since the first character of the string does not match any of the below symbols
test_find_first_not<'h', 'i', 'j'>(s, 0u);
}
TEST(FindNotSymbols, ExtraSymbols)
{
std::string s = "hello_world_hello";
test_find_first_not<'h', 'e', 'l', 'o', ' '>(s, 5u);
}
TEST(FindNotSymbols, EmptyString)
{
std::string s;
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(s, s.size());
}
TEST(FindNotSymbols, SingleChar)
{
std::string s = "a";
test_find_first_not<'a'>(s, s.size());
}
TEST(FindNotSymbols, NullCharacter)
{
// special test to ensure only the passed template arguments are used as needles
// since current find_first_symbols implementation takes in 16 characters and defaults
// to \0.
std::string s("abcdefg\0x", 9u);
test_find_first_not<'a', 'b', 'c', 'd', 'e', 'f', 'g'>(s, 7u);
}

View File

@ -57,7 +57,6 @@ class IColumn;
M(Milliseconds, connect_timeout_with_failover_secure_ms, 100, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "Timeout for receiving data from network, in seconds. If no bytes were received in this interval, exception is thrown. If you set this setting on client, the 'send_timeout' for the socket will be also set on the corresponding connection end on the server.", 0) \
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "Timeout for sending data to network, in seconds. If client needs to sent some data, but it did not able to send any bytes in this interval, exception is thrown. If you set this setting on client, the 'receive_timeout' for the socket will be also set on the corresponding connection end on the server.", 0) \
M(Seconds, drain_timeout, 3, "Timeout for draining remote connections, -1 means synchronous drain without ignoring errors", 0) \
M(Seconds, tcp_keep_alive_timeout, 290 /* less than DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC */, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
M(Milliseconds, hedged_connection_timeout_ms, 100, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
M(Milliseconds, receive_data_timeout_ms, 2000, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
@ -284,8 +283,6 @@ class IColumn;
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \
M(Milliseconds, sleep_after_receiving_query_ms, 0, "Time to sleep after receiving query in TCPHandler", 0) \
M(UInt64, unknown_packet_in_send_data, 0, "Send unknown packet instead of data Nth data packet", 0) \
/** Settings for testing connection collector */ \
M(Milliseconds, sleep_in_receive_cancel_ms, 0, "Time to sleep in receiving cancel in TCPHandler", 0) \
\
M(Bool, insert_allow_materialized_columns, false, "If setting is enabled, Allow materialized columns in INSERT.", 0) \
M(Seconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.", 0) \
@ -759,7 +756,7 @@ class IColumn;
MAKE_OBSOLETE(M, Seconds, temporary_live_view_timeout, 1) \
MAKE_OBSOLETE(M, Milliseconds, async_insert_cleanup_timeout_ms, 1000) \
MAKE_OBSOLETE(M, Bool, optimize_fuse_sum_count_avg, 0) \
MAKE_OBSOLETE(M, Seconds, drain_timeout, 3) \
/** The section above is for obsolete settings. Do not add anything there. */
@ -803,6 +800,7 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \

View File

@ -68,6 +68,15 @@ public:
return disk.writeFile(path, buf_size, mode, settings);
}
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override
{
disk.writeFileUsingCustomWriteObject(path, mode, std::move(custom_write_object_function));
}
void removeFile(const std::string & path) override
{
disk.removeFile(path);

View File

@ -38,6 +38,15 @@ void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const Strin
out->finalize();
}
void IDisk::writeFileUsingCustomWriteObject(
const String &, WriteMode, std::function<size_t(const StoredObject &, WriteMode, const std::optional<ObjectAttributes> &)>)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method `writeFileUsingCustomWriteObject()` is not implemented for disk: {}",
getDataSourceDescription().type);
}
DiskTransactionPtr IDisk::createTransaction()
{

View File

@ -209,6 +209,15 @@ public:
WriteMode mode = WriteMode::Rewrite,
const WriteSettings & settings = {}) = 0;
/// Write a file using a custom function to write an object to the disk's object storage.
/// This method is alternative to writeFile(), the difference is that writeFile() calls IObjectStorage::writeObject()
/// to write an object to the object storage while this method allows to specify a callback for that.
virtual void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function);
/// Remove file. Throws exception if file doesn't exists or it's a directory.
/// Return whether file was finally removed. (For remote disks it is not always removed).
virtual void removeFile(const String & path) = 0;

View File

@ -68,6 +68,13 @@ public:
const WriteSettings & settings = {},
bool autocommit = true) = 0;
/// Write a file using a custom function to write an object to the disk's object storage.
virtual void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) = 0;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
virtual void removeFile(const std::string & path) = 0;

View File

@ -577,6 +577,17 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
return result;
}
void DiskObjectStorage::writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function)
{
LOG_TEST(log, "Write file: {}", path);
auto transaction = createObjectStorageTransaction();
return transaction->writeFileUsingCustomWriteObject(path, mode, std::move(custom_write_object_function));
}
void DiskObjectStorage::applyNewSettings(
const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &)
{

View File

@ -152,6 +152,12 @@ public:
WriteMode mode,
const WriteSettings & settings) override;
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;

View File

@ -670,6 +670,44 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
}
void DiskObjectStorageTransaction::writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function)
{
/// This function is a simplified and adapted version of DiskObjectStorageTransaction::writeFile().
auto blob_name = object_storage.generateBlobNameForPath(path);
std::optional<ObjectAttributes> object_attributes;
if (metadata_helper)
{
auto revision = metadata_helper->revision_counter + 1;
metadata_helper->revision_counter++;
object_attributes = {
{"path", path}
};
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
}
auto object = StoredObject::create(object_storage, fs::path(metadata_storage.getObjectStorageRootPath()) / blob_name);
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object);
operations_to_execute.emplace_back(std::move(write_operation));
/// We always use mode Rewrite because we simulate append using metadata and different files
size_t object_size = std::move(custom_write_object_function)(object, WriteMode::Rewrite, object_attributes);
/// Create metadata (see create_metadata_callback in DiskObjectStorageTransaction::writeFile()).
if (mode == WriteMode::Rewrite)
metadata_transaction->createMetadataFile(path, blob_name, object_size);
else
metadata_transaction->addBlobToMetadata(path, blob_name, object_size);
metadata_transaction->commit();
}
void DiskObjectStorageTransaction::createHardLink(const std::string & src_path, const std::string & dst_path)
{
operations_to_execute.emplace_back(

View File

@ -99,6 +99,13 @@ public:
const WriteSettings & settings = {},
bool autocommit = true) override;
/// Write a file using a custom function to write an object to the disk's object storage.
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override;
void removeFile(const std::string & path) override;
void removeFileIfExists(const std::string & path) override;
void removeDirectory(const std::string & path) override;

View File

@ -1,7 +1,4 @@
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Common/ProfileEvents.h>
#include <Interpreters/Context.h>
#if USE_AWS_S3
@ -18,10 +15,12 @@
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3/copyS3File.h>
#include <Interpreters/Context.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Common/getRandomASCIIString.h>
#include <Common/ProfileEvents.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/logger_useful.h>
#include <Common/MultiVersion.h>

View File

@ -0,0 +1,69 @@
#include <Disks/ObjectStorages/S3/copyS3FileToDisk.h>
#if USE_AWS_S3
#include <IO/S3/getObjectInfo.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/copyData.h>
#include <IO/S3/copyS3File.h>
namespace DB
{
void copyS3FileToDisk(
const std::shared_ptr<const S3::Client> & s3_client,
const String & src_bucket,
const String & src_key,
const std::optional<String> & version_id,
std::optional<size_t> src_offset,
std::optional<size_t> src_size,
DiskPtr destination_disk,
const String & destination_path,
WriteMode write_mode,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const S3Settings::RequestSettings & request_settings,
ThreadPoolCallbackRunner<void> scheduler)
{
if (!src_offset)
src_offset = 0;
if (!src_size)
src_size = S3::getObjectSize(*s3_client, src_bucket, src_key, version_id.value_or(""), request_settings) - *src_offset;
auto destination_data_source_description = destination_disk->getDataSourceDescription();
if (destination_data_source_description != DataSourceDescription{DataSourceType::S3, s3_client->getInitialEndpoint(), false, false})
{
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} through buffers", src_key, destination_disk->getName());
ReadBufferFromS3 read_buffer{s3_client, src_bucket, src_key, {}, request_settings, read_settings};
if (*src_offset)
read_buffer.seek(*src_offset, SEEK_SET);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(*src_size, DBMS_DEFAULT_BUFFER_SIZE), write_mode, write_settings);
copyData(read_buffer, *write_buffer, *src_size);
write_buffer->finalize();
return;
}
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} using native copy", src_key, destination_disk->getName());
String dest_bucket = destination_disk->getObjectStorage()->getObjectsNamespace();
auto custom_write_object = [&](const StoredObject & object_, WriteMode write_mode_, const std::optional<ObjectAttributes> & object_attributes_) -> size_t
{
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
chassert(write_mode_ == WriteMode::Rewrite);
copyS3File(s3_client, src_bucket, src_key, *src_offset, *src_size, dest_bucket, /* dest_key= */ object_.absolute_path,
request_settings, object_attributes_, scheduler, /* for_disk_s3= */ true);
return *src_size;
};
destination_disk->writeFileUsingCustomWriteObject(destination_path, write_mode, custom_write_object);
}
}
#endif

View File

@ -0,0 +1,36 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <Disks/IDisk.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
namespace DB
{
/// Copies an object from S3 bucket to a disk of any type.
/// Depending on the disk the function can either do copying though buffers
/// (i.e. download the object by portions and then write those portions to the specified disk),
/// or perform a server-side copy.
void copyS3FileToDisk(
const std::shared_ptr<const S3::Client> & s3_client,
const String & src_bucket,
const String & src_key,
const std::optional<String> & version_id,
std::optional<size_t> src_offset,
std::optional<size_t> src_size,
DiskPtr destination_disk,
const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite,
const ReadSettings & read_settings = {},
const WriteSettings & write_settings = {},
const S3Settings::RequestSettings & request_settings = {},
ThreadPoolCallbackRunner<void> scheduler = {});
}
#endif

View File

@ -117,6 +117,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;

View File

@ -211,6 +211,7 @@ struct FormatSettings
std::unordered_set<int> skip_row_groups = {};
bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true;
UInt64 max_block_size = 8192;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
} parquet;

View File

@ -0,0 +1,103 @@
#include <Formats/MarkInCompressedFile.h>
#include <Common/BitHelpers.h>
namespace DB
{
// Write a range of bits in a bit-packed array.
// The array must be overallocated by one element.
// The bit range must be pre-filled with zeros.
void writeBits(UInt64 * dest, size_t bit_offset, UInt64 value)
{
size_t mod = bit_offset % 64;
dest[bit_offset / 64] |= value << mod;
if (mod)
dest[bit_offset / 64 + 1] |= value >> (64 - mod);
}
// The array must be overallocated by one element.
UInt64 readBits(const UInt64 * src, size_t bit_offset, size_t num_bits)
{
size_t mod = bit_offset % 64;
UInt64 value = src[bit_offset / 64] >> mod;
if (mod)
value |= src[bit_offset / 64 + 1] << (64 - mod);
return value & maskLowBits<UInt64>(num_bits);
}
MarksInCompressedFile::MarksInCompressedFile(const PlainArray & marks)
: num_marks(marks.size()), blocks((marks.size() + MARKS_PER_BLOCK - 1) / MARKS_PER_BLOCK, BlockInfo{})
{
if (num_marks == 0)
{
return;
}
// First pass: calculate layout of all blocks and total memory required.
size_t packed_bits = 0;
for (size_t block_idx = 0; block_idx < blocks.size(); ++block_idx)
{
BlockInfo & block = blocks[block_idx];
block.bit_offset_in_packed_array = packed_bits;
size_t max_x = 0;
size_t max_y = 0;
size_t num_marks_in_this_block = std::min(MARKS_PER_BLOCK, num_marks - block_idx * MARKS_PER_BLOCK);
for (size_t i = 0; i < num_marks_in_this_block; ++i)
{
const auto & mark = marks[block_idx * MARKS_PER_BLOCK + i];
block.min_x = std::min(block.min_x, mark.offset_in_compressed_file);
max_x = std::max(max_x, mark.offset_in_compressed_file);
block.min_y = std::min(block.min_y, mark.offset_in_decompressed_block);
max_y = std::max(max_y, mark.offset_in_decompressed_block);
block.trailing_zero_bits_in_y
= std::min(block.trailing_zero_bits_in_y, static_cast<UInt8>(getTrailingZeroBits(mark.offset_in_decompressed_block)));
}
block.bits_for_x = sizeof(size_t) * 8 - getLeadingZeroBits(max_x - block.min_x);
block.bits_for_y = sizeof(size_t) * 8 - getLeadingZeroBits((max_y - block.min_y) >> block.trailing_zero_bits_in_y);
packed_bits += num_marks_in_this_block * (block.bits_for_x + block.bits_for_y);
}
// Overallocate by +1 element to let the bit packing/unpacking do less bounds checking.
size_t packed_length = (packed_bits + 63) / 64 + 1;
packed.reserve_exact(packed_length);
packed.resize_fill(packed_length);
// Second pass: write out the packed marks.
for (size_t idx = 0; idx < num_marks; ++idx)
{
const auto & mark = marks[idx];
auto [block, offset] = lookUpMark(idx);
writeBits(packed.data(), offset, mark.offset_in_compressed_file - block->min_x);
writeBits(
packed.data(),
offset + block->bits_for_x,
(mark.offset_in_decompressed_block - block->min_y) >> block->trailing_zero_bits_in_y);
}
}
MarkInCompressedFile MarksInCompressedFile::get(size_t idx) const
{
auto [block, offset] = lookUpMark(idx);
size_t x = block->min_x + readBits(packed.data(), offset, block->bits_for_x);
size_t y = block->min_y + (readBits(packed.data(), offset + block->bits_for_x, block->bits_for_y) << block->trailing_zero_bits_in_y);
return MarkInCompressedFile{.offset_in_compressed_file = x, .offset_in_decompressed_block = y};
}
std::tuple<const MarksInCompressedFile::BlockInfo *, size_t> MarksInCompressedFile::lookUpMark(size_t idx) const
{
size_t block_idx = idx / MARKS_PER_BLOCK;
const BlockInfo & block = blocks[block_idx];
size_t offset = block.bit_offset_in_packed_array + (idx - block_idx * MARKS_PER_BLOCK) * (block.bits_for_x + block.bits_for_y);
return {&block, offset};
}
size_t MarksInCompressedFile::approximateMemoryUsage() const
{
return sizeof(*this) + blocks.size() * sizeof(blocks[0]) + packed.size() * sizeof(packed[0]);
}
}

View File

@ -2,8 +2,8 @@
#include <tuple>
#include <base/types.h>
#include <IO/WriteHelpers.h>
#include <base/types.h>
#include <Common/PODArray.h>
@ -23,15 +23,9 @@ struct MarkInCompressedFile
return std::tie(offset_in_compressed_file, offset_in_decompressed_block)
== std::tie(rhs.offset_in_compressed_file, rhs.offset_in_decompressed_block);
}
bool operator!=(const MarkInCompressedFile & rhs) const
{
return !(*this == rhs);
}
bool operator!=(const MarkInCompressedFile & rhs) const { return !(*this == rhs); }
auto asTuple() const
{
return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block);
}
auto asTuple() const { return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block); }
String toString() const
{
@ -40,20 +34,87 @@ struct MarkInCompressedFile
String toStringWithRows(size_t rows_num) const
{
return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + "," + DB::toString(rows_num) + ")";
return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + ","
+ DB::toString(rows_num) + ")";
}
};
class MarksInCompressedFile : public PODArray<MarkInCompressedFile>
/**
* In-memory representation of an array of marks.
*
* Uses an ad-hoc compression scheme that decreases memory usage while allowing
* random access in O(1) time.
* This is independent from the marks *file* format, which may be uncompressed
* or use a different compression method.
*
* Typical memory usage:
* * ~3 bytes/mark for integer columns
* * ~5 bytes/mark for string columns
* * ~0.3 bytes/mark for trivial marks in auxiliary dict files of LowCardinality columns
*/
class MarksInCompressedFile
{
public:
explicit MarksInCompressedFile(size_t n) : PODArray(n) {}
using PlainArray = PODArray<MarkInCompressedFile>;
void read(ReadBuffer & buffer, size_t from, size_t count)
MarksInCompressedFile(const PlainArray & marks);
MarkInCompressedFile get(size_t idx) const;
size_t approximateMemoryUsage() const;
private:
/** Throughout this class:
* * "x" stands for offset_in_compressed_file,
* * "y" stands for offset_in_decompressed_block.
*/
/** We need to store a sequence of marks, each consisting of two 64-bit integers:
* offset_in_compressed_file and offset_in_decompressed_block. We'll call them x and y for
* convenience, since compression doesn't care what they mean. The compression exploits the
* following regularities:
* * y is usually zero.
* * x usually increases steadily.
* * Differences between x values in nearby marks usually fit in much fewer than 64 bits.
*
* We split the sequence of marks into blocks, each containing MARKS_PER_BLOCK marks.
* (Not to be confused with data blocks.)
* For each mark, we store the difference [value] - [min value in the block], for each of the
* two values in the mark. Each block specifies the number of bits to use for these differences
* for all marks in this block.
* The smaller the blocks the fewer bits are required, but the bigger the relative overhead of
* block headers.
*
* Packed marks and block headers all live in one contiguous array.
*/
struct BlockInfo
{
buffer.readStrict(reinterpret_cast<char *>(data() + from), count * sizeof(MarkInCompressedFile));
}
// Min offset_in_compressed_file and offset_in_decompressed_block, correspondingly.
size_t min_x = UINT64_MAX;
size_t min_y = UINT64_MAX;
// Place in `packed` where this block start.
size_t bit_offset_in_packed_array;
// How many bits each mark takes. These numbers are bit-packed in the `packed` array.
// Can be zero. (Especially for y, which is typically all zeroes.)
UInt8 bits_for_x;
UInt8 bits_for_y;
// The `y` values should be <<'ed by this amount.
// Useful for integer columns when marks granularity is a power of 2; in this case all
// offset_in_decompressed_block values are divisible by 2^15 or so.
UInt8 trailing_zero_bits_in_y = 63;
};
static constexpr size_t MARKS_PER_BLOCK = 256;
size_t num_marks;
PODArray<BlockInfo> blocks;
PODArray<UInt64> packed;
// Mark idx -> {block info, bit offset in `packed`}.
std::tuple<const BlockInfo *, size_t> lookUpMark(size_t idx) const;
};
}

View File

@ -0,0 +1,52 @@
#include <random>
#include <gtest/gtest.h>
#include <Formats/MarkInCompressedFile.h>
using namespace DB;
TEST(Marks, Compression)
{
std::random_device dev;
std::mt19937 rng(dev());
auto gen = [&](size_t count, size_t max_x_increment, size_t max_y_increment)
{
size_t x = 0, y = 0;
PODArray<MarkInCompressedFile> plain(count);
for (int i = 0; i < count; ++i)
{
x += rng() % (max_x_increment + 1);
y += rng() % (max_y_increment + 1);
plain[i] = MarkInCompressedFile{.offset_in_compressed_file = x, .offset_in_decompressed_block = y};
}
return plain;
};
auto test = [](const PODArray<MarkInCompressedFile> & plain, size_t max_bits_per_mark)
{
PODArray<MarkInCompressedFile> copy;
copy.assign(plain); // paranoid in case next line mutates it
MarksInCompressedFile marks(copy);
for (size_t i = 0; i < plain.size(); ++i)
ASSERT_EQ(marks.get(i), plain[i]);
EXPECT_LE((marks.approximateMemoryUsage() - sizeof(MarksInCompressedFile)) * 8, plain.size() * max_bits_per_mark);
};
// Typical.
test(gen(10000, 1'000'000, 0), 30);
// Completely random 64-bit values.
test(gen(10000, UINT64_MAX - 1, UINT64_MAX - 1), 130);
// All zeros.
test(gen(10000, 0, 0), 2);
// Short.
test(gen(10, 1000, 1000), 65);
// Empty.
test(gen(0, 0, 0), 0);
}

View File

@ -53,6 +53,10 @@ struct ToDateImpl
{
static constexpr auto name = "toDate";
static inline UInt16 execute(const DecimalUtils::DecimalComponents<DateTime64> & t, const DateLUTImpl & time_zone)
{
return static_cast<UInt16>(time_zone.toDayNum(t.whole));
}
static inline UInt16 execute(Int64 t, const DateLUTImpl & time_zone)
{
return UInt16(time_zone.toDayNum(t));
@ -69,6 +73,10 @@ struct ToDateImpl
{
return d;
}
static inline DecimalUtils::DecimalComponents<DateTime64> executeExtendedResult(const DecimalUtils::DecimalComponents<DateTime64> & t, const DateLUTImpl & time_zone)
{
return {time_zone.toDayNum(t.whole), 0};
}
using FactorTransform = ZeroTransform;
};

View File

@ -285,9 +285,9 @@ struct NgramDistanceImpl
size_t first_size = dispatchSearcher(calculateHaystackStatsAndMetric<false>, data.data(), data_size, common_stats.get(), distance, nullptr);
/// For !symmetric version we should not use first_size.
if constexpr (symmetric)
res = distance * 1.f / std::max(first_size + second_size, static_cast<size_t>(1));
res = distance * 1.f / std::max(first_size + second_size, 1uz);
else
res = 1.f - distance * 1.f / std::max(second_size, static_cast<size_t>(1));
res = 1.f - distance * 1.f / std::max(second_size, 1uz);
}
else
{
@ -353,9 +353,9 @@ struct NgramDistanceImpl
/// For !symmetric version we should not use haystack_stats_size.
if constexpr (symmetric)
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, static_cast<size_t>(1));
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, 1uz);
else
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, static_cast<size_t>(1));
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, 1uz);
}
else
{
@ -424,7 +424,7 @@ struct NgramDistanceImpl
for (size_t j = 0; j < needle_stats_size; ++j)
--common_stats[needle_ngram_storage[j]];
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, static_cast<size_t>(1));
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, 1uz);
}
else
{
@ -471,9 +471,9 @@ struct NgramDistanceImpl
ngram_storage.get());
/// For !symmetric version we should not use haystack_stats_size.
if constexpr (symmetric)
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, static_cast<size_t>(1));
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, 1uz);
else
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, static_cast<size_t>(1));
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, 1uz);
}
else
{

View File

@ -46,36 +46,57 @@ public:
{
if constexpr (std::is_same_v<typename Transform::FactorTransform, ZeroTransform>)
return { .is_monotonic = true, .is_always_monotonic = true };
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
const DateLUTImpl * date_lut = &DateLUT::instance();
if (const auto * timezone = dynamic_cast<const TimezoneMixin *>(&type))
date_lut = &timezone->getTimeZone();
if (left.isNull() || right.isNull())
return is_not_monotonic;
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(&type))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(&type))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
const DateLUTImpl * date_lut = &DateLUT::instance();
if (const auto * timezone = dynamic_cast<const TimezoneMixin *>(&type))
date_lut = &timezone->getTimeZone();
if (left.isNull() || right.isNull())
return is_not_monotonic;
const auto * type_ptr = &type;
if (const auto * nullable_type = checkAndGetDataType<DataTypeNullable>(type_ptr))
type_ptr = nullable_type->getNestedType().get();
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(type_ptr))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(type_ptr))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDateTime>(type_ptr))
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
assert(checkAndGetDataType<DataTypeDateTime64>(type_ptr));
const auto & left_date_time = left.get<DateTime64>();
TransformDateTime64<typename Transform::FactorTransform> transformer_left(left_date_time.getScale());
const auto & right_date_time = right.get<DateTime64>();
TransformDateTime64<typename Transform::FactorTransform> transformer_right(right_date_time.getScale());
return transformer_left.execute(left_date_time.getValue(), *date_lut)
== transformer_right.execute(right_date_time.getValue(), *date_lut)
? is_monotonic : is_not_monotonic;
}
}
}

View File

@ -71,8 +71,7 @@ restoreUserDefinedSQLObjects(RestorerFromBackup & restorer, const String & data_
String function_name = unescapeForFileName(escaped_function_name);
String filepath = data_path_in_backup_fs / filename;
auto backup_entry = backup->readFile(filepath);
auto in = backup_entry->getReadBuffer();
auto in = backup->readFile(filepath);
String statement_def;
readStringUntilEOF(statement_def, *in);

View File

@ -291,7 +291,7 @@ public:
ssize_t remain_byte = src.getElementSize() - offset_byte;
if (length < 0)
{
length_byte = std::max(remain_byte + (length / word_size), static_cast<ssize_t>(0));
length_byte = std::max(remain_byte + (length / word_size), 0z);
over_bit = word_size + (length % word_size);
if (length_byte == 1 && over_bit <= offset_bit) // begin and end are in same byte AND there are no gaps
length_byte = 0;
@ -330,7 +330,7 @@ public:
size_t size = src.getElementSize();
if (length < 0)
{
length_byte = std::max(static_cast<ssize_t>(offset_byte) + (length / word_size), static_cast<ssize_t>(0));
length_byte = std::max(static_cast<ssize_t>(offset_byte) + (length / word_size), 0z);
over_bit = word_size + (length % word_size);
if (length_byte == 1 && over_bit <= offset_bit) // begin and end are in same byte AND there are no gaps
length_byte = 0;
@ -395,7 +395,7 @@ public:
}
else
{
length_byte = std::max(remain_byte + (static_cast<ssize_t>(length) / word_size), static_cast<ssize_t>(0));
length_byte = std::max(remain_byte + (static_cast<ssize_t>(length) / word_size), 0z);
over_bit = word_size + (length % word_size);
if (length_byte == 1 && over_bit <= offset_bit) // begin and end are in same byte AND there are no gaps
length_byte = 0;

View File

@ -20,5 +20,6 @@ using FunctionPositionCaseInsensitive = FunctionsStringSearch<PositionImpl<NameP
REGISTER_FUNCTION(PositionCaseInsensitive)
{
factory.registerFunction<FunctionPositionCaseInsensitive>();
factory.registerAlias("instr", NamePositionCaseInsensitive::name, FunctionFactory::CaseInsensitive);
}
}

View File

@ -106,7 +106,7 @@ void MemoryWriteBuffer::addChunk()
}
else
{
next_chunk_size = std::max(static_cast<size_t>(1), static_cast<size_t>(chunk_tail->size() * growth_rate));
next_chunk_size = std::max(1uz, static_cast<size_t>(chunk_tail->size() * growth_rate));
next_chunk_size = std::min(next_chunk_size, max_chunk_size);
}

View File

@ -259,12 +259,12 @@ namespace detail
{
try
{
callWithRedirects<true>(response, Poco::Net::HTTPRequest::HTTP_HEAD);
callWithRedirects<true>(response, Poco::Net::HTTPRequest::HTTP_HEAD, true);
break;
}
catch (const Poco::Exception & e)
{
if (i == settings.http_max_tries - 1)
if (i == settings.http_max_tries - 1 || !isRetriableError(response.getStatus()))
throw;
LOG_ERROR(log, "Failed to make HTTP_HEAD request to {}. Error: {}", uri.toString(), e.displayText());
@ -353,11 +353,12 @@ namespace detail
static bool isRetriableError(const Poco::Net::HTTPResponse::HTTPStatus http_status) noexcept
{
constexpr std::array non_retriable_errors{
static constexpr std::array non_retriable_errors{
Poco::Net::HTTPResponse::HTTPStatus::HTTP_BAD_REQUEST,
Poco::Net::HTTPResponse::HTTPStatus::HTTP_UNAUTHORIZED,
Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND,
Poco::Net::HTTPResponse::HTTPStatus::HTTP_FORBIDDEN,
Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_IMPLEMENTED,
Poco::Net::HTTPResponse::HTTPStatus::HTTP_METHOD_NOT_ALLOWED};
return std::all_of(

View File

@ -123,9 +123,8 @@ Client::Client(
{
auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(accessEndpointProvider().get());
endpoint_provider->GetBuiltInParameters().GetParameter("Region").GetString(explicit_region);
std::string endpoint;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && endpoint.find(".amazonaws.com") != std::string::npos;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(initial_endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && initial_endpoint.find(".amazonaws.com") != std::string::npos;
cache = std::make_shared<ClientCache>();
ClientCacheRegistry::instance().registerClient(cache);
@ -133,6 +132,7 @@ Client::Client(
Client::Client(const Client & other)
: Aws::S3::S3Client(other)
, initial_endpoint(other.initial_endpoint)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, max_redirects(other.max_redirects)

View File

@ -109,6 +109,9 @@ public:
}
}
/// Returns the initial endpoint.
const String & getInitialEndpoint() const { return initial_endpoint; }
/// Decorator for RetryStrategy needed for this client to work correctly.
/// We want to manually handle permanent moves (status code 301) because:
/// - redirect location is written in XML format inside the response body something that doesn't exist for HEAD
@ -198,6 +201,8 @@ private:
bool checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const;
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
String initial_endpoint;
std::string explicit_region;
mutable bool detect_region = true;

View File

@ -1442,7 +1442,8 @@ void Aggregator::prepareAggregateInstructions(
aggregate_columns[i][j] = materialized_columns.back().get();
/// Sparse columns without defaults may be handled incorrectly.
if (aggregate_columns[i][j]->getNumberOfDefaultRows() == 0)
if (aggregate_columns[i][j]->isSparse()
&& aggregate_columns[i][j]->getNumberOfDefaultRows() == 0)
allow_sparse_arguments = false;
auto full_column = allow_sparse_arguments

View File

@ -189,7 +189,7 @@ std::vector<JoinedElement> getTables(const ASTSelectQuery & select)
if (t.hasUsing())
{
if (has_using)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Multuple USING statements are not supported");
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Multiple USING statements are not supported");
has_using = true;
}

View File

@ -542,6 +542,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
OpenTelemetry::TracingContextHolder tracing_ctx_holder(__PRETTY_FUNCTION__ ,
task.entry.tracing_context,
this->context->getOpenTelemetrySpanLog());
tracing_ctx_holder.root_span.kind = OpenTelemetry::CONSUMER;
String active_node_path = task.getActiveNodePath();
String finished_node_path = task.getFinishedNodePath();

View File

@ -562,7 +562,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{
/// Allow push down and other optimizations for VIEW: replace with subquery and rewrite it.
ASTPtr view_table;
NameToNameMap parameter_values;
NameToNameMap parameter_types;
if (view)
{
@ -575,14 +574,13 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// and after query is replaced, we use these parameters to substitute in the parameterized view query
if (query_info.is_parameterized_view)
{
parameter_values = analyzeFunctionParamValues(query_ptr);
view->setParameterValues(parameter_values);
parameter_types = view->getParameterValues();
query_info.parameterized_view_values = analyzeFunctionParamValues(query_ptr);
parameter_types = view->getParameterTypes();
}
view->replaceWithSubquery(getSelectQuery(), view_table, metadata_snapshot, view->isParameterizedView());
if (query_info.is_parameterized_view)
{
view->replaceQueryParametersIfParametrizedView(query_ptr);
view->replaceQueryParametersIfParametrizedView(query_ptr, query_info.parameterized_view_values);
}
}
@ -595,7 +593,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
required_result_column_names,
table_join,
query_info.is_parameterized_view,
parameter_values,
query_info.parameterized_view_values,
parameter_types);
@ -747,7 +745,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.filter_asts.push_back(parallel_replicas_custom_filter_ast);
}
source_header = storage_snapshot->getSampleBlockForColumns(required_columns, parameter_values);
source_header = storage_snapshot->getSampleBlockForColumns(required_columns, query_info.parameterized_view_values);
}
/// Calculate structure of the result.

View File

@ -1,6 +1,11 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
namespace DB
{
@ -26,4 +31,59 @@ void InterpreterSetQuery::executeForCurrentContext()
getContext()->resetSettingsToDefaultValue(ast.default_settings);
}
static void applySettingsFromSelectWithUnion(const ASTSelectWithUnionQuery & select_with_union, ContextMutablePtr context)
{
const ASTs & children = select_with_union.list_of_selects->children;
if (children.empty())
return;
// We might have an arbitrarily complex UNION tree, so just give
// up if the last first-order child is not a plain SELECT.
// It is flattened later, when we process UNION ALL/DISTINCT.
const auto * last_select = children.back()->as<ASTSelectQuery>();
if (last_select && last_select->settings())
{
InterpreterSetQuery(last_select->settings(), context).executeForCurrentContext();
}
}
void InterpreterSetQuery::applySettingsFromQuery(const ASTPtr & ast, ContextMutablePtr context_)
{
if (!ast)
return;
if (const auto * select_query = ast->as<ASTSelectQuery>())
{
if (auto new_settings = select_query->settings())
InterpreterSetQuery(new_settings, context_).executeForCurrentContext();
}
else if (const auto * select_with_union_query = ast->as<ASTSelectWithUnionQuery>())
{
applySettingsFromSelectWithUnion(*select_with_union_query, context_);
}
else if (const auto * explain_query = ast->as<ASTExplainQuery>())
{
applySettingsFromQuery(explain_query->getExplainedQuery(), context_);
}
else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
if (query_with_output->settings_ast)
InterpreterSetQuery(query_with_output->settings_ast, context_).executeForCurrentContext();
if (const auto * create_query = ast->as<ASTCreateQuery>())
{
if (create_query->select)
{
applySettingsFromSelectWithUnion(create_query->select->as<ASTSelectWithUnionQuery &>(), context_);
}
}
}
else if (auto * insert_query = ast->as<ASTInsertQuery>())
{
context_->setInsertFormat(insert_query->format);
if (insert_query->settings_ast)
InterpreterSetQuery(insert_query->settings_ast, context_).executeForCurrentContext();
}
}
}

View File

@ -27,6 +27,9 @@ public:
bool supportsTransactions() const override { return true; }
/// To apply SETTINGS clauses from query as early as possible
static void applySettingsFromQuery(const ASTPtr & ast, ContextMutablePtr context_);
private:
ASTPtr query_ptr;
};

View File

@ -119,7 +119,9 @@ void LogicalExpressionsOptimizer::collectDisjunctiveEqualityChains()
bool found_chain = false;
auto * function = to_node->as<ASTFunction>();
if (function && function->name == "or" && function->children.size() == 1)
/// Optimization does not respect aliases properly, which can lead to MULTIPLE_EXPRESSION_FOR_ALIAS error.
/// Disable it if an expression has an alias. Proper implementation is done with the new analyzer.
if (function && function->alias.empty() && function->name == "or" && function->children.size() == 1)
{
const auto * expression_list = function->children[0]->as<ASTExpressionList>();
if (expression_list)
@ -128,14 +130,14 @@ void LogicalExpressionsOptimizer::collectDisjunctiveEqualityChains()
for (const auto & child : expression_list->children)
{
auto * equals = child->as<ASTFunction>();
if (equals && equals->name == "equals" && equals->children.size() == 1)
if (equals && equals->alias.empty() && equals->name == "equals" && equals->children.size() == 1)
{
const auto * equals_expression_list = equals->children[0]->as<ASTExpressionList>();
if (equals_expression_list && equals_expression_list->children.size() == 2)
{
/// Equality expr = xN.
const auto * literal = equals_expression_list->children[1]->as<ASTLiteral>();
if (literal)
if (literal && literal->alias.empty())
{
auto expr_lhs = equals_expression_list->children[0]->getTreeHash();
OrWithExpression or_with_expression{function, expr_lhs, function->tryGetAlias()};
@ -230,6 +232,9 @@ bool LogicalExpressionsOptimizer::mayOptimizeDisjunctiveEqualityChain(const Disj
const auto & equalities = chain.second;
const auto & equality_functions = equalities.functions;
if (settings.optimize_min_equality_disjunction_chain_length == 0)
return false;
/// For LowCardinality column, the dict is usually smaller and the index is relatively large.
/// In most cases, merging OR-chain as IN is better than converting each LowCardinality into full column individually.
/// For non-LowCardinality, we need to eliminate too short chains.

View File

@ -8,6 +8,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeEnum.h>
#include <Interpreters/Context.h>
#include <base/hex.h>
@ -20,11 +21,23 @@ namespace DB
NamesAndTypesList OpenTelemetrySpanLogElement::getNamesAndTypes()
{
auto span_kind_type = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"INTERNAL", static_cast<Int8>(OpenTelemetry::INTERNAL)},
{"SERVER", static_cast<Int8>(OpenTelemetry::SERVER)},
{"CLIENT", static_cast<Int8>(OpenTelemetry::CLIENT)},
{"PRODUCER", static_cast<Int8>(OpenTelemetry::PRODUCER)},
{"CONSUMER", static_cast<Int8>(OpenTelemetry::CONSUMER)}
}
);
return {
{"trace_id", std::make_shared<DataTypeUUID>()},
{"span_id", std::make_shared<DataTypeUInt64>()},
{"parent_span_id", std::make_shared<DataTypeUInt64>()},
{"operation_name", std::make_shared<DataTypeString>()},
{"kind", std::move(span_kind_type)},
// DateTime64 is really unwieldy -- there is no "normal" way to convert
// it to an UInt64 count of microseconds, except:
// 1) reinterpretAsUInt64(reinterpretAsFixedString(date)), which just
@ -59,6 +72,7 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(span_id);
columns[i++]->insert(parent_span_id);
columns[i++]->insert(operation_name);
columns[i++]->insert(kind);
columns[i++]->insert(start_time_us);
columns[i++]->insert(finish_time_us);
columns[i++]->insert(DateLUT::instance().toDayNum(finish_time_us / 1000000).toUnderType());

View File

@ -115,7 +115,7 @@ struct TemporaryFileStream::OutputWriter
, out_compressed_buf(*out_buf)
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
{
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to {}", path);
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to temporary file {}", path);
}
OutputWriter(std::unique_ptr<WriteBufferToFileSegment> out_buf_, const Block & header_)
@ -124,7 +124,7 @@ struct TemporaryFileStream::OutputWriter
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
{
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"),
"Writing to {}",
"Writing to temporary file {}",
static_cast<const WriteBufferToFileSegment *>(out_buf.get())->getFileName());
}

View File

@ -55,7 +55,7 @@ bool isSupportedAlterType(int type)
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context, const DDLQueryOnClusterParams & params)
{
OpenTelemetry::SpanHolder span(__FUNCTION__);
OpenTelemetry::SpanHolder span(__FUNCTION__, OpenTelemetry::PRODUCER);
if (context->getCurrentTransaction() && context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "ON CLUSTER queries inside transactions are not supported");

View File

@ -306,22 +306,6 @@ static void setQuerySpecificSettings(ASTPtr & ast, ContextMutablePtr context)
}
}
static void applySettingsFromSelectWithUnion(const ASTSelectWithUnionQuery & select_with_union, ContextMutablePtr context)
{
const ASTs & children = select_with_union.list_of_selects->children;
if (children.empty())
return;
// We might have an arbitrarily complex UNION tree, so just give
// up if the last first-order child is not a plain SELECT.
// It is flattened later, when we process UNION ALL/DISTINCT.
const auto * last_select = children.back()->as<ASTSelectQuery>();
if (last_select && last_select->settings())
{
InterpreterSetQuery(last_select->settings(), context).executeForCurrentContext();
}
}
static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const char * begin,
const char * end,
@ -483,35 +467,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter),
/// to allow settings to take effect.
if (const auto * select_query = ast->as<ASTSelectQuery>())
{
if (auto new_settings = select_query->settings())
InterpreterSetQuery(new_settings, context).executeForCurrentContext();
}
else if (const auto * select_with_union_query = ast->as<ASTSelectWithUnionQuery>())
{
applySettingsFromSelectWithUnion(*select_with_union_query, context);
}
else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
if (query_with_output->settings_ast)
InterpreterSetQuery(query_with_output->settings_ast, context).executeForCurrentContext();
InterpreterSetQuery::applySettingsFromQuery(ast, context);
if (const auto * create_query = ast->as<ASTCreateQuery>())
{
if (create_query->select)
{
applySettingsFromSelectWithUnion(create_query->select->as<ASTSelectWithUnionQuery &>(), context);
}
}
}
else if (auto * insert_query = ast->as<ASTInsertQuery>())
{
context->setInsertFormat(insert_query->format);
if (insert_query->settings_ast)
InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();
if (auto * insert_query = ast->as<ASTInsertQuery>())
insert_query->tail = istr;
}
setQuerySpecificSettings(ast, context);
@ -678,6 +637,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
}
bool can_use_query_cache =
settings.allow_experimental_query_cache && settings.use_query_cache
&& !ast->as<ASTExplainQuery>();
if (!async_insert)
{
/// We need to start the (implicit) transaction before getting the interpreter as this will get links to the latest snapshots
@ -757,7 +720,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto query_cache = context->getQueryCache();
bool read_result_from_query_cache = false; /// a query must not read from *and* write to the query cache at the same time
if (query_cache != nullptr
&& (settings.allow_experimental_query_cache && settings.use_query_cache && settings.enable_reads_from_query_cache)
&& (can_use_query_cache && settings.enable_reads_from_query_cache)
&& res.pipeline.pulling())
{
QueryCache::Key key(
@ -778,7 +741,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// then add a processor on top of the pipeline which stores the result in the query cache.
if (!read_result_from_query_cache
&& query_cache != nullptr
&& settings.allow_experimental_query_cache && settings.use_query_cache && settings.enable_writes_to_query_cache
&& can_use_query_cache && settings.enable_writes_to_query_cache
&& res.pipeline.pulling()
&& (!astContainsNonDeterministicFunctions(ast, context) || settings.query_cache_store_results_of_queries_with_nondeterministic_functions))
{
@ -946,8 +909,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto finish_callback = [elem,
context,
ast,
allow_experimental_query_cache = settings.allow_experimental_query_cache,
use_query_cache = settings.use_query_cache,
can_use_query_cache = can_use_query_cache,
enable_writes_to_query_cache = settings.enable_writes_to_query_cache,
query_cache_store_results_of_queries_with_nondeterministic_functions = settings.query_cache_store_results_of_queries_with_nondeterministic_functions,
log_queries,
@ -965,7 +927,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto query_cache = context->getQueryCache();
if (query_cache != nullptr
&& pulling_pipeline
&& allow_experimental_query_cache && use_query_cache && enable_writes_to_query_cache
&& can_use_query_cache && enable_writes_to_query_cache
&& (!astContainsNonDeterministicFunctions(ast, context) || query_cache_store_results_of_queries_with_nondeterministic_functions))
{
query_pipeline.finalizeWriteInQueryCache();

View File

@ -61,11 +61,17 @@ void addDefaultRequiredExpressionsRecursively(
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(column_default_expr);
NameSet required_columns_names = columns_context.requiredColumns();
auto required_type = std::make_shared<ASTLiteral>(columns.get(required_column_name).type->getName());
auto expr = makeASTFunction("_CAST", column_default_expr, std::make_shared<ASTLiteral>(columns.get(required_column_name).type->getName()));
auto expr = makeASTFunction("_CAST", column_default_expr, required_type);
if (is_column_in_query && convert_null_to_default)
{
expr = makeASTFunction("ifNull", std::make_shared<ASTIdentifier>(required_column_name), std::move(expr));
/// ifNull does not respect LowCardinality.
/// It may be fixed later or re-implemented properly for identical types.
expr = makeASTFunction("_CAST", std::move(expr), required_type);
}
default_expr_list_accum->children.emplace_back(setAlias(expr, required_column_name));
added_columns.emplace(required_column_name);

View File

@ -22,6 +22,8 @@
#include <filesystem>
#include <Common/logger_useful.h>
#define ORDINARY_TO_ATOMIC_PREFIX ".tmp_convert."
namespace fs = std::filesystem;
namespace DB
@ -117,6 +119,37 @@ static void checkUnsupportedVersion(ContextMutablePtr context, const String & da
"If so, you should upgrade through intermediate version.", database_name);
}
static void checkIncompleteOrdinaryToAtomicConversion(ContextPtr context, const std::map<String, String> & databases)
{
if (context->getConfigRef().has("allow_reserved_database_name_tmp_convert"))
return;
auto convert_flag_path = fs::path(context->getFlagsPath()) / "convert_ordinary_to_atomic";
if (!fs::exists(convert_flag_path))
return;
/// Flag exists. Let's check if we had an unsuccessful conversion attempt previously
for (const auto & db : databases)
{
if (!db.first.starts_with(ORDINARY_TO_ATOMIC_PREFIX))
continue;
size_t last_dot = db.first.rfind('.');
if (last_dot <= strlen(ORDINARY_TO_ATOMIC_PREFIX))
continue;
String actual_name = db.first.substr(strlen(ORDINARY_TO_ATOMIC_PREFIX), last_dot - strlen(ORDINARY_TO_ATOMIC_PREFIX));
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Found a database with special name: {}. "
"Most likely it indicates that conversion of database {} from Ordinary to Atomic "
"was interrupted or failed in the middle. You can add <allow_reserved_database_name_tmp_convert> to config.xml "
"or remove convert_ordinary_to_atomic file from flags/ directory, so the server will start forcefully. "
"After starting the server, you can finish conversion manually by moving rest of the tables from {} to {} "
"(using RENAME TABLE) and executing DROP DATABASE {} and RENAME DATABASE {} TO {}",
backQuote(db.first), backQuote(actual_name), backQuote(actual_name), backQuote(db.first),
backQuote(actual_name), backQuote(db.first), backQuote(actual_name));
}
}
void loadMetadata(ContextMutablePtr context, const String & default_database_name)
{
Poco::Logger * log = &Poco::Logger::get("loadMetadata");
@ -168,6 +201,8 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
}
}
checkIncompleteOrdinaryToAtomicConversion(context, databases);
/// clickhouse-local creates DatabaseMemory as default database by itself
/// For clickhouse-server we need create default database
bool create_default_db_if_not_exists = !default_database_name.empty();
@ -324,7 +359,7 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
database_name, fmt::join(permanently_detached_tables, ", "));
}
String tmp_name = fmt::format(".tmp_convert.{}.{}", database_name, thread_local_rng());
String tmp_name = fmt::format(ORDINARY_TO_ATOMIC_PREFIX"{}.{}", database_name, thread_local_rng());
try
{
@ -415,11 +450,13 @@ void convertDatabasesEnginesIfNeed(ContextMutablePtr context)
LOG_INFO(&Poco::Logger::get("loadMetadata"), "Found convert_ordinary_to_atomic file in flags directory, "
"will try to convert all Ordinary databases to Atomic");
fs::remove(convert_flag_path);
for (const auto & [name, _] : DatabaseCatalog::instance().getDatabases())
if (name != DatabaseCatalog::SYSTEM_DATABASE)
maybeConvertOrdinaryDatabaseToAtomic(context, name, /* tables_started */ true);
LOG_INFO(&Poco::Logger::get("loadMetadata"), "Conversion finished, removing convert_ordinary_to_atomic flag");
fs::remove(convert_flag_path);
}
void startupSystemTables()

View File

@ -93,7 +93,7 @@ static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::Ch
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data()) + chunk->offset();
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
}
return {std::move(internal_column), std::move(internal_type), column_name};
@ -159,8 +159,8 @@ static ColumnWithTypeAndName readColumnWithFixedStringData(std::shared_ptr<arrow
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
arrow::FixedSizeBinaryArray & chunk = dynamic_cast<arrow::FixedSizeBinaryArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Buffer> buffer = chunk.values();
column_chars_t.insert_assume_reserved(buffer->data(), buffer->data() + buffer->size());
const uint8_t * raw_data = chunk.raw_values();
column_chars_t.insert_assume_reserved(raw_data, raw_data + fixed_len * chunk.length());
}
return {std::move(internal_column), std::move(internal_type), column_name};
}
@ -178,9 +178,6 @@ static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::Ch
if (chunk.length() == 0)
continue;
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
column_data.emplace_back(chunk.Value(bool_i));
}
@ -402,7 +399,7 @@ static ColumnWithTypeAndName readColumnWithIndexesDataImpl(std::shared_ptr<arrow
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const auto * data = reinterpret_cast<const NumericType *>(buffer->data());
const auto * data = reinterpret_cast<const NumericType *>(buffer->data()) + chunk->offset();
/// Check that indexes are correct (protection against corrupted files)
/// Note that on null values index can be arbitrary value.
@ -554,8 +551,7 @@ static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptr<arrow:
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
auto & chunk = dynamic_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
const auto * raw_data = reinterpret_cast<const IPv6 *>(buffer->data());
const auto * raw_data = reinterpret_cast<const IPv6 *>(chunk.raw_data() + chunk.raw_value_offsets()[0]);
data.insert_assume_reserved(raw_data, raw_data + chunk.length());
}
return {std::move(internal_column), std::move(internal_type), column_name};

View File

@ -45,38 +45,42 @@ Chunk ParquetBlockInputFormat::generate()
block_missing_values.clear();
if (!file_reader)
{
prepareReader();
file_reader->set_batch_size(format_settings.parquet.max_block_size);
std::vector<int> row_group_indices;
for (int i = 0; i < row_group_total; ++i)
{
if (!skip_row_groups.contains(i))
row_group_indices.emplace_back(i);
}
auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &current_record_batch_reader);
if (!read_status.ok())
throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
}
if (is_stopped)
return {};
while (row_group_current < row_group_total && skip_row_groups.contains(row_group_current))
++row_group_current;
if (row_group_current >= row_group_total)
return res;
std::shared_ptr<arrow::Table> table;
std::unique_ptr<::arrow::RecordBatchReader> rbr;
std::vector<int> row_group_indices { row_group_current };
arrow::Status get_batch_reader_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &rbr);
if (!get_batch_reader_status.ok())
auto batch = current_record_batch_reader->Next();
if (!batch.ok())
{
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}",
get_batch_reader_status.ToString());
batch.status().ToString());
}
if (*batch)
{
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
}
else
{
return {};
}
arrow::Status read_status = rbr->ReadAll(&table);
if (!read_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
++row_group_current;
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table, table->num_rows(), block_missing_values_ptr);
return res;
}
@ -85,6 +89,7 @@ void ParquetBlockInputFormat::resetParser()
IInputFormat::resetParser();
file_reader.reset();
current_record_batch_reader.reset();
column_indices.clear();
row_group_current = 0;
block_missing_values.clear();

View File

@ -45,7 +45,7 @@ void PrettySpaceBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port
if (col.type->shouldAlignRightInPrettyFormats())
{
for (ssize_t k = 0; k < std::max(static_cast<ssize_t>(0), static_cast<ssize_t>(max_widths[i] - name_widths[i])); ++k)
for (ssize_t k = 0; k < std::max(0z, static_cast<ssize_t>(max_widths[i] - name_widths[i])); ++k)
writeChar(' ', out);
if (format_settings.pretty.color)
@ -62,7 +62,7 @@ void PrettySpaceBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port
if (format_settings.pretty.color)
writeCString("\033[0m", out);
for (ssize_t k = 0; k < std::max(static_cast<ssize_t>(0), static_cast<ssize_t>(max_widths[i] - name_widths[i])); ++k)
for (ssize_t k = 0; k < std::max(0z, static_cast<ssize_t>(max_widths[i] - name_widths[i])); ++k)
writeChar(' ', out);
}
}

View File

@ -40,8 +40,10 @@ void FillingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
return std::make_shared<FillingTransform>(header, sort_description, std::move(interpolate_description), on_totals);
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return std::make_shared<FillingNoopTransform>(header, sort_description);
return std::make_shared<FillingTransform>(header, sort_description, std::move(interpolate_description));
});
}

View File

@ -202,7 +202,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read);
QueryPipelineBuilder builder;

View File

@ -169,17 +169,13 @@ static bool tryConvertFields(FillColumnDescription & descr, const DataTypePtr &
}
FillingTransform::FillingTransform(
const Block & header_, const SortDescription & sort_description_, InterpolateDescriptionPtr interpolate_description_, bool on_totals_)
const Block & header_, const SortDescription & sort_description_, InterpolateDescriptionPtr interpolate_description_)
: ISimpleTransform(header_, transformHeader(header_, sort_description_), true)
, sort_description(sort_description_)
, interpolate_description(interpolate_description_)
, on_totals(on_totals_)
, filling_row(sort_description_)
, next_row(sort_description_)
{
if (on_totals)
return;
if (interpolate_description)
interpolate_actions = std::make_shared<ExpressionActions>(interpolate_description->actions);
@ -239,7 +235,7 @@ FillingTransform::FillingTransform(
IProcessor::Status FillingTransform::prepare()
{
if (!on_totals && input.isFinished() && !output.isFinished() && !has_input && !generate_suffix)
if (input.isFinished() && !output.isFinished() && !has_input && !generate_suffix)
{
should_insert_first = next_row < filling_row || first;
@ -266,9 +262,6 @@ IProcessor::Status FillingTransform::prepare()
void FillingTransform::transform(Chunk & chunk)
{
if (on_totals)
return;
if (!chunk.hasRows() && !generate_suffix)
return;

View File

@ -16,7 +16,7 @@ namespace DB
class FillingTransform : public ISimpleTransform
{
public:
FillingTransform(const Block & header_, const SortDescription & sort_description_, InterpolateDescriptionPtr interpolate_description_, bool on_totals_);
FillingTransform(const Block & header_, const SortDescription & sort_description_, InterpolateDescriptionPtr interpolate_description_);
String getName() const override { return "FillingTransform"; }
@ -33,7 +33,6 @@ private:
const SortDescription sort_description; /// Contains only columns with WITH FILL.
const InterpolateDescriptionPtr interpolate_description; /// Contains INTERPOLATE columns
const bool on_totals; /// FillingTransform does nothing on totals.
FillingRow filling_row; /// Current row, which is used to fill gaps.
FillingRow next_row; /// Row to which we need to generate filling rows.
@ -53,4 +52,16 @@ private:
bool should_insert_first = false;
};
class FillingNoopTransform : public ISimpleTransform
{
public:
FillingNoopTransform(const Block & header, const SortDescription & sort_description_)
: ISimpleTransform(header, FillingTransform::transformHeader(header, sort_description_), true)
{
}
void transform(Chunk &) override {}
String getName() const override { return "FillingNoopTransform"; }
};
}

View File

@ -1,124 +0,0 @@
#include <QueryPipeline/ConnectionCollector.h>
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include "Core/Protocol.h"
#include <Common/logger_useful.h>
namespace CurrentMetrics
{
extern const Metric AsyncDrainedConnections;
extern const Metric ActiveAsyncDrainedConnections;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_PACKET_FROM_SERVER;
}
std::unique_ptr<ConnectionCollector> ConnectionCollector::connection_collector;
static constexpr UInt64 max_connection_draining_tasks_per_thread = 20;
ConnectionCollector::ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads)
: WithMutableContext(global_context_), pool(max_threads, max_threads, max_threads * max_connection_draining_tasks_per_thread)
{
}
ConnectionCollector & ConnectionCollector::init(ContextMutablePtr global_context_, size_t max_threads)
{
if (connection_collector)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Connection collector is initialized twice. This is a bug");
}
connection_collector.reset(new ConnectionCollector(global_context_, max_threads));
return *connection_collector;
}
struct AsyncDrainTask
{
const ConnectionPoolWithFailoverPtr pool;
std::shared_ptr<IConnections> shared_connections;
void operator()() const
{
ConnectionCollector::drainConnections(*shared_connections, /* throw_error= */ false);
}
// We don't have std::unique_function yet. Wrap it in shared_ptr to make the functor copyable.
std::shared_ptr<CurrentMetrics::Increment> metric_increment
= std::make_shared<CurrentMetrics::Increment>(CurrentMetrics::ActiveAsyncDrainedConnections);
};
std::shared_ptr<IConnections> ConnectionCollector::enqueueConnectionCleanup(
const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr<IConnections> connections) noexcept
{
if (!connections)
return nullptr;
if (connection_collector)
{
if (connection_collector->pool.trySchedule(AsyncDrainTask{pool, connections}))
{
CurrentMetrics::add(CurrentMetrics::AsyncDrainedConnections, 1);
return nullptr;
}
}
return connections;
}
void ConnectionCollector::drainConnections(IConnections & connections, bool throw_error)
{
bool is_drained = false;
try
{
Packet packet = connections.drain();
is_drained = true;
switch (packet.type)
{
case Protocol::Server::EndOfStream:
case Protocol::Server::Log:
case Protocol::Server::ProfileEvents:
break;
case Protocol::Server::Exception:
packet.exception->rethrow();
break;
default:
/// Connection should be closed in case of unexpected packet,
/// since this means that the connection in some bad state.
is_drained = false;
throw NetException(
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet {} from one of the following replicas: {}. (expected EndOfStream, Log, ProfileEvents or Exception)",
Protocol::Server::toString(packet.type),
connections.dumpAddresses());
}
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__);
if (!is_drained)
{
try
{
connections.disconnect();
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__);
}
}
if (throw_error)
throw;
}
}
}

View File

@ -1,30 +0,0 @@
#pragma once
#include <Client/IConnections.h>
#include <Interpreters/Context_fwd.h>
#include <boost/noncopyable.hpp>
#include <Common/ThreadPool.h>
namespace DB
{
class ConnectionPoolWithFailover;
using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>;
class ConnectionCollector : boost::noncopyable, WithMutableContext
{
public:
static ConnectionCollector & init(ContextMutablePtr global_context_, size_t max_threads);
static std::shared_ptr<IConnections>
enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr<IConnections> connections) noexcept;
static void drainConnections(IConnections & connections, bool throw_error);
private:
explicit ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads);
static constexpr size_t reschedule_time_ms = 1000;
ThreadPool pool;
static std::unique_ptr<ConnectionCollector> connection_collector;
};
}

View File

@ -1,6 +1,4 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <QueryPipeline/ConnectionCollector.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
@ -25,12 +23,6 @@
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
namespace CurrentMetrics
{
extern const Metric SyncDrainedConnections;
extern const Metric ActiveSyncDrainedConnections;
}
namespace ProfileEvents
{
extern const Event ReadTaskRequestsReceived;
@ -67,7 +59,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
{
create_connections = [this, &connection, throttler, extension_]()
{
auto res = std::make_shared<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
auto res = std::make_unique<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
@ -83,7 +75,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
{
create_connections = [this, connection_ptr, throttler, extension_]()
{
auto res = std::make_shared<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler);
auto res = std::make_unique<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
@ -91,7 +83,6 @@ RemoteQueryExecutor::RemoteQueryExecutor(
}
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool_,
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
@ -100,10 +91,9 @@ RemoteQueryExecutor::RemoteQueryExecutor(
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, pool(pool_)
{
create_connections = [this, connections_, throttler, extension_]() mutable {
auto res = std::make_shared<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
@ -111,7 +101,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
}
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool_,
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
@ -119,9 +109,8 @@ RemoteQueryExecutor::RemoteQueryExecutor(
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, pool(pool_)
{
create_connections = [this, throttler, extension_]()->std::shared_ptr<IConnections>
create_connections = [this, pool, throttler, extension_]()->std::unique_ptr<IConnections>
{
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
@ -133,7 +122,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
if (main_table)
table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
auto res = std::make_shared<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check);
auto res = std::make_unique<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
@ -151,7 +140,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
else
connection_entries = pool->getMany(timeouts, &current_settings, pool_mode);
auto res = std::make_shared<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
@ -535,26 +524,38 @@ void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
/// Send the request to abort the execution of the request, if not already sent.
tryCancel("Cancelling query because enough data has been read", read_context);
if (context->getSettingsRef().drain_timeout != Poco::Timespan(-1000000))
/// Get the remaining packets so that there is no out of sync in the connections to the replicas.
Packet packet = connections->drain();
switch (packet.type)
{
auto connections_left = ConnectionCollector::enqueueConnectionCleanup(pool, connections);
if (connections_left)
{
/// Drain connections synchronously and suppress errors.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*connections_left, /* throw_error= */ false);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
}
else
{
/// Drain connections synchronously without suppressing errors.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*connections, /* throw_error= */ true);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
case Protocol::Server::EndOfStream:
finished = true;
break;
finished = true;
case Protocol::Server::Log:
/// Pass logs from remote server to client
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
log_queue->pushBlock(std::move(packet.block));
break;
case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;
case Protocol::Server::ProfileEvents:
/// Pass profile events from remote server to client
if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
if (!profile_queue->emplace(std::move(packet.block)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
break;
default:
got_unknown_packet_from_replica = true;
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
toString(packet.type),
connections->dumpAddresses());
}
}
void RemoteQueryExecutor::cancel(std::unique_ptr<ReadContext> * read_context)

Some files were not shown because too many files have changed in this diff Show More