Merge remote-tracking branch 'u/master' into build/llvm-16

Conflicts:
	docker/test/codebrowser/Dockerfile
This commit is contained in:
Azat Khuzhin 2023-05-09 20:40:43 +02:00
commit be39e8e566
80 changed files with 644 additions and 301 deletions

View File

@ -177,7 +177,19 @@ endif()
add_contrib (sqlite-cmake sqlite-amalgamation)
add_contrib (s2geometry-cmake s2geometry)
add_contrib (c-ares-cmake c-ares)
add_contrib (qpl-cmake qpl)
if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512))
option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES})
elseif(ENABLE_QPL)
message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support")
endif()
if (ENABLE_QPL)
add_contrib (idxd-config-cmake idxd-config)
add_contrib (qpl-cmake qpl) # requires: idxd-config
else()
message(STATUS "Not using QPL")
endif ()
add_contrib (morton-nd-cmake morton-nd)
if (ARCH_S390X)
add_contrib(crc32-s390x-cmake crc32-s390x)

View File

@ -0,0 +1,23 @@
## accel_config is the utility library required by QPL-Deflate codec for controlling and configuring Intel® In-Memory Analytics Accelerator (Intel® IAA).
set (LIBACCEL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config")
set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake")
set (LIBACCEL_HEADER_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config-cmake/include")
set (SRCS
"${LIBACCEL_SOURCE_DIR}/accfg/lib/libaccfg.c"
"${LIBACCEL_SOURCE_DIR}/util/log.c"
"${LIBACCEL_SOURCE_DIR}/util/sysfs.c"
)
add_library(_accel-config ${SRCS})
target_compile_options(_accel-config PRIVATE "-D_GNU_SOURCE")
target_include_directories(_accel-config BEFORE
PRIVATE ${UUID_DIR}
PRIVATE ${LIBACCEL_HEADER_DIR}
PRIVATE ${LIBACCEL_SOURCE_DIR})
target_include_directories(_accel-config SYSTEM BEFORE
PUBLIC ${LIBACCEL_SOURCE_DIR}/accfg)
add_library(ch_contrib::accel-config ALIAS _accel-config)

View File

@ -1,36 +1,5 @@
## The Intel® QPL provides high performance implementations of data processing functions for existing hardware accelerator, and/or software path in case if hardware accelerator is not available.
if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512))
option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES})
elseif(ENABLE_QPL)
message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support")
endif()
if (NOT ENABLE_QPL)
message(STATUS "Not using QPL")
return()
endif()
## QPL has build dependency on libaccel-config. Here is to build libaccel-config which is required by QPL.
## libaccel-config is the utility library for controlling and configuring Intel® In-Memory Analytics Accelerator (Intel® IAA).
set (LIBACCEL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config")
set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake")
set (LIBACCEL_HEADER_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake/idxd-header")
set (SRCS
"${LIBACCEL_SOURCE_DIR}/accfg/lib/libaccfg.c"
"${LIBACCEL_SOURCE_DIR}/util/log.c"
"${LIBACCEL_SOURCE_DIR}/util/sysfs.c"
)
add_library(accel-config ${SRCS})
target_compile_options(accel-config PRIVATE "-D_GNU_SOURCE")
target_include_directories(accel-config BEFORE
PRIVATE ${UUID_DIR}
PRIVATE ${LIBACCEL_HEADER_DIR}
PRIVATE ${LIBACCEL_SOURCE_DIR})
## QPL build start here.
set (QPL_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl")
set (QPL_SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl/sources")
set (QPL_BINARY_DIR "${ClickHouse_BINARY_DIR}/build/contrib/qpl")
@ -342,12 +311,12 @@ target_compile_definitions(_qpl
PUBLIC -DENABLE_QPL_COMPRESSION)
target_link_libraries(_qpl
PRIVATE accel-config
PRIVATE ch_contrib::accel-config
PRIVATE ch_contrib::isal
PRIVATE ${CMAKE_DL_LIBS})
add_library (ch_contrib::qpl ALIAS _qpl)
target_include_directories(_qpl SYSTEM BEFORE
PUBLIC "${QPL_PROJECT_DIR}/include"
PUBLIC "${LIBACCEL_SOURCE_DIR}/accfg"
PUBLIC ${UUID_DIR})
add_library (ch_contrib::qpl ALIAS _qpl)

View File

@ -21,12 +21,10 @@ RUN arch=${TARGETARCH:-amd64} \
# repo versions doesn't work correctly with C++17
# also we push reports to s3, so we add index.html to subfolder urls
# https://github.com/ClickHouse-Extras/woboq_codebrowser/commit/37e15eaf377b920acb0b48dbe82471be9203f76b
RUN git clone https://github.com/ClickHouse/woboq_codebrowser \
&& cd woboq_codebrowser \
RUN git clone --depth=1 https://github.com/ClickHouse/woboq_codebrowser /woboq_codebrowser \
&& cd /woboq_codebrowser \
&& cmake . -G Ninja -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang\+\+-${LLVM_VERSION} -DCMAKE_C_COMPILER=clang-${LLVM_VERSION} -DCLANG_BUILTIN_HEADERS_DIR=/usr/lib/llvm-${LLVM_VERSION}/lib/clang/${LLVM_VERSION}/include \
&& ninja \
&& cd .. \
&& rm -rf woboq_codebrowser
&& ninja
ENV CODEGEN=/woboq_codebrowser/generator/codebrowser_generator
ENV CODEINDEX=/woboq_codebrowser/indexgenerator/codebrowser_indexgenerator

View File

@ -30,7 +30,7 @@ description: In order to effectively mitigate possible human errors, you should
```
:::note ALL
`ALL` is only applicable to the `RESTORE` command.
`ALL` is only applicable to the `RESTORE` command prior to version 23.4 of Clickhouse.
:::
## Background

View File

@ -1045,7 +1045,7 @@ Default value: `0`.
## background_pool_size {#background_pool_size}
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
Before changing it, please also take a look at related MergeTree settings, such as [number_of_free_entries_in_pool_to_lower_max_size_of_merge](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-lower-max-size-of-merge) and [number_of_free_entries_in_pool_to_execute_mutation](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-execute-mutation).
@ -1063,8 +1063,8 @@ Default value: 16.
## background_merges_mutations_concurrency_ratio {#background_merges_mutations_concurrency_ratio}
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example if the ratio equals to 2 and
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operation could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example, if the ratio equals to 2 and
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operations could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
The same as for `background_pool_size` setting `background_merges_mutations_concurrency_ratio` could be applied from the `default` profile for backward compatibility.
Possible values:
@ -1079,6 +1079,33 @@ Default value: 2.
<background_merges_mutations_concurrency_ratio>3</background_merges_mutations_concurrency_ratio>
```
## merges_mutations_memory_usage_soft_limit {#merges_mutations_memory_usage_soft_limit}
Sets the limit on how much RAM is allowed to use for performing merge and mutation operations.
Zero means unlimited.
If ClickHouse reaches this limit, it won't schedule any new background merge or mutation operations but will continue to execute already scheduled tasks.
Possible values:
- Any positive integer.
**Example**
```xml
<merges_mutations_memory_usage_soft_limit>0</merges_mutations_memory_usage_soft_limit>
```
## merges_mutations_memory_usage_to_ram_ratio {#merges_mutations_memory_usage_to_ram_ratio}
The default `merges_mutations_memory_usage_soft_limit` value is calculated as `memory_amount * merges_mutations_memory_usage_to_ram_ratio`.
Default value: `0.5`.
**See also**
- [max_memory_usage](../../operations/settings/query-complexity.md#settings_max_memory_usage)
- [merges_mutations_memory_usage_soft_limit](#merges_mutations_memory_usage_soft_limit)
## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy}
Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart.

View File

@ -8,10 +8,6 @@ sidebar_label: Interval
The family of data types representing time and date intervals. The resulting types of the [INTERVAL](../../../sql-reference/operators/index.md#operator-interval) operator.
:::note
`Interval` data type values cant be stored in tables.
:::
Structure:
- Time interval as an unsigned integer value.
@ -19,6 +15,9 @@ Structure:
Supported interval types:
- `NANOSECOND`
- `MICROSECOND`
- `MILLISECOND`
- `SECOND`
- `MINUTE`
- `HOUR`

View File

@ -114,7 +114,7 @@ if (BUILD_STANDALONE_KEEPER)
clickhouse_add_executable(clickhouse-keeper ${CLICKHOUSE_KEEPER_STANDALONE_SOURCES})
# Remove some redundant dependencies
target_compile_definitions (clickhouse-keeper PRIVATE -DKEEPER_STANDALONE_BUILD)
target_compile_definitions (clickhouse-keeper PRIVATE -DCLICKHOUSE_PROGRAM_STANDALONE_BUILD)
target_compile_definitions (clickhouse-keeper PUBLIC -DWITHOUT_TEXT_LOG)
target_include_directories(clickhouse-keeper PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../src") # uses includes from src directory

View File

@ -57,7 +57,7 @@ int mainEntryClickHouseKeeper(int argc, char ** argv)
}
}
#ifdef KEEPER_STANDALONE_BUILD
#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
// Weak symbols don't work correctly on Darwin
// so we have a stub implementation to avoid linker errors

View File

@ -130,6 +130,7 @@ namespace CurrentMetrics
extern const Metric Revision;
extern const Metric VersionInteger;
extern const Metric MemoryTracking;
extern const Metric MergesMutationsMemoryTracking;
extern const Metric MaxDDLEntryID;
extern const Metric MaxPushedDDLEntryID;
}
@ -1225,6 +1226,25 @@ try
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(memory_amount),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
}
LOG_INFO(log, "Merges and mutations memory limit is set to {}",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit));
background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit);
background_memory_tracker.setDescription("(background)");
background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking);
total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory);
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();

