Merge branch 'master' into analyzer-01655_plan_optimizations_optimize_read_in_window_order

This commit is contained in:
Igor Nikonov 2023-05-10 12:22:00 +02:00 committed by GitHub
commit 53b751bfcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
116 changed files with 960 additions and 436 deletions

View File

@ -1341,6 +1341,40 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestReleaseAnalyzer:
needs: [BuilderDebRelease]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_analyzer
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (release, analyzer)
REPO_COPY=${{runner.temp}}/stateless_analyzer/ClickHouse
KILL_TIMEOUT=10800
EOF
- name: Download json reports
uses: actions/download-artifact@v3
with:
path: ${{ env.REPORTS_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestAarch64:
needs: [BuilderDebAarch64]
runs-on: [self-hosted, func-tester-aarch64]

View File

@ -314,7 +314,14 @@ struct integer<Bits, Signed>::_impl
const T alpha = t / static_cast<T>(max_int);
if (alpha <= static_cast<T>(max_int))
/** Here we have to use strict comparison.
* The max_int is 2^64 - 1.
* When casted to floating point type, it will be rounded to the closest representable number,
* which is 2^64.
* But 2^64 is not representable in uint64_t,
* so the maximum representable number will be strictly less.
*/
if (alpha < static_cast<T>(max_int))
self = static_cast<uint64_t>(alpha);
else // max(double) / 2^64 will surely contain less than 52 precision bits, so speed up computations.
set_multiplier<double>(self, static_cast<double>(alpha));

View File

@ -53,7 +53,7 @@ float logf(float x)
tmp = ix - OFF;
i = (tmp >> (23 - LOGF_TABLE_BITS)) % N;
k = (int32_t)tmp >> 23; /* arithmetic shift */
iz = ix - (tmp & 0x1ff << 23);
iz = ix - (tmp & 0xff800000);
invc = T[i].invc;
logc = T[i].logc;
z = (double_t)asfloat(iz);

View File

@ -21,7 +21,7 @@ set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_ASM_FLAGS "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_EXE_LINKER_FLAGS_INIT "-fuse-ld=bfd")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=bfd")
# Currently, lld does not work with the error:
# ld.lld: error: section size decrease is too large

View File

@ -177,7 +177,19 @@ endif()
add_contrib (sqlite-cmake sqlite-amalgamation)
add_contrib (s2geometry-cmake s2geometry)
add_contrib (c-ares-cmake c-ares)
add_contrib (qpl-cmake qpl)
if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512))
option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES})
elseif(ENABLE_QPL)
message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support")
endif()
if (ENABLE_QPL)
add_contrib (idxd-config-cmake idxd-config)
add_contrib (qpl-cmake qpl) # requires: idxd-config
else()
message(STATUS "Not using QPL")
endif ()
add_contrib (morton-nd-cmake morton-nd)
if (ARCH_S390X)
add_contrib(crc32-s390x-cmake crc32-s390x)

View File

@ -111,6 +111,8 @@ elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "mips")
set(ARCH "generic")
elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "ppc64le")
set(ARCH "ppc64le")
elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "riscv64")
set(ARCH "riscv64")
else()
message(FATAL_ERROR "Unknown processor:" ${CMAKE_SYSTEM_PROCESSOR})
endif()

View File

