mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge branch 'master' of github.com:ClickHouse/ClickHouse into tighten-limits-functional-tests
This commit is contained in:
commit
6c255e209f
18
.github/actions/debug/action.yml
vendored
Normal file
18
.github/actions/debug/action.yml
vendored
Normal file
@ -0,0 +1,18 @@
|
||||
name: DebugInfo
|
||||
description: Prints workflow debug info
|
||||
|
||||
runs:
|
||||
using: "composite"
|
||||
steps:
|
||||
- name: Print envs
|
||||
shell: bash
|
||||
run: |
|
||||
echo "::group::Envs"
|
||||
env
|
||||
echo "::endgroup::"
|
||||
- name: Print Event.json
|
||||
shell: bash
|
||||
run: |
|
||||
echo "::group::Event.json"
|
||||
python3 -m json.tool "$GITHUB_EVENT_PATH"
|
||||
echo "::endgroup::"
|
109
.github/workflows/auto_releases.yml
vendored
Normal file
109
.github/workflows/auto_releases.yml
vendored
Normal file
@ -0,0 +1,109 @@
|
||||
name: AutoReleases
|
||||
|
||||
env:
|
||||
PYTHONUNBUFFERED: 1
|
||||
|
||||
concurrency:
|
||||
group: autoreleases
|
||||
|
||||
on:
|
||||
# schedule:
|
||||
# - cron: '0 9 * * *'
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
dry-run:
|
||||
description: 'Dry run'
|
||||
required: false
|
||||
default: true
|
||||
type: boolean
|
||||
|
||||
jobs:
|
||||
AutoReleaseInfo:
|
||||
runs-on: [self-hosted, style-checker-aarch64]
|
||||
outputs:
|
||||
data: ${{ steps.info.outputs.AUTO_RELEASE_PARAMS }}
|
||||
dry_run: ${{ steps.info.outputs.DRY_RUN }}
|
||||
steps:
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
ROBOT_CLICKHOUSE_SSH_KEY<<RCSK
|
||||
${{secrets.ROBOT_CLICKHOUSE_SSH_KEY}}
|
||||
RCSK
|
||||
EOF
|
||||
echo "DRY_RUN=true" >> "$GITHUB_ENV"
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
- name: Prepare Info
|
||||
id: info
|
||||
run: |
|
||||
cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
python3 auto_release.py --prepare
|
||||
echo "::group::Auto Release Info"
|
||||
python3 -m json.tool /tmp/autorelease_info.json
|
||||
echo "::endgroup::"
|
||||
{
|
||||
echo 'AUTO_RELEASE_PARAMS<<EOF'
|
||||
cat /tmp/autorelease_info.json
|
||||
echo 'EOF'
|
||||
} >> "$GITHUB_ENV"
|
||||
{
|
||||
echo 'AUTO_RELEASE_PARAMS<<EOF'
|
||||
cat /tmp/autorelease_info.json
|
||||
echo 'EOF'
|
||||
} >> "$GITHUB_OUTPUT"
|
||||
echo "DRY_RUN=true" >> "$GITHUB_OUTPUT"
|
||||
- name: Post Release Branch statuses
|
||||
run: |
|
||||
cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
python3 auto_release.py --post-status
|
||||
- name: Clean up
|
||||
uses: ./.github/actions/clean
|
||||
|
||||
Release_0:
|
||||
needs: AutoReleaseInfo
|
||||
name: Release ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[0].release_branch }}
|
||||
if: ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[0] && fromJson(needs.AutoReleaseInfo.outputs.data).releases[0].ready }}
|
||||
uses: ./.github/workflows/create_release.yml
|
||||
with:
|
||||
ref: ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[0].commit_sha }}
|
||||
type: patch
|
||||
dry-run: ${{ needs.AutoReleaseInfo.outputs.dry_run }}
|
||||
#
|
||||
# Release_1:
|
||||
# needs: [AutoReleaseInfo, Release_0]
|
||||
# name: Release ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[1].release_branch }}
|
||||
# if: ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[1] && fromJson(needs.AutoReleaseInfo.outputs.data).releases[1].ready }}
|
||||
# uses: ./.github/workflows/create_release.yml
|
||||
# with:
|
||||
# ref: ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[1].commit_sha }}
|
||||
# type: patch
|
||||
# dry-run: ${{ env.DRY_RUN }}
|
||||
#
|
||||
# Release_2:
|
||||
# needs: [AutoReleaseInfo, Release_1]
|
||||
# name: Release ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[2].release_branch }}
|
||||
# if: ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[0] && fromJson(needs.AutoReleaseInfo.outputs.data).releases[2].ready }}
|
||||
# uses: ./.github/workflow/create_release.yml
|
||||
# with:
|
||||
# ref: ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[0].commit_sha }}
|
||||
# type: patch
|
||||
# dry-run: ${{ env.DRY_RUN }}
|
||||
#
|
||||
# Release_3:
|
||||
# needs: [AutoReleaseInfo, Release_2]
|
||||
# name: Release ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[3].release_branch }}
|
||||
# if: ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[3] && fromJson(needs.AutoReleaseInfo.outputs.data).releases[3].ready }}
|
||||
# uses: ./.github/workflow/create_release.yml
|
||||
# with:
|
||||
# ref: ${{ fromJson(needs.AutoReleaseInfo.outputs.data).releases[3].commit_sha }}
|
||||
# type: patch
|
||||
# dry-run: ${{ env.DRY_RUN }}
|
||||
|
||||
# - name: Post Slack Message
|
||||
# if: ${{ !cancelled() }}
|
||||
# run: |
|
||||
# cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
# python3 auto_release.py --post-auto-release-complete --wf-status ${{ job.status }}
|
22
.github/workflows/create_release.yml
vendored
22
.github/workflows/create_release.yml
vendored
@ -2,6 +2,7 @@ name: CreateRelease
|
||||
|
||||
concurrency:
|
||||
group: release
|
||||
|
||||
'on':
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
@ -26,6 +27,26 @@ concurrency:
|
||||
required: false
|
||||
default: false
|
||||
type: boolean
|
||||
workflow_call:
|
||||
inputs:
|
||||
ref:
|
||||
description: 'Git reference (branch or commit sha) from which to create the release'
|
||||
required: true
|
||||
type: string
|
||||
type:
|
||||
description: 'The type of release: "new" for a new release or "patch" for a patch release'
|
||||
required: true
|
||||
type: string
|
||||
only-repo:
|
||||
description: 'Run only repos updates including docker (repo-recovery, tests)'
|
||||
required: false
|
||||
default: false
|
||||
type: boolean
|
||||
dry-run:
|
||||
description: 'Dry run'
|
||||
required: false
|
||||
default: false
|
||||
type: boolean
|
||||
|
||||
jobs:
|
||||
CreateRelease:
|
||||
@ -101,6 +122,7 @@ jobs:
|
||||
--volume=".:/wd" --workdir="/wd" \
|
||||
clickhouse/style-test \
|
||||
./tests/ci/changelog.py -v --debug-helpers \
|
||||
--gh-user-or-token ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }} \
|
||||
--jobs=5 \
|
||||
--output="./docs/changelogs/${{ env.RELEASE_TAG }}.md" ${{ env.RELEASE_TAG }}
|
||||
git add ./docs/changelogs/${{ env.RELEASE_TAG }}.md
|
||||
|
3
.gitmodules
vendored
3
.gitmodules
vendored
@ -345,9 +345,6 @@
|
||||
[submodule "contrib/FP16"]
|
||||
path = contrib/FP16
|
||||
url = https://github.com/Maratyszcza/FP16.git
|
||||
[submodule "contrib/robin-map"]
|
||||
path = contrib/robin-map
|
||||
url = https://github.com/Tessil/robin-map.git
|
||||
[submodule "contrib/aklomp-base64"]
|
||||
path = contrib/aklomp-base64
|
||||
url = https://github.com/aklomp/base64.git
|
||||
|
@ -322,17 +322,21 @@ if (DISABLE_OMIT_FRAME_POINTER)
|
||||
set (CMAKE_ASM_FLAGS_ADD "${CMAKE_ASM_FLAGS_ADD} -fno-omit-frame-pointer -mno-omit-leaf-frame-pointer")
|
||||
endif()
|
||||
|
||||
# Before you start hating your debugger because it refuses to show variables ('<optimized out>'), try building with -DDEBUG_O_LEVEL="0"
|
||||
# https://stackoverflow.com/questions/63386189/whats-the-difference-between-a-compilers-o0-option-and-og-option/63386263#63386263
|
||||
set(DEBUG_O_LEVEL "g" CACHE STRING "The -Ox level used for debug builds")
|
||||
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMPILER_FLAGS} ${CMAKE_CXX_FLAGS_ADD}")
|
||||
set (CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -O3 ${DEBUG_INFO_FLAGS} ${CMAKE_CXX_FLAGS_ADD}")
|
||||
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Og ${DEBUG_INFO_FLAGS} ${CMAKE_CXX_FLAGS_ADD}")
|
||||
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O${DEBUG_O_LEVEL} ${DEBUG_INFO_FLAGS} ${CMAKE_CXX_FLAGS_ADD}")
|
||||
|
||||
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMPILER_FLAGS} ${CMAKE_C_FLAGS_ADD}")
|
||||
set (CMAKE_C_FLAGS_RELWITHDEBINFO "${CMAKE_C_FLAGS_RELWITHDEBINFO} -O3 ${DEBUG_INFO_FLAGS} ${CMAKE_C_FLAGS_ADD}")
|
||||
set (CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -Og ${DEBUG_INFO_FLAGS} ${CMAKE_C_FLAGS_ADD}")
|
||||
set (CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O${DEBUG_O_LEVEL} ${DEBUG_INFO_FLAGS} ${CMAKE_C_FLAGS_ADD}")
|
||||
|
||||
set (CMAKE_ASM_FLAGS "${CMAKE_ASM_FLAGS} ${COMPILER_FLAGS} ${CMAKE_ASM_FLAGS_ADD}")
|
||||
set (CMAKE_ASM_FLAGS_RELWITHDEBINFO "${CMAKE_ASM_FLAGS_RELWITHDEBINFO} -O3 ${DEBUG_INFO_FLAGS} ${CMAKE_ASM_FLAGS_ADD}")
|
||||
set (CMAKE_ASM_FLAGS_DEBUG "${CMAKE_ASM_FLAGS_DEBUG} -Og ${DEBUG_INFO_FLAGS} ${CMAKE_ASM_FLAGS_ADD}")
|
||||
set (CMAKE_ASM_FLAGS_DEBUG "${CMAKE_ASM_FLAGS_DEBUG} -O${DEBUG_O_LEVEL} ${DEBUG_INFO_FLAGS} ${CMAKE_ASM_FLAGS_ADD}")
|
||||
|
||||
if (OS_DARWIN)
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
|
||||
|
3
contrib/CMakeLists.txt
vendored
3
contrib/CMakeLists.txt
vendored
@ -209,9 +209,8 @@ endif()
|
||||
option(ENABLE_USEARCH "Enable USearch" ${ENABLE_LIBRARIES})
|
||||
if (ENABLE_USEARCH)
|
||||
add_contrib (FP16-cmake FP16)
|
||||
add_contrib (robin-map-cmake robin-map)
|
||||
add_contrib (SimSIMD-cmake SimSIMD)
|
||||
add_contrib (usearch-cmake usearch) # requires: FP16, robin-map, SimdSIMD
|
||||
add_contrib (usearch-cmake usearch) # requires: FP16, SimdSIMD
|
||||
else ()
|
||||
message(STATUS "Not using USearch")
|
||||
endif ()
|
||||
|
2
contrib/SimSIMD
vendored
2
contrib/SimSIMD
vendored
@ -1 +1 @@
|
||||
Subproject commit de2cb75b9e9e3389d5e1e51fd9f8ed151f3c17cf
|
||||
Subproject commit 91a76d1ac519b3b9dc8957734a3dabd985f00c26
|
1
contrib/robin-map
vendored
1
contrib/robin-map
vendored
@ -1 +0,0 @@
|
||||
Subproject commit 851a59e0e3063ee0e23089062090a73fd3de482d
|
@ -1 +0,0 @@
|
||||
# See contrib/usearch-cmake/CMakeLists.txt
|
2
contrib/usearch
vendored
2
contrib/usearch
vendored
@ -1 +1 @@
|
||||
Subproject commit 30810452bec5d3d3aa0931bb5d761e2f09aa6356
|
||||
Subproject commit e21a5778a0d4469ddaf38c94b7be0196bb701ee4
|
@ -1,5 +1,4 @@
|
||||
set(FP16_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/FP16")
|
||||
set(ROBIN_MAP_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/robin-map")
|
||||
set(SIMSIMD_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/SimSIMD")
|
||||
set(USEARCH_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/usearch")
|
||||
|
||||
@ -7,7 +6,6 @@ add_library(_usearch INTERFACE)
|
||||
|
||||
target_include_directories(_usearch SYSTEM INTERFACE
|
||||
${FP16_PROJECT_DIR}/include
|
||||
${ROBIN_MAP_PROJECT_DIR}/include
|
||||
${SIMSIMD_PROJECT_DIR}/include
|
||||
${USEARCH_PROJECT_DIR}/include)
|
||||
|
||||
|
@ -59,6 +59,8 @@ Parameters:
|
||||
- `ef_construction`: (optional, default: 128)
|
||||
- `ef_search`: (optional, default: 64)
|
||||
|
||||
Value 0 for parameters `m`, `ef_construction`, and `ef_search` refers to the default value.
|
||||
|
||||
Example:
|
||||
|
||||
```sql
|
||||
|
@ -1042,10 +1042,23 @@ Compression rates of LZ4 or ZSTD improve on average by 20-40%.
|
||||
This setting works best for tables with no primary key or a low-cardinality primary key, i.e. a table with only few distinct primary key values.
|
||||
High-cardinality primary keys, e.g. involving timestamp columns of type `DateTime64`, are not expected to benefit from this setting.
|
||||
|
||||
### deduplicate_merge_projection_mode
|
||||
## lightweight_mutation_projection_mode
|
||||
|
||||
By default, lightweight delete `DELETE` does not work for tables with projections. This is because rows in a projection may be affected by a `DELETE` operation. So the default value would be `throw`.
|
||||
However, this option can change the behavior. With the value either `drop` or `rebuild`, deletes will work with projections. `drop` would delete the projection so it might be fast in the current query as projection gets deleted but slow in future queries as no projection attached.
|
||||
`rebuild` would rebuild the projection which might affect the performance of the current query, but might speedup for future queries. A good thing is that these options would only work in the part level,
|
||||
which means projections in the part that don't get touched would stay intact instead of triggering any action like drop or rebuild.
|
||||
|
||||
Possible values:
|
||||
|
||||
- throw, drop, rebuild
|
||||
|
||||
Default value: throw
|
||||
|
||||
## deduplicate_merge_projection_mode
|
||||
|
||||
Whether to allow create projection for the table with non-classic MergeTree, that is not (Replicated, Shared) MergeTree. If allowed, what is the action when merge projections, either drop or rebuild. So classic MergeTree would ignore this setting.
|
||||
It also controls `OPTIMIZE DEDUPLICATE` as well, but has effect on all MergeTree family members.
|
||||
It also controls `OPTIMIZE DEDUPLICATE` as well, but has effect on all MergeTree family members. Similar to the option `lightweight_mutation_projection_mode`, it is also part level.
|
||||
|
||||
Possible values:
|
||||
|
||||
|
@ -38,8 +38,7 @@ If you anticipate frequent deletes, consider using a [custom partitioning key](/
|
||||
|
||||
### Lightweight `DELETE`s with projections
|
||||
|
||||
By default, `DELETE` does not work for tables with projections. This is because rows in a projection may be affected by a `DELETE` operation and may require the projection to be rebuilt, negatively affecting `DELETE` performance.
|
||||
However, there is an option to change this behavior. By changing setting `lightweight_mutation_projection_mode = 'drop'`, deletes will work with projections.
|
||||
By default, `DELETE` does not work for tables with projections. This is because rows in a projection may be affected by a `DELETE` operation. But there is a [MergeTree setting](https://clickhouse.com/docs/en/operations/settings/merge-tree-settings) `lightweight_mutation_projection_mode` can change the behavior.
|
||||
|
||||
## Performance considerations when using lightweight `DELETE`
|
||||
|
||||
|
@ -490,8 +490,6 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
|
||||
|
||||
/// process_list_element_holder is used to make an element in ProcessList live while BACKUP is working asynchronously.
|
||||
auto process_list_element = context_in_use->getProcessListElement();
|
||||
/// Update context to preserve query information in processlist (settings, current_database)
|
||||
process_list_element->updateContext(context_in_use);
|
||||
|
||||
thread_pool.scheduleOrThrowOnError(
|
||||
[this,
|
||||
@ -855,8 +853,6 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
|
||||
|
||||
/// process_list_element_holder is used to make an element in ProcessList live while RESTORE is working asynchronously.
|
||||
auto process_list_element = context_in_use->getProcessListElement();
|
||||
/// Update context to preserve query information in processlist (settings, current_database)
|
||||
process_list_element->updateContext(context_in_use);
|
||||
|
||||
thread_pool.scheduleOrThrowOnError(
|
||||
[this,
|
||||
|
@ -184,14 +184,20 @@ void DynamicResourceManager::updateConfiguration(const Poco::Util::AbstractConfi
|
||||
|
||||
// Resource update leads to loss of runtime data of nodes and may lead to temporary violation of constraints (e.g. limits)
|
||||
// Try to minimise this by reusing "equal" resources (initialized with the same configuration).
|
||||
std::vector<State::ResourcePtr> resources_to_attach;
|
||||
for (auto & [name, new_resource] : new_state->resources)
|
||||
{
|
||||
if (auto iter = state->resources.find(name); iter != state->resources.end()) // Resource update
|
||||
{
|
||||
State::ResourcePtr old_resource = iter->second;
|
||||
if (old_resource->equals(*new_resource))
|
||||
{
|
||||
new_resource = old_resource; // Rewrite with older version to avoid loss of runtime data
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// It is new or updated resource
|
||||
resources_to_attach.emplace_back(new_resource);
|
||||
}
|
||||
|
||||
// Commit new state
|
||||
@ -199,17 +205,14 @@ void DynamicResourceManager::updateConfiguration(const Poco::Util::AbstractConfi
|
||||
state = new_state;
|
||||
|
||||
// Attach new and updated resources to the scheduler
|
||||
for (auto & [name, resource] : new_state->resources)
|
||||
for (auto & resource : resources_to_attach)
|
||||
{
|
||||
const SchedulerNodePtr & root = resource->nodes.find("/")->second.ptr;
|
||||
if (root->parent == nullptr)
|
||||
resource->attached_to = &scheduler;
|
||||
scheduler.event_queue->enqueue([this, root]
|
||||
{
|
||||
resource->attached_to = &scheduler;
|
||||
scheduler.event_queue->enqueue([this, root]
|
||||
{
|
||||
scheduler.attachChild(root);
|
||||
});
|
||||
}
|
||||
scheduler.attachChild(root);
|
||||
});
|
||||
}
|
||||
|
||||
// NOTE: after mutex unlock `state` became available for Classifier(s) and must be immutable
|
||||
|
@ -325,6 +325,7 @@ class IColumn;
|
||||
\
|
||||
M(Bool, join_use_nulls, false, "Use NULLs for non-joined rows of outer JOINs for types that can be inside Nullable. If false, use default value of corresponding columns data type.", IMPORTANT) \
|
||||
\
|
||||
M(Int32, join_output_by_rowlist_perkey_rows_threshold, 5, "The lower limit of per-key average rows in the right table to determine whether to output by row list in hash join.", 0) \
|
||||
M(JoinStrictness, join_default_strictness, JoinStrictness::All, "Set default strictness in JOIN query. Possible values: empty string, 'ANY', 'ALL'. If empty, query without strictness will throw exception.", 0) \
|
||||
M(Bool, any_join_distinct_right_table_keys, false, "Enable old ANY JOIN logic with many-to-one left-to-right table keys mapping for all ANY JOINs. It leads to confusing not equal results for 't1 ANY LEFT JOIN t2' and 't2 ANY RIGHT JOIN t1'. ANY RIGHT JOIN needs one-to-many keys mapping to be consistent with LEFT one.", IMPORTANT) \
|
||||
M(Bool, single_join_prefer_left_table, true, "For single JOIN in case of identifier ambiguity prefer left table", IMPORTANT) \
|
||||
|
@ -87,6 +87,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
{"allow_experimental_time_series_table", false, false, "Added new setting to allow the TimeSeries table engine"},
|
||||
{"enable_analyzer", 1, 1, "Added an alias to a setting `allow_experimental_analyzer`."},
|
||||
{"optimize_functions_to_subcolumns", false, true, "Enabled settings by default"},
|
||||
{"join_output_by_rowlist_perkey_rows_threshold", 0, 5, "The lower limit of per-key average rows in the right table to determine whether to output by row list in hash join."},
|
||||
{"allow_experimental_vector_similarity_index", false, false, "Added new setting to allow experimental vector similarity indexes"},
|
||||
}
|
||||
},
|
||||
|
@ -645,8 +645,9 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size);
|
||||
|
||||
std::string failure_reason;
|
||||
bool continue_predownload = file_segment.reserve(
|
||||
current_predownload_size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds);
|
||||
current_predownload_size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, failure_reason);
|
||||
if (continue_predownload)
|
||||
{
|
||||
LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size);
|
||||
@ -1002,7 +1003,8 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|
||||
{
|
||||
chassert(file_offset_of_buffer_end + size - 1 <= file_segment.range().right);
|
||||
|
||||
bool success = file_segment.reserve(size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds);
|
||||
std::string failure_reason;
|
||||
bool success = file_segment.reserve(size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, failure_reason);
|
||||
if (success)
|
||||
{
|
||||
chassert(file_segment.getCurrentWriteOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
|
||||
@ -1028,7 +1030,8 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|
||||
LOG_TRACE(log, "Bypassing cache because writeCache method failed");
|
||||
}
|
||||
else
|
||||
LOG_TRACE(log, "No space left in cache to reserve {} bytes, will continue without cache download", size);
|
||||
LOG_TRACE(log, "No space left in cache to reserve {} bytes, reason: {}, "
|
||||
"will continue without cache download", failure_reason, size);
|
||||
|
||||
if (!success)
|
||||
{
|
||||
|
@ -91,7 +91,8 @@ bool FileSegmentRangeWriter::write(char * data, size_t size, size_t offset, File
|
||||
|
||||
size_t size_to_write = std::min(available_size, size);
|
||||
|
||||
bool reserved = file_segment->reserve(size_to_write, reserve_space_lock_wait_timeout_milliseconds);
|
||||
std::string failure_reason;
|
||||
bool reserved = file_segment->reserve(size_to_write, reserve_space_lock_wait_timeout_milliseconds, failure_reason);
|
||||
if (!reserved)
|
||||
{
|
||||
appendFilesystemCacheLog(*file_segment);
|
||||
|
@ -804,7 +804,8 @@ bool FileCache::tryReserve(
|
||||
const size_t size,
|
||||
FileCacheReserveStat & reserve_stat,
|
||||
const UserInfo & user,
|
||||
size_t lock_wait_timeout_milliseconds)
|
||||
size_t lock_wait_timeout_milliseconds,
|
||||
std::string & failure_reason)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheReserveMicroseconds);
|
||||
|
||||
@ -817,6 +818,7 @@ bool FileCache::tryReserve(
|
||||
if (cache_is_being_resized.load(std::memory_order_relaxed))
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::FilesystemCacheFailToReserveSpaceBecauseOfCacheResize);
|
||||
failure_reason = "cache is being resized";
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -824,6 +826,7 @@ bool FileCache::tryReserve(
|
||||
if (!cache_lock)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::FilesystemCacheFailToReserveSpaceBecauseOfLockContention);
|
||||
failure_reason = "cache contention";
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -847,6 +850,7 @@ bool FileCache::tryReserve(
|
||||
LOG_TEST(log, "Query limit exceeded, space reservation failed, "
|
||||
"recache_on_query_limit_exceeded is disabled (while reserving for {}:{})",
|
||||
file_segment.key(), file_segment.offset());
|
||||
failure_reason = "query limit exceeded";
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -877,6 +881,7 @@ bool FileCache::tryReserve(
|
||||
if (!query_priority->collectCandidatesForEviction(
|
||||
size, required_elements_num, reserve_stat, eviction_candidates, {}, user.user_id, cache_lock))
|
||||
{
|
||||
failure_reason = "cannot evict enough space for query limit";
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -891,11 +896,15 @@ bool FileCache::tryReserve(
|
||||
if (!main_priority->collectCandidatesForEviction(
|
||||
size, required_elements_num, reserve_stat, eviction_candidates, queue_iterator, user.user_id, cache_lock))
|
||||
{
|
||||
failure_reason = "cannot evict enough space";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!file_segment.getKeyMetadata()->createBaseDirectory())
|
||||
{
|
||||
failure_reason = "not enough space on device";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (eviction_candidates.size() > 0)
|
||||
{
|
||||
|
@ -165,7 +165,8 @@ public:
|
||||
size_t size,
|
||||
FileCacheReserveStat & stat,
|
||||
const UserInfo & user,
|
||||
size_t lock_wait_timeout_milliseconds);
|
||||
size_t lock_wait_timeout_milliseconds,
|
||||
std::string & failure_reason);
|
||||
|
||||
std::vector<FileSegment::Info> getFileSegmentInfos(const UserID & user_id);
|
||||
|
||||
|
@ -502,7 +502,11 @@ LockedKeyPtr FileSegment::lockKeyMetadata(bool assert_exists) const
|
||||
return metadata->tryLock();
|
||||
}
|
||||
|
||||
bool FileSegment::reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat)
|
||||
bool FileSegment::reserve(
|
||||
size_t size_to_reserve,
|
||||
size_t lock_wait_timeout_milliseconds,
|
||||
std::string & failure_reason,
|
||||
FileCacheReserveStat * reserve_stat)
|
||||
{
|
||||
if (!size_to_reserve)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Zero space reservation is not allowed");
|
||||
@ -554,7 +558,7 @@ bool FileSegment::reserve(size_t size_to_reserve, size_t lock_wait_timeout_milli
|
||||
if (!reserve_stat)
|
||||
reserve_stat = &dummy_stat;
|
||||
|
||||
bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user, lock_wait_timeout_milliseconds);
|
||||
bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user, lock_wait_timeout_milliseconds, failure_reason);
|
||||
|
||||
if (!reserved)
|
||||
setDownloadFailedUnlocked(lock());
|
||||
|
@ -201,7 +201,11 @@ public:
|
||||
|
||||
/// Try to reserve exactly `size` bytes (in addition to the getDownloadedSize() bytes already downloaded).
|
||||
/// Returns true if reservation was successful, false otherwise.
|
||||
bool reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat = nullptr);
|
||||
bool reserve(
|
||||
size_t size_to_reserve,
|
||||
size_t lock_wait_timeout_milliseconds,
|
||||
std::string & failure_reason,
|
||||
FileCacheReserveStat * reserve_stat = nullptr);
|
||||
|
||||
/// Write data into reserved space.
|
||||
void write(char * from, size_t size, size_t offset_in_file);
|
||||
|
@ -705,7 +705,8 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
|
||||
{
|
||||
auto size = reader->available();
|
||||
|
||||
if (!file_segment.reserve(size, reserve_space_lock_wait_timeout_milliseconds))
|
||||
std::string failure_reason;
|
||||
if (!file_segment.reserve(size, reserve_space_lock_wait_timeout_milliseconds, failure_reason))
|
||||
{
|
||||
LOG_TEST(
|
||||
log, "Failed to reserve space during background download "
|
||||
|
@ -75,7 +75,8 @@ void WriteBufferToFileSegment::nextImpl()
|
||||
FileCacheReserveStat reserve_stat;
|
||||
/// In case of an error, we don't need to finalize the file segment
|
||||
/// because it will be deleted soon and completed in the holder's destructor.
|
||||
bool ok = file_segment->reserve(bytes_to_write, reserve_space_lock_wait_timeout_milliseconds, &reserve_stat);
|
||||
std::string failure_reason;
|
||||
bool ok = file_segment->reserve(bytes_to_write, reserve_space_lock_wait_timeout_milliseconds, failure_reason, &reserve_stat);
|
||||
|
||||
if (!ok)
|
||||
{
|
||||
@ -84,9 +85,10 @@ void WriteBufferToFileSegment::nextImpl()
|
||||
reserve_stat_msg += fmt::format("{} hold {}, can release {}; ",
|
||||
toString(kind), ReadableSize(stat.non_releasable_size), ReadableSize(stat.releasable_size));
|
||||
|
||||
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Failed to reserve {} bytes for {}: {}(segment info: {})",
|
||||
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Failed to reserve {} bytes for {}: reason {}, {}(segment info: {})",
|
||||
bytes_to_write,
|
||||
file_segment->getKind() == FileSegmentKind::Temporary ? "temporary file" : "the file in cache",
|
||||
failure_reason,
|
||||
reserve_stat_msg,
|
||||
file_segment->getInfoForLog()
|
||||
);
|
||||
|
@ -15,48 +15,115 @@ JoinOnKeyColumns::JoinOnKeyColumns(const Block & block, const Names & key_names_
|
||||
{
|
||||
}
|
||||
|
||||
template<> void AddedColumns<false>::buildOutput()
|
||||
{
|
||||
}
|
||||
template<>
|
||||
void AddedColumns<false>::buildOutput() {}
|
||||
|
||||
template<>
|
||||
void AddedColumns<false>::buildJoinGetOutput() {}
|
||||
|
||||
template<>
|
||||
template<bool from_row_list>
|
||||
void AddedColumns<false>::buildOutputFromBlocks() {}
|
||||
|
||||
template<>
|
||||
void AddedColumns<true>::buildOutput()
|
||||
{
|
||||
for (size_t i = 0; i < this->size(); ++i)
|
||||
if (!output_by_row_list)
|
||||
buildOutputFromBlocks<false>();
|
||||
else
|
||||
{
|
||||
auto& col = columns[i];
|
||||
size_t default_count = 0;
|
||||
auto apply_default = [&]()
|
||||
if (join_data_avg_perkey_rows < output_by_row_list_threshold)
|
||||
buildOutputFromBlocks<true>();
|
||||
else
|
||||
{
|
||||
if (default_count > 0)
|
||||
for (size_t i = 0; i < this->size(); ++i)
|
||||
{
|
||||
JoinCommon::addDefaultValues(*col, type_name[i].type, default_count);
|
||||
default_count = 0;
|
||||
}
|
||||
};
|
||||
|
||||
for (size_t j = 0; j < lazy_output.blocks.size(); ++j)
|
||||
{
|
||||
if (!lazy_output.blocks[j])
|
||||
{
|
||||
default_count++;
|
||||
continue;
|
||||
}
|
||||
apply_default();
|
||||
const auto & column_from_block = reinterpret_cast<const Block *>(lazy_output.blocks[j])->getByPosition(right_indexes[i]);
|
||||
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
|
||||
if (is_join_get)
|
||||
{
|
||||
if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get());
|
||||
nullable_col && !column_from_block.column->isNullable())
|
||||
auto & col = columns[i];
|
||||
for (auto row_ref_i : lazy_output.row_refs)
|
||||
{
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, lazy_output.row_nums[j]);
|
||||
continue;
|
||||
if (row_ref_i)
|
||||
{
|
||||
const RowRefList * row_ref_list = reinterpret_cast<const RowRefList *>(row_ref_i);
|
||||
for (auto it = row_ref_list->begin(); it.ok(); ++it)
|
||||
col->insertFrom(*it->block->getByPosition(right_indexes[i]).column, it->row_num);
|
||||
}
|
||||
else
|
||||
type_name[i].type->insertDefaultInto(*col);
|
||||
}
|
||||
}
|
||||
col->insertFrom(*column_from_block.column, lazy_output.row_nums[j]);
|
||||
}
|
||||
apply_default();
|
||||
}
|
||||
}
|
||||
|
||||
template<>
|
||||
void AddedColumns<true>::buildJoinGetOutput()
|
||||
{
|
||||
for (size_t i = 0; i < this->size(); ++i)
|
||||
{
|
||||
auto & col = columns[i];
|
||||
for (auto row_ref_i : lazy_output.row_refs)
|
||||
{
|
||||
if (!row_ref_i)
|
||||
{
|
||||
type_name[i].type->insertDefaultInto(*col);
|
||||
continue;
|
||||
}
|
||||
const auto * row_ref = reinterpret_cast<const RowRef *>(row_ref_i);
|
||||
const auto & column_from_block = row_ref->block->getByPosition(right_indexes[i]);
|
||||
if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get()); nullable_col && !column_from_block.column->isNullable())
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, row_ref->row_num);
|
||||
else
|
||||
col->insertFrom(*column_from_block.column, row_ref->row_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<>
|
||||
template<bool from_row_list>
|
||||
void AddedColumns<true>::buildOutputFromBlocks()
|
||||
{
|
||||
if (this->size() == 0)
|
||||
return;
|
||||
std::vector<const Block *> blocks;
|
||||
std::vector<UInt32> row_nums;
|
||||
blocks.reserve(lazy_output.row_refs.size());
|
||||
row_nums.reserve(lazy_output.row_refs.size());
|
||||
for (auto row_ref_i : lazy_output.row_refs)
|
||||
{
|
||||
if (row_ref_i)
|
||||
{
|
||||
if constexpr (from_row_list)
|
||||
{
|
||||
const RowRefList * row_ref_list = reinterpret_cast<const RowRefList *>(row_ref_i);
|
||||
for (auto it = row_ref_list->begin(); it.ok(); ++it)
|
||||
{
|
||||
blocks.emplace_back(it->block);
|
||||
row_nums.emplace_back(it->row_num);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
const RowRef * row_ref = reinterpret_cast<const RowRefList *>(row_ref_i);
|
||||
blocks.emplace_back(row_ref->block);
|
||||
row_nums.emplace_back(row_ref->row_num);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
blocks.emplace_back(nullptr);
|
||||
row_nums.emplace_back(0);
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < this->size(); ++i)
|
||||
{
|
||||
auto & col = columns[i];
|
||||
for (size_t j = 0; j < blocks.size(); ++j)
|
||||
{
|
||||
if (blocks[j])
|
||||
col->insertFrom(*blocks[j]->getByPosition(right_indexes[i]).column, row_nums[j]);
|
||||
else
|
||||
type_name[i].type->insertDefaultInto(*col);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -72,29 +139,27 @@ void AddedColumns<false>::applyLazyDefaults()
|
||||
}
|
||||
|
||||
template<>
|
||||
void AddedColumns<true>::applyLazyDefaults()
|
||||
{
|
||||
}
|
||||
void AddedColumns<true>::applyLazyDefaults() {}
|
||||
|
||||
template <>
|
||||
void AddedColumns<false>::appendFromBlock(const Block & block, size_t row_num,const bool has_defaults)
|
||||
void AddedColumns<false>::appendFromBlock(const RowRef * row_ref, const bool has_defaults)
|
||||
{
|
||||
if (has_defaults)
|
||||
applyLazyDefaults();
|
||||
|
||||
#ifndef NDEBUG
|
||||
checkBlock(block);
|
||||
checkBlock(*row_ref->block);
|
||||
#endif
|
||||
if (is_join_get)
|
||||
{
|
||||
size_t right_indexes_size = right_indexes.size();
|
||||
for (size_t j = 0; j < right_indexes_size; ++j)
|
||||
{
|
||||
const auto & column_from_block = block.getByPosition(right_indexes[j]);
|
||||
const auto & column_from_block = row_ref->block->getByPosition(right_indexes[j]);
|
||||
if (auto * nullable_col = nullable_column_ptrs[j])
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, row_ref->row_num);
|
||||
else
|
||||
columns[j]->insertFrom(*column_from_block.column, row_num);
|
||||
columns[j]->insertFrom(*column_from_block.column, row_ref->row_num);
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -102,22 +167,21 @@ void AddedColumns<false>::appendFromBlock(const Block & block, size_t row_num,co
|
||||
size_t right_indexes_size = right_indexes.size();
|
||||
for (size_t j = 0; j < right_indexes_size; ++j)
|
||||
{
|
||||
const auto & column_from_block = block.getByPosition(right_indexes[j]);
|
||||
columns[j]->insertFrom(*column_from_block.column, row_num);
|
||||
const auto & column_from_block = row_ref->block->getByPosition(right_indexes[j]);
|
||||
columns[j]->insertFrom(*column_from_block.column, row_ref->row_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <>
|
||||
void AddedColumns<true>::appendFromBlock(const Block & block, size_t row_num, bool)
|
||||
void AddedColumns<true>::appendFromBlock(const RowRef * row_ref, bool)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
checkBlock(block);
|
||||
checkBlock(*row_ref->block);
|
||||
#endif
|
||||
if (has_columns_to_add)
|
||||
{
|
||||
lazy_output.blocks.emplace_back(reinterpret_cast<UInt64>(&block));
|
||||
lazy_output.row_nums.emplace_back(static_cast<uint32_t>(row_num));
|
||||
lazy_output.row_refs.emplace_back(reinterpret_cast<UInt64>(row_ref));
|
||||
}
|
||||
}
|
||||
template<>
|
||||
@ -131,8 +195,7 @@ void AddedColumns<true>::appendDefaultRow()
|
||||
{
|
||||
if (has_columns_to_add)
|
||||
{
|
||||
lazy_output.blocks.emplace_back(0);
|
||||
lazy_output.row_nums.emplace_back(0);
|
||||
lazy_output.row_refs.emplace_back(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -50,8 +50,7 @@ public:
|
||||
|
||||
struct LazyOutput
|
||||
{
|
||||
PaddedPODArray<UInt64> blocks;
|
||||
PaddedPODArray<UInt32> row_nums;
|
||||
PaddedPODArray<UInt64> row_refs;
|
||||
};
|
||||
|
||||
AddedColumns(
|
||||
@ -76,8 +75,7 @@ public:
|
||||
if constexpr (lazy)
|
||||
{
|
||||
has_columns_to_add = num_columns_to_add > 0;
|
||||
lazy_output.blocks.reserve(rows_to_add);
|
||||
lazy_output.row_nums.reserve(rows_to_add);
|
||||
lazy_output.row_refs.reserve(rows_to_add);
|
||||
}
|
||||
|
||||
columns.reserve(num_columns_to_add);
|
||||
@ -115,18 +113,22 @@ public:
|
||||
if (columns[j]->isNullable() && !saved_column->isNullable())
|
||||
nullable_column_ptrs[j] = typeid_cast<ColumnNullable *>(columns[j].get());
|
||||
}
|
||||
join_data_avg_perkey_rows = join.getJoinedData()->avgPerKeyRows();
|
||||
output_by_row_list_threshold = join.getTableJoin().outputByRowListPerkeyRowsThreshold();
|
||||
}
|
||||
|
||||
size_t size() const { return columns.size(); }
|
||||
|
||||
void buildOutput();
|
||||
|
||||
void buildJoinGetOutput();
|
||||
|
||||
ColumnWithTypeAndName moveColumn(size_t i)
|
||||
{
|
||||
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name);
|
||||
}
|
||||
|
||||
void appendFromBlock(const Block & block, size_t row_num, bool has_default);
|
||||
void appendFromBlock(const RowRef * row_ref, bool has_default);
|
||||
|
||||
void appendDefaultRow();
|
||||
|
||||
@ -134,6 +136,8 @@ public:
|
||||
|
||||
const IColumn & leftAsofKey() const { return *left_asof_key; }
|
||||
|
||||
static constexpr bool isLazy() { return lazy; }
|
||||
|
||||
Block left_block;
|
||||
std::vector<JoinOnKeyColumns> join_on_keys;
|
||||
ExpressionActionsPtr additional_filter_expression;
|
||||
@ -142,6 +146,9 @@ public:
|
||||
size_t rows_to_add;
|
||||
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
|
||||
bool need_filter = false;
|
||||
bool output_by_row_list = false;
|
||||
size_t join_data_avg_perkey_rows = 0;
|
||||
size_t output_by_row_list_threshold = 0;
|
||||
IColumn::Filter filter;
|
||||
|
||||
void reserve(bool need_replicate)
|
||||
@ -212,15 +219,22 @@ private:
|
||||
columns.back()->reserve(src_column.column->size());
|
||||
type_name.emplace_back(src_column.type, src_column.name, qualified_name);
|
||||
}
|
||||
|
||||
/** Build output from the blocks that extract from `RowRef` or `RowRefList`, to avoid block cache miss which may cause performance slow down.
|
||||
* And This problem would happen it we directly build output from `RowRef` or `RowRefList`.
|
||||
*/
|
||||
template<bool from_row_list>
|
||||
void buildOutputFromBlocks();
|
||||
};
|
||||
|
||||
/// Adapter class to pass into addFoundRowAll
|
||||
/// In joinRightColumnsWithAdditionalFilter we don't want to add rows directly into AddedColumns,
|
||||
/// because they need to be filtered by additional_filter_expression.
|
||||
class PreSelectedRows : public std::vector<RowRef>
|
||||
class PreSelectedRows : public std::vector<const RowRef *>
|
||||
{
|
||||
public:
|
||||
void appendFromBlock(const Block & block, size_t row_num, bool /* has_default */) { this->emplace_back(&block, row_num); }
|
||||
void appendFromBlock(const RowRef * row_ref, bool /* has_default */) { this->emplace_back(row_ref); }
|
||||
static constexpr bool isLazy() { return false; }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -495,7 +495,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
|
||||
}
|
||||
|
||||
size_t rows = source_block.rows();
|
||||
|
||||
data->rows_to_join += rows;
|
||||
const auto & right_key_names = table_join->getAllNames(JoinTableSide::Right);
|
||||
ColumnPtrMap all_key_columns(right_key_names.size());
|
||||
for (const auto & column_name : right_key_names)
|
||||
@ -647,7 +647,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
|
||||
total_bytes = getTotalByteCount();
|
||||
}
|
||||
}
|
||||
|
||||
data->keys_to_join = total_rows;
|
||||
shrinkStoredBlocksToFit(total_bytes);
|
||||
|
||||
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
|
||||
|
@ -345,6 +345,18 @@ public:
|
||||
|
||||
size_t blocks_allocated_size = 0;
|
||||
size_t blocks_nullmaps_allocated_size = 0;
|
||||
|
||||
/// Number of rows of right table to join
|
||||
size_t rows_to_join = 0;
|
||||
/// Number of keys of right table to join
|
||||
size_t keys_to_join = 0;
|
||||
|
||||
size_t avgPerKeyRows() const
|
||||
{
|
||||
if (keys_to_join == 0)
|
||||
return 0;
|
||||
return rows_to_join / keys_to_join;
|
||||
}
|
||||
};
|
||||
|
||||
using RightTableDataPtr = std::shared_ptr<RightTableData>;
|
||||
|
@ -83,6 +83,7 @@ public:
|
||||
const Block & block_with_columns_to_add,
|
||||
const MapsTemplateVector & maps_,
|
||||
bool is_join_get = false);
|
||||
|
||||
private:
|
||||
template <typename KeyGetter, bool is_asof_join>
|
||||
static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes);
|
||||
@ -128,7 +129,7 @@ private:
|
||||
template <typename AddedColumns>
|
||||
static ColumnPtr buildAdditionalFilter(
|
||||
size_t left_start_row,
|
||||
const std::vector<RowRef> & selected_rows,
|
||||
const std::vector<const RowRef *> & selected_rows,
|
||||
const std::vector<size_t> & row_replicate_offset,
|
||||
AddedColumns & added_columns);
|
||||
|
||||
|
@ -95,7 +95,10 @@ Block HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinBlockImpl(
|
||||
added_columns.join_on_keys.clear();
|
||||
Block remaining_block = sliceBlock(block, num_joined);
|
||||
|
||||
added_columns.buildOutput();
|
||||
if (is_join_get)
|
||||
added_columns.buildJoinGetOutput();
|
||||
else
|
||||
added_columns.buildOutput();
|
||||
for (size_t i = 0; i < added_columns.size(); ++i)
|
||||
block.insert(added_columns.moveColumn(i));
|
||||
|
||||
@ -339,6 +342,8 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
|
||||
size_t rows = added_columns.rows_to_add;
|
||||
if constexpr (need_filter)
|
||||
added_columns.filter = IColumn::Filter(rows, 0);
|
||||
if constexpr (!flag_per_row && (STRICTNESS == JoinStrictness::All || (STRICTNESS == JoinStrictness::Semi && KIND == JoinKind::Right)))
|
||||
added_columns.output_by_row_list = true;
|
||||
|
||||
Arena pool;
|
||||
|
||||
@ -354,8 +359,8 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
|
||||
{
|
||||
if (unlikely(current_offset >= max_joined_block_rows))
|
||||
{
|
||||
added_columns.offsets_to_replicate->resize_assume_reserved(i);
|
||||
added_columns.filter.resize_assume_reserved(i);
|
||||
added_columns.offsets_to_replicate->resize(i);
|
||||
added_columns.filter.resize(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -381,15 +386,15 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
|
||||
const IColumn & left_asof_key = added_columns.leftAsofKey();
|
||||
|
||||
auto row_ref = mapped->findAsof(left_asof_key, i);
|
||||
if (row_ref.block)
|
||||
if (row_ref && row_ref->block)
|
||||
{
|
||||
setUsed<need_filter>(added_columns.filter, i);
|
||||
if constexpr (flag_per_row)
|
||||
used_flags.template setUsed<join_features.need_flags, flag_per_row>(row_ref.block, row_ref.row_num, 0);
|
||||
used_flags.template setUsed<join_features.need_flags, flag_per_row>(row_ref->block, row_ref->row_num, 0);
|
||||
else
|
||||
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
|
||||
|
||||
added_columns.appendFromBlock(*row_ref.block, row_ref.row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(row_ref, join_features.add_missing);
|
||||
}
|
||||
else
|
||||
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
|
||||
@ -420,7 +425,7 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
|
||||
if (used_once)
|
||||
{
|
||||
setUsed<need_filter>(added_columns.filter, i);
|
||||
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(&mapped, join_features.add_missing);
|
||||
}
|
||||
|
||||
break;
|
||||
@ -438,7 +443,7 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
|
||||
{
|
||||
setUsed<need_filter>(added_columns.filter, i);
|
||||
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
|
||||
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(&mapped, join_features.add_missing);
|
||||
|
||||
if (join_features.is_any_or_semi_join)
|
||||
{
|
||||
@ -477,7 +482,7 @@ template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
|
||||
template <typename AddedColumns>
|
||||
ColumnPtr HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::buildAdditionalFilter(
|
||||
size_t left_start_row,
|
||||
const std::vector<RowRef> & selected_rows,
|
||||
const std::vector<const RowRef *> & selected_rows,
|
||||
const std::vector<size_t> & row_replicate_offset,
|
||||
AddedColumns & added_columns)
|
||||
{
|
||||
@ -489,7 +494,7 @@ ColumnPtr HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::buildAdditionalFilter
|
||||
result_column = ColumnUInt8::create();
|
||||
break;
|
||||
}
|
||||
const Block & sample_right_block = *selected_rows.begin()->block;
|
||||
const Block & sample_right_block = *((*selected_rows.begin())->block);
|
||||
if (!sample_right_block || !added_columns.additional_filter_expression)
|
||||
{
|
||||
auto filter = ColumnUInt8::create();
|
||||
@ -519,8 +524,8 @@ ColumnPtr HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::buildAdditionalFilter
|
||||
auto new_col = col.column->cloneEmpty();
|
||||
for (const auto & selected_row : selected_rows)
|
||||
{
|
||||
const auto & src_col = selected_row.block->getByPosition(right_col_pos);
|
||||
new_col->insertFrom(*src_col.column, selected_row.row_num);
|
||||
const auto & src_col = selected_row->block->getByPosition(right_col_pos);
|
||||
new_col->insertFrom(*src_col.column, selected_row->row_num);
|
||||
}
|
||||
executed_block.insert({std::move(new_col), col.type, col.name});
|
||||
}
|
||||
@ -700,26 +705,24 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumnsWithAddt
|
||||
{
|
||||
// For inner join, we need mark each right row'flag, because we only use each right row once.
|
||||
auto used_once = used_flags.template setUsedOnce<join_features.need_flags, true>(
|
||||
selected_right_row_it->block, selected_right_row_it->row_num, 0);
|
||||
(*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0);
|
||||
if (used_once)
|
||||
{
|
||||
any_matched = true;
|
||||
total_added_rows += 1;
|
||||
added_columns.appendFromBlock(
|
||||
*selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto used_once = used_flags.template setUsedOnce<join_features.need_flags, true>(
|
||||
selected_right_row_it->block, selected_right_row_it->row_num, 0);
|
||||
(*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0);
|
||||
if (used_once)
|
||||
{
|
||||
any_matched = true;
|
||||
total_added_rows += 1;
|
||||
added_columns.appendFromBlock(
|
||||
*selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -727,16 +730,14 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumnsWithAddt
|
||||
{
|
||||
any_matched = true;
|
||||
if constexpr (join_features.right && join_features.need_flags)
|
||||
used_flags.template setUsed<true, true>(selected_right_row_it->block, selected_right_row_it->row_num, 0);
|
||||
used_flags.template setUsed<true, true>((*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
any_matched = true;
|
||||
total_added_rows += 1;
|
||||
added_columns.appendFromBlock(
|
||||
*selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing);
|
||||
used_flags.template setUsed<join_features.need_flags, true>(
|
||||
selected_right_row_it->block, selected_right_row_it->row_num, 0);
|
||||
added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing);
|
||||
used_flags.template setUsed<join_features.need_flags, true>((*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@ -756,8 +757,7 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumnsWithAddt
|
||||
if (filter_flags[replicated_row])
|
||||
{
|
||||
any_matched = true;
|
||||
added_columns.appendFromBlock(
|
||||
*selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing);
|
||||
total_added_rows += 1;
|
||||
}
|
||||
++selected_right_row_it;
|
||||
@ -767,8 +767,7 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumnsWithAddt
|
||||
if (filter_flags[replicated_row])
|
||||
{
|
||||
any_matched = true;
|
||||
added_columns.appendFromBlock(
|
||||
*selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing);
|
||||
total_added_rows += 1;
|
||||
selected_right_row_it = selected_right_row_it + row_replicate_offset[i] - replicated_row;
|
||||
break;
|
||||
|
@ -18,11 +18,25 @@ struct JoinFeatures
|
||||
static constexpr bool inner = KIND == JoinKind::Inner;
|
||||
static constexpr bool full = KIND == JoinKind::Full;
|
||||
|
||||
/** Whether we may need duplicate rows from the left table.
|
||||
* For example, when we have row (key1, attr1) in left table
|
||||
* and rows (key1, attr2), (key1, attr3) in right table,
|
||||
* then we need to duplicate row (key1, attr1) for each of joined rows from right table, so result will be
|
||||
* (key1, attr1, key1, attr2)
|
||||
* (key1, attr1, key1, attr3)
|
||||
*/
|
||||
static constexpr bool need_replication = is_all_join || (is_any_join && right) || (is_semi_join && right);
|
||||
|
||||
/// Whether we need to filter rows from the left table that do not have matches in the right table.
|
||||
static constexpr bool need_filter = !need_replication && (inner || right || (is_semi_join && left) || (is_anti_join && left));
|
||||
|
||||
/// Whether we need to add default values for columns from the left table.
|
||||
static constexpr bool add_missing = (left || full) && !is_semi_join;
|
||||
|
||||
/// Whether we need to store flags for rows from the right table table
|
||||
/// that indicates if they have matches in the left table.
|
||||
static constexpr bool need_flags = MapGetter<KIND, STRICTNESS, std::is_same_v<std::decay_t<Map>, HashJoin::MapsAll>>::flagged;
|
||||
|
||||
static constexpr bool is_maps_all = std::is_same_v<std::decay_t<Map>, HashJoin::MapsAll>;
|
||||
};
|
||||
|
||||
|
@ -104,7 +104,7 @@ void addFoundRowAll(
|
||||
{
|
||||
if (!known_rows.isKnown(std::make_pair(it->block, it->row_num)))
|
||||
{
|
||||
added.appendFromBlock(*it->block, it->row_num, false);
|
||||
added.appendFromBlock(*it, false);
|
||||
++current_offset;
|
||||
if (!new_known_rows_ptr)
|
||||
{
|
||||
@ -124,11 +124,16 @@ void addFoundRowAll(
|
||||
known_rows.add(std::cbegin(*new_known_rows_ptr), std::cend(*new_known_rows_ptr));
|
||||
}
|
||||
}
|
||||
else if constexpr (AddedColumns::isLazy())
|
||||
{
|
||||
added.appendFromBlock(&mapped, false);
|
||||
current_offset += mapped.rows;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto it = mapped.begin(); it.ok(); ++it)
|
||||
{
|
||||
added.appendFromBlock(*it->block, it->row_num, false);
|
||||
added.appendFromBlock(*it, false);
|
||||
++current_offset;
|
||||
}
|
||||
}
|
||||
|
@ -244,9 +244,6 @@ public:
|
||||
/// Same as checkTimeLimit but it never throws
|
||||
[[nodiscard]] bool checkTimeLimitSoft();
|
||||
|
||||
/// Use it in case of the query left in background to execute asynchronously
|
||||
void updateContext(ContextWeakPtr weak_context) { context = std::move(weak_context); }
|
||||
|
||||
/// Get the reference for the start of the query. Used to synchronize with other Stopwatches
|
||||
UInt64 getQueryCPUStartTime() { return watch.getStart(); }
|
||||
};
|
||||
|
@ -144,7 +144,7 @@ public:
|
||||
return low;
|
||||
}
|
||||
|
||||
RowRef findAsof(const IColumn & asof_column, size_t row_num) override
|
||||
RowRef * findAsof(const IColumn & asof_column, size_t row_num) override
|
||||
{
|
||||
sort();
|
||||
|
||||
@ -156,10 +156,10 @@ public:
|
||||
if (pos != entries.size())
|
||||
{
|
||||
size_t row_ref_index = entries[pos].row_ref_index;
|
||||
return row_refs[row_ref_index];
|
||||
return &row_refs[row_ref_index];
|
||||
}
|
||||
|
||||
return {nullptr, 0};
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -122,7 +122,7 @@ struct RowRefList : RowRef
|
||||
};
|
||||
|
||||
RowRefList() {} /// NOLINT
|
||||
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
|
||||
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_), rows(1) {}
|
||||
|
||||
ForwardIterator begin() const { return ForwardIterator(this); }
|
||||
|
||||
@ -135,8 +135,11 @@ struct RowRefList : RowRef
|
||||
*next = Batch(nullptr);
|
||||
}
|
||||
next = next->insert(std::move(row_ref), pool);
|
||||
++rows;
|
||||
}
|
||||
|
||||
public:
|
||||
SizeT rows = 0;
|
||||
private:
|
||||
Batch * next = nullptr;
|
||||
};
|
||||
@ -158,7 +161,7 @@ struct SortedLookupVectorBase
|
||||
virtual void insert(const IColumn &, const Block *, size_t) = 0;
|
||||
|
||||
// This needs to be synchronized internally
|
||||
virtual RowRef findAsof(const IColumn &, size_t) = 0;
|
||||
virtual RowRef * findAsof(const IColumn &, size_t) = 0;
|
||||
};
|
||||
|
||||
|
||||
|
@ -115,6 +115,7 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_, Temporary
|
||||
, partial_merge_join_left_table_buffer_bytes(settings.partial_merge_join_left_table_buffer_bytes)
|
||||
, max_files_to_merge(settings.join_on_disk_max_files_to_merge)
|
||||
, temporary_files_codec(settings.temporary_files_codec)
|
||||
, output_by_rowlist_perkey_rows_threshold(settings.join_output_by_rowlist_perkey_rows_threshold)
|
||||
, max_memory_usage(settings.max_memory_usage)
|
||||
, tmp_volume(tmp_volume_)
|
||||
, tmp_data(tmp_data_)
|
||||
|
@ -148,6 +148,7 @@ private:
|
||||
const size_t partial_merge_join_left_table_buffer_bytes = 0;
|
||||
const size_t max_files_to_merge = 0;
|
||||
const String temporary_files_codec = "LZ4";
|
||||
const size_t output_by_rowlist_perkey_rows_threshold = 0;
|
||||
|
||||
/// Value if setting max_memory_usage for query, can be used when max_bytes_in_join is not specified.
|
||||
size_t max_memory_usage = 0;
|
||||
@ -295,6 +296,7 @@ public:
|
||||
return join_use_nulls && isRightOrFull(kind());
|
||||
}
|
||||
|
||||
size_t outputByRowListPerkeyRowsThreshold() const { return output_by_rowlist_perkey_rows_threshold; }
|
||||
size_t defaultMaxBytes() const { return default_max_bytes; }
|
||||
size_t maxJoinedBlockRows() const { return max_joined_block_rows; }
|
||||
size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; }
|
||||
|
@ -246,7 +246,8 @@ void download(FileSegment & file_segment)
|
||||
ASSERT_EQ(file_segment.state(), State::DOWNLOADING);
|
||||
ASSERT_EQ(file_segment.getDownloadedSize(), 0);
|
||||
|
||||
ASSERT_TRUE(file_segment.reserve(file_segment.range().size(), 1000));
|
||||
std::string failure_reason;
|
||||
ASSERT_TRUE(file_segment.reserve(file_segment.range().size(), 1000, failure_reason));
|
||||
download(cache_base_path, file_segment);
|
||||
ASSERT_EQ(file_segment.state(), State::DOWNLOADING);
|
||||
|
||||
@ -258,7 +259,8 @@ void assertDownloadFails(FileSegment & file_segment)
|
||||
{
|
||||
ASSERT_EQ(file_segment.getOrSetDownloader(), FileSegment::getCallerId());
|
||||
ASSERT_EQ(file_segment.getDownloadedSize(), 0);
|
||||
ASSERT_FALSE(file_segment.reserve(file_segment.range().size(), 1000));
|
||||
std::string failure_reason;
|
||||
ASSERT_FALSE(file_segment.reserve(file_segment.range().size(), 1000, failure_reason));
|
||||
file_segment.complete();
|
||||
}
|
||||
|
||||
@ -957,10 +959,11 @@ TEST_F(FileCacheTest, temporaryData)
|
||||
|
||||
{
|
||||
ASSERT_EQ(some_data_holder->size(), 5);
|
||||
std::string failure_reason;
|
||||
for (auto & segment : *some_data_holder)
|
||||
{
|
||||
ASSERT_TRUE(segment->getOrSetDownloader() == DB::FileSegment::getCallerId());
|
||||
ASSERT_TRUE(segment->reserve(segment->range().size(), 1000));
|
||||
ASSERT_TRUE(segment->reserve(segment->range().size(), 1000, failure_reason));
|
||||
download(*segment);
|
||||
segment->complete();
|
||||
}
|
||||
|
@ -198,6 +198,29 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
|
||||
print_database_table();
|
||||
}
|
||||
|
||||
if (sync_replica_mode != SyncReplicaMode::DEFAULT)
|
||||
{
|
||||
settings.ostr << ' ';
|
||||
print_keyword(magic_enum::enum_name(sync_replica_mode));
|
||||
|
||||
// If the mode is LIGHTWEIGHT and specific source replicas are specified
|
||||
if (sync_replica_mode == SyncReplicaMode::LIGHTWEIGHT && !src_replicas.empty())
|
||||
{
|
||||
settings.ostr << ' ';
|
||||
print_keyword("FROM");
|
||||
settings.ostr << ' ';
|
||||
|
||||
bool first = true;
|
||||
for (const auto & src : src_replicas)
|
||||
{
|
||||
if (!first)
|
||||
settings.ostr << ", ";
|
||||
first = false;
|
||||
settings.ostr << quoteString(src);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (query_settings)
|
||||
{
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << settings.nl_or_ws << "SETTINGS " << (settings.hilite ? hilite_none : "");
|
||||
@ -233,28 +256,6 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
|
||||
print_identifier(disk);
|
||||
}
|
||||
|
||||
if (sync_replica_mode != SyncReplicaMode::DEFAULT)
|
||||
{
|
||||
settings.ostr << ' ';
|
||||
print_keyword(magic_enum::enum_name(sync_replica_mode));
|
||||
|
||||
// If the mode is LIGHTWEIGHT and specific source replicas are specified
|
||||
if (sync_replica_mode == SyncReplicaMode::LIGHTWEIGHT && !src_replicas.empty())
|
||||
{
|
||||
settings.ostr << ' ';
|
||||
print_keyword("FROM");
|
||||
settings.ostr << ' ';
|
||||
|
||||
bool first = true;
|
||||
for (const auto & src : src_replicas)
|
||||
{
|
||||
if (!first)
|
||||
settings.ostr << ", ";
|
||||
first = false;
|
||||
settings.ostr << quoteString(src);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Type::SYNC_DATABASE_REPLICA:
|
||||
|
@ -96,7 +96,7 @@ bool ExecutingGraph::addEdges(uint64_t node)
|
||||
return was_edge_added;
|
||||
}
|
||||
|
||||
bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
|
||||
ExecutingGraph::UpdateNodeStatus ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
|
||||
{
|
||||
auto & cur_node = *nodes[pid];
|
||||
Processors new_processors;
|
||||
@ -108,7 +108,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
|
||||
catch (...)
|
||||
{
|
||||
cur_node.exception = std::current_exception();
|
||||
return false;
|
||||
return UpdateNodeStatus::Exception;
|
||||
}
|
||||
|
||||
{
|
||||
@ -118,7 +118,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
|
||||
{
|
||||
for (auto & processor : new_processors)
|
||||
processor->cancel();
|
||||
return false;
|
||||
return UpdateNodeStatus::Cancelled;
|
||||
}
|
||||
processors->insert(processors->end(), new_processors.begin(), new_processors.end());
|
||||
|
||||
@ -178,7 +178,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
return UpdateNodeStatus::Done;
|
||||
}
|
||||
|
||||
void ExecutingGraph::initializeExecution(Queue & queue)
|
||||
@ -213,7 +213,7 @@ void ExecutingGraph::initializeExecution(Queue & queue)
|
||||
}
|
||||
|
||||
|
||||
bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue)
|
||||
ExecutingGraph::UpdateNodeStatus ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue)
|
||||
{
|
||||
std::stack<Edge *> updated_edges;
|
||||
std::stack<uint64_t> updated_processors;
|
||||
@ -309,7 +309,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
|
||||
catch (...)
|
||||
{
|
||||
node.exception = std::current_exception();
|
||||
return false;
|
||||
return UpdateNodeStatus::Exception;
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
@ -386,8 +386,9 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
|
||||
read_lock.unlock();
|
||||
{
|
||||
std::unique_lock lock(nodes_mutex);
|
||||
if (!expandPipeline(updated_processors, pid))
|
||||
return false;
|
||||
auto status = expandPipeline(updated_processors, pid);
|
||||
if (status != UpdateNodeStatus::Done)
|
||||
return status;
|
||||
}
|
||||
read_lock.lock();
|
||||
|
||||
@ -397,7 +398,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
return UpdateNodeStatus::Done;
|
||||
}
|
||||
|
||||
void ExecutingGraph::cancel(bool cancel_all_processors)
|
||||
|
@ -138,10 +138,17 @@ public:
|
||||
/// Traverse graph the first time to update all the childless nodes.
|
||||
void initializeExecution(Queue & queue);
|
||||
|
||||
enum class UpdateNodeStatus
|
||||
{
|
||||
Done,
|
||||
Exception,
|
||||
Cancelled,
|
||||
};
|
||||
|
||||
/// Update processor with pid number (call IProcessor::prepare).
|
||||
/// Check parents and children of current processor and push them to stacks if they also need to be updated.
|
||||
/// If processor wants to be expanded, lock will be upgraded to get write access to pipeline.
|
||||
bool updateNode(uint64_t pid, Queue & queue, Queue & async_queue);
|
||||
UpdateNodeStatus updateNode(uint64_t pid, Queue & queue, Queue & async_queue);
|
||||
|
||||
void cancel(bool cancel_all_processors = true);
|
||||
|
||||
@ -155,7 +162,7 @@ private:
|
||||
|
||||
/// Update graph after processor (pid) returned ExpandPipeline status.
|
||||
/// All new nodes and nodes with updated ports are pushed into stack.
|
||||
bool expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
|
||||
UpdateNodeStatus expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
|
||||
|
||||
std::shared_ptr<Processors> processors;
|
||||
std::vector<bool> source_processors;
|
||||
|
@ -77,9 +77,9 @@ const Processors & PipelineExecutor::getProcessors() const
|
||||
return graph->getProcessors();
|
||||
}
|
||||
|
||||
void PipelineExecutor::cancel()
|
||||
void PipelineExecutor::cancel(ExecutionStatus reason)
|
||||
{
|
||||
cancelled = true;
|
||||
tryUpdateExecutionStatus(ExecutionStatus::Executing, reason);
|
||||
finish();
|
||||
graph->cancel();
|
||||
}
|
||||
@ -98,6 +98,11 @@ void PipelineExecutor::finish()
|
||||
tasks.finish();
|
||||
}
|
||||
|
||||
bool PipelineExecutor::tryUpdateExecutionStatus(ExecutionStatus expected, ExecutionStatus desired)
|
||||
{
|
||||
return execution_status.compare_exchange_strong(expected, desired);
|
||||
}
|
||||
|
||||
void PipelineExecutor::execute(size_t num_threads, bool concurrency_control)
|
||||
{
|
||||
checkTimeLimit();
|
||||
@ -120,7 +125,7 @@ void PipelineExecutor::execute(size_t num_threads, bool concurrency_control)
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
span.addAttribute(ExecutionStatus::fromCurrentException());
|
||||
span.addAttribute(DB::ExecutionStatus::fromCurrentException());
|
||||
|
||||
#ifndef NDEBUG
|
||||
LOG_TRACE(log, "Exception while executing query. Current state:\n{}", dumpPipeline());
|
||||
@ -169,7 +174,7 @@ bool PipelineExecutor::checkTimeLimitSoft()
|
||||
// We call cancel here so that all processors are notified and tasks waken up
|
||||
// so that the "break" is faster and doesn't wait for long events
|
||||
if (!continuing)
|
||||
cancel();
|
||||
cancel(ExecutionStatus::CancelledByTimeout);
|
||||
|
||||
return continuing;
|
||||
}
|
||||
@ -195,7 +200,8 @@ void PipelineExecutor::finalizeExecution()
|
||||
{
|
||||
checkTimeLimit();
|
||||
|
||||
if (cancelled)
|
||||
auto status = execution_status.load();
|
||||
if (status == ExecutionStatus::CancelledByTimeout || status == ExecutionStatus::CancelledByUser)
|
||||
return;
|
||||
|
||||
bool all_processors_finished = true;
|
||||
@ -271,7 +277,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
|
||||
break;
|
||||
|
||||
if (!context.executeTask())
|
||||
cancel();
|
||||
cancel(ExecutionStatus::Exception);
|
||||
|
||||
if (tasks.isFinished())
|
||||
break;
|
||||
@ -289,11 +295,13 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
|
||||
Queue async_queue;
|
||||
|
||||
/// Prepare processor after execution.
|
||||
if (!graph->updateNode(context.getProcessorID(), queue, async_queue))
|
||||
cancel();
|
||||
auto status = graph->updateNode(context.getProcessorID(), queue, async_queue);
|
||||
if (status == ExecutingGraph::UpdateNodeStatus::Exception)
|
||||
cancel(ExecutionStatus::Exception);
|
||||
|
||||
/// Push other tasks to global queue.
|
||||
tasks.pushTasks(queue, async_queue, context);
|
||||
if (status == ExecutingGraph::UpdateNodeStatus::Done)
|
||||
tasks.pushTasks(queue, async_queue, context);
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
@ -309,7 +317,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
|
||||
{
|
||||
/// spawnThreads can throw an exception, for example CANNOT_SCHEDULE_TASK.
|
||||
/// We should cancel execution properly before rethrow.
|
||||
cancel();
|
||||
cancel(ExecutionStatus::Exception);
|
||||
throw;
|
||||
}
|
||||
|
||||
@ -328,6 +336,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
|
||||
void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_control)
|
||||
{
|
||||
is_execution_initialized = true;
|
||||
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
|
||||
|
||||
size_t use_threads = num_threads;
|
||||
|
||||
@ -393,7 +402,7 @@ void PipelineExecutor::executeImpl(size_t num_threads, bool concurrency_control)
|
||||
{
|
||||
/// If finished_flag is not set, there was an exception.
|
||||
/// Cancel execution in this case.
|
||||
cancel();
|
||||
cancel(ExecutionStatus::Exception);
|
||||
if (pool)
|
||||
pool->wait();
|
||||
}
|
||||
|
@ -48,8 +48,20 @@ public:
|
||||
|
||||
const Processors & getProcessors() const;
|
||||
|
||||
enum class ExecutionStatus
|
||||
{
|
||||
NotStarted,
|
||||
Executing,
|
||||
Finished,
|
||||
Exception,
|
||||
CancelledByUser,
|
||||
CancelledByTimeout,
|
||||
};
|
||||
|
||||
/// Cancel execution. May be called from another thread.
|
||||
void cancel();
|
||||
void cancel() { cancel(ExecutionStatus::CancelledByUser); }
|
||||
|
||||
ExecutionStatus getExecutionStatus() const { return execution_status.load(); }
|
||||
|
||||
/// Cancel processors which only read data from source. May be called from another thread.
|
||||
void cancelReading();
|
||||
@ -81,7 +93,7 @@ private:
|
||||
/// system.opentelemetry_span_log
|
||||
bool trace_processors = false;
|
||||
|
||||
std::atomic_bool cancelled = false;
|
||||
std::atomic<ExecutionStatus> execution_status = ExecutionStatus::NotStarted;
|
||||
std::atomic_bool cancelled_reading = false;
|
||||
|
||||
LoggerPtr log = getLogger("PipelineExecutor");
|
||||
@ -105,6 +117,10 @@ private:
|
||||
void executeStepImpl(size_t thread_num, std::atomic_bool * yield_flag = nullptr);
|
||||
void executeSingleThread(size_t thread_num);
|
||||
void finish();
|
||||
void cancel(ExecutionStatus reason);
|
||||
|
||||
/// If execution_status == from, change it to desired.
|
||||
bool tryUpdateExecutionStatus(ExecutionStatus expected, ExecutionStatus desired);
|
||||
|
||||
String dumpPipeline() const;
|
||||
};
|
||||
|
@ -15,6 +15,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int QUERY_WAS_CANCELLED;
|
||||
}
|
||||
|
||||
class PushingAsyncSource : public ISource
|
||||
@ -176,6 +177,16 @@ void PushingAsyncPipelineExecutor::start()
|
||||
data->thread = ThreadFromGlobalPool(std::move(func));
|
||||
}
|
||||
|
||||
[[noreturn]] static void throwOnExecutionStatus(PipelineExecutor::ExecutionStatus status)
|
||||
{
|
||||
if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout
|
||||
|| status == PipelineExecutor::ExecutionStatus::CancelledByUser)
|
||||
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
|
||||
}
|
||||
|
||||
void PushingAsyncPipelineExecutor::push(Chunk chunk)
|
||||
{
|
||||
if (!started)
|
||||
@ -185,8 +196,7 @@ void PushingAsyncPipelineExecutor::push(Chunk chunk)
|
||||
data->rethrowExceptionIfHas();
|
||||
|
||||
if (!is_pushed)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingAsyncPipelineExecutor was finished before all data was inserted");
|
||||
throwOnExecutionStatus(data->executor->getExecutionStatus());
|
||||
}
|
||||
|
||||
void PushingAsyncPipelineExecutor::push(Block block)
|
||||
|
@ -11,6 +11,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int QUERY_WAS_CANCELLED;
|
||||
}
|
||||
|
||||
class PushingSource : public ISource
|
||||
@ -80,6 +81,15 @@ const Block & PushingPipelineExecutor::getHeader() const
|
||||
return pushing_source->getPort().getHeader();
|
||||
}
|
||||
|
||||
[[noreturn]] static void throwOnExecutionStatus(PipelineExecutor::ExecutionStatus status)
|
||||
{
|
||||
if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout
|
||||
|| status == PipelineExecutor::ExecutionStatus::CancelledByUser)
|
||||
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
|
||||
}
|
||||
|
||||
void PushingPipelineExecutor::start()
|
||||
{
|
||||
@ -91,8 +101,7 @@ void PushingPipelineExecutor::start()
|
||||
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
|
||||
|
||||
if (!executor->executeStep(&input_wait_flag))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
|
||||
throwOnExecutionStatus(executor->getExecutionStatus());
|
||||
}
|
||||
|
||||
void PushingPipelineExecutor::push(Chunk chunk)
|
||||
@ -103,8 +112,7 @@ void PushingPipelineExecutor::push(Chunk chunk)
|
||||
pushing_source->setData(std::move(chunk));
|
||||
|
||||
if (!executor->executeStep(&input_wait_flag))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
|
||||
throwOnExecutionStatus(executor->getExecutionStatus());
|
||||
}
|
||||
|
||||
void PushingPipelineExecutor::push(Block block)
|
||||
|
@ -133,16 +133,31 @@ static ColumnWithTypeAndName readColumnWithStringData(const std::shared_ptr<arro
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
|
||||
const size_t chunk_length = chunk.length();
|
||||
|
||||
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
|
||||
const size_t null_count = chunk.null_count();
|
||||
if (null_count == 0)
|
||||
{
|
||||
if (!chunk.IsNull(offset_i) && buffer)
|
||||
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
|
||||
{
|
||||
const auto * raw_data = buffer->data() + chunk.value_offset(offset_i);
|
||||
column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i));
|
||||
}
|
||||
column_chars_t.emplace_back('\0');
|
||||
column_chars_t.emplace_back('\0');
|
||||
|
||||
column_offsets.emplace_back(column_chars_t.size());
|
||||
column_offsets.emplace_back(column_chars_t.size());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
|
||||
{
|
||||
if (!chunk.IsNull(offset_i) && buffer)
|
||||
{
|
||||
const auto * raw_data = buffer->data() + chunk.value_offset(offset_i);
|
||||
column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i));
|
||||
}
|
||||
column_chars_t.emplace_back('\0');
|
||||
|
||||
column_offsets.emplace_back(column_chars_t.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
|
@ -1143,24 +1143,42 @@ readColumnWithStringData(const orc::ColumnVectorBatch * orc_column, const orc::T
|
||||
reserver_size += 1;
|
||||
}
|
||||
|
||||
column_chars_t.reserve(reserver_size);
|
||||
column_offsets.reserve(orc_str_column->numElements);
|
||||
column_chars_t.resize_exact(reserver_size);
|
||||
column_offsets.resize_exact(orc_str_column->numElements);
|
||||
|
||||
size_t curr_offset = 0;
|
||||
for (size_t i = 0; i < orc_str_column->numElements; ++i)
|
||||
if (!orc_str_column->hasNulls)
|
||||
{
|
||||
if (!orc_str_column->hasNulls || orc_str_column->notNull[i])
|
||||
for (size_t i = 0; i < orc_str_column->numElements; ++i)
|
||||
{
|
||||
const auto * buf = orc_str_column->data[i];
|
||||
size_t buf_size = orc_str_column->length[i];
|
||||
column_chars_t.insert_assume_reserved(buf, buf + buf_size);
|
||||
memcpy(&column_chars_t[curr_offset], buf, buf_size);
|
||||
curr_offset += buf_size;
|
||||
|
||||
column_chars_t[curr_offset] = 0;
|
||||
++curr_offset;
|
||||
|
||||
column_offsets[i] = curr_offset;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t i = 0; i < orc_str_column->numElements; ++i)
|
||||
{
|
||||
if (orc_str_column->notNull[i])
|
||||
{
|
||||
const auto * buf = orc_str_column->data[i];
|
||||
size_t buf_size = orc_str_column->length[i];
|
||||
memcpy(&column_chars_t[curr_offset], buf, buf_size);
|
||||
curr_offset += buf_size;
|
||||
}
|
||||
|
||||
column_chars_t.push_back(0);
|
||||
++curr_offset;
|
||||
column_chars_t[curr_offset] = 0;
|
||||
++curr_offset;
|
||||
|
||||
column_offsets.push_back(curr_offset);
|
||||
column_offsets[i] = curr_offset;
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
|
@ -1157,8 +1157,7 @@ void WindowTransform::appendChunk(Chunk & chunk)
|
||||
// Initialize output columns.
|
||||
for (auto & ws : workspaces)
|
||||
{
|
||||
if (ws.window_function_impl)
|
||||
block.casted_columns.push_back(ws.window_function_impl->castColumn(block.input_columns, ws.argument_column_indices));
|
||||
block.casted_columns.push_back(ws.window_function_impl ? ws.window_function_impl->castColumn(block.input_columns, ws.argument_column_indices) : nullptr);
|
||||
|
||||
block.output_columns.push_back(ws.aggregate_function->getResultType()
|
||||
->createColumn());
|
||||
|
@ -2,9 +2,6 @@
|
||||
|
||||
#if USE_USEARCH
|
||||
|
||||
#pragma clang diagnostic push
|
||||
#pragma clang diagnostic ignored "-Wpass-failed"
|
||||
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Common/BitHelpers.h>
|
||||
#include <Common/formatReadable.h>
|
||||
@ -46,15 +43,15 @@ namespace
|
||||
{
|
||||
|
||||
/// The only indexing method currently supported by USearch
|
||||
std::set<String> methods = {"hnsw"};
|
||||
const std::set<String> methods = {"hnsw"};
|
||||
|
||||
/// Maps from user-facing name to internal name
|
||||
std::unordered_map<String, unum::usearch::metric_kind_t> distanceFunctionToMetricKind = {
|
||||
const std::unordered_map<String, unum::usearch::metric_kind_t> distanceFunctionToMetricKind = {
|
||||
{"L2Distance", unum::usearch::metric_kind_t::l2sq_k},
|
||||
{"cosineDistance", unum::usearch::metric_kind_t::cos_k}};
|
||||
|
||||
/// Maps from user-facing name to internal name
|
||||
std::unordered_map<String, unum::usearch::scalar_kind_t> quantizationToScalarKind = {
|
||||
const std::unordered_map<String, unum::usearch::scalar_kind_t> quantizationToScalarKind = {
|
||||
{"f32", unum::usearch::scalar_kind_t::f32_k},
|
||||
{"f16", unum::usearch::scalar_kind_t::f16_k},
|
||||
{"i8", unum::usearch::scalar_kind_t::i8_k}};
|
||||
@ -95,9 +92,19 @@ USearchIndexWithSerialization::USearchIndexWithSerialization(
|
||||
unum::usearch::metric_kind_t metric_kind,
|
||||
unum::usearch::scalar_kind_t scalar_kind,
|
||||
UsearchHnswParams usearch_hnsw_params)
|
||||
: Base(Base::make(unum::usearch::metric_punned_t(dimensions, metric_kind, scalar_kind),
|
||||
unum::usearch::index_dense_config_t(usearch_hnsw_params.m, usearch_hnsw_params.ef_construction, usearch_hnsw_params.ef_search)))
|
||||
{
|
||||
USearchIndex::metric_t metric(dimensions, metric_kind, scalar_kind);
|
||||
|
||||
unum::usearch::index_dense_config_t config(usearch_hnsw_params.m, usearch_hnsw_params.ef_construction, usearch_hnsw_params.ef_search);
|
||||
config.enable_key_lookups = false; /// we don't do row-to-vector lookups
|
||||
|
||||
if (auto error = config.validate(); error) /// already called in vectorSimilarityIndexValidator, call again because usearch may change the config in-place
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid parameters passed to vector similarity index. Error: {}", String(error.release()));
|
||||
|
||||
if (auto result = USearchIndex::make(metric, config); !result)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not create vector similarity index. Error: {}", String(result.error.release()));
|
||||
else
|
||||
swap(result.index);
|
||||
}
|
||||
|
||||
void USearchIndexWithSerialization::serialize(WriteBuffer & ostr) const
|
||||
@ -108,9 +115,8 @@ void USearchIndexWithSerialization::serialize(WriteBuffer & ostr) const
|
||||
return true;
|
||||
};
|
||||
|
||||
auto result = Base::save_to_stream(callback);
|
||||
if (result.error)
|
||||
throw Exception::createRuntime(ErrorCodes::INCORRECT_DATA, "Could not save vector similarity index, error: " + String(result.error.release()));
|
||||
if (auto result = Base::save_to_stream(callback); !result)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not save vector similarity index. Error: {}", String(result.error.release()));
|
||||
}
|
||||
|
||||
void USearchIndexWithSerialization::deserialize(ReadBuffer & istr)
|
||||
@ -121,26 +127,43 @@ void USearchIndexWithSerialization::deserialize(ReadBuffer & istr)
|
||||
return true;
|
||||
};
|
||||
|
||||
auto result = Base::load_from_stream(callback);
|
||||
if (result.error)
|
||||
if (auto result = Base::load_from_stream(callback); !result)
|
||||
/// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here
|
||||
throw Exception::createRuntime(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index, error: " + String(result.error.release()) + " Please drop the index and create it again.");
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release()));
|
||||
|
||||
if (!try_reserve(limits()))
|
||||
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for usearch index");
|
||||
}
|
||||
|
||||
USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const
|
||||
{
|
||||
USearchIndex::stats_t global_stats = Base::stats();
|
||||
|
||||
Statistics statistics = {
|
||||
.max_level = max_level(),
|
||||
.connectivity = connectivity(),
|
||||
.size = size(), /// number of vectors
|
||||
.capacity = capacity(), /// number of vectors reserved
|
||||
.memory_usage = memory_usage(), /// in bytes, the value is not exact
|
||||
.size = size(),
|
||||
.capacity = capacity(),
|
||||
.memory_usage = memory_usage(),
|
||||
.bytes_per_vector = bytes_per_vector(),
|
||||
.scalar_words = scalar_words(),
|
||||
.statistics = stats()};
|
||||
.nodes = global_stats.nodes,
|
||||
.edges = global_stats.edges,
|
||||
.max_edges = global_stats.max_edges,
|
||||
.level_stats = {}};
|
||||
|
||||
for (size_t i = 0; i < statistics.max_level; ++i)
|
||||
statistics.level_stats.push_back(Base::stats(i));
|
||||
|
||||
return statistics;
|
||||
}
|
||||
|
||||
String USearchIndexWithSerialization::Statistics::toString() const
|
||||
{
|
||||
return fmt::format("max_level = {}, connectivity = {}, size = {}, capacity = {}, memory_usage = {}, bytes_per_vector = {}, scalar_words = {}, nodes = {}, edges = {}, max_edges = {}",
|
||||
max_level, connectivity, size, capacity, ReadableSize(memory_usage), bytes_per_vector, scalar_words, nodes, edges, max_edges);
|
||||
|
||||
}
|
||||
MergeTreeIndexGranuleVectorSimilarity::MergeTreeIndexGranuleVectorSimilarity(
|
||||
const String & index_name_,
|
||||
const Block & index_sample_block_,
|
||||
@ -181,8 +204,7 @@ void MergeTreeIndexGranuleVectorSimilarity::serializeBinary(WriteBuffer & ostr)
|
||||
index->serialize(ostr);
|
||||
|
||||
auto statistics = index->getStatistics();
|
||||
LOG_TRACE(logger, "Wrote vector similarity index: max_level = {}, connectivity = {}, size = {}, capacity = {}, memory_usage = {}",
|
||||
statistics.max_level, statistics.connectivity, statistics.size, statistics.capacity, ReadableSize(statistics.memory_usage));
|
||||
LOG_TRACE(logger, "Wrote vector similarity index: {}", statistics.toString());
|
||||
}
|
||||
|
||||
void MergeTreeIndexGranuleVectorSimilarity::deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion /*version*/)
|
||||
@ -204,8 +226,7 @@ void MergeTreeIndexGranuleVectorSimilarity::deserializeBinary(ReadBuffer & istr,
|
||||
index->deserialize(istr);
|
||||
|
||||
auto statistics = index->getStatistics();
|
||||
LOG_TRACE(logger, "Loaded vector similarity index: max_level = {}, connectivity = {}, size = {}, capacity = {}, memory_usage = {}",
|
||||
statistics.max_level, statistics.connectivity, statistics.size, statistics.capacity, ReadableSize(statistics.memory_usage));
|
||||
LOG_TRACE(logger, "Loaded vector similarity index: {}", statistics.toString());
|
||||
}
|
||||
|
||||
MergeTreeIndexAggregatorVectorSimilarity::MergeTreeIndexAggregatorVectorSimilarity(
|
||||
@ -285,19 +306,24 @@ void MergeTreeIndexAggregatorVectorSimilarity::update(const Block & block, size_
|
||||
if (!index)
|
||||
index = std::make_shared<USearchIndexWithSerialization>(dimensions, metric_kind, scalar_kind, usearch_hnsw_params);
|
||||
|
||||
/// We use Usearch's index_dense_t as index type which supports only 4 bio entries according to https://github.com/unum-cloud/usearch/tree/main/cpp
|
||||
if (index->size() + num_rows > std::numeric_limits<UInt32>::max())
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Size of vector similarity index in column {} would exceed 4 billion entries", index_column_name);
|
||||
|
||||
/// Reserving space is mandatory
|
||||
if (!index->reserve(roundUpToPowerOfTwoOrZero(index->size() + num_rows)))
|
||||
if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + num_rows)))
|
||||
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index");
|
||||
|
||||
for (size_t row = 0; row < num_rows; ++row)
|
||||
{
|
||||
auto rc = index->add(static_cast<UInt32>(index->size()), &column_array_data_float_data[column_array_offsets[row - 1]]);
|
||||
if (!rc)
|
||||
throw Exception::createRuntime(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index, error: " + String(rc.error.release()));
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::USearchAddCount);
|
||||
ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, rc.visited_members);
|
||||
ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, rc.computed_distances);
|
||||
if (auto result = index->add(static_cast<UInt32>(index->size()), &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release()));
|
||||
else
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::USearchAddCount);
|
||||
ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members);
|
||||
ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -351,17 +377,16 @@ std::vector<size_t> MergeTreeIndexConditionVectorSimilarity::getUsefulRanges(Mer
|
||||
|
||||
const std::vector<float> reference_vector = vector_similarity_condition.getReferenceVector();
|
||||
|
||||
auto result = index->search(reference_vector.data(), limit);
|
||||
if (result.error)
|
||||
throw Exception::createRuntime(ErrorCodes::INCORRECT_DATA, "Could not search in vector similarity index, error: " + String(result.error.release()));
|
||||
auto search_result = index->search(reference_vector.data(), limit);
|
||||
if (!search_result)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not search in vector similarity index. Error: {}", String(search_result.error.release()));
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::USearchSearchCount);
|
||||
ProfileEvents::increment(ProfileEvents::USearchSearchVisitedMembers, result.visited_members);
|
||||
ProfileEvents::increment(ProfileEvents::USearchSearchComputedDistances, result.computed_distances);
|
||||
ProfileEvents::increment(ProfileEvents::USearchSearchVisitedMembers, search_result.visited_members);
|
||||
ProfileEvents::increment(ProfileEvents::USearchSearchComputedDistances, search_result.computed_distances);
|
||||
|
||||
std::vector<USearchIndex::key_t> neighbors(result.size()); /// indexes of dots which were closest to the reference vector
|
||||
std::vector<USearchIndex::distance_t> distances(result.size());
|
||||
result.dump_to(neighbors.data(), distances.data());
|
||||
std::vector<USearchIndex::vector_key_t> neighbors(search_result.size()); /// indexes of vectors which were closest to the reference vector
|
||||
search_result.dump_to(neighbors.data());
|
||||
|
||||
std::vector<size_t> granules;
|
||||
granules.reserve(neighbors.size());
|
||||
@ -409,14 +434,13 @@ MergeTreeIndexConditionPtr MergeTreeIndexVectorSimilarity::createIndexCondition(
|
||||
|
||||
MergeTreeIndexPtr vectorSimilarityIndexCreator(const IndexDescription & index)
|
||||
{
|
||||
const bool has_six_args = (index.arguments.size() == 6);
|
||||
|
||||
/// Default parameters:
|
||||
unum::usearch::metric_kind_t metric_kind = distanceFunctionToMetricKind.at(index.arguments[1].safeGet<String>());
|
||||
|
||||
/// use defaults for the other parameters
|
||||
unum::usearch::scalar_kind_t scalar_kind = unum::usearch::scalar_kind_t::f32_k;
|
||||
UsearchHnswParams usearch_hnsw_params;
|
||||
|
||||
/// Optional parameters:
|
||||
const bool has_six_args = (index.arguments.size() == 6);
|
||||
if (has_six_args)
|
||||
{
|
||||
scalar_kind = quantizationToScalarKind.at(index.arguments[2].safeGet<String>());
|
||||
@ -461,12 +485,16 @@ void vectorSimilarityIndexValidator(const IndexDescription & index, bool /* atta
|
||||
{
|
||||
if (!quantizationToScalarKind.contains(index.arguments[2].safeGet<String>()))
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Third argument (quantization) of vector similarity index is not supported. Supported quantizations are: {}", joinByComma(quantizationToScalarKind));
|
||||
if (index.arguments[3].safeGet<UInt64>() < 2)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Fourth argument (M) of vector similarity index must be > 1");
|
||||
if (index.arguments[4].safeGet<UInt64>() < 1)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Fifth argument (ef_construction) of vector similarity index must be > 0");
|
||||
if (index.arguments[5].safeGet<UInt64>() < 1)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Sixth argument (ef_search) of vector similarity index must be > 0");
|
||||
|
||||
/// Call Usearche's own parameter validation method for HNSW-specific parameters
|
||||
UInt64 m = index.arguments[3].safeGet<UInt64>();
|
||||
UInt64 ef_construction = index.arguments[4].safeGet<UInt64>();
|
||||
UInt64 ef_search = index.arguments[5].safeGet<UInt64>();
|
||||
|
||||
unum::usearch::index_dense_config_t config(m, ef_construction, ef_search);
|
||||
|
||||
if (auto error = config.validate(); error)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid parameters passed to vector similarity index. Error: {}", String(error.release()));
|
||||
}
|
||||
|
||||
/// Check that the index is created on a single column
|
||||
|
@ -4,12 +4,9 @@
|
||||
|
||||
#if USE_USEARCH
|
||||
|
||||
#pragma clang diagnostic push
|
||||
#pragma clang diagnostic ignored "-Wpass-failed"
|
||||
# include <Storages/MergeTree/VectorSimilarityCondition.h>
|
||||
# include <Common/Logger.h>
|
||||
# include <usearch/index_dense.hpp>
|
||||
#pragma clang diagnostic pop
|
||||
#include <Storages/MergeTree/VectorSimilarityCondition.h>
|
||||
#include <Common/Logger.h>
|
||||
#include <usearch/index_dense.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -21,7 +18,7 @@ struct UsearchHnswParams
|
||||
size_t ef_search = unum::usearch::default_expansion_search();
|
||||
};
|
||||
|
||||
using USearchIndex = unum::usearch::index_dense_gt</*key_at*/ uint32_t, /*compressed_slot_at*/ uint32_t>;
|
||||
using USearchIndex = unum::usearch::index_dense_t;
|
||||
|
||||
class USearchIndexWithSerialization : public USearchIndex
|
||||
{
|
||||
@ -41,13 +38,18 @@ public:
|
||||
{
|
||||
size_t max_level;
|
||||
size_t connectivity;
|
||||
size_t size;
|
||||
size_t capacity;
|
||||
size_t memory_usage;
|
||||
/// advanced stats:
|
||||
size_t size; /// number of indexed vectors
|
||||
size_t capacity; /// reserved number of indexed vectors
|
||||
size_t memory_usage; /// byte size (not exact)
|
||||
size_t bytes_per_vector;
|
||||
size_t scalar_words;
|
||||
Base::stats_t statistics;
|
||||
size_t nodes;
|
||||
size_t edges;
|
||||
size_t max_edges;
|
||||
|
||||
std::vector<USearchIndex::stats_t> level_stats; /// for debugging, excluded from getStatistics()
|
||||
|
||||
String toString() const;
|
||||
};
|
||||
|
||||
Statistics getStatistics() const;
|
||||
|
@ -19,7 +19,6 @@ from env_helper import TEMP_PATH
|
||||
from git_helper import git_runner, is_shallow
|
||||
from github_helper import GitHub, PullRequest, PullRequests, Repository
|
||||
from s3_helper import S3Helper
|
||||
from get_robot_token import get_best_robot_token
|
||||
from ci_utils import Shell
|
||||
from version_helper import (
|
||||
FILE_WITH_VERSION_PATH,
|
||||
@ -172,7 +171,6 @@ def parse_args() -> argparse.Namespace:
|
||||
parser.add_argument(
|
||||
"--gh-user-or-token",
|
||||
help="user name or GH token to authenticate",
|
||||
default=get_best_robot_token(),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--gh-password",
|
||||
|
@ -484,7 +484,7 @@ class ReleaseInfo:
|
||||
)
|
||||
else:
|
||||
if not dry_run:
|
||||
assert not self.changelog_pr
|
||||
assert not self.version_bump_pr
|
||||
|
||||
self.prs_merged = res
|
||||
|
||||
|
@ -570,6 +570,8 @@ class ClickHouseCluster:
|
||||
self.spark_session = None
|
||||
|
||||
self.with_azurite = False
|
||||
self.azurite_container = "azurite-container"
|
||||
self.blob_service_client = None
|
||||
self._azurite_port = 0
|
||||
|
||||
# available when with_hdfs == True
|
||||
@ -2692,6 +2694,32 @@ class ClickHouseCluster:
|
||||
connection_string
|
||||
)
|
||||
logging.debug(blob_service_client.get_account_information())
|
||||
containers = [
|
||||
c
|
||||
for c in blob_service_client.list_containers(
|
||||
name_starts_with=self.azurite_container
|
||||
)
|
||||
if c.name == self.azurite_container
|
||||
]
|
||||
if len(containers) > 0:
|
||||
for c in containers:
|
||||
blob_service_client.delete_container(c)
|
||||
|
||||
container_client = blob_service_client.get_container_client(
|
||||
self.azurite_container
|
||||
)
|
||||
if container_client.exists():
|
||||
logging.debug(
|
||||
f"azurite container '{self.azurite_container}' exist, deleting all blobs"
|
||||
)
|
||||
for b in container_client.list_blobs():
|
||||
container_client.delete_blob(b.name)
|
||||
else:
|
||||
logging.debug(
|
||||
f"azurite container '{self.azurite_container}' doesn't exist, creating it"
|
||||
)
|
||||
container_client.create_container()
|
||||
|
||||
self.blob_service_client = blob_service_client
|
||||
return
|
||||
except Exception as ex:
|
||||
|
@ -20,21 +20,30 @@ node_1_2 = cluster.add_instance("node_1_2", with_zookeeper=True)
|
||||
node_2_1 = cluster.add_instance("node_2_1", with_zookeeper=True)
|
||||
node_2_2 = cluster.add_instance("node_2_2", with_zookeeper=True)
|
||||
|
||||
# For test to be runnable multiple times
|
||||
seqno = 0
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def create_tables():
|
||||
global seqno
|
||||
try:
|
||||
seqno += 1
|
||||
for shard in (1, 2):
|
||||
for replica in (1, 2):
|
||||
node = cluster.instances["node_{}_{}".format(shard, replica)]
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE replicated (d Date, x UInt32) ENGINE =
|
||||
ReplicatedMergeTree('/clickhouse/tables/{shard}/replicated', '{instance}') PARTITION BY toYYYYMM(d) ORDER BY d""".format(
|
||||
shard=shard, instance=node.name
|
||||
)
|
||||
f"CREATE TABLE replicated (d Date, x UInt32) ENGINE = "
|
||||
f"ReplicatedMergeTree('/clickhouse/tables/{shard}/replicated_{seqno}', '{node.name}') PARTITION BY toYYYYMM(d) ORDER BY d"
|
||||
)
|
||||
|
||||
node_1_1.query(
|
||||
@ -42,10 +51,15 @@ CREATE TABLE replicated (d Date, x UInt32) ENGINE =
|
||||
"Distributed('test_cluster', 'default', 'replicated')"
|
||||
)
|
||||
|
||||
yield cluster
|
||||
yield
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
node_1_1.query("DROP TABLE distributed")
|
||||
|
||||
node_1_1.query("DROP TABLE replicated")
|
||||
node_1_2.query("DROP TABLE replicated")
|
||||
node_2_1.query("DROP TABLE replicated")
|
||||
node_2_2.query("DROP TABLE replicated")
|
||||
|
||||
|
||||
def test(started_cluster):
|
||||
@ -101,7 +115,9 @@ SELECT sum(x) FROM distributed WITH TOTALS SETTINGS
|
||||
# allow pings to zookeeper to timeout (must be greater than ZK session timeout).
|
||||
for _ in range(30):
|
||||
try:
|
||||
node_2_2.query("SELECT * FROM system.zookeeper where path = '/'")
|
||||
node_2_2.query(
|
||||
"SELECT * FROM system.zookeeper where path = '/' SETTINGS insert_keeper_max_retries = 0"
|
||||
)
|
||||
time.sleep(0.5)
|
||||
except:
|
||||
break
|
||||
@ -120,7 +136,7 @@ SELECT sum(x) FROM distributed SETTINGS
|
||||
== "3"
|
||||
)
|
||||
|
||||
# Regression for skip_unavailable_shards in conjunction with skip_unavailable_shards
|
||||
# Prefer fallback_to_stale_replicas over skip_unavailable_shards
|
||||
assert (
|
||||
instance_with_dist_table.query(
|
||||
"""
|
||||
|
@ -202,6 +202,10 @@ def test_create_table():
|
||||
f"S3Queue('http://minio1:9001/root/data/', 'CSV', 'gzip') settings mode = 'ordered'",
|
||||
f"S3Queue('http://minio1:9001/root/data/', 'minio', '{password}', 'CSV') settings mode = 'ordered'",
|
||||
f"S3Queue('http://minio1:9001/root/data/', 'minio', '{password}', 'CSV', 'gzip') settings mode = 'ordered'",
|
||||
(
|
||||
f"Iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
|
||||
"DNS_ERROR",
|
||||
),
|
||||
]
|
||||
|
||||
def make_test_case(i):
|
||||
@ -266,6 +270,7 @@ def test_create_table():
|
||||
# due to sensitive data substituion the query will be normalized, so not "settings" but "SETTINGS"
|
||||
"CREATE TABLE table19 (`x` int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'minio', '[HIDDEN]', 'CSV') SETTINGS mode = 'ordered'",
|
||||
"CREATE TABLE table20 (`x` int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'minio', '[HIDDEN]', 'CSV', 'gzip') SETTINGS mode = 'ordered'",
|
||||
"CREATE TABLE table21 (`x` int) ENGINE = Iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
|
||||
],
|
||||
must_not_contain=[password],
|
||||
)
|
||||
@ -387,6 +392,7 @@ def test_table_functions():
|
||||
f"azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none', 'auto')",
|
||||
f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
|
||||
f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')",
|
||||
f"iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
|
||||
]
|
||||
|
||||
def make_test_case(i):
|
||||
@ -478,6 +484,7 @@ def test_table_functions():
|
||||
f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc49 (x int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
|
||||
f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
|
||||
"CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
|
||||
],
|
||||
must_not_contain=[password],
|
||||
)
|
||||
|
@ -445,7 +445,7 @@ def test_mysql_distributed(started_cluster):
|
||||
query = "SELECT * FROM ("
|
||||
for i in range(3):
|
||||
query += "SELECT name FROM test_replicas UNION DISTINCT "
|
||||
query += "SELECT name FROM test_replicas)"
|
||||
query += "SELECT name FROM test_replicas) ORDER BY name"
|
||||
|
||||
result = node2.query(query)
|
||||
assert result == "host2\nhost3\nhost4\n"
|
||||
@ -827,6 +827,9 @@ def test_settings(started_cluster):
|
||||
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
|
||||
)
|
||||
|
||||
node1.query("DROP DATABASE IF EXISTS m")
|
||||
node1.query("DROP DATABASE IF EXISTS mm")
|
||||
|
||||
rw_timeout = 40123001
|
||||
connect_timeout = 40123002
|
||||
node1.query(
|
||||
@ -855,6 +858,9 @@ def test_settings(started_cluster):
|
||||
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
|
||||
)
|
||||
|
||||
node1.query("DROP DATABASE m")
|
||||
node1.query("DROP DATABASE mm")
|
||||
|
||||
drop_mysql_table(conn, table_name)
|
||||
conn.close()
|
||||
|
||||
@ -930,6 +936,9 @@ def test_joins(started_cluster):
|
||||
|
||||
conn.commit()
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS test_joins_table_users")
|
||||
node1.query("DROP TABLE IF EXISTS test_joins_table_tickets")
|
||||
|
||||
node1.query(
|
||||
"""
|
||||
CREATE TABLE test_joins_table_users
|
||||
@ -964,6 +973,9 @@ def test_joins(started_cluster):
|
||||
"""
|
||||
) == "281607\tFeedback\t2024-06-25 12:09:41\tuser@example.com\n"
|
||||
|
||||
node1.query("DROP TABLE test_joins_table_users")
|
||||
node1.query("DROP TABLE test_joins_table_tickets")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
with contextmanager(started_cluster)() as cluster:
|
||||
|
@ -1,6 +1,7 @@
|
||||
import io
|
||||
import logging
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
|
||||
import pytest
|
||||
@ -13,7 +14,6 @@ from uuid import uuid4
|
||||
AVAILABLE_MODES = ["unordered", "ordered"]
|
||||
DEFAULT_AUTH = ["'minio'", "'minio123'"]
|
||||
NO_AUTH = ["NOSIGN"]
|
||||
AZURE_CONTAINER_NAME = "cont"
|
||||
|
||||
|
||||
def prepare_public_s3_bucket(started_cluster):
|
||||
@ -68,13 +68,24 @@ def s3_queue_setup_teardown(started_cluster):
|
||||
instance = started_cluster.instances["instance"]
|
||||
instance_2 = started_cluster.instances["instance2"]
|
||||
|
||||
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
|
||||
instance_2.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
|
||||
instance.query("DROP DATABASE IF EXISTS default; CREATE DATABASE default;")
|
||||
instance_2.query("DROP DATABASE IF EXISTS default; CREATE DATABASE default;")
|
||||
|
||||
minio = started_cluster.minio_client
|
||||
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
|
||||
for obj in objects:
|
||||
minio.remove_object(started_cluster.minio_bucket, obj.object_name)
|
||||
|
||||
container_client = started_cluster.blob_service_client.get_container_client(
|
||||
started_cluster.azurite_container
|
||||
)
|
||||
|
||||
if container_client.exists():
|
||||
blob_names = [b.name for b in container_client.list_blobs()]
|
||||
logging.debug(f"Deleting blobs: {blob_names}")
|
||||
for b in blob_names:
|
||||
container_client.delete_blob(b)
|
||||
|
||||
yield # run test
|
||||
|
||||
|
||||
@ -129,11 +140,6 @@ def started_cluster():
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
|
||||
container_client = cluster.blob_service_client.get_container_client(
|
||||
AZURE_CONTAINER_NAME
|
||||
)
|
||||
container_client.create_container()
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
@ -190,7 +196,7 @@ def put_s3_file_content(started_cluster, filename, data, bucket=None):
|
||||
|
||||
def put_azure_file_content(started_cluster, filename, data, bucket=None):
|
||||
client = started_cluster.blob_service_client.get_blob_client(
|
||||
AZURE_CONTAINER_NAME, filename
|
||||
started_cluster.azurite_container, filename
|
||||
)
|
||||
buf = io.BytesIO(data)
|
||||
client.upload_blob(buf, "BlockBlob", len(data))
|
||||
@ -226,7 +232,7 @@ def create_table(
|
||||
url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/"
|
||||
engine_def = f"{engine_name}('{url}', {auth_params}, {file_format})"
|
||||
else:
|
||||
engine_def = f"{engine_name}('{started_cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', '{files_path}/', 'CSV')"
|
||||
engine_def = f"{engine_name}('{started_cluster.env_variables['AZURITE_CONNECTION_STRING']}', '{started_cluster.azurite_container}', '{files_path}/', 'CSV')"
|
||||
|
||||
node.query(f"DROP TABLE IF EXISTS {table_name}")
|
||||
create_query = f"""
|
||||
@ -262,15 +268,21 @@ def create_mv(
|
||||
)
|
||||
|
||||
|
||||
def generate_random_string(length=6):
|
||||
return "".join(random.choice(string.ascii_lowercase) for i in range(length))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
|
||||
@pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"])
|
||||
def test_delete_after_processing(started_cluster, mode, engine_name):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"test.delete_after_processing_{mode}_{engine_name}"
|
||||
table_name = f"delete_after_processing_{mode}_{engine_name}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
files_path = f"{table_name}_data"
|
||||
files_num = 5
|
||||
row_num = 10
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
if engine_name == "S3Queue":
|
||||
storage = "s3"
|
||||
else:
|
||||
@ -285,7 +297,7 @@ def test_delete_after_processing(started_cluster, mode, engine_name):
|
||||
table_name,
|
||||
mode,
|
||||
files_path,
|
||||
additional_settings={"after_processing": "delete"},
|
||||
additional_settings={"after_processing": "delete", "keeper_path": keeper_path},
|
||||
engine_name=engine_name,
|
||||
)
|
||||
create_mv(node, table_name, dst_table_name)
|
||||
@ -313,7 +325,7 @@ def test_delete_after_processing(started_cluster, mode, engine_name):
|
||||
assert len(objects) == 0
|
||||
else:
|
||||
client = started_cluster.blob_service_client.get_container_client(
|
||||
AZURE_CONTAINER_NAME
|
||||
started_cluster.azurite_container
|
||||
)
|
||||
objects_iterator = client.list_blobs(files_path)
|
||||
for objects in objects_iterator:
|
||||
@ -324,11 +336,12 @@ def test_delete_after_processing(started_cluster, mode, engine_name):
|
||||
@pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"])
|
||||
def test_failed_retry(started_cluster, mode, engine_name):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"test.failed_retry_{mode}_{engine_name}"
|
||||
table_name = f"failed_retry_{mode}_{engine_name}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
files_path = f"{table_name}_data"
|
||||
file_path = f"{files_path}/trash_test.csv"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
retries_num = 3
|
||||
|
||||
values = [
|
||||
@ -385,8 +398,9 @@ def test_failed_retry(started_cluster, mode, engine_name):
|
||||
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
||||
def test_direct_select_file(started_cluster, mode):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"test.direct_select_file_{mode}"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
table_name = f"direct_select_file_{mode}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{mode}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
file_path = f"{files_path}/test.csv"
|
||||
|
||||
@ -447,7 +461,7 @@ def test_direct_select_file(started_cluster, mode):
|
||||
] == []
|
||||
|
||||
# New table with different zookeeper path
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{mode}_2"
|
||||
keeper_path = f"{keeper_path}_2"
|
||||
create_table(
|
||||
started_cluster,
|
||||
node,
|
||||
@ -491,8 +505,17 @@ def test_direct_select_multiple_files(started_cluster, mode):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"direct_select_multiple_files_{mode}"
|
||||
files_path = f"{table_name}_data"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
|
||||
create_table(started_cluster, node, table_name, mode, files_path)
|
||||
create_table(
|
||||
started_cluster,
|
||||
node,
|
||||
table_name,
|
||||
mode,
|
||||
files_path,
|
||||
additional_settings={"keeper_path": keeper_path},
|
||||
)
|
||||
for i in range(5):
|
||||
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
|
||||
values_csv = (
|
||||
@ -515,14 +538,23 @@ def test_direct_select_multiple_files(started_cluster, mode):
|
||||
|
||||
|
||||
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
||||
def test_streaming_to_view_(started_cluster, mode):
|
||||
def test_streaming_to_view(started_cluster, mode):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"streaming_to_view_{mode}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
files_path = f"{table_name}_data"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
|
||||
total_values = generate_random_files(started_cluster, files_path, 10)
|
||||
create_table(started_cluster, node, table_name, mode, files_path)
|
||||
create_table(
|
||||
started_cluster,
|
||||
node,
|
||||
table_name,
|
||||
mode,
|
||||
files_path,
|
||||
additional_settings={"keeper_path": keeper_path},
|
||||
)
|
||||
create_mv(node, table_name, dst_table_name)
|
||||
|
||||
expected_values = set([tuple(i) for i in total_values])
|
||||
@ -544,7 +576,8 @@ def test_streaming_to_many_views(started_cluster, mode):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"streaming_to_many_views_{mode}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
|
||||
for i in range(3):
|
||||
@ -582,7 +615,8 @@ def test_streaming_to_many_views(started_cluster, mode):
|
||||
def test_multiple_tables_meta_mismatch(started_cluster):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"multiple_tables_meta_mismatch"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
|
||||
create_table(
|
||||
@ -675,7 +709,8 @@ def test_multiple_tables_streaming_sync(started_cluster, mode):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"multiple_tables_streaming_sync_{mode}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 300
|
||||
|
||||
@ -756,7 +791,10 @@ def test_multiple_tables_streaming_sync(started_cluster, mode):
|
||||
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
|
||||
node = started_cluster.instances["instance"]
|
||||
node_2 = started_cluster.instances["instance2"]
|
||||
table_name = f"multiple_tables_streaming_sync_distributed_{mode}"
|
||||
# A unique table name is necessary for repeatable tests
|
||||
table_name = (
|
||||
f"multiple_tables_streaming_sync_distributed_{mode}_{generate_random_string()}"
|
||||
)
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
files_path = f"{table_name}_data"
|
||||
@ -833,7 +871,8 @@ def test_max_set_age(started_cluster):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = "max_set_age"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
max_age = 20
|
||||
files_to_generate = 10
|
||||
@ -944,10 +983,9 @@ def test_max_set_age(started_cluster):
|
||||
def test_max_set_size(started_cluster):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"max_set_size"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
max_age = 10
|
||||
files_to_generate = 10
|
||||
|
||||
create_table(
|
||||
@ -991,7 +1029,8 @@ def test_drop_table(started_cluster):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"test_drop"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 300
|
||||
|
||||
@ -1021,9 +1060,11 @@ def test_drop_table(started_cluster):
|
||||
|
||||
def test_s3_client_reused(started_cluster):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"test.test_s3_client_reused"
|
||||
table_name = f"test_s3_client_reused"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
files_path = f"{table_name}_data"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
row_num = 10
|
||||
|
||||
def get_created_s3_clients_count():
|
||||
@ -1057,6 +1098,7 @@ def test_s3_client_reused(started_cluster):
|
||||
additional_settings={
|
||||
"after_processing": "delete",
|
||||
"s3queue_processing_threads_num": 1,
|
||||
"keeper_path": keeper_path,
|
||||
},
|
||||
auth=NO_AUTH,
|
||||
bucket=started_cluster.minio_public_bucket,
|
||||
@ -1114,7 +1156,8 @@ def test_processing_threads(started_cluster, mode):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"processing_threads_{mode}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 300
|
||||
processing_threads = 32
|
||||
@ -1181,7 +1224,8 @@ def test_shards(started_cluster, mode, processing_threads):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"test_shards_{mode}_{processing_threads}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 300
|
||||
shards_num = 3
|
||||
@ -1300,7 +1344,7 @@ where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_pr
|
||||
pytest.param("unordered", 1),
|
||||
pytest.param("unordered", 8),
|
||||
pytest.param("ordered", 1),
|
||||
pytest.param("ordered", 8),
|
||||
pytest.param("ordered", 2),
|
||||
],
|
||||
)
|
||||
def test_shards_distributed(started_cluster, mode, processing_threads):
|
||||
@ -1308,10 +1352,11 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
|
||||
node_2 = started_cluster.instances["instance2"]
|
||||
table_name = f"test_shards_distributed_{mode}_{processing_threads}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 300
|
||||
row_num = 50
|
||||
row_num = 300
|
||||
total_rows = row_num * files_to_generate
|
||||
shards_num = 2
|
||||
|
||||
@ -1461,8 +1506,8 @@ def test_settings_check(started_cluster):
|
||||
node = started_cluster.instances["instance"]
|
||||
node_2 = started_cluster.instances["instance2"]
|
||||
table_name = f"test_settings_check"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
mode = "ordered"
|
||||
|
||||
@ -1504,7 +1549,10 @@ def test_processed_file_setting(started_cluster, processing_threads):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"test_processed_file_setting_{processing_threads}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{processing_threads}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = (
|
||||
f"/clickhouse/test_{table_name}_{processing_threads}_{generate_random_string()}"
|
||||
)
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 10
|
||||
|
||||
@ -1555,7 +1603,10 @@ def test_processed_file_setting_distributed(started_cluster, processing_threads)
|
||||
node_2 = started_cluster.instances["instance2"]
|
||||
table_name = f"test_processed_file_setting_distributed_{processing_threads}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = (
|
||||
f"/clickhouse/test_{table_name}_{processing_threads}_{generate_random_string()}"
|
||||
)
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 10
|
||||
|
||||
@ -1609,7 +1660,8 @@ def test_upgrade(started_cluster):
|
||||
|
||||
table_name = f"test_upgrade"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
# A unique path is necessary for repeatable tests
|
||||
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 10
|
||||
|
||||
@ -1648,7 +1700,8 @@ def test_upgrade(started_cluster):
|
||||
def test_exception_during_insert(started_cluster):
|
||||
node = started_cluster.instances["instance_too_many_parts"]
|
||||
|
||||
table_name = f"test_exception_during_insert"
|
||||
# A unique table name is necessary for repeatable tests
|
||||
table_name = f"test_exception_during_insert_{generate_random_string()}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
files_path = f"{table_name}_data"
|
||||
@ -1664,6 +1717,7 @@ def test_exception_during_insert(started_cluster):
|
||||
"keeper_path": keeper_path,
|
||||
},
|
||||
)
|
||||
node.rotate_logs()
|
||||
total_values = generate_random_files(
|
||||
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
||||
)
|
||||
@ -1680,33 +1734,49 @@ def test_exception_during_insert(started_cluster):
|
||||
)
|
||||
assert "Too many parts" in exception
|
||||
|
||||
original_parts_to_throw_insert = 0
|
||||
modified_parts_to_throw_insert = 10
|
||||
node.replace_in_config(
|
||||
"/etc/clickhouse-server/config.d/merge_tree.xml",
|
||||
"parts_to_throw_insert>0",
|
||||
"parts_to_throw_insert>10",
|
||||
f"parts_to_throw_insert>{original_parts_to_throw_insert}",
|
||||
f"parts_to_throw_insert>{modified_parts_to_throw_insert}",
|
||||
)
|
||||
node.restart_clickhouse()
|
||||
try:
|
||||
node.restart_clickhouse()
|
||||
|
||||
def get_count():
|
||||
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
|
||||
def get_count():
|
||||
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
|
||||
|
||||
expected_rows = 10
|
||||
for _ in range(20):
|
||||
if expected_rows == get_count():
|
||||
break
|
||||
time.sleep(1)
|
||||
assert expected_rows == get_count()
|
||||
expected_rows = 10
|
||||
for _ in range(20):
|
||||
if expected_rows == get_count():
|
||||
break
|
||||
time.sleep(1)
|
||||
assert expected_rows == get_count()
|
||||
finally:
|
||||
node.replace_in_config(
|
||||
"/etc/clickhouse-server/config.d/merge_tree.xml",
|
||||
f"parts_to_throw_insert>{modified_parts_to_throw_insert}",
|
||||
f"parts_to_throw_insert>{original_parts_to_throw_insert}",
|
||||
)
|
||||
node.restart_clickhouse()
|
||||
|
||||
|
||||
def test_commit_on_limit(started_cluster):
|
||||
node = started_cluster.instances["instance"]
|
||||
|
||||
table_name = f"test_commit_on_limit"
|
||||
# A unique table name is necessary for repeatable tests
|
||||
table_name = f"test_commit_on_limit_{generate_random_string()}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 10
|
||||
|
||||
failed_files_event_before = int(
|
||||
node.query(
|
||||
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles' SETTINGS system_events_show_zero_values=1"
|
||||
)
|
||||
)
|
||||
create_table(
|
||||
started_cluster,
|
||||
node,
|
||||
@ -1782,6 +1852,9 @@ def test_commit_on_limit(started_cluster):
|
||||
assert "test_999999.csv" in get_processed_files()
|
||||
|
||||
assert 1 == int(
|
||||
node.count_in_log(f"Setting file {files_path}/test_9999.csv as failed")
|
||||
)
|
||||
assert failed_files_event_before + 1 == int(
|
||||
node.query(
|
||||
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles' SETTINGS system_events_show_zero_values=1"
|
||||
)
|
||||
|
15
tests/performance/all_join_opt.xml
Normal file
15
tests/performance/all_join_opt.xml
Normal file
@ -0,0 +1,15 @@
|
||||
<test>
|
||||
<create_query>CREATE TABLE test (a Int64, b String, c LowCardinality(String)) ENGINE = MergeTree() ORDER BY a</create_query>
|
||||
<create_query>CREATE TABLE test1 (a Int64, b String, c LowCardinality(String)) ENGINE = MergeTree() ORDER BY a</create_query>
|
||||
|
||||
<fill_query>INSERT INTO test SELECT number % 10000, number % 10000, number % 10000 FROM numbers(10000000)</fill_query>
|
||||
<fill_query>INSERT INTO test1 SELECT number % 1000 , number % 1000, number % 1000 FROM numbers(100000)</fill_query>
|
||||
|
||||
<query tag='INNER'>SELECT MAX(test1.a) FROM test INNER JOIN test1 on test.b = test1.b</query>
|
||||
<query tag='LEFT'>SELECT MAX(test1.a) FROM test LEFT JOIN test1 on test.b = test1.b</query>
|
||||
<query tag='RIGHT'>SELECT MAX(test1.a) FROM test RIGHT JOIN test1 on test.b = test1.b</query>
|
||||
<query tag='FULL'>SELECT MAX(test1.a) FROM test FULL JOIN test1 on test.b = test1.b</query>
|
||||
|
||||
<drop_query>DROP TABLE IF EXISTS test</drop_query>
|
||||
<drop_query>DROP TABLE IF EXISTS test1</drop_query>
|
||||
</test>
|
@ -14,4 +14,4 @@
|
||||
========
|
||||
201902_4_5_1 1
|
||||
========
|
||||
201801_1_1_0 1
|
||||
201801_1_1_2 1
|
||||
|
@ -39,6 +39,6 @@ CHECK TABLE mt_table PARTITION 201902 SETTINGS max_threads = 1;
|
||||
|
||||
SELECT '========';
|
||||
|
||||
CHECK TABLE mt_table PART '201801_1_1_0';
|
||||
CHECK TABLE mt_table PART '201801_1_1_2';
|
||||
|
||||
DROP TABLE IF EXISTS mt_table;
|
||||
|
@ -11,33 +11,40 @@ function query()
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&database_atomic_wait_for_drop_and_detach_synchronously=1" -d "$*"
|
||||
}
|
||||
|
||||
# NOTE: database = $CLICKHOUSE_DATABASE is unwanted
|
||||
verify_sql="SELECT
|
||||
(SELECT sumIf(value, metric = 'PartsActive'), sumIf(value, metric = 'PartsOutdated') FROM system.metrics)
|
||||
= (SELECT sum(active), sum(NOT active) FROM
|
||||
(SELECT active FROM system.parts UNION ALL SELECT active FROM system.projection_parts UNION ALL SELECT 1 FROM system.dropped_tables_parts))"
|
||||
|
||||
# The query is not atomic - it can compare states between system.parts and system.metrics from different points in time.
|
||||
# So, there is inherent race condition. But it should get expected result eventually.
|
||||
# In case of test failure, this code will do infinite loop and timeout.
|
||||
verify()
|
||||
{
|
||||
for i in {1..5000}
|
||||
do
|
||||
result=$( query "$verify_sql" )
|
||||
[ "$result" = "1" ] && echo "$result" && break
|
||||
sleep 0.1
|
||||
local result
|
||||
|
||||
if [[ $i -eq 5000 ]]
|
||||
then
|
||||
query "
|
||||
SELECT sumIf(value, metric = 'PartsActive'), sumIf(value, metric = 'PartsOutdated') FROM system.metrics;
|
||||
SELECT sum(active), sum(NOT active) FROM system.parts;
|
||||
SELECT sum(active), sum(NOT active) FROM system.projection_parts;
|
||||
SELECT count() FROM system.dropped_tables_parts;
|
||||
"
|
||||
for _ in {1..100}; do
|
||||
# NOTE: database = $CLICKHOUSE_DATABASE is unwanted
|
||||
result=$( query "SELECT
|
||||
(SELECT sumIf(value, metric = 'PartsActive'), sumIf(value, metric = 'PartsOutdated') FROM system.metrics)
|
||||
=
|
||||
(SELECT sum(active), sum(NOT active) FROM (
|
||||
SELECT active FROM system.parts
|
||||
UNION ALL SELECT active FROM system.projection_parts
|
||||
UNION ALL SELECT 1 FROM system.dropped_tables_parts
|
||||
))"
|
||||
)
|
||||
|
||||
if [ "$result" = "1" ]; then
|
||||
echo "$result"
|
||||
return
|
||||
fi
|
||||
|
||||
sleep 0.5
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
SELECT sumIf(value, metric = 'PartsActive'), sumIf(value, metric = 'PartsOutdated') FROM system.metrics;
|
||||
SELECT sum(active), sum(NOT active) FROM system.parts;
|
||||
SELECT sum(active), sum(NOT active) FROM system.projection_parts;
|
||||
SELECT count() FROM system.dropped_tables_parts;
|
||||
"
|
||||
}
|
||||
|
||||
query "DROP TABLE IF EXISTS test_table"
|
||||
|
@ -3,8 +3,6 @@ Two or six index arguments
|
||||
2nd argument (distance function) must be String and L2Distance or cosineDistance
|
||||
3nd argument (quantization), if given, must be String and f32, f16, ...
|
||||
4nd argument (M), if given, must be UInt64 and > 1
|
||||
5nd argument (ef_construction), if given, must be UInt64 and > 0
|
||||
6nd argument (ef_search), if given, must be UInt64 and > 0
|
||||
Must be created on single column
|
||||
Must be created on Array(Float32) columns
|
||||
Rejects INSERTs of Arrays with different sizes
|
||||
|
@ -27,12 +27,6 @@ CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similar
|
||||
SELECT '4nd argument (M), if given, must be UInt64 and > 1';
|
||||
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance', 'f32', 'invalid', 1, 1)) ENGINE = MergeTree ORDER BY id; -- { serverError INCORRECT_QUERY }
|
||||
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance', 'f32', 1, 1, 1)) ENGINE = MergeTree ORDER BY id; -- { serverError INCORRECT_DATA }
|
||||
SELECT '5nd argument (ef_construction), if given, must be UInt64 and > 0';
|
||||
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance', 'f32', 2, 'invalid', 1)) ENGINE = MergeTree ORDER BY id; -- { serverError INCORRECT_QUERY }
|
||||
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance', 'f32', 2, 0, 1)) ENGINE = MergeTree ORDER BY id; -- { serverError INCORRECT_DATA }
|
||||
SELECT '6nd argument (ef_search), if given, must be UInt64 and > 0';
|
||||
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance', 'f32', 2, 1, 'invalid')) ENGINE = MergeTree ORDER BY id; -- { serverError INCORRECT_QUERY }
|
||||
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance', 'f32', 2, 1, 0)) ENGINE = MergeTree ORDER BY id; -- { serverError INCORRECT_DATA }
|
||||
|
||||
SELECT 'Must be created on single column';
|
||||
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx (vec, id) TYPE vector_similarity('hnsw', 'L2Distance')) ENGINE = MergeTree ORDER BY id; -- { serverError INCORRECT_NUMBER_OF_COLUMNS }
|
||||
|
@ -0,0 +1 @@
|
||||
SYSTEM SYNC REPLICA db.`table` LIGHTWEIGHT
|
@ -0,0 +1 @@
|
||||
SELECT formatQuery('SYSTEM SYNC REPLICA db.table LIGHTWEIGHT');
|
@ -38,3 +38,19 @@
|
||||
7
|
||||
8
|
||||
9
|
||||
15 \N 3 15 15 15 15
|
||||
14 \N 2 10 10 10 154
|
||||
13 \N 2 10 10 10 143
|
||||
12 \N 2 10 10 10 14
|
||||
11 \N 2 10 10 10 12
|
||||
10 \N 2 10 10 10 10
|
||||
9 \N 1 5 5 5 99
|
||||
8 \N 1 5 5 5 88
|
||||
7 \N 1 5 5 5 9
|
||||
6 \N 1 5 5 5 7
|
||||
5 \N 1 5 5 5 5
|
||||
4 \N 0 0 0 0 44
|
||||
3 \N 0 0 0 0 33
|
||||
2 \N 0 0 0 0 4
|
||||
1 \N 0 0 0 0 2
|
||||
0 \N 0 0 0 0 0
|
||||
|
@ -2,3 +2,23 @@ SELECT lagInFrame(2::UInt128, 2, number) OVER w FROM numbers(10) WINDOW w AS (OR
|
||||
SELECT leadInFrame(2::UInt128, 2, number) OVER w FROM numbers(10) WINDOW w AS (ORDER BY number);
|
||||
SELECT lagInFrame(2::UInt64, 2, number) OVER w FROM numbers(10) WINDOW w AS (ORDER BY number);
|
||||
SELECT leadInFrame(2::UInt64, 2, number) OVER w FROM numbers(10) WINDOW w AS (ORDER BY number);
|
||||
|
||||
SELECT
|
||||
number,
|
||||
YYYYMMDDToDate(1, toLowCardinality(11), max(YYYYMMDDToDate(YYYYMMDDToDate(toLowCardinality(1), 11, materialize(NULL), 19700101.1, 1, 27, 7, materialize(toUInt256(37)), 9, 19, 9), 1, toUInt128(11), NULL, 19700101.1, 1, 27, 7, 37, 9, 19, 9), toUInt256(30)) IGNORE NULLS OVER w, NULL, 19700101.1, toNullable(1), 27, materialize(7), 37, 9, 19, 9),
|
||||
p,
|
||||
pp,
|
||||
lagInFrame(number, number - pp) OVER w AS lag2,
|
||||
lagInFrame(number, number - pp, number * 11) OVER w AS lag,
|
||||
leadInFrame(number, number - pp, number * 11) OVER w AS lead
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
number,
|
||||
intDiv(number, 5) AS p,
|
||||
p * 5 AS pp
|
||||
FROM numbers(16)
|
||||
)
|
||||
WHERE toLowCardinality(1)
|
||||
WINDOW w AS (PARTITION BY p ORDER BY number ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
|
||||
ORDER BY number DESC NULLS LAST;
|
||||
|
@ -0,0 +1,2 @@
|
||||
QUERY_WAS_CANCELLED
|
||||
QUERY_WAS_CANCELLED
|
8
tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh
Executable file
8
tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh
Executable file
@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "create table null_t (number UInt64) engine = Null;"
|
||||
${CLICKHOUSE_CLIENT} --query "select sleep(0.1) from system.numbers settings max_block_size = 1 format Native" 2>/dev/null | ${CLICKHOUSE_CLIENT} --max_execution_time 0.3 --timeout_overflow_mode break --query "insert into null_t format Native" 2>&1 | grep -o "QUERY_WAS_CANCELLED"
|
Loading…
Reference in New Issue
Block a user