View File

@ -544,6 +544,10 @@ if (TARGET ch_contrib::qpl)
dbms_target_link_libraries(PUBLIC ch_contrib::qpl)
endif ()
if (TARGET ch_contrib::accel-config)
dbms_target_link_libraries(PUBLIC ch_contrib::accel-config)
endif ()
target_link_libraries(clickhouse_common_io PUBLIC boost::context)
dbms_target_link_libraries(PUBLIC boost::context)

View File

@ -53,6 +53,7 @@
M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
M(MergesMutationsMemoryTracking, "Total amount of memory (bytes) allocated by background tasks (merges and mutations).") \
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \

View File

@ -96,12 +96,17 @@ using namespace std::chrono_literals;
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false);
std::atomic<Int64> MemoryTracker::free_memory_in_allocator_arenas;
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_)
: parent(parent_)
, log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_)
, level(level_)
{}
MemoryTracker::~MemoryTracker()
{
@ -528,3 +533,10 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value))
;
}
bool canEnqueueBackgroundTask()
{
auto limit = background_memory_tracker.getSoftLimit();
auto amount = background_memory_tracker.get();
return limit == 0 || amount < limit;
}

View File

@ -98,6 +98,7 @@ public:
explicit MemoryTracker(VariableContext level_ = VariableContext::Thread);
explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_);
~MemoryTracker();
@ -110,6 +111,22 @@ public:
return amount.load(std::memory_order_relaxed);
}
// Merges and mutations may pass memory ownership to other threads thus in the end of execution
// MemoryTracker for background task may have a non-zero counter.
// This method is intended to fix the counter inside of background_memory_tracker.
// NOTE: We can't use alloc/free methods to do it, because they also will change the value inside
// of total_memory_tracker.
void adjustOnBackgroundTaskEnd(const MemoryTracker * child)
{
auto background_memory_consumption = child->amount.load(std::memory_order_relaxed);
amount.fetch_sub(background_memory_consumption, std::memory_order_relaxed);
// Also fix CurrentMetrics::MergesMutationsMemoryTracking
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
CurrentMetrics::sub(metric_loaded, background_memory_consumption);
}
Int64 getPeak() const
{
return peak.load(std::memory_order_relaxed);
@ -220,3 +237,6 @@ public:
};
extern MemoryTracker total_memory_tracker;
extern MemoryTracker background_memory_tracker;
bool canEnqueueBackgroundTask();

View File

@ -378,6 +378,13 @@ void transpose(const T * src, char * dst, UInt32 num_bits, UInt32 tail = 64)
/// UInt64[N] transposed matrix -> UIntX[64]
template <typename T, bool full = false>
#if defined(__s390x__)
/* Compiler Bug for S390x :- https://github.com/llvm/llvm-project/issues/62572
* Please remove this after the fix is backported
*/
__attribute__((noinline))
#endif
void reverseTranspose(const char * src, T * buf, UInt32 num_bits, UInt32 tail = 64)
{
UInt64 matrix[64] = {};

View File

@ -172,7 +172,7 @@ void registerCodecDeflateQpl(CompressionCodecFactory & factory);
/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
void registerCodecDelta(CompressionCodecFactory & factory);
void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
@ -188,7 +188,7 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecZSTD(*this);
registerCodecLZ4HC(*this);
registerCodecMultiple(*this);
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
registerCodecDelta(*this);
registerCodecT64(*this);
registerCodecDoubleDelta(*this);

View File

@ -42,6 +42,8 @@ namespace DB
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, max_server_memory_usage, 0, "Limit on total memory usage. Zero means Unlimited.", 0) \
M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to ram ratio. Allows to lower max memory on low-memory systems.", 0) \
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \
M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
\
M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \

View File

@ -338,7 +338,7 @@ void SettingFieldString::readBinary(ReadBuffer & in)
/// that. The linker does not complain only because clickhouse-keeper does not call any of below
/// functions. A cleaner alternative would be more modular libraries, e.g. one for data types, which
/// could then be linked by the server and the linker.
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
SettingFieldMap::SettingFieldMap(const Field & f) : value(fieldToMap(f)) {}

View File