@ -0,0 +1,23 @@
## accel_config is the utility library required by QPL-Deflate codec for controlling and configuring Intel® In-Memory Analytics Accelerator (Intel® IAA).
set (LIBACCEL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config")
set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake")
set (LIBACCEL_HEADER_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config-cmake/include")
set (SRCS
"${LIBACCEL_SOURCE_DIR}/accfg/lib/libaccfg.c"
"${LIBACCEL_SOURCE_DIR}/util/log.c"
"${LIBACCEL_SOURCE_DIR}/util/sysfs.c"
)
add_library(_accel-config ${SRCS})
target_compile_options(_accel-config PRIVATE "-D_GNU_SOURCE")
target_include_directories(_accel-config BEFORE
PRIVATE ${UUID_DIR}
PRIVATE ${LIBACCEL_HEADER_DIR}
PRIVATE ${LIBACCEL_SOURCE_DIR})
target_include_directories(_accel-config SYSTEM BEFORE
PUBLIC ${LIBACCEL_SOURCE_DIR}/accfg)
add_library(ch_contrib::accel-config ALIAS _accel-config)

View File

@ -1,36 +1,5 @@
## The Intel® QPL provides high performance implementations of data processing functions for existing hardware accelerator, and/or software path in case if hardware accelerator is not available.
if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512))
option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES})
elseif(ENABLE_QPL)
message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support")
endif()
if (NOT ENABLE_QPL)
message(STATUS "Not using QPL")
return()
endif()
## QPL has build dependency on libaccel-config. Here is to build libaccel-config which is required by QPL.
## libaccel-config is the utility library for controlling and configuring Intel® In-Memory Analytics Accelerator (Intel® IAA).
set (LIBACCEL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config")
set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake")
set (LIBACCEL_HEADER_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake/idxd-header")
set (SRCS
"${LIBACCEL_SOURCE_DIR}/accfg/lib/libaccfg.c"
"${LIBACCEL_SOURCE_DIR}/util/log.c"
"${LIBACCEL_SOURCE_DIR}/util/sysfs.c"
)
add_library(accel-config ${SRCS})
target_compile_options(accel-config PRIVATE "-D_GNU_SOURCE")
target_include_directories(accel-config BEFORE
PRIVATE ${UUID_DIR}
PRIVATE ${LIBACCEL_HEADER_DIR}
PRIVATE ${LIBACCEL_SOURCE_DIR})
## QPL build start here.
set (QPL_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl")
set (QPL_SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl/sources")
set (QPL_BINARY_DIR "${ClickHouse_BINARY_DIR}/build/contrib/qpl")
@ -342,12 +311,12 @@ target_compile_definitions(_qpl
PUBLIC -DENABLE_QPL_COMPRESSION)
target_link_libraries(_qpl
PRIVATE accel-config
PRIVATE ch_contrib::accel-config
PRIVATE ch_contrib::isal
PRIVATE ${CMAKE_DL_LIBS})
add_library (ch_contrib::qpl ALIAS _qpl)
target_include_directories(_qpl SYSTEM BEFORE
PUBLIC "${QPL_PROJECT_DIR}/include"
PUBLIC "${LIBACCEL_SOURCE_DIR}/accfg"
PUBLIC ${UUID_DIR})
add_library (ch_contrib::qpl ALIAS _qpl)

View File

@ -36,12 +36,10 @@ RUN arch=${TARGETARCH:-amd64} \
# repo versions doesn't work correctly with C++17
# also we push reports to s3, so we add index.html to subfolder urls
# https://github.com/ClickHouse-Extras/woboq_codebrowser/commit/37e15eaf377b920acb0b48dbe82471be9203f76b
RUN git clone https://github.com/ClickHouse/woboq_codebrowser \
&& cd woboq_codebrowser \
RUN git clone --depth=1 https://github.com/ClickHouse/woboq_codebrowser /woboq_codebrowser \
&& cd /woboq_codebrowser \
&& cmake . -G Ninja -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang\+\+-${LLVM_VERSION} -DCMAKE_C_COMPILER=clang-${LLVM_VERSION} \
&& ninja \
&& cd .. \
&& rm -rf woboq_codebrowser
&& ninja
ENV CODEGEN=/woboq_codebrowser/generator/codebrowser_generator
ENV CODEINDEX=/woboq_codebrowser/indexgenerator/codebrowser_indexgenerator

View File

@ -90,15 +90,17 @@ SELECT * FROM mySecondReplacingMT FINAL;
### is_deleted
`is_deleted` — Name of the column with the type of row: `1` is a “deleted“ row, `0` is a “state“ row.
`is_deleted` — Name of a column used during a merge to determine whether the data in this row represents the state or is to be deleted; `1` is a “deleted“ row, `0` is a “state“ row.
Column data type — `Int8`.
Column data type — `UInt8`.
Can only be enabled when `ver` is used.
The row is deleted when use the `OPTIMIZE ... FINAL CLEANUP`, or `OPTIMIZE ... FINAL` if the engine settings `clean_deleted_rows` has been set to `Always`.
No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted one is the one kept.
:::note
`is_deleted` can only be enabled when `ver` is used.
The row is deleted when `OPTIMIZE ... FINAL CLEANUP` or `OPTIMIZE ... FINAL` is used, or if the engine setting `clean_deleted_rows` has been set to `Always`.
No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted row is the one kept.
:::
## Query clauses

View File

@ -30,7 +30,7 @@ description: In order to effectively mitigate possible human errors, you should
```
:::note ALL
`ALL` is only applicable to the `RESTORE` command.
`ALL` is only applicable to the `RESTORE` command prior to version 23.4 of Clickhouse.
:::
## Background

View File

@ -1063,8 +1063,8 @@ Default value: 16.
## background_merges_mutations_concurrency_ratio {#background_merges_mutations_concurrency_ratio}
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example if the ratio equals to 2 and
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operation could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example, if the ratio equals to 2 and
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operations could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
The same as for `background_pool_size` setting `background_merges_mutations_concurrency_ratio` could be applied from the `default` profile for backward compatibility.
Possible values:
@ -1079,6 +1079,33 @@ Default value: 2.
<background_merges_mutations_concurrency_ratio>3</background_merges_mutations_concurrency_ratio>
```
## merges_mutations_memory_usage_soft_limit {#merges_mutations_memory_usage_soft_limit}
Sets the limit on how much RAM is allowed to use for performing merge and mutation operations.
Zero means unlimited.
If ClickHouse reaches this limit, it won't schedule any new background merge or mutation operations but will continue to execute already scheduled tasks.
Possible values:
- Any positive integer.
**Example**
```xml
<merges_mutations_memory_usage_soft_limit>0</merges_mutations_memory_usage_soft_limit>
```
## merges_mutations_memory_usage_to_ram_ratio {#merges_mutations_memory_usage_to_ram_ratio}
The default `merges_mutations_memory_usage_soft_limit` value is calculated as `memory_amount * merges_mutations_memory_usage_to_ram_ratio`.
Default value: `0.5`.
**See also**
- [max_memory_usage](../../operations/settings/query-complexity.md#settings_max_memory_usage)
- [merges_mutations_memory_usage_soft_limit](#merges_mutations_memory_usage_soft_limit)
## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy}
Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart.

View File

@ -1410,8 +1410,8 @@ and [enable_writes_to_query_cache](#enable-writes-to-query-cache) control in mor
Possible values:
- 0 - Yes
- 1 - No
- 0 - Disabled
- 1 - Enabled
Default value: `0`.

View File

@ -8,10 +8,6 @@ sidebar_label: Interval
The family of data types representing time and date intervals. The resulting types of the [INTERVAL](../../../sql-reference/operators/index.md#operator-interval) operator.
:::note
`Interval` data type values cant be stored in tables.
:::
Structure:
- Time interval as an unsigned integer value.
@ -19,6 +15,9 @@ Structure:
Supported interval types:
- `NANOSECOND`
- `MICROSECOND`
- `MILLISECOND`
- `SECOND`
- `MINUTE`
- `HOUR`

View File

@ -862,7 +862,8 @@ bool Client::processWithFuzzing(const String & full_query)
const auto * tmp_pos = text_2.c_str();
const auto ast_3 = parseQuery(tmp_pos, tmp_pos + text_2.size(),
false /* allow_multi_statements */);
const auto text_3 = ast_3->formatForErrorMessage();
const auto text_3 = ast_3 ? ast_3->formatForErrorMessage() : "";
if (text_3 != text_2)
{
fmt::print(stderr, "Found error: The query formatting is broken.\n");
@ -877,7 +878,7 @@ bool Client::processWithFuzzing(const String & full_query)
fmt::print(stderr, "Text-1 (AST-1 formatted):\n'{}'\n", query_to_execute);
fmt::print(stderr, "AST-2 (Text-1 parsed):\n'{}'\n", ast_2->dumpTree());
fmt::print(stderr, "Text-2 (AST-2 formatted):\n'{}'\n", text_2);
fmt::print(stderr, "AST-3 (Text-2 parsed):\n'{}'\n", ast_3->dumpTree());
fmt::print(stderr, "AST-3 (Text-2 parsed):\n'{}'\n", ast_3 ? ast_3->dumpTree() : "");
fmt::print(stderr, "Text-3 (AST-3 formatted):\n'{}'\n", text_3);
fmt::print(stderr, "Text-3 must be equal to Text-2, but it is not.\n");

View File

@ -114,7 +114,7 @@ if (BUILD_STANDALONE_KEEPER)
clickhouse_add_executable(clickhouse-keeper ${CLICKHOUSE_KEEPER_STANDALONE_SOURCES})
# Remove some redundant dependencies
target_compile_definitions (clickhouse-keeper PRIVATE -DKEEPER_STANDALONE_BUILD)
target_compile_definitions (clickhouse-keeper PRIVATE -DCLICKHOUSE_PROGRAM_STANDALONE_BUILD)
target_compile_definitions (clickhouse-keeper PUBLIC -DWITHOUT_TEXT_LOG)
target_include_directories(clickhouse-keeper PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../src") # uses includes from src directory

View File

@ -57,7 +57,7 @@ int mainEntryClickHouseKeeper(int argc, char ** argv)
}
}
#ifdef KEEPER_STANDALONE_BUILD
#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
// Weak symbols don't work correctly on Darwin
// so we have a stub implementation to avoid linker errors

View File

@ -130,6 +130,7 @@ namespace CurrentMetrics
extern const Metric Revision;
extern const Metric VersionInteger;
extern const Metric MemoryTracking;
extern const Metric MergesMutationsMemoryTracking;
extern const Metric MaxDDLEntryID;
extern const Metric MaxPushedDDLEntryID;
}
@ -1225,6 +1226,25 @@ try
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(memory_amount),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
}
LOG_INFO(log, "Merges and mutations memory limit is set to {}",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit));
background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit);
background_memory_tracker.setDescription("(background)");
background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking);
total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory);
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
@ -1238,8 +1258,13 @@ try
global_context->setMacros(std::make_unique<Macros>(*config, "macros", log));
global_context->setExternalAuthenticatorsConfig(*config);
if (global_context->isServerCompletelyStarted())
{
/// It does not make sense to reload anything before server has started.
/// Moreover, it may break initialization order.
global_context->loadOrReloadDictionaries(*config);
global_context->loadOrReloadUserDefinedExecutableFunctions(*config);
}
global_context->setRemoteHostFilter(*config);

View File

@ -162,14 +162,13 @@ private:
class PushOrVisitor
{
public:
PushOrVisitor(ContextPtr context, size_t max_atoms_, size_t num_atoms_)
PushOrVisitor(ContextPtr context, size_t max_atoms_)
: max_atoms(max_atoms_)
, num_atoms(num_atoms_)
, and_resolver(FunctionFactory::instance().get("and", context))
, or_resolver(FunctionFactory::instance().get("or", context))
{}
bool visit(QueryTreeNodePtr & node)
bool visit(QueryTreeNodePtr & node, size_t num_atoms)
{
if (max_atoms && num_atoms > max_atoms)
return false;
@ -187,7 +186,10 @@ public:
{
auto & arguments = function_node->getArguments().getNodes();
for (auto & argument : arguments)
visit(argument);
{
if (!visit(argument, num_atoms))
return false;
}
}
if (name == "or")
@ -217,7 +219,7 @@ public:
auto rhs = createFunctionNode(or_resolver, std::move(other_node), std::move(and_function_arguments[1]));
node = createFunctionNode(and_resolver, std::move(lhs), std::move(rhs));
visit(node);
return visit(node, num_atoms);
}
return true;
@ -225,7 +227,6 @@ public:
private:
size_t max_atoms;
size_t num_atoms;
const FunctionOverloadResolverPtr and_resolver;
const FunctionOverloadResolverPtr or_resolver;
@ -516,8 +517,8 @@ std::optional<CNF> CNF::tryBuildCNF(const QueryTreeNodePtr & node, ContextPtr co
visitor.visit(node_cloned, false);
}
if (PushOrVisitor visitor(context, max_atoms, atom_count);
!visitor.visit(node_cloned))
if (PushOrVisitor visitor(context, max_atoms);
!visitor.visit(node_cloned, atom_count))
return std::nullopt;
CollectGroupsVisitor collect_visitor;

View File

@ -544,6 +544,10 @@ if (TARGET ch_contrib::qpl)
dbms_target_link_libraries(PUBLIC ch_contrib::qpl)
endif ()
if (TARGET ch_contrib::accel-config)
dbms_target_link_libraries(PUBLIC ch_contrib::accel-config)
endif ()
target_link_libraries(clickhouse_common_io PUBLIC boost::context)
dbms_target_link_libraries(PUBLIC boost::context)

View File

@ -53,6 +53,7 @@
M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
M(MergesMutationsMemoryTracking, "Total amount of memory (bytes) allocated by background tasks (merges and mutations).") \
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \

View File

@ -29,21 +29,14 @@
M(13, SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH) \
M(15, DUPLICATE_COLUMN) \
M(16, NO_SUCH_COLUMN_IN_TABLE) \
M(17, DELIMITER_IN_STRING_LITERAL_DOESNT_MATCH) \
M(18, CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN) \
M(19, SIZE_OF_FIXED_STRING_DOESNT_MATCH) \
M(20, NUMBER_OF_COLUMNS_DOESNT_MATCH) \
M(21, CANNOT_READ_ALL_DATA_FROM_TAB_SEPARATED_INPUT) \
M(22, CANNOT_PARSE_ALL_VALUE_FROM_TAB_SEPARATED_INPUT) \
M(23, CANNOT_READ_FROM_ISTREAM) \
M(24, CANNOT_WRITE_TO_OSTREAM) \
M(25, CANNOT_PARSE_ESCAPE_SEQUENCE) \
M(26, CANNOT_PARSE_QUOTED_STRING) \
M(27, CANNOT_PARSE_INPUT_ASSERTION_FAILED) \
M(28, CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER) \
M(29, CANNOT_PRINT_INTEGER) \
M(30, CANNOT_READ_SIZE_OF_COMPRESSED_CHUNK) \
M(31, CANNOT_READ_COMPRESSED_CHUNK) \
M(32, ATTEMPT_TO_READ_AFTER_EOF) \
M(33, CANNOT_READ_ALL_DATA) \
M(34, TOO_MANY_ARGUMENTS_FOR_FUNCTION) \
@ -57,7 +50,6 @@
M(42, NUMBER_OF_ARGUMENTS_DOESNT_MATCH) \
M(43, ILLEGAL_TYPE_OF_ARGUMENT) \
M(44, ILLEGAL_COLUMN) \
M(45, ILLEGAL_NUMBER_OF_RESULT_COLUMNS) \
M(46, UNKNOWN_FUNCTION) \
M(47, UNKNOWN_IDENTIFIER) \
M(48, NOT_IMPLEMENTED) \
@ -66,20 +58,14 @@
M(51, EMPTY_LIST_OF_COLUMNS_QUERIED) \
M(52, COLUMN_QUERIED_MORE_THAN_ONCE) \
M(53, TYPE_MISMATCH) \
M(54, STORAGE_DOESNT_ALLOW_PARAMETERS) \
M(55, STORAGE_REQUIRES_PARAMETER) \
M(56, UNKNOWN_STORAGE) \
M(57, TABLE_ALREADY_EXISTS) \
M(58, TABLE_METADATA_ALREADY_EXISTS) \
M(59, ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER) \
M(60, UNKNOWN_TABLE) \
M(61, ONLY_FILTER_COLUMN_IN_BLOCK) \
M(62, SYNTAX_ERROR) \
M(63, UNKNOWN_AGGREGATE_FUNCTION) \
M(64, CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT) \
M(65, CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT) \
M(66, NOT_A_COLUMN) \
M(67, ILLEGAL_KEY_OF_AGGREGATION) \
M(68, CANNOT_GET_SIZE_OF_FIELD) \
M(69, ARGUMENT_OUT_OF_BOUND) \
M(70, CANNOT_CONVERT_TYPE) \
@ -109,16 +95,11 @@
M(94, CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS) \
M(95, CANNOT_READ_FROM_SOCKET) \
M(96, CANNOT_WRITE_TO_SOCKET) \
M(97, CANNOT_READ_ALL_DATA_FROM_CHUNKED_INPUT) \
M(98, CANNOT_WRITE_TO_EMPTY_BLOCK_OUTPUT_STREAM) \
M(99, UNKNOWN_PACKET_FROM_CLIENT) \
M(100, UNKNOWN_PACKET_FROM_SERVER) \
M(101, UNEXPECTED_PACKET_FROM_CLIENT) \
M(102, UNEXPECTED_PACKET_FROM_SERVER) \
M(103, RECEIVED_DATA_FOR_WRONG_QUERY_ID) \
M(104, TOO_SMALL_BUFFER_SIZE) \
M(105, CANNOT_READ_HISTORY) \
M(106, CANNOT_APPEND_HISTORY) \
M(107, FILE_DOESNT_EXIST) \
M(108, NO_DATA_TO_INSERT) \
M(109, CANNOT_BLOCK_SIGNAL) \
@ -137,7 +118,6 @@
M(123, UNKNOWN_TYPE_OF_AST_NODE) \
M(124, INCORRECT_ELEMENT_OF_SET) \
M(125, INCORRECT_RESULT_OF_SCALAR_SUBQUERY) \
M(126, CANNOT_GET_RETURN_TYPE) \
M(127, ILLEGAL_INDEX) \
M(128, TOO_LARGE_ARRAY_SIZE) \
M(129, FUNCTION_IS_SPECIAL) \
@ -149,30 +129,17 @@
M(137, UNKNOWN_ELEMENT_IN_CONFIG) \
M(138, EXCESSIVE_ELEMENT_IN_CONFIG) \
M(139, NO_ELEMENTS_IN_CONFIG) \
M(140, ALL_REQUESTED_COLUMNS_ARE_MISSING) \
M(141, SAMPLING_NOT_SUPPORTED) \
M(142, NOT_FOUND_NODE) \
M(143, FOUND_MORE_THAN_ONE_NODE) \
M(144, FIRST_DATE_IS_BIGGER_THAN_LAST_DATE) \
M(145, UNKNOWN_OVERFLOW_MODE) \
M(146, QUERY_SECTION_DOESNT_MAKE_SENSE) \
M(147, NOT_FOUND_FUNCTION_ELEMENT_FOR_AGGREGATE) \
M(148, NOT_FOUND_RELATION_ELEMENT_FOR_CONDITION) \
M(149, NOT_FOUND_RHS_ELEMENT_FOR_CONDITION) \
M(150, EMPTY_LIST_OF_ATTRIBUTES_PASSED) \
M(151, INDEX_OF_COLUMN_IN_SORT_CLAUSE_IS_OUT_OF_RANGE) \
M(152, UNKNOWN_DIRECTION_OF_SORTING) \
M(153, ILLEGAL_DIVISION) \
M(154, AGGREGATE_FUNCTION_NOT_APPLICABLE) \
M(155, UNKNOWN_RELATION) \
M(156, DICTIONARIES_WAS_NOT_LOADED) \
M(157, ILLEGAL_OVERFLOW_MODE) \
M(158, TOO_MANY_ROWS) \
M(159, TIMEOUT_EXCEEDED) \
M(160, TOO_SLOW) \
M(161, TOO_MANY_COLUMNS) \
M(162, TOO_DEEP_SUBQUERIES) \
M(163, TOO_DEEP_PIPELINE) \
M(164, READONLY) \
M(165, TOO_MANY_TEMPORARY_COLUMNS) \
M(166, TOO_MANY_TEMPORARY_NON_CONST_COLUMNS) \
@ -183,20 +150,14 @@
M(172, CANNOT_CREATE_DIRECTORY) \
M(173, CANNOT_ALLOCATE_MEMORY) \
M(174, CYCLIC_ALIASES) \
M(176, CHUNK_NOT_FOUND) \
M(177, DUPLICATE_CHUNK_NAME) \
M(178, MULTIPLE_ALIASES_FOR_EXPRESSION) \
M(179, MULTIPLE_EXPRESSIONS_FOR_ALIAS) \
M(180, THERE_IS_NO_PROFILE) \
M(181, ILLEGAL_FINAL) \
M(182, ILLEGAL_PREWHERE) \
M(183, UNEXPECTED_EXPRESSION) \
M(184, ILLEGAL_AGGREGATION) \
M(185, UNSUPPORTED_MYISAM_BLOCK_TYPE) \
M(186, UNSUPPORTED_COLLATION_LOCALE) \
M(187, COLLATION_COMPARISON_FAILED) \
M(188, UNKNOWN_ACTION) \
M(189, TABLE_MUST_NOT_BE_CREATED_MANUALLY) \
M(190, SIZES_OF_ARRAYS_DONT_MATCH) \
M(191, SET_SIZE_LIMIT_EXCEEDED) \
M(192, UNKNOWN_USER) \
@ -204,15 +165,12 @@
M(194, REQUIRED_PASSWORD) \
M(195, IP_ADDRESS_NOT_ALLOWED) \
M(196, UNKNOWN_ADDRESS_PATTERN_TYPE) \
M(197, SERVER_REVISION_IS_TOO_OLD) \
M(198, DNS_ERROR) \
M(199, UNKNOWN_QUOTA) \
M(200, QUOTA_DOESNT_ALLOW_KEYS) \
M(201, QUOTA_EXCEEDED) \
M(202, TOO_MANY_SIMULTANEOUS_QUERIES) \
M(203, NO_FREE_CONNECTION) \
M(204, CANNOT_FSYNC) \
M(205, NESTED_TYPE_TOO_DEEP) \
M(206, ALIAS_REQUIRED) \
M(207, AMBIGUOUS_IDENTIFIER) \
M(208, EMPTY_NESTED_TABLE) \
@ -229,7 +187,6 @@
M(219, DATABASE_NOT_EMPTY) \
M(220, DUPLICATE_INTERSERVER_IO_ENDPOINT) \
M(221, NO_SUCH_INTERSERVER_IO_ENDPOINT) \
M(222, ADDING_REPLICA_TO_NON_EMPTY_TABLE) \
M(223, UNEXPECTED_AST_STRUCTURE) \
M(224, REPLICA_IS_ALREADY_ACTIVE) \
M(225, NO_ZOOKEEPER) \
@ -253,9 +210,7 @@
M(243, NOT_ENOUGH_SPACE) \
M(244, UNEXPECTED_ZOOKEEPER_ERROR) \
M(246, CORRUPTED_DATA) \
M(247, INCORRECT_MARK) \
M(248, INVALID_PARTITION_VALUE) \
M(250, NOT_ENOUGH_BLOCK_NUMBERS) \
M(251, NO_SUCH_REPLICA) \
M(252, TOO_MANY_PARTS) \
M(253, REPLICA_ALREADY_EXISTS) \
@ -271,8 +226,6 @@
M(264, INCOMPATIBLE_TYPE_OF_JOIN) \
M(265, NO_AVAILABLE_REPLICA) \
M(266, MISMATCH_REPLICAS_DATA_SOURCES) \
M(267, STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS) \
M(268, CPUID_ERROR) \
M(269, INFINITE_LOOP) \
M(270, CANNOT_COMPRESS) \
M(271, CANNOT_DECOMPRESS) \
@ -295,9 +248,7 @@
M(290, LIMIT_EXCEEDED) \
M(291, DATABASE_ACCESS_DENIED) \
M(293, MONGODB_CANNOT_AUTHENTICATE) \
M(294, INVALID_BLOCK_EXTRA_INFO) \
M(295, RECEIVED_EMPTY_DATA) \
M(296, NO_REMOTE_SHARD_FOUND) \
M(297, SHARD_HAS_NO_CONNECTIONS) \
M(298, CANNOT_PIPE) \
M(299, CANNOT_FORK) \
@ -311,13 +262,10 @@
M(307, TOO_MANY_BYTES) \
M(308, UNEXPECTED_NODE_IN_ZOOKEEPER) \
M(309, FUNCTION_CANNOT_HAVE_PARAMETERS) \
M(317, INVALID_SHARD_WEIGHT) \
M(318, INVALID_CONFIG_PARAMETER) \
M(319, UNKNOWN_STATUS_OF_INSERT) \
M(321, VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE) \
M(335, BARRIER_TIMEOUT) \
M(336, UNKNOWN_DATABASE_ENGINE) \
M(337, DDL_GUARD_IS_ACTIVE) \
M(341, UNFINISHED) \
M(342, METADATA_MISMATCH) \
M(344, SUPPORT_IS_DISABLED) \
@ -325,14 +273,10 @@
M(346, CANNOT_CONVERT_CHARSET) \
M(347, CANNOT_LOAD_CONFIG) \
M(349, CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN) \
M(350, INCOMPATIBLE_SOURCE_TABLES) \
M(351, AMBIGUOUS_TABLE_NAME) \
M(352, AMBIGUOUS_COLUMN_NAME) \
M(353, INDEX_OF_POSITIONAL_ARGUMENT_IS_OUT_OF_RANGE) \
M(354, ZLIB_INFLATE_FAILED) \
M(355, ZLIB_DEFLATE_FAILED) \
M(356, BAD_LAMBDA) \
M(357, RESERVED_IDENTIFIER_NAME) \
M(358, INTO_OUTFILE_NOT_ALLOWED) \
M(359, TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT) \
M(360, CANNOT_CREATE_CHARSET_CONVERTER) \
@ -341,7 +285,6 @@
M(363, CANNOT_CREATE_IO_BUFFER) \
M(364, RECEIVED_ERROR_TOO_MANY_REQUESTS) \
M(366, SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT) \
M(367, TOO_MANY_FETCHES) \
M(369, ALL_REPLICAS_ARE_STALE) \
M(370, DATA_TYPE_CANNOT_BE_USED_IN_TABLES) \
M(371, INCONSISTENT_CLUSTER_DEFINITION) \
@ -352,7 +295,6 @@
M(376, CANNOT_PARSE_UUID) \
M(377, ILLEGAL_SYNTAX_FOR_DATA_TYPE) \
M(378, DATA_TYPE_CANNOT_HAVE_ARGUMENTS) \
M(379, UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK) \
M(380, CANNOT_KILL) \
M(381, HTTP_LENGTH_REQUIRED) \
M(382, CANNOT_LOAD_CATBOOST_MODEL) \
@ -378,11 +320,9 @@
M(402, CANNOT_IOSETUP) \
M(403, INVALID_JOIN_ON_EXPRESSION) \
M(404, BAD_ODBC_CONNECTION_STRING) \
M(405, PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT) \
M(406, TOP_AND_LIMIT_TOGETHER) \
M(407, DECIMAL_OVERFLOW) \
M(408, BAD_REQUEST_PARAMETER) \
M(409, EXTERNAL_EXECUTABLE_NOT_FOUND) \
M(410, EXTERNAL_SERVER_IS_NOT_RESPONDING) \
M(411, PTHREAD_ERROR) \
M(412, NETLINK_ERROR) \
@ -399,7 +339,6 @@
M(424, CANNOT_LINK) \
M(425, SYSTEM_ERROR) \
M(427, CANNOT_COMPILE_REGEXP) \
M(428, UNKNOWN_LOG_LEVEL) \
M(429, FAILED_TO_GETPWUID) \
M(430, MISMATCHING_USERS_FOR_PROCESS_AND_DATA) \
M(431, ILLEGAL_SYNTAX_FOR_CODEC_TYPE) \
@ -433,7 +372,6 @@
M(459, CANNOT_SET_THREAD_PRIORITY) \
M(460, CANNOT_CREATE_TIMER) \
M(461, CANNOT_SET_TIMER_PERIOD) \
M(462, CANNOT_DELETE_TIMER) \
M(463, CANNOT_FCNTL) \
M(464, CANNOT_PARSE_ELF) \
M(465, CANNOT_PARSE_DWARF) \
@ -456,15 +394,12 @@
M(482, DICTIONARY_ACCESS_DENIED) \
M(483, TOO_MANY_REDIRECTS) \
M(484, INTERNAL_REDIS_ERROR) \
M(485, SCALAR_ALREADY_EXISTS) \
M(487, CANNOT_GET_CREATE_DICTIONARY_QUERY) \
M(488, UNKNOWN_DICTIONARY) \
M(489, INCORRECT_DICTIONARY_DEFINITION) \
M(490, CANNOT_FORMAT_DATETIME) \
M(491, UNACCEPTABLE_URL) \
M(492, ACCESS_ENTITY_NOT_FOUND) \
M(493, ACCESS_ENTITY_ALREADY_EXISTS) \
M(494, ACCESS_ENTITY_FOUND_DUPLICATES) \
M(495, ACCESS_STORAGE_READONLY) \
M(496, QUOTA_REQUIRES_CLIENT_KEY) \
M(497, ACCESS_DENIED) \
@ -475,8 +410,6 @@
M(502, CANNOT_SIGQUEUE) \
M(503, AGGREGATE_FUNCTION_THROW) \
M(504, FILE_ALREADY_EXISTS) \
M(505, CANNOT_DELETE_DIRECTORY) \
M(506, UNEXPECTED_ERROR_CODE) \
M(507, UNABLE_TO_SKIP_UNUSED_SHARDS) \
M(508, UNKNOWN_ACCESS_TYPE) \
M(509, INVALID_GRANT) \
@ -501,8 +434,6 @@
M(530, CANNOT_CONNECT_RABBITMQ) \
M(531, CANNOT_FSTAT) \
M(532, LDAP_ERROR) \
M(533, INCONSISTENT_RESERVATIONS) \
M(534, NO_RESERVATIONS_PROVIDED) \
M(535, UNKNOWN_RAID_TYPE) \
M(536, CANNOT_RESTORE_FROM_FIELD_DUMP) \
M(537, ILLEGAL_MYSQL_VARIABLE) \
@ -518,8 +449,6 @@
M(547, INVALID_RAID_TYPE) \
M(548, UNKNOWN_VOLUME) \
M(549, DATA_TYPE_CANNOT_BE_USED_IN_KEY) \
M(550, CONDITIONAL_TREE_PARENT_NOT_FOUND) \
M(551, ILLEGAL_PROJECTION_MANIPULATOR) \
M(552, UNRECOGNIZED_ARGUMENTS) \
M(553, LZMA_STREAM_ENCODER_FAILED) \
M(554, LZMA_STREAM_DECODER_FAILED) \
@ -580,8 +509,6 @@
M(609, FUNCTION_ALREADY_EXISTS) \
M(610, CANNOT_DROP_FUNCTION) \
M(611, CANNOT_CREATE_RECURSIVE_FUNCTION) \
M(612, OBJECT_ALREADY_STORED_ON_DISK) \
M(613, OBJECT_WAS_NOT_STORED_ON_DISK) \
M(614, POSTGRESQL_CONNECTION_FAILURE) \
M(615, CANNOT_ADVISE) \
M(616, UNKNOWN_READ_METHOD) \
@ -612,9 +539,7 @@
M(641, CANNOT_APPEND_TO_FILE) \
M(642, CANNOT_PACK_ARCHIVE) \
M(643, CANNOT_UNPACK_ARCHIVE) \
M(644, REMOTE_FS_OBJECT_CACHE_ERROR) \
M(645, NUMBER_OF_DIMENSIONS_MISMATCHED) \
M(646, CANNOT_BACKUP_DATABASE) \
M(647, CANNOT_BACKUP_TABLE) \
M(648, WRONG_DDL_RENAMING_SETTINGS) \
M(649, INVALID_TRANSACTION) \

View File

@ -80,6 +80,8 @@ template <
class ClearableHashSet
: public HashTable<Key, ClearableHashTableCell<Key, HashTableCell<Key, Hash, ClearableHashSetState>>, Hash, Grower, Allocator>
{
using Cell = ClearableHashTableCell<Key, HashTableCell<Key, Hash, ClearableHashSetState>>;
public:
using Base = HashTable<Key, ClearableHashTableCell<Key, HashTableCell<Key, Hash, ClearableHashSetState>>, Hash, Grower, Allocator>;
using typename Base::LookupResult;
@ -88,6 +90,13 @@ public:
{
++this->version;
this->m_size = 0;
if constexpr (Cell::need_zero_value_storage)
{
/// clear ZeroValueStorage
if (this->hasZero())
this->clearHasZero();
}
}
};
@ -103,11 +112,20 @@ class ClearableHashSetWithSavedHash : public HashTable<
Grower,
Allocator>
{
using Cell = ClearableHashTableCell<Key, HashSetCellWithSavedHash<Key, Hash, ClearableHashSetState>>;
public:
void clear()
{
++this->version;
this->m_size = 0;
if constexpr (Cell::need_zero_value_storage)
{
/// clear ZeroValueStorage
if (this->hasZero())
this->clearHasZero();
}
}
};

View File

@ -96,12 +96,17 @@ using namespace std::chrono_literals;
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false);
std::atomic<Int64> MemoryTracker::free_memory_in_allocator_arenas;
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_)
: parent(parent_)
, log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_)
, level(level_)
{}
MemoryTracker::~MemoryTracker()
{
@ -528,3 +533,10 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value))
;
}
bool canEnqueueBackgroundTask()
{
auto limit = background_memory_tracker.getSoftLimit();
auto amount = background_memory_tracker.get();
return limit == 0 || amount < limit;
}