@ -18,7 +18,7 @@
#include "config.h"
#include "config_version.h"
#if USE_SENTRY && !defined(KEEPER_STANDALONE_BUILD)
#if USE_SENTRY && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
# include <sentry.h>
# include <cstdio>

View File

@ -11,9 +11,6 @@ namespace DB
*
* Mostly the same as Int64.
* But also tagged with interval kind.
*
* Intended usage is for temporary elements in expressions,
* not for storing values in tables.
*/
class DataTypeInterval final : public DataTypeNumberBase<Int64>
{
@ -34,7 +31,6 @@ public:
bool equals(const IDataType & rhs) const override;
bool isParametric() const override { return true; }
bool cannotBeStoredInTables() const override { return true; }
bool isCategorial() const override { return false; }
bool canBeInsideNullable() const override { return true; }
};

View File

@ -726,7 +726,7 @@ static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context
return create.uuid;
}
void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 max_log_ptr)
void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr)
{
is_recovering = true;
SCOPE_EXIT({ is_recovering = false; });

View File

@ -102,7 +102,7 @@ private:
void checkQueryValid(const ASTPtr & query, ContextPtr query_context) const;
void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 max_log_ptr);
void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr);
std::map<String, String> tryGetConsistentMetadataSnapshot(const ZooKeeperPtr & zookeeper, UInt32 & max_log_ptr);
ASTPtr parseQueryFromMetadataInZooKeeper(const String & node_name, const String & query);

View File

@ -47,14 +47,14 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
IAsynchronousReader & reader_,
const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek_)
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, read_settings(settings_)
, reader(reader_)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.prefetch_buffer_size)
, min_bytes_for_seek(min_bytes_for_seek_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
@ -63,6 +63,8 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
#else
, log(&Poco::Logger::get("AsyncBuffer(" + impl->getFileName() + ")"))
#endif
, async_read_counters(async_read_counters_)
, prefetches_log(prefetches_log_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
}
@ -111,7 +113,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
@ -186,8 +188,8 @@ void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemP
.reader_id = current_reader_id,
};
if (auto prefetch_log = Context::getGlobalContextInstance()->getFilesystemReadPrefetchesLog())
prefetch_log->add(elem);
if (prefetches_log)
prefetches_log->add(elem);
}
@ -335,7 +337,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
if (impl->initialized()
&& read_until_position && new_pos < *read_until_position
&& new_pos > file_offset_of_buffer_end
&& new_pos < file_offset_of_buffer_end + min_bytes_for_seek)
&& new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek)
{
ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks);
bytes_to_ignore = new_pos - file_offset_of_buffer_end;

View File