View File

@ -98,6 +98,7 @@ public:
explicit MemoryTracker(VariableContext level_ = VariableContext::Thread);
explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_);
~MemoryTracker();
@ -110,6 +111,22 @@ public:
return amount.load(std::memory_order_relaxed);
}
// Merges and mutations may pass memory ownership to other threads thus in the end of execution
// MemoryTracker for background task may have a non-zero counter.
// This method is intended to fix the counter inside of background_memory_tracker.
// NOTE: We can't use alloc/free methods to do it, because they also will change the value inside
// of total_memory_tracker.
void adjustOnBackgroundTaskEnd(const MemoryTracker * child)
{
auto background_memory_consumption = child->amount.load(std::memory_order_relaxed);
amount.fetch_sub(background_memory_consumption, std::memory_order_relaxed);
// Also fix CurrentMetrics::MergesMutationsMemoryTracking
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
CurrentMetrics::sub(metric_loaded, background_memory_consumption);
}
Int64 getPeak() const
{
return peak.load(std::memory_order_relaxed);
@ -220,3 +237,6 @@ public:
};
extern MemoryTracker total_memory_tracker;
extern MemoryTracker background_memory_tracker;
bool canEnqueueBackgroundTask();

View File

@ -378,6 +378,13 @@ void transpose(const T * src, char * dst, UInt32 num_bits, UInt32 tail = 64)
/// UInt64[N] transposed matrix -> UIntX[64]
template <typename T, bool full = false>
#if defined(__s390x__)
/* Compiler Bug for S390x :- https://github.com/llvm/llvm-project/issues/62572
* Please remove this after the fix is backported
*/
__attribute__((noinline))
#endif
void reverseTranspose(const char * src, T * buf, UInt32 num_bits, UInt32 tail = 64)
{
UInt64 matrix[64] = {};

View File

@ -172,7 +172,7 @@ void registerCodecDeflateQpl(CompressionCodecFactory & factory);
/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
void registerCodecDelta(CompressionCodecFactory & factory);
void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
@ -188,7 +188,7 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecZSTD(*this);
registerCodecLZ4HC(*this);
registerCodecMultiple(*this);
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
registerCodecDelta(*this);
registerCodecT64(*this);
registerCodecDoubleDelta(*this);

View File

@ -42,6 +42,8 @@ namespace DB
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, max_server_memory_usage, 0, "Limit on total memory usage. Zero means Unlimited.", 0) \
M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to ram ratio. Allows to lower max memory on low-memory systems.", 0) \
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \
M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
\
M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \

View File

@ -338,7 +338,7 @@ void SettingFieldString::readBinary(ReadBuffer & in)
/// that. The linker does not complain only because clickhouse-keeper does not call any of below
/// functions. A cleaner alternative would be more modular libraries, e.g. one for data types, which
/// could then be linked by the server and the linker.
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
SettingFieldMap::SettingFieldMap(const Field & f) : value(fieldToMap(f)) {}

View File

@ -18,7 +18,7 @@
#include "config.h"
#include "config_version.h"
#if USE_SENTRY && !defined(KEEPER_STANDALONE_BUILD)
#if USE_SENTRY && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
# include <sentry.h>
# include <cstdio>

View File

@ -11,9 +11,6 @@ namespace DB
*
* Mostly the same as Int64.
* But also tagged with interval kind.
*
* Intended usage is for temporary elements in expressions,
* not for storing values in tables.
*/
class DataTypeInterval final : public DataTypeNumberBase<Int64>
{
@ -34,7 +31,6 @@ public:
bool equals(const IDataType & rhs) const override;
bool isParametric() const override { return true; }
bool cannotBeStoredInTables() const override { return true; }
bool isCategorial() const override { return false; }
bool canBeInsideNullable() const override { return true; }
};

View File

@ -1,5 +1,6 @@
#include <Databases/DDLDependencyVisitor.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include <Databases/removeWhereConditionPlaceholder.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/misc.h>
@ -12,6 +13,8 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/parseQuery.h>
#include <Common/KnownObjectNames.h>
#include <Poco/String.h>
@ -25,6 +28,7 @@ namespace
/// Used to visits ASTCreateQuery and extracts the names of all tables explicitly referenced in the create query.
class DDLDependencyVisitorData
{
friend void tryVisitNestedSelect(const String & query, DDLDependencyVisitorData & data);
public:
DDLDependencyVisitorData(const ContextPtr & context_, const QualifiedTableName & table_name_, const ASTPtr & ast_)
: create_query(ast_), table_name(table_name_), current_database(context_->getCurrentDatabase()), context(context_)
@ -106,10 +110,18 @@ namespace
if (!info || !info->is_local)
return;
if (!info->table_name.table.empty())
{
if (info->table_name.database.empty())
info->table_name.database = current_database;
dependencies.emplace(std::move(info->table_name));
}
else
{
/// We don't have a table name, we have a select query instead
tryVisitNestedSelect(info->query, *this);
}
}
/// ASTTableExpression represents a reference to a table in SELECT query.
/// DDLDependencyVisitor should handle ASTTableExpression because some CREATE queries can contain SELECT queries after AS
@ -424,6 +436,25 @@ namespace
static bool needChildVisit(const ASTPtr &, const ASTPtr & child, const Data & data) { return data.needChildVisit(child); }
static void visit(const ASTPtr & ast, Data & data) { data.visit(ast); }
};
void tryVisitNestedSelect(const String & query, DDLDependencyVisitorData & data)
{
try
{
ParserSelectWithUnionQuery parser;
String description = fmt::format("Query for ClickHouse dictionary {}", data.table_name);
String fixed_query = removeWhereConditionPlaceholder(query);
ASTPtr select = parseQuery(parser, fixed_query, description,
data.context->getSettingsRef().max_query_size, data.context->getSettingsRef().max_parser_depth);
DDLDependencyVisitor::Visitor visitor{data};
visitor.visit(select);
}
catch (...)
{
tryLogCurrentException("DDLDependencyVisitor");
}
}
}

View File