@ -12,6 +12,7 @@ namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
class ReadBufferFromRemoteFSGather;
/**
@ -34,7 +35,8 @@ public:
explicit AsynchronousReadIndirectBufferFromRemoteFS(
IAsynchronousReader & reader_, const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE);
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_);
~AsynchronousReadIndirectBufferFromRemoteFS() override;
@ -83,8 +85,6 @@ private:
Memory<> prefetch_buffer;
size_t min_bytes_for_seek;
std::string query_id;
std::string current_reader_id;
@ -95,6 +95,9 @@ private:
Poco::Logger * log;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;

View File

@ -48,7 +48,8 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
size_t file_size_,
bool allow_seeks_after_first_read_,
bool use_external_buffer_,
std::optional<size_t> read_until_position_)
std::optional<size_t> read_until_position_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
#ifndef NDEBUG
, log(&Poco::Logger::get("CachedOnDiskReadBufferFromFile(" + source_file_path_ + ")"))
@ -62,12 +63,12 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
, read_until_position(read_until_position_ ? *read_until_position_ : file_size_)
, implementation_buffer_creator(implementation_buffer_creator_)
, query_id(query_id_)
, enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log)
, current_buffer_id(getRandomASCIIString(8))
, allow_seeks_after_first_read(allow_seeks_after_first_read_)
, use_external_buffer(use_external_buffer_)
, query_context_holder(cache_->getQueryContextHolder(query_id, settings_))
, is_persistent(settings_.is_file_cache_persistent)
, cache_log(cache_log_)
{
}
@ -103,7 +104,7 @@ void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
break;
}
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
if (cache_log)
cache_log->add(elem);
}
@ -487,7 +488,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto * current_file_segment = &file_segments->front();
auto completed_range = current_file_segment->range();
if (enable_logging)
if (cache_log)
appendFilesystemCacheLog(completed_range, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -512,7 +513,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (enable_logging && file_segments && !file_segments->empty())
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front().range(), read_type);
}

View File

@ -32,7 +32,8 @@ public:
size_t file_size_,
bool allow_seeks_after_first_read_,
bool use_external_buffer_,
std::optional<size_t> read_until_position_ = std::nullopt);
std::optional<size_t> read_until_position_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
~CachedOnDiskReadBufferFromFile() override;
@ -137,7 +138,6 @@ private:
String last_caller_id;
String query_id;
bool enable_logging = false;
String current_buffer_id;
bool allow_seeks_after_first_read;
@ -148,6 +148,8 @@ private:
FileCache::QueryContextHolderPtr query_context_holder;
bool is_persistent;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -153,27 +153,27 @@ FileSegment & FileSegmentRangeWriter::allocateFileSegment(size_t offset, FileSeg
void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_segment)
{
if (cache_log)
if (!cache_log)
return;
auto file_segment_range = file_segment.range();
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1;
FilesystemCacheLogElement elem
{
auto file_segment_range = file_segment.range();
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1;
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_path,
.file_segment_range = { file_segment_range.left, file_segment_right_bound },
.requested_range = {},
.cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE,
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = false,
.read_buffer_id = {},
.profile_counters = nullptr,
};
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_path,
.file_segment_range = { file_segment_range.left, file_segment_right_bound },
.requested_range = {},
.cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE,
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = false,
.read_buffer_id = {},
.profile_counters = nullptr,
};
cache_log->add(elem);
}
cache_log->add(elem);
}
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)

View File

@ -8,7 +8,6 @@
#include <iostream>
#include <base/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/Context.h>
namespace DB
@ -17,15 +16,18 @@ namespace DB
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_)
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBuffer(nullptr, 0)
, read_buffer_creator(std::move(read_buffer_creator_))
, blobs_to_read(blobs_to_read_)
, settings(settings_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
, enable_cache_log(!query_id.empty() && settings.enable_filesystem_cache_log)
{
if (cache_log_ && settings.enable_filesystem_cache_log)
cache_log = cache_log_;
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
@ -36,7 +38,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{
if (current_buf != nullptr && !with_cache && enable_cache_log)
if (current_buf != nullptr && !with_cache)
{
appendFilesystemCacheLog();
}
@ -61,7 +63,8 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt);
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt,
cache_log);
}
return current_read_buffer_creator();
@ -69,7 +72,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
{
if (current_object.remote_path.empty())
if (!cache_log || current_object.remote_path.empty())
return;
FilesystemCacheLogElement elem
@ -82,9 +85,7 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
.file_segment_size = total_bytes_read_from_current_file,
.read_from_cache_attempted = false,
};
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
cache_log->add(elem);
cache_log->add(elem);
}
IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore)
@ -267,10 +268,8 @@ size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather()
{
if (!with_cache && enable_cache_log)
{
if (!with_cache)
appendFilesystemCacheLog();
}
}
}

View File

@ -25,7 +25,8 @@ public:
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_);
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
~ReadBufferFromRemoteFSGather() override;
@ -93,7 +94,7 @@ private:
size_t total_bytes_read_from_current_file = 0;
bool enable_cache_log = false;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -11,7 +11,6 @@
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/AsyncReadCounters.h>
#include <Interpreters/Context.h>
#include <base/getThreadId.h>
#include <future>
@ -75,17 +74,11 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
return scheduleFromThreadPool<Result>([request]() -> Result
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
std::optional<AsyncReadIncrement> increment;
if (CurrentThread::isInitialized())
{
auto query_context = CurrentThread::get().getQueryContext();
if (query_context)
increment.emplace(query_context->getAsyncReadCounters());
}
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
auto async_read_counters = remote_fs_fd->getReadCounters();
std::optional<AsyncReadIncrement> increment = async_read_counters ? std::optional<AsyncReadIncrement>(async_read_counters) : std::nullopt;
auto watch = std::make_unique<Stopwatch>(CLOCK_MONOTONIC);
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
watch->stop();

View File

@ -8,6 +8,8 @@
namespace DB
{
struct AsyncReadCounters;
class ThreadPoolRemoteFSReader : public IAsynchronousReader
{
public:
@ -24,12 +26,19 @@ private:
class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor
{
public:
explicit RemoteFSFileDescriptor(ReadBuffer & reader_) : reader(reader_) { }
explicit RemoteFSFileDescriptor(
ReadBuffer & reader_,
std::shared_ptr<AsyncReadCounters> async_read_counters_)
: reader(reader_)
, async_read_counters(async_read_counters_) {}
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
std::shared_ptr<AsyncReadCounters> getReadCounters() const { return async_read_counters; }
private:
ReadBuffer & reader;
std::shared_ptr<AsyncReadCounters> async_read_counters;
};
}

View File

@ -5,7 +5,9 @@
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/SynchronousReader.h>
#include <IO/AsynchronousReader.h>
#include <Common/ProfileEvents.h>
#include "config.h"
@ -27,7 +29,6 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
const std::string & filename,
const ReadSettings & settings,
@ -119,11 +120,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
auto & reader = getThreadPoolReader(FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,
@ -137,11 +134,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,

View File

@ -0,0 +1,76 @@
#include <Common/ErrorCodes.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/AsynchronousReader.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <IO/SynchronousReader.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
#include <Interpreters/Context.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type)
{
#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
const auto & config = Poco::Util::Application::instance().config();
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
static auto asynchronous_remote_fs_reader = createThreadPoolReader(type, config);
return *asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
static auto asynchronous_local_fs_reader = createThreadPoolReader(type, config);
return *asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
static auto synchronous_local_fs_reader = createThreadPoolReader(type, config);
return *synchronous_local_fs_reader;
}
}
#else
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
return context->getThreadPoolReader(type);
#endif
}
std::unique_ptr<IAsynchronousReader> createThreadPoolReader(
FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
auto pool_size = config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
return std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
auto pool_size = config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
return std::make_unique<ThreadPoolReader>(pool_size, queue_size);
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::make_unique<SynchronousReader>();
}
}
}
}

View File

@ -0,0 +1,23 @@
#pragma once
namespace Poco::Util { class AbstractConfiguration; }
namespace DB
{
class IAsynchronousReader;
enum class FilesystemReaderType
{
SYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_REMOTE_FS_READER,
};
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type);
std::unique_ptr<IAsynchronousReader> createThreadPoolReader(
FilesystemReaderType type,
const Poco::Util::AbstractConfiguration & config);
}

View File

@ -10,6 +10,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
@ -86,6 +87,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto settings_ptr = settings.get();
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
@ -104,12 +106,16 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
auto reader_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
disk_read_settings);
disk_read_settings,
global_context->getFilesystemCacheLog());
if (disk_read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, disk_read_settings, std::move(reader_impl));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(reader_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -4,6 +4,7 @@
#include <IO/BoundedReadBuffer.h>
#include <Disks/IO/CachedOnDiskWriteBufferFromFile.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Common/CurrentThread.h>

View File

@ -74,7 +74,7 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
hdfs_uri, hdfs_path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
};
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings);
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings, nullptr);
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl), read_settings);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
}

View File

@ -26,15 +26,6 @@ void IObjectStorage::getDirectoryContents(const std::string &,
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getDirectoryContents() is not supported");
}
IAsynchronousReader & IObjectStorage::getThreadPoolReader()
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
return context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
}
ThreadPool & IObjectStorage::getThreadPoolWriter()
{
auto context = Context::getGlobalContextInstance();

View File

@ -157,8 +157,6 @@ public:
virtual const std::string & getCacheName() const;
static IAsynchronousReader & getThreadPoolReader();
static ThreadPool & getThreadPoolWriter();
virtual void shutdown() = 0;

View File

@ -51,6 +51,7 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
std::optional<size_t> file_size) const
{
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[=] (const std::string & file_path, size_t /* read_until_position */)
-> std::unique_ptr<ReadBufferFromFileBase>
@ -59,14 +60,18 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
};
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, modified_settings);
std::move(read_buffer_creator), objects, modified_settings,
global_context->getFilesystemCacheLog());
/// We use `remove_fs_method` (not `local_fs_method`) because we are about to use
/// AsynchronousReadIndirectBufferFromRemoteFS which works by the remote_fs_* settings.
if (modified_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, modified_settings, std::move(impl));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, modified_settings, std::move(impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -98,6 +98,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
std::optional<size_t>) const
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto settings_ptr = s3_settings.get();
@ -121,13 +122,16 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
disk_read_settings);
disk_read_settings,
global_context->getFilesystemCacheLog());
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(s3_impl), disk_read_settings.remote_read_min_bytes_for_seek);
reader, disk_read_settings, std::move(s3_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -13,6 +13,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -179,12 +180,20 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
read_until_position);
};
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), StoredObjects{object}, read_settings);
auto global_context = Context::getGlobalContextInstance();
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
read_settings,
global_context->getFilesystemCacheLog());
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = IObjectStorage::getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(web_impl), min_bytes_for_seek);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, read_settings, std::move(web_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -61,7 +61,7 @@ struct CreateFileSegmentSettings
: kind(kind_), unbounded(unbounded_) {}
};
class FileSegment : private boost::noncopyable, public std::enable_shared_from_this<FileSegment>
class FileSegment : private boost::noncopyable
{
friend struct LockedKey;
friend class FileCache; /// Because of reserved_size in tryReserve().

View File

@ -1,8 +1,6 @@
#pragma once
#include <mutex>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <boost/noncopyable.hpp>
#include <map>
namespace DB
{
@ -63,6 +61,8 @@ namespace DB
*/
struct CacheGuard : private boost::noncopyable
{
/// struct is used (not keyword `using`) to make CacheGuard::Lock non-interchangable with other guards locks
/// so, we wouldn't be able to pass CacheGuard::Lock to a function which accepts KeyGuard::Lock, for example
struct Lock : public std::unique_lock<std::mutex>
{
explicit Lock(std::mutex & mutex_) : std::unique_lock<std::mutex>(mutex_) {}

View File

@ -208,10 +208,9 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
chassert(key_not_found_policy == KeyNotFoundPolicy::CREATE_EMPTY);
}
/// Not we are at a case:
/// key_state == KeyMetadata::KeyState::REMOVED
/// and KeyNotFoundPolicy == CREATE_EMPTY
/// Retry.
/// Now we are at the case when the key was removed (key_state == KeyMetadata::KeyState::REMOVED)
/// but we need to return empty key (key_not_found_policy == KeyNotFoundPolicy::CREATE_EMPTY)
/// Retry
return lockKeyMetadata(key, key_not_found_policy);
}
@ -241,13 +240,6 @@ void CacheMetadata::doCleanup()
{
auto lock = guard.lock();
/// Let's mention this case.
/// This metadata cleanup is delayed so what is we marked key as deleted and
/// put it to deletion queue, but then the same key was added to cache before
/// we actually performed this delayed removal?
/// In this case it will work fine because on each attempt to add any key to cache
/// we perform this delayed removal.
FileCacheKey cleanup_key;
while (cleanup_queue->tryPop(cleanup_key))
{

View File

@ -4169,35 +4169,8 @@ OrdinaryBackgroundExecutorPtr Context::getCommonExecutor() const
return shared->common_executor;
}
static size_t getThreadPoolReaderSizeFromConfig(Context::FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
return config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
}
case Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
return config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
}
case Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::numeric_limits<std::size_t>::max();
}
}
}
size_t Context::getThreadPoolReaderSize(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
return getThreadPoolReaderSizeFromConfig(type, config);
}
IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
auto lock = getLock();
switch (type)
@ -4205,31 +4178,20 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
if (!shared->asynchronous_remote_fs_reader)
{
auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
shared->asynchronous_remote_fs_reader = std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
}
shared->asynchronous_remote_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->asynchronous_local_fs_reader)
{
auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
shared->asynchronous_local_fs_reader = std::make_unique<ThreadPoolReader>(pool_size, queue_size);
}
shared->asynchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->synchronous_local_fs_reader)
{
shared->synchronous_local_fs_reader = std::make_unique<SynchronousReader>();
}
shared->synchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->synchronous_local_fs_reader;
}