@ -103,7 +103,7 @@ void DDLLoadingDependencyVisitor::visit(const ASTFunctionWithKeyValueArguments &
auto config = getDictionaryConfigurationFromAST(data.create_query->as<ASTCreateQuery &>(), data.global_context);
auto info = getInfoIfClickHouseDictionarySource(config, data.global_context);
if (!info || !info->is_local)
if (!info || !info->is_local || info->table_name.table.empty())
return;
if (info->table_name.database.empty())

View File

@ -137,7 +137,7 @@ namespace
auto config = getDictionaryConfigurationFromAST(data.create_query->as<ASTCreateQuery &>(), data.global_context);
auto info = getInfoIfClickHouseDictionarySource(config, data.global_context);
if (!info || !info->is_local)
if (!info || !info->is_local || info->table_name.table.empty())
return;
auto * source_list = dictionary.source->elements->as<ASTExpressionList>();

View File

@ -726,7 +726,7 @@ static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context
return create.uuid;
}
void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 max_log_ptr)
void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr)
{
is_recovering = true;
SCOPE_EXIT({ is_recovering = false; });

View File

@ -102,7 +102,7 @@ private:
void checkQueryValid(const ASTPtr & query, ContextPtr query_context) const;
void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 max_log_ptr);
void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr);
std::map<String, String> tryGetConsistentMetadataSnapshot(const ZooKeeperPtr & zookeeper, UInt32 & max_log_ptr);
ASTPtr parseQueryFromMetadataInZooKeeper(const String & node_name, const String & query);

View File

@ -0,0 +1,20 @@
#include <Databases/removeWhereConditionPlaceholder.h>
namespace DB
{
std::string removeWhereConditionPlaceholder(const std::string & query)
{
static constexpr auto true_condition = "(1 = 1)";
auto condition_position = query.find(CONDITION_PLACEHOLDER_TO_REPLACE_VALUE);
if (condition_position != std::string::npos)
{
auto query_copy = query;
query_copy.replace(condition_position, CONDITION_PLACEHOLDER_TO_REPLACE_VALUE.size(), true_condition);
return query_copy;
}
return query;
}
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <string>
namespace DB
{
static constexpr std::string_view CONDITION_PLACEHOLDER_TO_REPLACE_VALUE = "{condition}";
/** In case UPDATE_FIELD is specified in {condition} for dictionary that must load all data.
* Replace {condition} with true_condition for initial dictionary load.
* For next dictionary loads {condition} will be updated with UPDATE_FIELD.
*/
std::string removeWhereConditionPlaceholder(const std::string & query);
}

View File

@ -6,7 +6,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Databases/removeWhereConditionPlaceholder.h>
namespace DB
{
@ -24,7 +24,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static constexpr std::string_view CONDITION_PLACEHOLDER_TO_REPLACE_VALUE = "{condition}";
ExternalQueryBuilder::ExternalQueryBuilder(
const DictionaryStructure & dict_struct_,
@ -82,23 +81,8 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
writeChar(';', out);
return out.str();
}
else
{
/** In case UPDATE_FIELD is specified in {condition} for dictionary that must load all data.
* Replace {condition} with true_condition for initial dictionary load.
* For next dictionary loads {condition} will be updated with UPDATE_FIELD.
*/
static constexpr auto true_condition = "(1 = 1)";
auto condition_position = query.find(CONDITION_PLACEHOLDER_TO_REPLACE_VALUE);
if (condition_position != std::string::npos)
{
auto query_copy = query;
query_copy.replace(condition_position, CONDITION_PLACEHOLDER_TO_REPLACE_VALUE.size(), true_condition);
return query_copy;
}
return query;
}
return removeWhereConditionPlaceholder(query);
}
void ExternalQueryBuilder::composeLoadAllQuery(WriteBuffer & out) const

View File

@ -649,10 +649,12 @@ getInfoIfClickHouseDictionarySource(DictionaryConfigurationPtr & config, Context
String database = config->getString("dictionary.source.clickhouse.db", "");
String table = config->getString("dictionary.source.clickhouse.table", "");
if (table.empty())
return {};
info.query = config->getString("dictionary.source.clickhouse.query", "");
if (!table.empty())
info.table_name = {database, table};
else if (info.query.empty())
return {};
try
{

View File

@ -18,6 +18,7 @@ getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr conte
struct ClickHouseDictionarySourceInfo
{
QualifiedTableName table_name;
String query;
bool is_local = false;
};

View File

@ -47,14 +47,14 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
IAsynchronousReader & reader_,
const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek_)
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, read_settings(settings_)
, reader(reader_)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.prefetch_buffer_size)
, min_bytes_for_seek(min_bytes_for_seek_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
@ -63,6 +63,8 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
#else
, log(&Poco::Logger::get("AsyncBuffer(" + impl->getFileName() + ")"))
#endif
, async_read_counters(async_read_counters_)
, prefetches_log(prefetches_log_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
}
@ -111,7 +113,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
@ -186,8 +188,8 @@ void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemP
.reader_id = current_reader_id,
};
if (auto prefetch_log = Context::getGlobalContextInstance()->getFilesystemReadPrefetchesLog())
prefetch_log->add(elem);
if (prefetches_log)
prefetches_log->add(elem);
}
@ -335,7 +337,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
if (impl->initialized()
&& read_until_position && new_pos < *read_until_position
&& new_pos > file_offset_of_buffer_end
&& new_pos < file_offset_of_buffer_end + min_bytes_for_seek)
&& new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek)
{
ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks);
bytes_to_ignore = new_pos - file_offset_of_buffer_end;

View File

@ -12,6 +12,7 @@ namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
class ReadBufferFromRemoteFSGather;
/**
@ -34,7 +35,8 @@ public:
explicit AsynchronousReadIndirectBufferFromRemoteFS(
IAsynchronousReader & reader_, const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE);
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_);
~AsynchronousReadIndirectBufferFromRemoteFS() override;
@ -83,8 +85,6 @@ private:
Memory<> prefetch_buffer;
size_t min_bytes_for_seek;
std::string query_id;
std::string current_reader_id;
@ -95,6 +95,9 @@ private:
Poco::Logger * log;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;

View File

@ -48,7 +48,8 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
size_t file_size_,
bool allow_seeks_after_first_read_,
bool use_external_buffer_,
std::optional<size_t> read_until_position_)
std::optional<size_t> read_until_position_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
#ifndef NDEBUG
, log(&Poco::Logger::get("CachedOnDiskReadBufferFromFile(" + source_file_path_ + ")"))
@ -62,12 +63,12 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
, read_until_position(read_until_position_ ? *read_until_position_ : file_size_)
, implementation_buffer_creator(implementation_buffer_creator_)
, query_id(query_id_)
, enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log)
, current_buffer_id(getRandomASCIIString(8))
, allow_seeks_after_first_read(allow_seeks_after_first_read_)
, use_external_buffer(use_external_buffer_)
, query_context_holder(cache_->getQueryContextHolder(query_id, settings_))
, is_persistent(settings_.is_file_cache_persistent)
, cache_log(cache_log_)
{
}
@ -103,7 +104,7 @@ void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
break;
}
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
if (cache_log)
cache_log->add(elem);
}
@ -487,7 +488,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto * current_file_segment = &file_segments->front();
auto completed_range = current_file_segment->range();
if (enable_logging)
if (cache_log)
appendFilesystemCacheLog(completed_range, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -512,7 +513,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (enable_logging && file_segments && !file_segments->empty())
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front().range(), read_type);
}

View File

@ -32,7 +32,8 @@ public:
size_t file_size_,
bool allow_seeks_after_first_read_,
bool use_external_buffer_,
std::optional<size_t> read_until_position_ = std::nullopt);
std::optional<size_t> read_until_position_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
~CachedOnDiskReadBufferFromFile() override;
@ -137,7 +138,6 @@ private:
String last_caller_id;
String query_id;
bool enable_logging = false;
String current_buffer_id;
bool allow_seeks_after_first_read;
@ -148,6 +148,8 @@ private:
FileCache::QueryContextHolderPtr query_context_holder;
bool is_persistent;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -153,8 +153,9 @@ FileSegment & FileSegmentRangeWriter::allocateFileSegment(size_t offset, FileSeg
void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_segment)
{
if (cache_log)
{
if (!cache_log)
return;
auto file_segment_range = file_segment.range();
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1;
@ -173,7 +174,6 @@ void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_s
};
cache_log->add(elem);
}
}
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)

View File

@ -8,7 +8,6 @@
#include <iostream>
#include <base/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/Context.h>
namespace DB
@ -17,15 +16,18 @@ namespace DB
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_)
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBuffer(nullptr, 0)
, read_buffer_creator(std::move(read_buffer_creator_))
, blobs_to_read(blobs_to_read_)
, settings(settings_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
, enable_cache_log(!query_id.empty() && settings.enable_filesystem_cache_log)
{
if (cache_log_ && settings.enable_filesystem_cache_log)
cache_log = cache_log_;
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
@ -36,7 +38,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{
if (current_buf != nullptr && !with_cache && enable_cache_log)
if (current_buf != nullptr && !with_cache)
{
appendFilesystemCacheLog();
}
@ -61,7 +63,8 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt);
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt,
cache_log);
}
return current_read_buffer_creator();
@ -69,7 +72,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
{
if (current_object.remote_path.empty())
if (!cache_log || current_object.remote_path.empty())
return;
FilesystemCacheLogElement elem
@ -82,8 +85,6 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
.file_segment_size = total_bytes_read_from_current_file,
.read_from_cache_attempted = false,
};
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
cache_log->add(elem);
}
@ -267,10 +268,8 @@ size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather()
{
if (!with_cache && enable_cache_log)
{
if (!with_cache)
appendFilesystemCacheLog();
}
}
}

View File

@ -25,7 +25,8 @@ public:
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_);
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
~ReadBufferFromRemoteFSGather() override;
@ -93,7 +94,7 @@ private:
size_t total_bytes_read_from_current_file = 0;
bool enable_cache_log = false;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -11,7 +11,6 @@
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/AsyncReadCounters.h>
#include <Interpreters/Context.h>
#include <base/getThreadId.h>
#include <future>
@ -75,17 +74,11 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
return scheduleFromThreadPool<Result>([request]() -> Result
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
std::optional<AsyncReadIncrement> increment;
if (CurrentThread::isInitialized())
{
auto query_context = CurrentThread::get().getQueryContext();
if (query_context)
increment.emplace(query_context->getAsyncReadCounters());
}
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
auto async_read_counters = remote_fs_fd->getReadCounters();
std::optional<AsyncReadIncrement> increment = async_read_counters ? std::optional<AsyncReadIncrement>(async_read_counters) : std::nullopt;
auto watch = std::make_unique<Stopwatch>(CLOCK_MONOTONIC);
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
watch->stop();

View File

@ -8,6 +8,8 @@
namespace DB
{
struct AsyncReadCounters;
class ThreadPoolRemoteFSReader : public IAsynchronousReader
{
public:
@ -24,12 +26,19 @@ private:
class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor
{
public:
explicit RemoteFSFileDescriptor(ReadBuffer & reader_) : reader(reader_) { }
explicit RemoteFSFileDescriptor(
ReadBuffer & reader_,
std::shared_ptr<AsyncReadCounters> async_read_counters_)
: reader(reader_)
, async_read_counters(async_read_counters_) {}
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
std::shared_ptr<AsyncReadCounters> getReadCounters() const { return async_read_counters; }
private:
ReadBuffer & reader;
std::shared_ptr<AsyncReadCounters> async_read_counters;
};
}

View File

@ -5,7 +5,9 @@
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/SynchronousReader.h>
#include <IO/AsynchronousReader.h>
#include <Common/ProfileEvents.h>
#include "config.h"
@ -27,7 +29,6 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
const std::string & filename,
const ReadSettings & settings,
@ -119,11 +120,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
auto & reader = getThreadPoolReader(FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,
@ -137,11 +134,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,

View File

@ -0,0 +1,76 @@
#include <Common/ErrorCodes.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/AsynchronousReader.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <IO/SynchronousReader.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
#include <Interpreters/Context.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type)
{
#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
const auto & config = Poco::Util::Application::instance().config();
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
static auto asynchronous_remote_fs_reader = createThreadPoolReader(type, config);
return *asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
static auto asynchronous_local_fs_reader = createThreadPoolReader(type, config);
return *asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
static auto synchronous_local_fs_reader = createThreadPoolReader(type, config);
return *synchronous_local_fs_reader;
}
}
#else
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
return context->getThreadPoolReader(type);
#endif
}
std::unique_ptr<IAsynchronousReader> createThreadPoolReader(
FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
auto pool_size = config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
return std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
auto pool_size = config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
return std::make_unique<ThreadPoolReader>(pool_size, queue_size);
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::make_unique<SynchronousReader>();
}
}
}
}

View File

@ -0,0 +1,23 @@
#pragma once
namespace Poco::Util { class AbstractConfiguration; }
namespace DB
{
class IAsynchronousReader;
enum class FilesystemReaderType
{
SYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_REMOTE_FS_READER,
};
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type);
std::unique_ptr<IAsynchronousReader> createThreadPoolReader(
FilesystemReaderType type,
const Poco::Util::AbstractConfiguration & config);
}

View File

@ -10,6 +10,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
@ -86,6 +87,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto settings_ptr = settings.get();
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
@ -104,12 +106,16 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
auto reader_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
disk_read_settings);
disk_read_settings,
global_context->getFilesystemCacheLog());
if (disk_read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, disk_read_settings, std::move(reader_impl));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(reader_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -4,6 +4,7 @@
#include <IO/BoundedReadBuffer.h>
#include <Disks/IO/CachedOnDiskWriteBufferFromFile.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Common/CurrentThread.h>

View File

@ -74,7 +74,7 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
hdfs_uri, hdfs_path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
};
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings);
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings, nullptr);
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl), read_settings);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
}

View File

@ -26,15 +26,6 @@ void IObjectStorage::getDirectoryContents(const std::string &,
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getDirectoryContents() is not supported");
}
IAsynchronousReader & IObjectStorage::getThreadPoolReader()
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
return context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
}
ThreadPool & IObjectStorage::getThreadPoolWriter()
{
auto context = Context::getGlobalContextInstance();

View File

@ -157,8 +157,6 @@ public:
virtual const std::string & getCacheName() const;
static IAsynchronousReader & getThreadPoolReader();
static ThreadPool & getThreadPoolWriter();
virtual void shutdown() = 0;

View File

@ -51,6 +51,7 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
std::optional<size_t> file_size) const
{
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[=] (const std::string & file_path, size_t /* read_until_position */)
-> std::unique_ptr<ReadBufferFromFileBase>
@ -59,14 +60,18 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
};
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, modified_settings);
std::move(read_buffer_creator), objects, modified_settings,
global_context->getFilesystemCacheLog());
/// We use `remove_fs_method` (not `local_fs_method`) because we are about to use
/// AsynchronousReadIndirectBufferFromRemoteFS which works by the remote_fs_* settings.
if (modified_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, modified_settings, std::move(impl));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, modified_settings, std::move(impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -98,6 +98,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
std::optional<size_t>) const
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto settings_ptr = s3_settings.get();
@ -121,13 +122,16 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
disk_read_settings);
disk_read_settings,
global_context->getFilesystemCacheLog());
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(s3_impl), disk_read_settings.remote_read_min_bytes_for_seek);
reader, disk_read_settings, std::move(s3_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -13,6 +13,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -179,12 +180,20 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
read_until_position);
};
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), StoredObjects{object}, read_settings);
auto global_context = Context::getGlobalContextInstance();
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
read_settings,
global_context->getFilesystemCacheLog());
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = IObjectStorage::getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(web_impl), min_bytes_for_seek);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, read_settings, std::move(web_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -580,7 +580,7 @@ private:
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
unalignedStoreLittleEndian<UInt64>(buf + 8, 0x00000000FFFF0000ull | (static_cast<UInt64>(ntohl(in)) << 32));
#else
unalignedStoreLittleEndian<UInt64>(buf + 8, 0x00000000FFFF0000ull | (static_cast<UInt64>(in)) << 32));
unalignedStoreLittleEndian<UInt64>(buf + 8, 0x00000000FFFF0000ull | (static_cast<UInt64>(__builtin_bswap32(in))) << 32));
#endif
}
};