View File

@ -11,6 +11,7 @@
#include <Core/Settings.h>
#include <Core/UUID.h>
#include <IO/AsyncReadCounters.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DatabaseCatalog.h>
@ -1096,17 +1097,8 @@ public:
OrdinaryBackgroundExecutorPtr getFetchesExecutor() const;
OrdinaryBackgroundExecutorPtr getCommonExecutor() const;
enum class FilesystemReaderType
{
SYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_REMOTE_FS_READER,
};
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
size_t getThreadPoolReaderSize(FilesystemReaderType type) const;
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
ThreadPool & getThreadPoolWriter() const;

View File

@ -241,13 +241,17 @@ Chain InterpreterInsertQuery::buildChain(
running_group = std::make_shared<ThreadGroup>(getContext());
auto sample = getSampleBlock(columns, table, metadata_snapshot);
return buildChainImpl(table, metadata_snapshot, sample, thread_status_holder, running_group, elapsed_counter_ms);
Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms);
Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample, thread_status_holder);
chain.appendChain(std::move(sink));
return chain;
}
Chain InterpreterInsertQuery::buildChainImpl(
Chain InterpreterInsertQuery::buildSink(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms)
@ -258,14 +262,7 @@ Chain InterpreterInsertQuery::buildChainImpl(
thread_status = nullptr;
auto context_ptr = getContext();
const ASTInsertQuery * query = nullptr;
if (query_ptr)
query = query_ptr->as<ASTInsertQuery>();
const Settings & settings = context_ptr->getSettingsRef();
bool null_as_default = query && query->select && context_ptr->getSettingsRef().insert_null_as_default;
/// We create a pipeline of several streams, into which we will write data.
Chain out;
/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyed
@ -286,16 +283,48 @@ Chain InterpreterInsertQuery::buildChainImpl(
thread_status_holder, running_group, elapsed_counter_ms);
}
return out;
}
Chain InterpreterInsertQuery::buildPreSinkChain(
const Block & subsequent_header,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder)
{
ThreadStatus * thread_status = current_thread;
if (!thread_status_holder)
thread_status = nullptr;
auto context_ptr = getContext();
const ASTInsertQuery * query = nullptr;
if (query_ptr)
query = query_ptr->as<ASTInsertQuery>();
const Settings & settings = context_ptr->getSettingsRef();
bool null_as_default = query && query->select && context_ptr->getSettingsRef().insert_null_as_default;
/// We create a pipeline of several streams, into which we will write data.
Chain out;
auto input_header = [&]() -> const Block &
{
return out.empty() ? subsequent_header : out.getInputHeader();
};
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
/// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns.
if (const auto & constraints = metadata_snapshot->getConstraints(); !constraints.empty())
out.addSource(std::make_shared<CheckConstraintsTransform>(
table->getStorageID(), out.getInputHeader(), metadata_snapshot->getConstraints(), context_ptr));
table->getStorageID(), input_header(), metadata_snapshot->getConstraints(), context_ptr));
auto adding_missing_defaults_dag = addMissingDefaults(
query_sample_block,
out.getInputHeader().getNamesAndTypesList(),
input_header().getNamesAndTypesList(),
metadata_snapshot->getColumns(),
context_ptr,
null_as_default);
@ -316,12 +345,12 @@ Chain InterpreterInsertQuery::buildChainImpl(
bool table_prefers_large_blocks = table->prefersLargeBlocks();
out.addSource(std::make_shared<SquashingChunksTransform>(
out.getInputHeader(),
input_header(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL));
}
auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), thread_status, getContext()->getQuota());
auto counting = std::make_shared<CountingTransform>(input_header(), thread_status, getContext()->getQuota());
counting->setProcessListElement(context_ptr->getProcessListElement());
counting->setProgressCallback(context_ptr->getProgressCallback());
out.addSource(std::move(counting));
@ -362,10 +391,20 @@ BlockIO InterpreterInsertQuery::execute()
// Distributed INSERT SELECT
distributed_pipeline = table->distributedWrite(query, getContext());
std::vector<Chain> out_chains;
std::vector<Chain> presink_chains;
std::vector<Chain> sink_chains;
if (!distributed_pipeline || query.watch)
{
size_t out_streams_size = 1;
/// Number of streams works like this:
/// * For the SELECT, use `max_threads`, or `max_insert_threads`, or whatever
/// InterpreterSelectQuery ends up with.
/// * Use `max_insert_threads` streams for various insert-preparation steps, e.g.
/// materializing and squashing (too slow to do in one thread). That's `presink_chains`.
/// * If the table supports parallel inserts, use the same streams for writing to IStorage.
/// Otherwise ResizeProcessor them down to 1 stream.
/// * If it's not an INSERT SELECT, forget all that and use one stream.
size_t pre_streams_size = 1;
size_t sink_streams_size = 1;
if (query.select)
{
@ -441,10 +480,14 @@ BlockIO InterpreterInsertQuery::execute()
pipeline.dropTotalsAndExtremes();
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
out_streams_size = std::min(static_cast<size_t>(settings.max_insert_threads), pipeline.getNumStreams());
if (settings.max_insert_threads > 1)
{
pre_streams_size = std::min(static_cast<size_t>(settings.max_insert_threads), pipeline.getNumStreams());
if (table->supportsParallelInsert())
sink_streams_size = pre_streams_size;
}
pipeline.resize(out_streams_size);
pipeline.resize(pre_streams_size);
/// Allow to insert Nullable into non-Nullable columns, NULL values will be added as defaults values.
if (getContext()->getSettingsRef().insert_null_as_default)
@ -476,13 +519,17 @@ BlockIO InterpreterInsertQuery::execute()
running_group = current_thread->getThreadGroup();
if (!running_group)
running_group = std::make_shared<ThreadGroup>(getContext());
for (size_t i = 0; i < out_streams_size; ++i)
for (size_t i = 0; i < sink_streams_size; ++i)
{
auto out = buildChainImpl(table, metadata_snapshot, query_sample_block,
/* thread_status_holder= */ nullptr,
running_group,
/* elapsed_counter_ms= */ nullptr);
out_chains.emplace_back(std::move(out));
auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr,
running_group, /* elapsed_counter_ms= */ nullptr);
sink_chains.emplace_back(std::move(out));
}
for (size_t i = 0; i < pre_streams_size; ++i)
{
auto out = buildPreSinkChain(sink_chains[0].getInputHeader(), table, metadata_snapshot,
query_sample_block, /* thread_status_holder= */ nullptr);
presink_chains.emplace_back(std::move(out));
}
}
@ -495,7 +542,7 @@ BlockIO InterpreterInsertQuery::execute()
}
else if (query.select || query.watch)
{
const auto & header = out_chains.at(0).getInputHeader();
const auto & header = presink_chains.at(0).getInputHeader();
auto actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
@ -516,10 +563,14 @@ BlockIO InterpreterInsertQuery::execute()
size_t num_select_threads = pipeline.getNumThreads();
for (auto & chain : out_chains)
for (auto & chain : presink_chains)
resources = chain.detachResources();
for (auto & chain : sink_chains)
resources = chain.detachResources();
pipeline.addChains(std::move(out_chains));
pipeline.addChains(std::move(presink_chains));
pipeline.resize(sink_chains.size());
pipeline.addChains(std::move(sink_chains));
if (!settings.parallel_view_processing)
{
@ -552,7 +603,8 @@ BlockIO InterpreterInsertQuery::execute()
}
else
{
res.pipeline = QueryPipeline(std::move(out_chains.at(0)));
presink_chains.at(0).appendChain(std::move(sink_chains.at(0)));
res.pipeline = QueryPipeline(std::move(presink_chains[0]));
res.pipeline.setNumThreads(std::min<size_t>(res.pipeline.getNumThreads(), settings.max_threads));
if (query.hasInlinedData() && !async_insert)

View File

@ -66,13 +66,19 @@ private:
std::vector<std::unique_ptr<ReadBuffer>> owned_buffers;
Chain buildChainImpl(
Chain buildSink(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms);
Chain buildPreSinkChain(
const Block & subsequent_header,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder);
};

View File

@ -1,19 +1,14 @@
#include "OpenTelemetrySpanLog.h"
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeEnum.h>
#include <Interpreters/Context.h>
#include <base/hex.h>
#include <Common/CurrentThread.h>
#include <Core/Field.h>
namespace DB
@ -32,11 +27,13 @@ NamesAndTypesList OpenTelemetrySpanLogElement::getNamesAndTypes()
}
);
auto low_cardinality_string = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
return {
{"trace_id", std::make_shared<DataTypeUUID>()},
{"span_id", std::make_shared<DataTypeUInt64>()},
{"parent_span_id", std::make_shared<DataTypeUInt64>()},
{"operation_name", std::make_shared<DataTypeString>()},
{"operation_name", low_cardinality_string},
{"kind", std::move(span_kind_type)},
// DateTime64 is really unwieldy -- there is no "normal" way to convert
// it to an UInt64 count of microseconds, except:
@ -51,15 +48,17 @@ NamesAndTypesList OpenTelemetrySpanLogElement::getNamesAndTypes()
{"start_time_us", std::make_shared<DataTypeUInt64>()},
{"finish_time_us", std::make_shared<DataTypeUInt64>()},
{"finish_date", std::make_shared<DataTypeDate>()},
{"attribute", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>())},
{"attribute", std::make_shared<DataTypeMap>(low_cardinality_string, std::make_shared<DataTypeString>())},
};
}
NamesAndAliases OpenTelemetrySpanLogElement::getNamesAndAliases()
{
auto low_cardinality_string = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
return
{
{"attribute.names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "mapKeys(attribute)"},
{"attribute.names", std::make_shared<DataTypeArray>(low_cardinality_string), "mapKeys(attribute)"},
{"attribute.values", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "mapValues(attribute)"}
};
}
@ -83,4 +82,3 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
}
}

View File

@ -84,6 +84,7 @@ ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_contex
group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
group->memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator);
group->memory_tracker.setParent(&background_memory_tracker);
if (settings.memory_tracker_fault_probability > 0.0)
group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);

View File

@ -168,7 +168,6 @@ void FinishAggregatingInOrderAlgorithm::addToAggregation()
accumulated_bytes += static_cast<size_t>(static_cast<double>(states[i].total_bytes) * current_rows / states[i].num_rows);
accumulated_rows += current_rows;
if (!states[i].isValid())
inputs_to_update.push_back(i);
}

View File

@ -99,6 +99,14 @@ void Chain::addSink(ProcessorPtr processor)
processors.emplace_back(std::move(processor));
}
void Chain::appendChain(Chain chain)
{
connect(getOutputPort(), chain.getInputPort());
processors.splice(processors.end(), std::move(chain.processors));
attachResources(chain.detachResources());
num_threads += chain.num_threads;
}
IProcessor & Chain::getSource()
{
checkInitialized(processors);

View File

@ -7,6 +7,10 @@
namespace DB
{
/// Has one unconnected input port and one unconnected output port.
/// There may be other ports on the processors, but they must all be connected.
/// The unconnected input must be on the first processor, output - on the last.
/// The processors don't necessarily form an actual chain.
class Chain
{
public:
@ -27,6 +31,7 @@ public:
void addSource(ProcessorPtr processor);
void addSink(ProcessorPtr processor);
void appendChain(Chain chain);
IProcessor & getSource();
IProcessor & getSink();
@ -44,7 +49,11 @@ public:
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
void addInterpreterContext(ContextPtr context) { holder.interpreter_context.emplace_back(std::move(context)); }
void attachResources(QueryPlanResourceHolder holder_) { holder = std::move(holder_); }
void attachResources(QueryPlanResourceHolder holder_)
{
/// This operator "=" actually merges holder_ into holder, doesn't replace.
holder = std::move(holder_);
}
QueryPlanResourceHolder detachResources() { return std::move(holder); }
void reset();

View File

@ -1,7 +1,7 @@
#include <Server/ProtocolServerAdapter.h>
#include <Server/TCPServer.h>
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
#include <Server/GRPCServer.h>
#endif
@ -37,7 +37,7 @@ ProtocolServerAdapter::ProtocolServerAdapter(
{
}
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
class ProtocolServerAdapter::GRPCServerAdapterImpl : public Impl
{
public:

View File

@ -23,7 +23,7 @@ public:
ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<TCPServer> tcp_server_);
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<GRPCServer> grpc_server_);
#endif

View File

@ -67,7 +67,7 @@ bool AsynchronousReadBufferFromHDFS::hasPendingDataToRead()
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, nullptr);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;

View File

@ -21,6 +21,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <IO/ReadBufferFromString.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTCreateQuery.h>
@ -232,7 +233,7 @@ public:
if (thread_pool_read)
{
return std::make_unique<AsynchronousReadBufferFromHDFS>(
IObjectStorage::getThreadPoolReader(), read_settings, std::move(buf));
getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, std::move(buf));
}
else
{

View File

@ -146,6 +146,8 @@ public:
virtual bool supportsReplication() const { return false; }
/// Returns true if the storage supports parallel insert.
/// If false, each INSERT query will call write() only once.
/// Different INSERT queries may write in parallel regardless of this value.
virtual bool supportsParallelInsert() const { return false; }
/// Returns true if the storage supports deduplication of inserted data blocks.

View File

@ -80,4 +80,9 @@ MergeInfo MergeListElement::getInfo() const
return res;
}
MergeListElement::~MergeListElement()
{
background_memory_tracker.adjustOnBackgroundTaskEnd(&getMemoryTracker());
}
}