View File

@ -47,7 +47,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
else
bypass_cache_threashold = FILECACHE_BYPASS_THRESHOLD;
do_not_evict_index_and_mark_files = config.getUInt64(config_prefix + ".do_not_evict_index_and_mark_files", false);
do_not_evict_index_and_mark_files = config.getUInt64(config_prefix + ".do_not_evict_index_and_mark_files", true);
delayed_cleanup_interval_ms = config.getUInt64(config_prefix + ".delayed_cleanup_interval_ms", FILECACHE_DELAYED_CLEANUP_INTERVAL_MS);
}

View File

@ -61,7 +61,7 @@ struct CreateFileSegmentSettings
: kind(kind_), unbounded(unbounded_) {}
};
class FileSegment : private boost::noncopyable, public std::enable_shared_from_this<FileSegment>
class FileSegment : private boost::noncopyable
{
friend struct LockedKey;
friend class FileCache; /// Because of reserved_size in tryReserve().

View File

@ -1,8 +1,6 @@
#pragma once
#include <mutex>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <boost/noncopyable.hpp>
#include <map>
namespace DB
{
@ -63,6 +61,8 @@ namespace DB
*/
struct CacheGuard : private boost::noncopyable
{
/// struct is used (not keyword `using`) to make CacheGuard::Lock non-interchangable with other guards locks
/// so, we wouldn't be able to pass CacheGuard::Lock to a function which accepts KeyGuard::Lock, for example
struct Lock : public std::unique_lock<std::mutex>
{
explicit Lock(std::mutex & mutex_) : std::unique_lock<std::mutex>(mutex_) {}

View File

@ -208,10 +208,9 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
chassert(key_not_found_policy == KeyNotFoundPolicy::CREATE_EMPTY);
}
/// Not we are at a case:
/// key_state == KeyMetadata::KeyState::REMOVED
/// and KeyNotFoundPolicy == CREATE_EMPTY
/// Retry.
/// Now we are at the case when the key was removed (key_state == KeyMetadata::KeyState::REMOVED)
/// but we need to return empty key (key_not_found_policy == KeyNotFoundPolicy::CREATE_EMPTY)
/// Retry
return lockKeyMetadata(key, key_not_found_policy);
}
@ -241,13 +240,6 @@ void CacheMetadata::doCleanup()
{
auto lock = guard.lock();
/// Let's mention this case.
/// This metadata cleanup is delayed so what is we marked key as deleted and
/// put it to deletion queue, but then the same key was added to cache before
/// we actually performed this delayed removal?
/// In this case it will work fine because on each attempt to add any key to cache
/// we perform this delayed removal.
FileCacheKey cleanup_key;
while (cleanup_queue->tryPop(cleanup_key))
{

View File

@ -669,13 +669,15 @@ SharedContextHolder Context::createShared()
ContextMutablePtr Context::createCopy(const ContextPtr & other)
{
auto lock = other->getLock();
return std::shared_ptr<Context>(new Context(*other));
}
ContextMutablePtr Context::createCopy(const ContextWeakPtr & other)
{
auto ptr = other.lock();
if (!ptr) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't copy an expired context");
if (!ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't copy an expired context");
return createCopy(ptr);
}
@ -4169,35 +4171,8 @@ OrdinaryBackgroundExecutorPtr Context::getCommonExecutor() const
return shared->common_executor;
}
static size_t getThreadPoolReaderSizeFromConfig(Context::FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
return config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
}
case Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
return config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
}
case Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::numeric_limits<std::size_t>::max();
}
}
}
size_t Context::getThreadPoolReaderSize(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
return getThreadPoolReaderSizeFromConfig(type, config);
}
IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
auto lock = getLock();
switch (type)
@ -4205,31 +4180,20 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
if (!shared->asynchronous_remote_fs_reader)
{
auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
shared->asynchronous_remote_fs_reader = std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
}
shared->asynchronous_remote_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->asynchronous_local_fs_reader)
{
auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
shared->asynchronous_local_fs_reader = std::make_unique<ThreadPoolReader>(pool_size, queue_size);
}
shared->asynchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->synchronous_local_fs_reader)
{
shared->synchronous_local_fs_reader = std::make_unique<SynchronousReader>();
}
shared->synchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->synchronous_local_fs_reader;
}

View File

@ -11,6 +11,7 @@
#include <Core/Settings.h>
#include <Core/UUID.h>
#include <IO/AsyncReadCounters.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DatabaseCatalog.h>
@ -1096,17 +1097,8 @@ public:
OrdinaryBackgroundExecutorPtr getFetchesExecutor() const;
OrdinaryBackgroundExecutorPtr getCommonExecutor() const;
enum class FilesystemReaderType
{
SYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_REMOTE_FS_READER,
};
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
size_t getThreadPoolReaderSize(FilesystemReaderType type) const;
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
ThreadPool & getThreadPoolWriter() const;

View File