View File

@ -115,6 +115,8 @@ struct MergeListElement : boost::noncopyable
MergeListElement * ptr() { return this; }
MergeListElement & ref() { return *this; }
~MergeListElement();
};
/** Maintains a list of currently running merges.

View File

@ -166,8 +166,8 @@ struct Settings;
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for Zero-copy table-independet info.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
/** Compress marks and primary key. */ \
M(Bool, compress_marks, false, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \
M(Bool, compress_primary_key, false, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \
M(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \
M(Bool, compress_primary_key, true, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \
M(String, marks_compression_codec, "ZSTD(3)", "Compression encoding used by marks, marks are small enough and cached, so the default compression is ZSTD(3).", 0) \
M(String, primary_key_compression_codec, "ZSTD(3)", "Compression encoding used by primary, primary key is small enough and cached, so the default compression is ZSTD(3).", 0) \
M(UInt64, marks_compress_block_size, 65536, "Mark compress block size, the actual size of the block to compress.", 0) \

View File

@ -535,7 +535,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (!args.storage_def->order_by)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"You must provide an ORDER BY or PRIMARY KEY expression in the table definition. "
"If you don't want this table to be sorted, use ORDER BY/PRIMARY KEY tuple()");
"If you don't want this table to be sorted, use ORDER BY/PRIMARY KEY ()");
/// Get sorting key from engine arguments.
///

View File

@ -8,6 +8,7 @@
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
#include <Common/ProfileEventsScope.h>
#include <Common/typeid_cast.h>
@ -42,6 +43,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <fmt/core.h>
namespace DB
{
@ -918,7 +920,14 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT;
if (partition_id.empty())
if (!canEnqueueBackgroundTask())
{
if (out_disable_reason)
*out_disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})",
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()));
}
else if (partition_id.empty())
{
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool;

View File

@ -5,6 +5,7 @@
#include <base/hex.h>
#include <Common/Macros.h>
#include <Common/MemoryTracker.h>
#include <Common/ProfileEventsScope.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ZooKeeper/KeeperException.h>
@ -3250,7 +3251,14 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
size_t merges_and_mutations_sum = merges_and_mutations_queued.merges + merges_and_mutations_queued.mutations;
if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
if (!canEnqueueBackgroundTask())
{
LOG_TRACE(log, "Reached memory limit for the background tasks ({}), so won't select new parts to merge or mutate."
"Current background tasks memory usage: {}.",
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()));
}
else if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})"
" is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.",

View File

@ -646,6 +646,7 @@ StorageS3Source::ReadBufferOrFactory StorageS3Source::createS3ReadBuffer(const S
std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
const String & key, const ReadSettings & read_settings, size_t object_size)
{
auto context = getContext();
auto read_buffer_creator =
[this, read_settings, object_size]
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
@ -667,10 +668,17 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{StoredObject{key, object_size}},
read_settings);
read_settings,
/* cache_log */nullptr);
auto & pool_reader = getContext()->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(pool_reader, read_settings, std::move(s3_impl));
auto modified_settings{read_settings};
/// FIXME: Changing this setting to default value breaks something around parquet reading
modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size;
auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
pool_reader, modified_settings, std::move(s3_impl),
context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog());
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)

View File

@ -6,6 +6,7 @@
#include <IO/copyData.h>
#include <IO/WriteBufferFromString.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Interpreters/Context.h>
#include <Common/Config/ConfigProcessor.h>
#include <Storages/HDFS/AsynchronousReadBufferFromHDFS.h>
@ -25,7 +26,7 @@ int main()
String path = "/path/to/hdfs/file";
ReadSettings settings = {};
auto in = std::make_unique<ReadBufferFromHDFS>(hdfs_namenode_url, path, *config, settings);
auto & reader = IObjectStorage::getThreadPoolReader();
auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
AsynchronousReadBufferFromHDFS buf(reader, {}, std::move(in));
String output;

View File

@ -0,0 +1,6 @@
<clickhouse>
<merge_tree>
<compress_marks>0</compress_marks>
<compress_primary_key>0</compress_primary_key>
</merge_tree>
</clickhouse>

View File

@ -1,7 +1,5 @@
<clickhouse>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<compress_marks>0</compress_marks>
<compress_primary_key>0</compress_primary_key>
</merge_tree>
</merge_tree>
</clickhouse>

View File

@ -12,7 +12,9 @@ node1 = cluster.add_instance(
with_installed_binary=True,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/wide_parts_only.xml"], with_zookeeper=True
"node2",
main_configs=["configs/wide_parts_only.xml", "configs/no_compress_marks.xml"],
with_zookeeper=True,
)

View File