@ -241,13 +241,17 @@ Chain InterpreterInsertQuery::buildChain(
running_group = std::make_shared<ThreadGroup>(getContext());
auto sample = getSampleBlock(columns, table, metadata_snapshot);
return buildChainImpl(table, metadata_snapshot, sample, thread_status_holder, running_group, elapsed_counter_ms);
Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms);
Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample, thread_status_holder);
chain.appendChain(std::move(sink));
return chain;
}
Chain InterpreterInsertQuery::buildChainImpl(
Chain InterpreterInsertQuery::buildSink(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms)
@ -258,14 +262,7 @@ Chain InterpreterInsertQuery::buildChainImpl(
thread_status = nullptr;
auto context_ptr = getContext();
const ASTInsertQuery * query = nullptr;
if (query_ptr)
query = query_ptr->as<ASTInsertQuery>();
const Settings & settings = context_ptr->getSettingsRef();
bool null_as_default = query && query->select && context_ptr->getSettingsRef().insert_null_as_default;
/// We create a pipeline of several streams, into which we will write data.
Chain out;
/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyed
@ -286,16 +283,48 @@ Chain InterpreterInsertQuery::buildChainImpl(
thread_status_holder, running_group, elapsed_counter_ms);
}
return out;
}
Chain InterpreterInsertQuery::buildPreSinkChain(
const Block & subsequent_header,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder)
{
ThreadStatus * thread_status = current_thread;
if (!thread_status_holder)
thread_status = nullptr;
auto context_ptr = getContext();
const ASTInsertQuery * query = nullptr;
if (query_ptr)
query = query_ptr->as<ASTInsertQuery>();
const Settings & settings = context_ptr->getSettingsRef();
bool null_as_default = query && query->select && context_ptr->getSettingsRef().insert_null_as_default;
/// We create a pipeline of several streams, into which we will write data.
Chain out;
auto input_header = [&]() -> const Block &
{
return out.empty() ? subsequent_header : out.getInputHeader();
};
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
/// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns.
if (const auto & constraints = metadata_snapshot->getConstraints(); !constraints.empty())
out.addSource(std::make_shared<CheckConstraintsTransform>(
table->getStorageID(), out.getInputHeader(), metadata_snapshot->getConstraints(), context_ptr));
table->getStorageID(), input_header(), metadata_snapshot->getConstraints(), context_ptr));
auto adding_missing_defaults_dag = addMissingDefaults(
query_sample_block,
out.getInputHeader().getNamesAndTypesList(),
input_header().getNamesAndTypesList(),
metadata_snapshot->getColumns(),
context_ptr,
null_as_default);
@ -316,12 +345,12 @@ Chain InterpreterInsertQuery::buildChainImpl(
bool table_prefers_large_blocks = table->prefersLargeBlocks();
out.addSource(std::make_shared<SquashingChunksTransform>(
out.getInputHeader(),
input_header(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL));
}
auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), thread_status, getContext()->getQuota());
auto counting = std::make_shared<CountingTransform>(input_header(), thread_status, getContext()->getQuota());
counting->setProcessListElement(context_ptr->getProcessListElement());
counting->setProgressCallback(context_ptr->getProgressCallback());
out.addSource(std::move(counting));
@ -362,10 +391,20 @@ BlockIO InterpreterInsertQuery::execute()
// Distributed INSERT SELECT
distributed_pipeline = table->distributedWrite(query, getContext());
std::vector<Chain> out_chains;
std::vector<Chain> presink_chains;
std::vector<Chain> sink_chains;
if (!distributed_pipeline || query.watch)
{
size_t out_streams_size = 1;
/// Number of streams works like this:
/// * For the SELECT, use `max_threads`, or `max_insert_threads`, or whatever
/// InterpreterSelectQuery ends up with.
/// * Use `max_insert_threads` streams for various insert-preparation steps, e.g.
/// materializing and squashing (too slow to do in one thread). That's `presink_chains`.
/// * If the table supports parallel inserts, use the same streams for writing to IStorage.
/// Otherwise ResizeProcessor them down to 1 stream.
/// * If it's not an INSERT SELECT, forget all that and use one stream.
size_t pre_streams_size = 1;
size_t sink_streams_size = 1;
if (query.select)
{
@ -441,10 +480,14 @@ BlockIO InterpreterInsertQuery::execute()
pipeline.dropTotalsAndExtremes();
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
out_streams_size = std::min(static_cast<size_t>(settings.max_insert_threads), pipeline.getNumStreams());
if (settings.max_insert_threads > 1)
{
pre_streams_size = std::min(static_cast<size_t>(settings.max_insert_threads), pipeline.getNumStreams());
if (table->supportsParallelInsert())
sink_streams_size = pre_streams_size;
}
pipeline.resize(out_streams_size);
pipeline.resize(pre_streams_size);
/// Allow to insert Nullable into non-Nullable columns, NULL values will be added as defaults values.
if (getContext()->getSettingsRef().insert_null_as_default)
@ -476,13 +519,17 @@ BlockIO InterpreterInsertQuery::execute()
running_group = current_thread->getThreadGroup();
if (!running_group)
running_group = std::make_shared<ThreadGroup>(getContext());
for (size_t i = 0; i < out_streams_size; ++i)
for (size_t i = 0; i < sink_streams_size; ++i)
{
auto out = buildChainImpl(table, metadata_snapshot, query_sample_block,
/* thread_status_holder= */ nullptr,
running_group,
/* elapsed_counter_ms= */ nullptr);
out_chains.emplace_back(std::move(out));
auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr,
running_group, /* elapsed_counter_ms= */ nullptr);
sink_chains.emplace_back(std::move(out));
}
for (size_t i = 0; i < pre_streams_size; ++i)
{
auto out = buildPreSinkChain(sink_chains[0].getInputHeader(), table, metadata_snapshot,
query_sample_block, /* thread_status_holder= */ nullptr);
presink_chains.emplace_back(std::move(out));
}
}
@ -495,7 +542,7 @@ BlockIO InterpreterInsertQuery::execute()
}
else if (query.select || query.watch)
{
const auto & header = out_chains.at(0).getInputHeader();
const auto & header = presink_chains.at(0).getInputHeader();
auto actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
@ -516,10 +563,14 @@ BlockIO InterpreterInsertQuery::execute()
size_t num_select_threads = pipeline.getNumThreads();
for (auto & chain : out_chains)
for (auto & chain : presink_chains)
resources = chain.detachResources();
for (auto & chain : sink_chains)
resources = chain.detachResources();
pipeline.addChains(std::move(out_chains));
pipeline.addChains(std::move(presink_chains));
pipeline.resize(sink_chains.size());
pipeline.addChains(std::move(sink_chains));
if (!settings.parallel_view_processing)
{
@ -552,7 +603,8 @@ BlockIO InterpreterInsertQuery::execute()
}
else
{
res.pipeline = QueryPipeline(std::move(out_chains.at(0)));
presink_chains.at(0).appendChain(std::move(sink_chains.at(0)));
res.pipeline = QueryPipeline(std::move(presink_chains[0]));
res.pipeline.setNumThreads(std::min<size_t>(res.pipeline.getNumThreads(), settings.max_threads));
if (query.hasInlinedData() && !async_insert)

View File

@ -66,13 +66,19 @@ private:
std::vector<std::unique_ptr<ReadBuffer>> owned_buffers;
Chain buildChainImpl(
Chain buildSink(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms);
Chain buildPreSinkChain(
const Block & subsequent_header,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder);
};

View File

@ -1,19 +1,14 @@
#include "OpenTelemetrySpanLog.h"
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeEnum.h>
#include <Interpreters/Context.h>
#include <base/hex.h>
#include <Common/CurrentThread.h>
#include <Core/Field.h>
namespace DB
@ -32,11 +27,13 @@ NamesAndTypesList OpenTelemetrySpanLogElement::getNamesAndTypes()
}
);
auto low_cardinality_string = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
return {
{"trace_id", std::make_shared<DataTypeUUID>()},
{"span_id", std::make_shared<DataTypeUInt64>()},
{"parent_span_id", std::make_shared<DataTypeUInt64>()},
{"operation_name", std::make_shared<DataTypeString>()},
{"operation_name", low_cardinality_string},
{"kind", std::move(span_kind_type)},
// DateTime64 is really unwieldy -- there is no "normal" way to convert
// it to an UInt64 count of microseconds, except:
@ -51,15 +48,17 @@ NamesAndTypesList OpenTelemetrySpanLogElement::getNamesAndTypes()
{"start_time_us", std::make_shared<DataTypeUInt64>()},
{"finish_time_us", std::make_shared<DataTypeUInt64>()},
{"finish_date", std::make_shared<DataTypeDate>()},
{"attribute", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>())},
{"attribute", std::make_shared<DataTypeMap>(low_cardinality_string, std::make_shared<DataTypeString>())},
};
}
NamesAndAliases OpenTelemetrySpanLogElement::getNamesAndAliases()
{
auto low_cardinality_string = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
return
{
{"attribute.names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "mapKeys(attribute)"},
{"attribute.names", std::make_shared<DataTypeArray>(low_cardinality_string), "mapKeys(attribute)"},
{"attribute.values", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "mapValues(attribute)"}
};
}
@ -83,4 +82,3 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
}
}

View File

@ -84,6 +84,7 @@ ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_contex
group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
group->memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator);
group->memory_tracker.setParent(&background_memory_tracker);
if (settings.memory_tracker_fault_probability > 0.0)
group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);

View File

@ -0,0 +1,49 @@
#include <Interpreters/Context.h>
#include <Common/tests/gtest_global_context.h>
#include <gtest/gtest.h>
using namespace DB;
template <typename Ptr>
void run(Ptr context)
{
for (size_t i = 0; i < 100; ++i)
{
std::thread t1([context]
{
if constexpr (std::is_same_v<ContextWeakPtr, Ptr>)
context.lock()->getAsyncReadCounters();
else
context->getAsyncReadCounters();
});
std::thread t2([context]
{
Context::createCopy(context);
});
t1.join();
t2.join();
}
}
TEST(Context, MutableRace)
{
auto context = Context::createCopy(getContext().context);
context->makeQueryContext();
run<ContextMutablePtr>(context);
}
TEST(Context, ConstRace)
{
auto context = Context::createCopy(getContext().context);
context->makeQueryContext();
run<ContextPtr>(context);
}
TEST(Context, WeakRace)
{
auto context = Context::createCopy(getContext().context);
context->makeQueryContext();
run<ContextWeakPtr>(context);
}

View File

@ -168,7 +168,6 @@ void FinishAggregatingInOrderAlgorithm::addToAggregation()
accumulated_bytes += static_cast<size_t>(static_cast<double>(states[i].total_bytes) * current_rows / states[i].num_rows);
accumulated_rows += current_rows;
if (!states[i].isValid())
inputs_to_update.push_back(i);
}

View File

@ -99,6 +99,14 @@ void Chain::addSink(ProcessorPtr processor)
processors.emplace_back(std::move(processor));
}
void Chain::appendChain(Chain chain)
{
connect(getOutputPort(), chain.getInputPort());
processors.splice(processors.end(), std::move(chain.processors));
attachResources(chain.detachResources());
num_threads += chain.num_threads;
}
IProcessor & Chain::getSource()
{
checkInitialized(processors);

View File

@ -7,6 +7,10 @@
namespace DB
{
/// Has one unconnected input port and one unconnected output port.
/// There may be other ports on the processors, but they must all be connected.
/// The unconnected input must be on the first processor, output - on the last.
/// The processors don't necessarily form an actual chain.
class Chain
{
public:
@ -27,6 +31,7 @@ public:
void addSource(ProcessorPtr processor);
void addSink(ProcessorPtr processor);
void appendChain(Chain chain);
IProcessor & getSource();
IProcessor & getSink();
@ -44,7 +49,11 @@ public:
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
void addInterpreterContext(ContextPtr context) { holder.interpreter_context.emplace_back(std::move(context)); }
void attachResources(QueryPlanResourceHolder holder_) { holder = std::move(holder_); }
void attachResources(QueryPlanResourceHolder holder_)
{
/// This operator "=" actually merges holder_ into holder, doesn't replace.
holder = std::move(holder_);
}
QueryPlanResourceHolder detachResources() { return std::move(holder); }
void reset();

View File

@ -1,7 +1,7 @@
#include <Server/ProtocolServerAdapter.h>
#include <Server/TCPServer.h>
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
#include <Server/GRPCServer.h>
#endif
@ -37,7 +37,7 @@ ProtocolServerAdapter::ProtocolServerAdapter(
{
}
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
class ProtocolServerAdapter::GRPCServerAdapterImpl : public Impl
{
public:

View File

@ -23,7 +23,7 @@ public:
ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<TCPServer> tcp_server_);
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<GRPCServer> grpc_server_);
#endif

View File

@ -67,7 +67,7 @@ bool AsynchronousReadBufferFromHDFS::hasPendingDataToRead()
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, nullptr);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;

View File

@ -21,6 +21,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <IO/ReadBufferFromString.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTCreateQuery.h>
@ -232,7 +233,7 @@ public:
if (thread_pool_read)
{
return std::make_unique<AsynchronousReadBufferFromHDFS>(
IObjectStorage::getThreadPoolReader(), read_settings, std::move(buf));
getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, std::move(buf));
}
else
{

View File

@ -146,6 +146,8 @@ public:
virtual bool supportsReplication() const { return false; }
/// Returns true if the storage supports parallel insert.
/// If false, each INSERT query will call write() only once.
/// Different INSERT queries may write in parallel regardless of this value.
virtual bool supportsParallelInsert() const { return false; }
/// Returns true if the storage supports deduplication of inserted data blocks.

View File

@ -80,4 +80,9 @@ MergeInfo MergeListElement::getInfo() const
return res;
}
MergeListElement::~MergeListElement()
{
background_memory_tracker.adjustOnBackgroundTaskEnd(&getMemoryTracker());
}
}

View File

@ -115,6 +115,8 @@ struct MergeListElement : boost::noncopyable
MergeListElement * ptr() { return this; }
MergeListElement & ref() { return *this; }
~MergeListElement();
};
/** Maintains a list of currently running merges.

View File

@ -166,8 +166,8 @@ struct Settings;
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for Zero-copy table-independet info.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
/** Compress marks and primary key. */ \
M(Bool, compress_marks, false, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \
M(Bool, compress_primary_key, false, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \
M(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \
M(Bool, compress_primary_key, true, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \
M(String, marks_compression_codec, "ZSTD(3)", "Compression encoding used by marks, marks are small enough and cached, so the default compression is ZSTD(3).", 0) \
M(String, primary_key_compression_codec, "ZSTD(3)", "Compression encoding used by primary, primary key is small enough and cached, so the default compression is ZSTD(3).", 0) \
M(UInt64, marks_compress_block_size, 65536, "Mark compress block size, the actual size of the block to compress.", 0) \

View File

@ -535,7 +535,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (!args.storage_def->order_by)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"You must provide an ORDER BY or PRIMARY KEY expression in the table definition. "
"If you don't want this table to be sorted, use ORDER BY/PRIMARY KEY tuple()");
"If you don't want this table to be sorted, use ORDER BY/PRIMARY KEY ()");
/// Get sorting key from engine arguments.
///

View File

@ -8,6 +8,7 @@
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
#include <Common/ProfileEventsScope.h>
#include <Common/typeid_cast.h>
@ -42,6 +43,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <fmt/core.h>
namespace DB
{
@ -918,7 +920,14 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT;
if (partition_id.empty())
if (!canEnqueueBackgroundTask())
{
if (out_disable_reason)
*out_disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})",
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()));
}
else if (partition_id.empty())
{
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool;

View File

@ -5,6 +5,7 @@
#include <base/hex.h>
#include <Common/Macros.h>
#include <Common/MemoryTracker.h>
#include <Common/ProfileEventsScope.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ZooKeeper/KeeperException.h>
@ -3250,7 +3251,14 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
size_t merges_and_mutations_sum = merges_and_mutations_queued.merges + merges_and_mutations_queued.mutations;
if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
if (!canEnqueueBackgroundTask())
{
LOG_TRACE(log, "Reached memory limit for the background tasks ({}), so won't select new parts to merge or mutate."
"Current background tasks memory usage: {}.",
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()));
}
else if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})"
" is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.",

View File

@ -646,6 +646,7 @@ StorageS3Source::ReadBufferOrFactory StorageS3Source::createS3ReadBuffer(const S
std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
const String & key, const ReadSettings & read_settings, size_t object_size)
{
auto context = getContext();
auto read_buffer_creator =
[this, read_settings, object_size]
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
@ -667,10 +668,17 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{StoredObject{key, object_size}},
read_settings);
read_settings,
/* cache_log */nullptr);
auto & pool_reader = getContext()->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(pool_reader, read_settings, std::move(s3_impl));
auto modified_settings{read_settings};
/// FIXME: Changing this setting to default value breaks something around parquet reading
modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size;
auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
pool_reader, modified_settings, std::move(s3_impl),
context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog());
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)

View File

@ -6,6 +6,7 @@
#include <IO/copyData.h>
#include <IO/WriteBufferFromString.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Interpreters/Context.h>
#include <Common/Config/ConfigProcessor.h>
#include <Storages/HDFS/AsynchronousReadBufferFromHDFS.h>
@ -25,7 +26,7 @@ int main()
String path = "/path/to/hdfs/file";
ReadSettings settings = {};
auto in = std::make_unique<ReadBufferFromHDFS>(hdfs_namenode_url, path, *config, settings);
auto & reader = IObjectStorage::getThreadPoolReader();
auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
AsynchronousReadBufferFromHDFS buf(reader, {}, std::move(in));
String output;

View File

@ -123,7 +123,6 @@
02324_map_combinator_bug
02241_join_rocksdb_bs
02003_WithMergeableStateAfterAggregationAndLimit_LIMIT_BY_LIMIT_OFFSET
01626_cnf_fuzz_long
01115_join_with_dictionary
01009_global_array_join_names
00917_multiple_joins_denny_crane

View File

@ -2244,7 +2244,7 @@ def main(args):
"\nFound hung queries in processlist:", args, "red", attrs=["bold"]
)
)
print(processlist)
print(processlist.decode())
print(get_transactions_list(args))
print_stacktraces()

View File

@ -17,7 +17,6 @@ import urllib.parse
import shlex
import urllib3
import requests
import pyspark
try:
# Please, add modules that required for specific tests only here.
@ -33,6 +32,7 @@ try:
import nats
import ssl
import meilisearch
import pyspark
from confluent_kafka.avro.cached_schema_registry_client import (
CachedSchemaRegistryClient,
)

View File

@ -0,0 +1,6 @@
<clickhouse>
<merge_tree>
<compress_marks>0</compress_marks>
<compress_primary_key>0</compress_primary_key>
</merge_tree>
</clickhouse>

View File

@ -1,7 +1,5 @@
<clickhouse>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<compress_marks>0</compress_marks>
<compress_primary_key>0</compress_primary_key>
</merge_tree>
</clickhouse>

View File

@ -12,7 +12,9 @@ node1 = cluster.add_instance(
with_installed_binary=True,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/wide_parts_only.xml"], with_zookeeper=True
"node2",
main_configs=["configs/wide_parts_only.xml", "configs/no_compress_marks.xml"],
with_zookeeper=True,
)

View File

@ -14,6 +14,7 @@ node_old = cluster.add_instance(
)
node_new = cluster.add_instance(
"node2",
main_configs=["configs/no_compress_marks.xml"],
with_zookeeper=True,
stay_alive=True,
)
@ -29,7 +30,7 @@ def start_cluster():
cluster.shutdown()
def test_vertical_merges_from_comapact_parts(start_cluster):
def test_vertical_merges_from_compact_parts(start_cluster):
for i, node in enumerate([node_old, node_new]):
node.query(
"""
@ -41,7 +42,7 @@ def test_vertical_merges_from_comapact_parts(start_cluster):
vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 1,
min_bytes_for_wide_part = 0,
min_rows_for_wide_part = 100;
min_rows_for_wide_part = 100
""".format(
i
)
@ -104,8 +105,16 @@ def test_vertical_merges_from_comapact_parts(start_cluster):
node_old.query("SYSTEM FLUSH LOGS")
assert not (
# Now the old node is restarted as a new, and its config allows compressed indices, and it merged the data into compressed indices,
# that's why the error about different number of compressed files is expected and ok.
(
node_old.contains_in_log("CHECKSUM_DOESNT_MATCH")
or node_new.contains_in_log("CHECKSUM_DOESNT_MATCH")
and not node_old.contains_in_log("Different number of files")
)
or (
node_new.contains_in_log("CHECKSUM_DOESNT_MATCH")
and not node_new.contains_in_log("Different number of files")
)
)
assert node_new.query(check_query.format("all_0_3_3")) == "Vertical\tWide\n"

View File

@ -191,3 +191,27 @@ def test_dependent_dict_table_distr(node):
node.restart_clickhouse()
query("DROP DATABASE IF EXISTS test_db;")
def test_no_lazy_load():
node2.query("create database no_lazy")
node2.query(
"create table no_lazy.src (n int, m int) engine=MergeTree order by n partition by n % 100"
)
node2.query("insert into no_lazy.src select number, number from numbers(0, 99)")
node2.query("insert into no_lazy.src select number, number from numbers(100, 99)")
node2.query(
"create dictionary no_lazy.dict (n int, mm int) primary key n "
"source(clickhouse(query 'select n, m + sleepEachRow(0.1) as mm from no_lazy.src')) "
"lifetime(min 0 max 0) layout(complex_key_hashed_array(shards 10))"
)
node2.restart_clickhouse()
assert "42\n" == node2.query("select dictGet('no_lazy.dict', 'mm', 42)")
assert "some tables depend on it" in node2.query_and_get_error(
"drop table no_lazy.src", settings={"check_referential_table_dependencies": 1}
)
node2.query("drop database no_lazy")

View File

@ -227,7 +227,7 @@ def test_merge_tree_load_parts_filesystem_error(started_cluster):
# It can be a filesystem exception triggered at initialization of part storage but it hard
# to trigger it because it should be an exception on stat/listDirectory.
# The most easy way to trigger such exception is to use chmod but clickhouse server
# is run with root user in integration test and this won't work. So let's do some
# is run with root user in integration test and this won't work. So let's do
# some stupid things: create a table without adaptive granularity and change mark
# extensions of data files in part to make clickhouse think that it's a compact part which
# cannot be created in such table. This will trigger a LOGICAL_ERROR on part creation.
@ -240,7 +240,8 @@ def test_merge_tree_load_parts_filesystem_error(started_cluster):
).strip()
node3.exec_in_container(
["bash", "-c", f"mv {part_path}id.mrk {part_path}id.mrk3"], privileged=True
["bash", "-c", f"mv {part_path}id.cmrk {part_path}id.cmrk3"],
privileged=True,
)
corrupt_part("mt_load_parts", "all_1_1_0")

Some files were not shown because too many files have changed in this diff Show More