@ -14,6 +14,7 @@ node_old = cluster.add_instance(
)
node_new = cluster.add_instance(
"node2",
main_configs=["configs/no_compress_marks.xml"],
with_zookeeper=True,
stay_alive=True,
)
@ -29,7 +30,7 @@ def start_cluster():
cluster.shutdown()
def test_vertical_merges_from_comapact_parts(start_cluster):
def test_vertical_merges_from_compact_parts(start_cluster):
for i, node in enumerate([node_old, node_new]):
node.query(
"""
@ -41,7 +42,7 @@ def test_vertical_merges_from_comapact_parts(start_cluster):
vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 1,
min_bytes_for_wide_part = 0,
min_rows_for_wide_part = 100;
min_rows_for_wide_part = 100
""".format(
i
)
@ -104,8 +105,16 @@ def test_vertical_merges_from_comapact_parts(start_cluster):
node_old.query("SYSTEM FLUSH LOGS")
assert not (
node_old.contains_in_log("CHECKSUM_DOESNT_MATCH")
or node_new.contains_in_log("CHECKSUM_DOESNT_MATCH")
# Now the old node is restarted as a new, and its config allows compressed indices, and it merged the data into compressed indices,
# that's why the error about different number of compressed files is expected and ok.
(
node_old.contains_in_log("CHECKSUM_DOESNT_MATCH")
and not node_old.contains_in_log("Different number of files")
)
or (
node_new.contains_in_log("CHECKSUM_DOESNT_MATCH")
and not node_new.contains_in_log("Different number of files")
)
)
assert node_new.query(check_query.format("all_0_3_3")) == "Vertical\tWide\n"

View File

@ -227,7 +227,7 @@ def test_merge_tree_load_parts_filesystem_error(started_cluster):
# It can be a filesystem exception triggered at initialization of part storage but it hard
# to trigger it because it should be an exception on stat/listDirectory.
# The most easy way to trigger such exception is to use chmod but clickhouse server
# is run with root user in integration test and this won't work. So let's do some
# is run with root user in integration test and this won't work. So let's do
# some stupid things: create a table without adaptive granularity and change mark
# extensions of data files in part to make clickhouse think that it's a compact part which
# cannot be created in such table. This will trigger a LOGICAL_ERROR on part creation.
@ -240,7 +240,8 @@ def test_merge_tree_load_parts_filesystem_error(started_cluster):
).strip()
node3.exec_in_container(
["bash", "-c", f"mv {part_path}id.mrk {part_path}id.mrk3"], privileged=True
["bash", "-c", f"mv {part_path}id.cmrk {part_path}id.cmrk3"],
privileged=True,
)
corrupt_part("mt_load_parts", "all_1_1_0")

View File

@ -0,0 +1,39 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node")
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_memory_limit_success():
node.query(
"CREATE TABLE test_merge_oom ENGINE=AggregatingMergeTree ORDER BY id EMPTY AS SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(20000)"
)
node.query("SYSTEM STOP MERGES test_merge_oom")
node.query(
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
)
node.query(
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
)
node.query(
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
)
_, error = node.query_and_get_answer_with_error(
"SYSTEM START MERGES test_merge_oom;SET optimize_throw_if_noop=1;OPTIMIZE TABLE test_merge_oom FINAL"
)
assert not error
node.query("DROP TABLE test_merge_oom")

View File

@ -164,19 +164,24 @@ def get_ssl_context(cert_name):
def execute_query_https(
query, user, enable_ssl_auth=True, cert_name=None, password=None
):
url = (
f"https://{instance.ip_address}:{HTTPS_PORT}/?query={urllib.parse.quote(query)}"
)
request = urllib.request.Request(url)
request.add_header("X-ClickHouse-User", user)
if enable_ssl_auth:
request.add_header("X-ClickHouse-SSL-Certificate-Auth", "on")
if password:
request.add_header("X-ClickHouse-Key", password)
response = urllib.request.urlopen(
request, context=get_ssl_context(cert_name)
).read()
return response.decode("utf-8")
retries = 10
while True:
try:
url = f"https://{instance.ip_address}:{HTTPS_PORT}/?query={urllib.parse.quote(query)}"
request = urllib.request.Request(url)
request.add_header("X-ClickHouse-User", user)
if enable_ssl_auth:
request.add_header("X-ClickHouse-SSL-Certificate-Auth", "on")
if password:
request.add_header("X-ClickHouse-Key", password)
response = urllib.request.urlopen(
request, context=get_ssl_context(cert_name)
).read()
return response.decode("utf-8")
except BrokenPipeError:
retries -= 1
if retries == 0:
raise
def test_https():

View File

@ -3,6 +3,7 @@ CREATE TABLE numbers_squashed (number UInt8) ENGINE = StripeLog;
SET min_insert_block_size_rows = 100;
SET min_insert_block_size_bytes = 0;
SET max_insert_threads = 1;
SET max_threads = 1;
INSERT INTO numbers_squashed

View File

@ -0,0 +1,6 @@
2023-01-01 00:00:01.000000001 2023-01-01 02:00:00.000000001 2023-01-01 00:00:00.000000004 1 2 0
2023-01-01 00:00:02.000000001 2023-01-01 03:00:00.000000001 2023-01-01 00:00:00.000000005 2 3 0
2023-01-01 00:00:01.000000001 2023-01-01 02:00:00.000000001 2023-01-01 00:00:00.000000004 1 2 0
2023-01-01 00:00:02.000000001 2023-01-01 03:00:00.000000001 2023-01-01 00:00:00.000000005 2 3 0
0
1

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS saved_intervals_tmp;
create table saved_intervals_tmp Engine=Memory as SELECT number as EventID, toIntervalSecond(number+1) as v1, toIntervalHour(number+2) as v2, toIntervalNanosecond(number+3) as v3 from numbers(2);
with toDateTime64('2023-01-01 00:00:00.000000001', 9, 'US/Eastern') as c select c+v1 as c_v1, c+v2 as c_v2, c+v3 as c_v3, date_diff(second, c, c_v1), date_diff(hour, c, c_v2), date_diff(second, c, c_v3) from saved_intervals_tmp;
DROP TABLE IF EXISTS saved_intervals_tmp;
DROP TABLE IF EXISTS saved_intervals_mgt;
create table saved_intervals_mgt Engine=MergeTree() ORDER BY EventID as SELECT number as EventID, toIntervalSecond(number+1) as v1, toIntervalHour(number+2) as v2, toIntervalNanosecond(number+3) as v3 from numbers(2);
with toDateTime64('2023-01-01 00:00:00.000000001', 9, 'US/Eastern') as c select c+v1 as c_v1, c+v2 as c_v2, c+v3 as c_v3, date_diff(second, c, c_v1), date_diff(hour, c, c_v2), date_diff(second, c, c_v3) from saved_intervals_mgt;
DROP TABLE IF EXISTS saved_intervals_mgt;
DROP TABLE IF EXISTS t1;
CREATE table t1 (v1 IntervalMinute) ENGINE = Memory;
INSERT INTO t1 with toDateTime64('2023-01-01 00:00:00.000000001', 9, 'US/Eastern') as c SELECT EXTRACT(MINUTE FROM c+toIntervalSecond(number * 60)) from numbers(2);
select * from t1;
DROP TABLE IF EXISTS t1;