From 0c9d7f7060a6268d7cfdb1e72ec844bae942031a Mon Sep 17 00:00:00 2001 From: Suzy Wang Date: Tue, 19 Jul 2022 12:42:50 -0700 Subject: [PATCH 01/32] Update libuv to 1.44.1 --- contrib/libuv | 2 +- contrib/libuv-cmake/CMakeLists.txt | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/contrib/libuv b/contrib/libuv index 95081e7c16c..9460f4a12d9 160000 --- a/contrib/libuv +++ b/contrib/libuv @@ -1 +1 @@ -Subproject commit 95081e7c16c9857babe6d4e2bc1c779198ea89ae +Subproject commit 9460f4a12d95884abd841539066b56c99b73fd60 diff --git a/contrib/libuv-cmake/CMakeLists.txt b/contrib/libuv-cmake/CMakeLists.txt index 1a7714e47ce..2452a1e7612 100644 --- a/contrib/libuv-cmake/CMakeLists.txt +++ b/contrib/libuv-cmake/CMakeLists.txt @@ -76,6 +76,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Linux") list(APPEND uv_libraries rt) list(APPEND uv_sources src/unix/linux-core.c + src/unix/epoll.c src/unix/linux-inotify.c src/unix/linux-syscalls.c src/unix/procfs-exepath.c @@ -111,7 +112,8 @@ if(CMAKE_SYSTEM_NAME STREQUAL "OS/390") src/unix/pthread-fixes.c src/unix/pthread-barrier.c src/unix/os390.c - src/unix/os390-syscalls.c) + src/unix/os390-syscalls.c + src/unix/os390-proctitle.c) endif() if(CMAKE_SYSTEM_NAME STREQUAL "SunOS") From ec7f89f2872c5915b0a05354cec2182c0802020d Mon Sep 17 00:00:00 2001 From: Suzy Wang Date: Mon, 15 Aug 2022 12:47:23 -0700 Subject: [PATCH 02/32] Update libuv commits, pull changes from upstream --- contrib/libuv | 2 +- contrib/libuv-cmake/CMakeLists.txt | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/contrib/libuv b/contrib/libuv index 9460f4a12d9..eede294dc4c 160000 --- a/contrib/libuv +++ b/contrib/libuv @@ -1 +1 @@ -Subproject commit 9460f4a12d95884abd841539066b56c99b73fd60 +Subproject commit eede294dc4cd710490c63e2eee4229d160ff9eee diff --git a/contrib/libuv-cmake/CMakeLists.txt b/contrib/libuv-cmake/CMakeLists.txt index 2452a1e7612..b1257f5579a 100644 --- a/contrib/libuv-cmake/CMakeLists.txt +++ b/contrib/libuv-cmake/CMakeLists.txt @@ -15,6 +15,7 @@ set(uv_sources src/inet.c src/random.c src/strscpy.c + src/strtok.c src/threadpool.c src/timer.c src/uv-common.c @@ -75,8 +76,8 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Linux") list(APPEND uv_defines _GNU_SOURCE _POSIX_C_SOURCE=200112) list(APPEND uv_libraries rt) list(APPEND uv_sources - src/unix/linux-core.c src/unix/epoll.c + src/unix/linux-core.c src/unix/linux-inotify.c src/unix/linux-syscalls.c src/unix/procfs-exepath.c @@ -112,8 +113,8 @@ if(CMAKE_SYSTEM_NAME STREQUAL "OS/390") src/unix/pthread-fixes.c src/unix/pthread-barrier.c src/unix/os390.c - src/unix/os390-syscalls.c - src/unix/os390-proctitle.c) + src/unix/os390-proctitle.c + src/unix/os390-syscalls.c) endif() if(CMAKE_SYSTEM_NAME STREQUAL "SunOS") From 9d14397a2ac7d062ab199cdbb9bdc63fbe328ea5 Mon Sep 17 00:00:00 2001 From: Suzy Wang Date: Wed, 17 Aug 2022 08:15:08 -0700 Subject: [PATCH 03/32] rm duplicated declaration --- contrib/libuv-cmake/CMakeLists.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/contrib/libuv-cmake/CMakeLists.txt b/contrib/libuv-cmake/CMakeLists.txt index b1257f5579a..ba6bc746c59 100644 --- a/contrib/libuv-cmake/CMakeLists.txt +++ b/contrib/libuv-cmake/CMakeLists.txt @@ -82,8 +82,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Linux") src/unix/linux-syscalls.c src/unix/procfs-exepath.c src/unix/random-getrandom.c - src/unix/random-sysctl-linux.c - src/unix/sysinfo-loadavg.c) + src/unix/random-sysctl-linux.c) endif() if(CMAKE_SYSTEM_NAME STREQUAL "NetBSD") From 1171ba909530b07d104ebd1c35567d02f5195156 Mon Sep 17 00:00:00 2001 From: Suzy Wang Date: Thu, 18 Aug 2022 13:25:50 -0700 Subject: [PATCH 04/32] Add dup3() and inotify_init() from musl 1.2.3 --- base/glibc-compatibility/musl/dup3.c | 24 +++++++++++++++++++++++ base/glibc-compatibility/musl/inotify.c | 26 +++++++++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 base/glibc-compatibility/musl/dup3.c create mode 100644 base/glibc-compatibility/musl/inotify.c diff --git a/base/glibc-compatibility/musl/dup3.c b/base/glibc-compatibility/musl/dup3.c new file mode 100644 index 00000000000..f919f79125a --- /dev/null +++ b/base/glibc-compatibility/musl/dup3.c @@ -0,0 +1,24 @@ +#define _GNU_SOURCE +#include +#include +#include +#include "syscall.h" + +int __dup3(int old, int new, int flags) +{ + int r; +#ifdef SYS_dup2 + if (old==new) return __syscall_ret(-EINVAL); + if (flags & O_CLOEXEC) { + while ((r=__syscall(SYS_dup3, old, new, flags))==-EBUSY); + if (r!=-ENOSYS) return __syscall_ret(r); + } + while ((r=__syscall(SYS_dup2, old, new))==-EBUSY); + if (flags & O_CLOEXEC) __syscall(SYS_fcntl, new, F_SETFD, FD_CLOEXEC); +#else + while ((r=__syscall(SYS_dup3, old, new, flags))==-EBUSY); +#endif + return __syscall_ret(r); +} + +weak_alias(__dup3, dup3); diff --git a/base/glibc-compatibility/musl/inotify.c b/base/glibc-compatibility/musl/inotify.c new file mode 100644 index 00000000000..df5e48b31b7 --- /dev/null +++ b/base/glibc-compatibility/musl/inotify.c @@ -0,0 +1,26 @@ +#include +#include +#include "syscall.h" + +int inotify_init() +{ + return inotify_init1(0); +} +int inotify_init1(int flags) +{ + int r = __syscall(SYS_inotify_init1, flags); +#ifdef SYS_inotify_init + if (r==-ENOSYS && !flags) r = __syscall(SYS_inotify_init); +#endif + return __syscall_ret(r); +} + +int inotify_add_watch(int fd, const char *pathname, uint32_t mask) +{ + return syscall(SYS_inotify_add_watch, fd, pathname, mask); +} + +int inotify_rm_watch(int fd, int wd) +{ + return syscall(SYS_inotify_rm_watch, fd, wd); +} From 7a190f84c836ceb9deb1b0c1d00ff196c165178c Mon Sep 17 00:00:00 2001 From: Suzy Wang Date: Fri, 19 Aug 2022 11:59:47 -0700 Subject: [PATCH 05/32] Remove weak_alias --- base/glibc-compatibility/musl/dup3.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/base/glibc-compatibility/musl/dup3.c b/base/glibc-compatibility/musl/dup3.c index f919f79125a..7f05788ebfc 100644 --- a/base/glibc-compatibility/musl/dup3.c +++ b/base/glibc-compatibility/musl/dup3.c @@ -20,5 +20,3 @@ int __dup3(int old, int new, int flags) #endif return __syscall_ret(r); } - -weak_alias(__dup3, dup3); From a774dd708d44e312dec3cf4ca0ac5ac96f1f37d6 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Wed, 13 Jul 2022 10:26:38 +0800 Subject: [PATCH 06/32] Better logging support. --- src/Common/logger_useful.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Common/logger_useful.h b/src/Common/logger_useful.h index 1e84efd8085..cc14d768ad0 100644 --- a/src/Common/logger_useful.h +++ b/src/Common/logger_useful.h @@ -3,6 +3,7 @@ /// Macros for convenient usage of Poco logger. #include +#include #include #include #include From a8d8293466dcb7874ca692b4a8b437b617303828 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Wed, 13 Jul 2022 10:27:43 +0800 Subject: [PATCH 07/32] Fix unused columns introduced by with stmt --- src/Interpreters/RequiredSourceColumnsVisitor.cpp | 11 ----------- .../02354_with_statement_non_exist_column.reference | 1 + .../02354_with_statement_non_exist_column.sql | 9 +++++++++ 3 files changed, 10 insertions(+), 11 deletions(-) create mode 100644 tests/queries/0_stateless/02354_with_statement_non_exist_column.reference create mode 100644 tests/queries/0_stateless/02354_with_statement_non_exist_column.sql diff --git a/src/Interpreters/RequiredSourceColumnsVisitor.cpp b/src/Interpreters/RequiredSourceColumnsVisitor.cpp index fe0ee6f97ac..18cbfaee63f 100644 --- a/src/Interpreters/RequiredSourceColumnsVisitor.cpp +++ b/src/Interpreters/RequiredSourceColumnsVisitor.cpp @@ -154,17 +154,6 @@ void RequiredSourceColumnsMatcher::visit(const ASTSelectQuery & select, const AS find_columns(interpolate->as()->expr.get()); } - if (const auto & with = select.with()) - { - for (auto & node : with->children) - { - if (const auto * identifier = node->as()) - data.addColumnIdentifier(*identifier); - else - data.addColumnAliasIfAny(*node); - } - } - std::vector out; for (const auto & node : select.children) { diff --git a/tests/queries/0_stateless/02354_with_statement_non_exist_column.reference b/tests/queries/0_stateless/02354_with_statement_non_exist_column.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/tests/queries/0_stateless/02354_with_statement_non_exist_column.reference @@ -0,0 +1 @@ +1 diff --git a/tests/queries/0_stateless/02354_with_statement_non_exist_column.sql b/tests/queries/0_stateless/02354_with_statement_non_exist_column.sql new file mode 100644 index 00000000000..1a989c1d952 --- /dev/null +++ b/tests/queries/0_stateless/02354_with_statement_non_exist_column.sql @@ -0,0 +1,9 @@ +WITH x AS y SELECT 1; + +DROP TEMPORARY TABLE IF EXISTS t1; +DROP TEMPORARY TABLE IF EXISTS t2; + +CREATE TEMPORARY TABLE t1 (a Int64); +CREATE TEMPORARY TABLE t2 (a Int64, b Int64); + +WITH b AS bb SELECT bb FROM t2 WHERE a IN (SELECT a FROM t1); From d58de79f574a8cfe4b1a864dd9fc8d4c1cf76535 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Wed, 24 Aug 2022 00:58:42 +0800 Subject: [PATCH 08/32] Revert "Better logging support." This reverts commit a774dd708d44e312dec3cf4ca0ac5ac96f1f37d6. --- src/Common/logger_useful.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Common/logger_useful.h b/src/Common/logger_useful.h index cc14d768ad0..1e84efd8085 100644 --- a/src/Common/logger_useful.h +++ b/src/Common/logger_useful.h @@ -3,7 +3,6 @@ /// Macros for convenient usage of Poco logger. #include -#include #include #include #include From ee2f9caa16bce376776912662eed10a44337f898 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 24 Aug 2022 17:44:09 +0200 Subject: [PATCH 09/32] Fix --- src/Common/FileCache.cpp | 85 ++++++++++++++++++++++------------ src/Common/FileCache.h | 8 +++- src/Common/FileSegment.cpp | 94 +++++++++++++++++++++++++------------- src/Common/FileSegment.h | 7 ++- 4 files changed, 130 insertions(+), 64 deletions(-) diff --git a/src/Common/FileCache.cpp b/src/Common/FileCache.cpp index 1aa8a25bb79..f996527e54c 100644 --- a/src/Common/FileCache.cpp +++ b/src/Common/FileCache.cpp @@ -74,7 +74,12 @@ bool FileCache::isReadOnly() void FileCache::assertInitialized() const { if (!is_initialized) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized"); + { + if (initialization_exception) + std::rethrow_exception(initialization_exception); + else + throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized"); + } } void FileCache::initialize() @@ -90,27 +95,45 @@ void FileCache::initialize() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + initialization_exception = std::current_exception(); throw; } } else + { fs::create_directories(cache_base_path); + } is_initialized = true; } } void FileCache::useCell( - const FileSegmentCell & cell, FileSegments & result, std::lock_guard & cache_lock) const + const FileSegmentCell & cell, FileSegments & result, std::lock_guard & cache_lock) { auto file_segment = cell.file_segment; - if (file_segment->isDownloaded() - && fs::file_size(getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->isPersistent())) == 0) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Cannot have zero size downloaded file segments. Current file segment: {}", - file_segment->range().toString()); + if (file_segment->isDownloaded()) + { + fs::path path = file_segment->getPathInLocalCache(); + if (!fs::exists(path)) + { + remove(file_segment, cache_lock); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "File path does not exist, but file has DOWNLOADED state. {}", + file_segment->getInfoForLog()); + } + + if (fs::file_size(path) == 0) + { + remove(file_segment, cache_lock); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot have zero size downloaded file segments. {}", + file_segment->getInfoForLog()); + } + } result.push_back(cell.file_segment); @@ -221,7 +244,12 @@ FileSegments FileCache::getImpl( } FileSegments FileCache::splitRangeIntoCells( - const Key & key, size_t offset, size_t size, FileSegment::State state, bool is_persistent, std::lock_guard & cache_lock) + const Key & key, + size_t offset, + size_t size, + FileSegment::State state, + bool is_persistent, + std::lock_guard & cache_lock) { assert(size > 0); @@ -416,7 +444,7 @@ FileCache::FileSegmentCell * FileCache::addCell( if (files[key].contains(offset)) throw Exception( ErrorCodes::LOGICAL_ERROR, - "Cache already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}", + "Cache cell already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}", key.toString(), offset, size, dumpStructureUnlocked(key, cache_lock)); auto skip_or_download = [&]() -> FileSegmentPtr @@ -604,9 +632,7 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc auto remove_file_segment = [&](FileSegmentPtr file_segment, size_t file_segment_size) { query_context->remove(file_segment->key(), file_segment->offset(), file_segment_size, cache_lock); - - std::lock_guard segment_lock(file_segment->mutex); - remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock); + remove(file_segment, cache_lock); }; assert(trash.empty()); @@ -723,19 +749,13 @@ bool FileCache::tryReserveForMainList( } } - auto remove_file_segment = [&](FileSegmentPtr file_segment) - { - std::lock_guard segment_lock(file_segment->mutex); - remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock); - }; - /// This case is very unlikely, can happen in case of exception from /// file_segment->complete(), which would be a logical error. assert(trash.empty()); for (auto & cell : trash) { if (auto file_segment = cell->file_segment) - remove_file_segment(file_segment); + remove(file_segment, cache_lock); } if (is_overflow()) @@ -757,7 +777,7 @@ bool FileCache::tryReserveForMainList( for (auto & cell : to_evict) { if (auto file_segment = cell->file_segment) - remove_file_segment(file_segment); + remove(file_segment, cache_lock); } if (main_priority->getCacheSize(cache_lock) > (1ull << 63)) @@ -868,6 +888,12 @@ void FileCache::removeIfReleasable() #endif } +void FileCache::remove(FileSegmentPtr file_segment, std::lock_guard & cache_lock) +{ + std::lock_guard segment_lock(file_segment->mutex); + remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock); +} + void FileCache::remove( Key key, size_t offset, std::lock_guard & cache_lock, std::lock_guard & /* segment_lock */) @@ -878,7 +904,7 @@ void FileCache::remove( if (!cell) throw Exception(ErrorCodes::LOGICAL_ERROR, "No cache cell for key: {}, offset: {}", key.toString(), offset); - bool is_persistent_file_segment = cell->file_segment->isPersistent(); + auto cache_file_path = cell->file_segment->getPathInLocalCache(); if (cell->queue_iterator) { @@ -888,7 +914,6 @@ void FileCache::remove( auto & offsets = files[key]; offsets.erase(offset); - auto cache_file_path = getPathInLocalCache(key, offset, is_persistent_file_segment); if (fs::exists(cache_file_path)) { try @@ -907,9 +932,10 @@ void FileCache::remove( } catch (...) { - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {}", - key.toString(), offset, cache_file_path, getCurrentExceptionMessage(false)); + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {}", + key.toString(), offset, cache_file_path, getCurrentExceptionMessage(false)); } } } @@ -1138,9 +1164,10 @@ FileCache::FileSegmentCell::FileSegmentCell( break; } default: - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state, got: {}", - FileSegment::stateToString(file_segment->download_state)); + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state, got: {}", + FileSegment::stateToString(file_segment->download_state)); } } diff --git a/src/Common/FileCache.h b/src/Common/FileCache.h index 1690690d102..e1a06dc48e4 100644 --- a/src/Common/FileCache.h +++ b/src/Common/FileCache.h @@ -140,7 +140,9 @@ private: bool enable_filesystem_query_cache_limit; Poco::Logger * log; + bool is_initialized = false; + std::exception_ptr initialization_exception; mutable std::mutex mutex; @@ -152,6 +154,10 @@ private: std::lock_guard & cache_lock, std::lock_guard & segment_lock); + void remove( + FileSegmentPtr file_segment, + std::lock_guard & cache_lock); + bool isLastFileSegmentHolder( const Key & key, size_t offset, @@ -220,7 +226,7 @@ private: bool is_persistent, std::lock_guard & cache_lock); - void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard & cache_lock) const; + void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard & cache_lock); bool tryReserveForMainList( const Key & key, diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index 0b2e874e9ab..17a0fc5f9a7 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -89,7 +89,7 @@ size_t FileSegment::getDownloadedSize() const size_t FileSegment::getRemainingSizeToDownload() const { std::lock_guard segment_lock(mutex); - return range().size() - downloaded_size; + return range().size() - getDownloadedSize(segment_lock); } bool FileSegment::isDetached() const @@ -158,7 +158,7 @@ void FileSegment::resetDownloader() void FileSegment::resetDownloaderImpl(std::lock_guard & segment_lock) { - if (downloaded_size == range().size()) + if (getDownloadedSize(segment_lock) == range().size()) setDownloaded(segment_lock); else download_state = State::PARTIALLY_DOWNLOADED; @@ -240,14 +240,16 @@ void FileSegment::write(const char * from, size_t size, size_t offset_) "Not enough space is reserved. Available: {}, expected: {}", availableSize(), size); if (!isDownloader()) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})", - getCallerId(), downloader_id); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})", + getCallerId(), downloader_id); - if (downloaded_size == range().size()) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Attempt to write {} bytes to offset: {}, but current file segment is already fully downloaded", - size, offset_); + if (getDownloadedSize() == range().size()) + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Attempt to write {} bytes to offset: {}, but current file segment is already fully downloaded", + size, offset_); auto download_offset = range().left + downloaded_size; if (offset_ != download_offset) @@ -349,12 +351,16 @@ bool FileSegment::reserve(size_t size) caller_id, downloader_id); } - if (downloaded_size + size > range().size()) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})", - size, range().toString(), downloaded_size); + size_t current_downloaded_size = getDownloadedSize(segment_lock); + if (current_downloaded_size + size > range().size()) + { + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Attempt to reserve space too much space: {} ({})", + size, getInfoForLogImpl(segment_lock)); + } - assert(reserved_size >= downloaded_size); + assert(reserved_size >= current_downloaded_size); } /** @@ -362,29 +368,45 @@ bool FileSegment::reserve(size_t size) * in case previous downloader did not fully download current file_segment * and the caller is going to continue; */ - size_t free_space = reserved_size - downloaded_size; - size_t size_to_reserve = size - free_space; - std::lock_guard cache_lock(cache->mutex); + size_t current_downloaded_size = getDownloadedSize(); + assert(reserved_size >= current_downloaded_size); + size_t free_space = reserved_size - current_downloaded_size; - bool reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock); - - if (reserved) + bool reserved = free_space >= size; + if (!reserved) { - std::lock_guard segment_lock(mutex); - reserved_size += size; + std::lock_guard cache_lock(cache->mutex); + + size_t size_to_reserve = size - free_space; + reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock); + + if (reserved) + { + std::lock_guard segment_lock(mutex); + reserved_size += size; + } } return reserved; } -void FileSegment::setDownloaded(std::lock_guard & /* segment_lock */) +bool FileSegment::isDownloaded() const +{ + std::lock_guard segment_lock(mutex); + return isDownloadedUnlocked(segment_lock); +} + +bool FileSegment::isDownloadedUnlocked(std::lock_guard & /* segment_lock */) const +{ + return is_downloaded; +} + +void FileSegment::setDownloaded([[maybe_unused]] std::lock_guard & segment_lock) { if (is_downloaded) return; - download_state = State::DOWNLOADED; - is_downloaded = true; downloader_id.clear(); if (cache_writer) @@ -393,6 +415,12 @@ void FileSegment::setDownloaded(std::lock_guard & /* segment_lock */ cache_writer.reset(); remote_file_reader.reset(); } + + download_state = State::DOWNLOADED; + is_downloaded = true; + + assert(getDownloadedSize(segment_lock) > 0); + assert(std::filesystem::file_size(getPathInLocalCache()) > 0); } void FileSegment::setDownloadFailed(std::lock_guard & /* segment_lock */) @@ -425,7 +453,7 @@ void FileSegment::completeBatchAndResetDownloader() resetDownloaderImpl(segment_lock); - LOG_TEST(log, "Complete batch. Current downloaded size: {}", downloaded_size); + LOG_TEST(log, "Complete batch. Current downloaded size: {}", getDownloadedSize(segment_lock)); cv.notify_all(); } @@ -513,8 +541,8 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach } case State::DOWNLOADED: { - assert(downloaded_size == range().size()); - assert(is_downloaded); + assert(getDownloadedSize(segment_lock) == range().size()); + assert(isDownloadedUnlocked(segment_lock)); break; } case State::DOWNLOADING: @@ -574,6 +602,7 @@ String FileSegment::getInfoForLogImpl(std::lock_guard & segment_lock { WriteBufferFromOwnString info; info << "File segment: " << range().toString() << ", "; + info << "key: " << key().toString() << ", "; info << "state: " << download_state << ", "; info << "downloaded size: " << getDownloadedSize(segment_lock) << ", "; info << "reserved size: " << reserved_size << ", "; @@ -814,17 +843,18 @@ void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment) if (file_segment.isDetached()) return; - if (file_segment.getDownloadedSize() > 0) + size_t current_downloaded_size = file_segment.getDownloadedSize(); + if (current_downloaded_size > 0) { /// file_segment->complete(DOWNLOADED) is not enough, because file segment capacity /// was initially set with a margin as `max_file_segment_size`. => We need to always /// resize to actual size after download finished. file_segment.getOrSetDownloader(); - assert(file_segment.downloaded_size <= file_segment.range().size()); + assert(current_downloaded_size <= file_segment.range().size()); file_segment.segment_range = FileSegment::Range( - file_segment.segment_range.left, file_segment.segment_range.left + file_segment.downloaded_size - 1); - file_segment.reserved_size = file_segment.downloaded_size; + file_segment.segment_range.left, file_segment.segment_range.left + current_downloaded_size - 1); + file_segment.reserved_size = current_downloaded_size; file_segment.completeWithState(FileSegment::State::DOWNLOADED); diff --git a/src/Common/FileSegment.h b/src/Common/FileSegment.h index 2b25cfd172e..b0e81e0f268 100644 --- a/src/Common/FileSegment.h +++ b/src/Common/FileSegment.h @@ -130,7 +130,7 @@ public: bool isDownloader() const; - bool isDownloaded() const { return is_downloaded.load(); } + bool isDownloaded() const; static String getCallerId(); @@ -187,6 +187,8 @@ private: void setDownloadFailed(std::lock_guard & segment_lock); bool isDownloaderImpl(std::lock_guard & segment_lock) const; + bool isDownloadedUnlocked(std::lock_guard & segment_lock) const; + void wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard & segment_lock) const; bool lastFileSegmentHolder() const; @@ -236,7 +238,8 @@ private: /// In general case, all file segments are owned by cache. bool is_detached = false; - std::atomic is_downloaded{false}; + bool is_downloaded{false}; + std::atomic hits_count = 0; /// cache hits. std::atomic ref_count = 0; /// Used for getting snapshot state From b52e098a4094f314c85f1ca80439352d3b70bf4a Mon Sep 17 00:00:00 2001 From: helifu Date: Thu, 25 Aug 2022 10:56:53 +0800 Subject: [PATCH 10/32] Fix a crash while the grpc port is conflict --- src/Server/GRPCServer.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index 573e43e9d7a..165dc7e1344 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -1862,6 +1862,11 @@ void GRPCServer::start() queue = builder.AddCompletionQueue(); grpc_server = builder.BuildAndStart(); + if (nullptr == grpc_server) + { + throw DB::Exception("Can't start grpc server, there is a port conflict", DB::ErrorCodes::NETWORK_ERROR); + } + runner->start(); } From 44577ea8f62520d5d0e22858ba554ace2c8740c4 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Thu, 25 Aug 2022 13:02:41 +0200 Subject: [PATCH 11/32] Update FileSegment.cpp --- src/Common/FileSegment.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index 17a0fc5f9a7..af8f18af1cc 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -384,7 +384,7 @@ bool FileSegment::reserve(size_t size) if (reserved) { std::lock_guard segment_lock(mutex); - reserved_size += size; + reserved_size += size_to_reserve; } } From fac87d9c4dda28b93916edd6e206f7730540e61f Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Thu, 25 Aug 2022 13:18:03 +0200 Subject: [PATCH 12/32] Update FileSegment.cpp --- src/Common/FileSegment.cpp | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index 5c7d85f49b5..edeab26dfe7 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -522,13 +522,6 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach download_state = State::PARTIALLY_DOWNLOADED; resetDownloaderImpl(segment_lock); - - if (cache_writer) - { - cache_writer->finalize(); - cache_writer.reset(); - remote_file_reader.reset(); - } } switch (download_state) From 5b7fe91675581dc8ea50955dfb085b6aa14b7bd1 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Fri, 26 Aug 2022 17:02:31 +0200 Subject: [PATCH 13/32] Avoid deadlock in case of new query and OOM --- src/Interpreters/ProcessList.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index a6e3387b647..9643e2750ba 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -83,6 +83,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as { std::unique_lock lock(mutex); + OvercommitTrackerBlockerInThread overcommit_blocker; // To avoid deadlock in case of OOM IAST::QueryKind query_kind = ast->getQueryKind(); const auto queue_max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds(); From cad4eeb8917e6fafd3a41fd8e4b1dbc708bf68f4 Mon Sep 17 00:00:00 2001 From: Suzy Wang Date: Sun, 28 Aug 2022 18:03:07 -0700 Subject: [PATCH 14/32] Correct __dup3() to dup3() --- base/glibc-compatibility/musl/dup3.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/base/glibc-compatibility/musl/dup3.c b/base/glibc-compatibility/musl/dup3.c index 7f05788ebfc..4da17d4bc9f 100644 --- a/base/glibc-compatibility/musl/dup3.c +++ b/base/glibc-compatibility/musl/dup3.c @@ -4,7 +4,7 @@ #include #include "syscall.h" -int __dup3(int old, int new, int flags) +int dup3(int old, int new, int flags) { int r; #ifdef SYS_dup2 From 0a6c4b92657944b35648795a129a7efff2816a82 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 29 Aug 2022 15:41:32 +0200 Subject: [PATCH 15/32] Fix --- docs/en/engines/database-engines/mysql.md | 4 +- src/Common/mysqlxx/Pool.cpp | 10 ++- src/Common/mysqlxx/PoolWithFailover.cpp | 12 ++- src/Common/mysqlxx/mysqlxx/Pool.h | 22 ++++- src/Databases/DatabaseFactory.cpp | 15 ++-- .../ExternalDataSourceConfiguration.cpp | 6 ++ src/Storages/MySQL/MySQLHelpers.cpp | 10 ++- src/Storages/MySQL/MySQLHelpers.h | 5 +- src/Storages/MySQL/MySQLSettings.cpp | 8 +- src/Storages/MySQL/MySQLSettings.h | 2 + src/TableFunctions/TableFunctionMySQL.cpp | 17 +++- .../configs/named_collections.xml | 11 +++ tests/integration/test_storage_mysql/test.py | 87 +++++++++++++++++++ 13 files changed, 184 insertions(+), 25 deletions(-) diff --git a/docs/en/engines/database-engines/mysql.md b/docs/en/engines/database-engines/mysql.md index 89a0786a9ec..45f2d9a29e1 100644 --- a/docs/en/engines/database-engines/mysql.md +++ b/docs/en/engines/database-engines/mysql.md @@ -3,7 +3,7 @@ sidebar_position: 50 sidebar_label: MySQL --- -# MySQL +# MySQL Allows to connect to databases on a remote MySQL server and perform `INSERT` and `SELECT` queries to exchange data between ClickHouse and MySQL. @@ -98,7 +98,7 @@ mysql> select * from mysql_table; Database in ClickHouse, exchanging data with the MySQL server: ``` sql -CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password') +CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password') SETTINGS read_write_timeout=10000, connect_timeout=100; ``` ``` sql diff --git a/src/Common/mysqlxx/Pool.cpp b/src/Common/mysqlxx/Pool.cpp index a74feb54cd3..bee62a0af2e 100644 --- a/src/Common/mysqlxx/Pool.cpp +++ b/src/Common/mysqlxx/Pool.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include @@ -260,7 +261,10 @@ void Pool::Entry::forceConnected() const else sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); - pool->logger.debug("Entry: Reconnecting to MySQL server %s", pool->description); + pool->logger.debug( + "Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u", + pool->description, pool->connect_timeout, pool->rw_timeout); + data->conn.connect( pool->db.c_str(), pool->server.c_str(), @@ -325,6 +329,10 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time) { logger.debug("Connecting to %s", description); + logger.debug( + "Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u", + description, connect_timeout, rw_timeout); + conn_ptr->conn.connect( db.c_str(), server.c_str(), diff --git a/src/Common/mysqlxx/PoolWithFailover.cpp b/src/Common/mysqlxx/PoolWithFailover.cpp index 36dd713d454..f3dee1a6776 100644 --- a/src/Common/mysqlxx/PoolWithFailover.cpp +++ b/src/Common/mysqlxx/PoolWithFailover.cpp @@ -168,6 +168,7 @@ PoolWithFailover::Entry PoolWithFailover::get() } app.logger().warning("Connection to " + pool->getDescription() + " failed: " + e.displayText()); + replica_name_to_error_detail.insert_or_assign(pool->getDescription(), ErrorDetail{e.code(), e.displayText()}); continue; @@ -177,7 +178,10 @@ PoolWithFailover::Entry PoolWithFailover::get() } } - app.logger().error("Connection to all replicas failed " + std::to_string(try_no + 1) + " times"); + if (replicas_by_priority.size() > 1) + app.logger().error("Connection to all mysql replicas failed " + std::to_string(try_no + 1) + " times"); + else + app.logger().error("Connection to mysql failed " + std::to_string(try_no + 1) + " times"); } if (full_pool) @@ -187,7 +191,11 @@ PoolWithFailover::Entry PoolWithFailover::get() } DB::WriteBufferFromOwnString message; - message << "Connections to all replicas failed: "; + if (replicas_by_priority.size() > 1) + message << "Connections to all mysql replicas failed: "; + else + message << "Connections to mysql failed: "; + for (auto it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it) { for (auto jt = it->second.begin(); jt != it->second.end(); ++jt) diff --git a/src/Common/mysqlxx/mysqlxx/Pool.h b/src/Common/mysqlxx/mysqlxx/Pool.h index 5a436146f02..1fa8eaeb997 100644 --- a/src/Common/mysqlxx/mysqlxx/Pool.h +++ b/src/Common/mysqlxx/mysqlxx/Pool.h @@ -169,10 +169,24 @@ public: unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS, unsigned enable_local_infile_ = MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE, bool opt_reconnect_ = MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT) - : logger(Poco::Logger::get("mysqlxx::Pool")), default_connections(default_connections_), - max_connections(max_connections_), db(db_), server(server_), user(user_), password(password_), port(port_), socket(socket_), - connect_timeout(connect_timeout_), rw_timeout(rw_timeout_), enable_local_infile(enable_local_infile_), - opt_reconnect(opt_reconnect_) {} + : logger(Poco::Logger::get("mysqlxx::Pool")) + , default_connections(default_connections_) + , max_connections(max_connections_) + , db(db_) + , server(server_) + , user(user_) + , password(password_) + , port(port_) + , socket(socket_) + , connect_timeout(connect_timeout_) + , rw_timeout(rw_timeout_) + , enable_local_infile(enable_local_infile_) + , opt_reconnect(opt_reconnect_) + { + logger.debug( + "Created MySQL Pool with settings: connect_timeout=%u, read_write_timeout=%u, default_connections_number=%u, max_connections_number=%u", + connect_timeout, rw_timeout, default_connections, max_connections); + } Pool(const Pool & other) : logger(other.logger), default_connections{other.default_connections}, diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index c16de2d33a5..96db2a17b72 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -183,15 +183,15 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String StorageMySQLConfiguration configuration; ASTs & arguments = engine->arguments->children; - MySQLSettings mysql_settings; + auto mysql_settings = std::make_unique(); - if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, mysql_settings)) + if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, *mysql_settings)) { auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value(); configuration.set(common_configuration); configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; - mysql_settings.applyChanges(settings_changes); + mysql_settings->applyChanges(settings_changes); if (!storage_specific_args.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -228,15 +228,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String { if (engine_name == "MySQL") { - auto mysql_database_settings = std::make_unique(); - auto mysql_pool = createMySQLPoolWithFailover(configuration, mysql_settings); + mysql_settings->loadFromQueryContext(context); + mysql_settings->loadFromQuery(*engine_define); /// higher priority - mysql_database_settings->loadFromQueryContext(context); - mysql_database_settings->loadFromQuery(*engine_define); /// higher priority + auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings); return std::make_shared( context, database_name, metadata_path, engine_define, configuration.database, - std::move(mysql_database_settings), std::move(mysql_pool), create.attach); + std::move(mysql_settings), std::move(mysql_pool), create.attach); } MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password); diff --git a/src/Storages/ExternalDataSourceConfiguration.cpp b/src/Storages/ExternalDataSourceConfiguration.cpp index 5710aa6cd6a..53a9655e0c7 100644 --- a/src/Storages/ExternalDataSourceConfiguration.cpp +++ b/src/Storages/ExternalDataSourceConfiguration.cpp @@ -17,6 +17,7 @@ #endif #if USE_MYSQL #include +#include #endif #if USE_NATSIO #include @@ -575,6 +576,10 @@ template std::optional getExternalDataSourceConfiguration( const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings & storage_settings); +template +std::optional getExternalDataSourceConfiguration( + const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings & storage_settings); + template std::optional getExternalDataSourceConfiguration( const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, @@ -583,5 +588,6 @@ std::optional getExternalDataSourceConfiguration( template SettingsChanges getSettingsChangesFromConfig( const BaseSettings & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix); + #endif } diff --git a/src/Storages/MySQL/MySQLHelpers.cpp b/src/Storages/MySQL/MySQLHelpers.cpp index edeb4ffca8a..94c07d2670f 100644 --- a/src/Storages/MySQL/MySQLHelpers.cpp +++ b/src/Storages/MySQL/MySQLHelpers.cpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -13,8 +14,8 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -mysqlxx::PoolWithFailover -createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings) +template mysqlxx::PoolWithFailover +createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const T & mysql_settings) { if (!mysql_settings.connection_pool_size) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size"); @@ -29,6 +30,11 @@ createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, con mysql_settings.read_write_timeout); } +template +mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings); +template +mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const ConnectionMySQLSettings & mysql_settings); + } #endif diff --git a/src/Storages/MySQL/MySQLHelpers.h b/src/Storages/MySQL/MySQLHelpers.h index 712c5a2c719..59052be5c2a 100644 --- a/src/Storages/MySQL/MySQLHelpers.h +++ b/src/Storages/MySQL/MySQLHelpers.h @@ -9,10 +9,9 @@ namespace mysqlxx { class PoolWithFailover; } namespace DB { struct StorageMySQLConfiguration; -struct MySQLSettings; -mysqlxx::PoolWithFailover -createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings); +template mysqlxx::PoolWithFailover +createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const T & mysql_settings); } diff --git a/src/Storages/MySQL/MySQLSettings.cpp b/src/Storages/MySQL/MySQLSettings.cpp index 1a8f0804777..5c1a2246ae9 100644 --- a/src/Storages/MySQL/MySQLSettings.cpp +++ b/src/Storages/MySQL/MySQLSettings.cpp @@ -15,13 +15,18 @@ namespace ErrorCodes IMPLEMENT_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS) +void MySQLSettings::loadFromQuery(const ASTSetQuery & settings_def) +{ + applyChanges(settings_def.changes); +} + void MySQLSettings::loadFromQuery(ASTStorage & storage_def) { if (storage_def.settings) { try { - applyChanges(storage_def.settings->changes); + loadFromQuery(*storage_def.settings); } catch (Exception & e) { @@ -39,4 +44,3 @@ void MySQLSettings::loadFromQuery(ASTStorage & storage_def) } } - diff --git a/src/Storages/MySQL/MySQLSettings.h b/src/Storages/MySQL/MySQLSettings.h index be1e09c12e6..9fa9b846cd3 100644 --- a/src/Storages/MySQL/MySQLSettings.h +++ b/src/Storages/MySQL/MySQLSettings.h @@ -13,6 +13,7 @@ namespace Poco::Util namespace DB { class ASTStorage; +class ASTSetQuery; #define LIST_OF_MYSQL_SETTINGS(M) \ M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \ @@ -32,6 +33,7 @@ using MySQLBaseSettings = BaseSettings; struct MySQLSettings : public MySQLBaseSettings { void loadFromQuery(ASTStorage & storage_def); + void loadFromQuery(const ASTSetQuery & settings_def); }; diff --git a/src/TableFunctions/TableFunctionMySQL.cpp b/src/TableFunctions/TableFunctionMySQL.cpp index bd554b6163e..c67d6b3b652 100644 --- a/src/TableFunctions/TableFunctionMySQL.cpp +++ b/src/TableFunctions/TableFunctionMySQL.cpp @@ -37,11 +37,26 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr if (!args_func.arguments) throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR); + auto & args = args_func.arguments->children; + MySQLSettings mysql_settings; - configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context, mysql_settings); + const auto & settings = context->getSettingsRef(); mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec; mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec; + + for (auto it = args.begin(); it != args.end(); ++it) + { + const ASTSetQuery * settings_ast = (*it)->as(); + if (settings_ast) + { + mysql_settings.loadFromQuery(*settings_ast); + args.erase(it); + break; + } + } + + configuration = StorageMySQL::getConfiguration(args, context, mysql_settings); pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings)); } diff --git a/tests/integration/test_storage_mysql/configs/named_collections.xml b/tests/integration/test_storage_mysql/configs/named_collections.xml index b4a79880d2a..4d3fbf6085c 100644 --- a/tests/integration/test_storage_mysql/configs/named_collections.xml +++ b/tests/integration/test_storage_mysql/configs/named_collections.xml @@ -30,5 +30,16 @@ test_table
0 + + root + clickhouse + mysql57 + 3306 + clickhouse + test_settings
+ 1 + 20123001 + 20123002 +
diff --git a/tests/integration/test_storage_mysql/test.py b/tests/integration/test_storage_mysql/test.py index 34ef17327f9..50f0c5519b5 100644 --- a/tests/integration/test_storage_mysql/test.py +++ b/tests/integration/test_storage_mysql/test.py @@ -732,6 +732,93 @@ def test_mysql_null(started_cluster): conn.close() +def test_settings(started_cluster): + table_name = "test_settings" + node1.query(f"DROP TABLE IF EXISTS {table_name}") + wait_timeout = 123 + rw_timeout = 10123001 + connect_timeout = 10123002 + connection_pool_size = 1 + + conn = get_mysql_conn(started_cluster, cluster.mysql_ip) + drop_mysql_table(conn, table_name) + create_mysql_table(conn, table_name) + + node1.query( + f""" + CREATE TABLE {table_name} + ( + id UInt32, + name String, + age UInt32, + money UInt32 + ) + ENGINE = MySQL('mysql57:3306', 'clickhouse', '{table_name}', 'root', 'clickhouse') + SETTINGS connection_wait_timeout={wait_timeout}, connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}, connection_pool_size={connection_pool_size} + """ + ) + + node1.query(f"SELECT * FROM {table_name}") + assert node1.contains_in_log( + f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" + ) + + rw_timeout = 20123001 + connect_timeout = 20123002 + node1.query(f"SELECT * FROM mysql(mysql_with_settings)") + assert node1.contains_in_log( + f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" + ) + + rw_timeout = 30123001 + connect_timeout = 30123002 + node1.query( + f""" + SELECT * + FROM mysql('mysql57:3306', 'clickhouse', '{table_name}', 'root', 'clickhouse', + SETTINGS + connection_wait_timeout={wait_timeout}, + connect_timeout={connect_timeout}, + read_write_timeout={rw_timeout}, + connection_pool_size={connection_pool_size}) + """ + ) + assert node1.contains_in_log( + f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" + ) + + rw_timeout = 40123001 + connect_timeout = 40123002 + node1.query( + f""" + CREATE DATABASE m + ENGINE = MySQL(mysql_with_settings, connection_wait_timeout={wait_timeout}, connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}, connection_pool_size={connection_pool_size}) + """ + ) + assert node1.contains_in_log( + f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" + ) + + rw_timeout = 50123001 + connect_timeout = 50123002 + node1.query( + f""" + CREATE DATABASE mm ENGINE = MySQL('mysql57:3306', 'clickhouse', 'root', 'clickhouse') + SETTINGS + connection_wait_timeout={wait_timeout}, + connect_timeout={connect_timeout}, + read_write_timeout={rw_timeout}, + connection_pool_size={connection_pool_size} + """ + ) + assert node1.contains_in_log( + f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" + ) + + drop_mysql_table(conn, table_name) + conn.close() + + if __name__ == "__main__": with contextmanager(started_cluster)() as cluster: for name, instance in list(cluster.instances.items()): From b1dab84d97a7cbe3c9b1563c4b90a44b97322060 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 29 Aug 2022 16:17:32 +0200 Subject: [PATCH 16/32] Review fixes --- src/Common/FileCache.cpp | 42 ++++++++++++++++++++------------------ src/Common/FileCache.h | 4 ++-- src/Common/FileSegment.cpp | 38 +++++++++++++++++----------------- src/Common/FileSegment.h | 2 +- 4 files changed, 44 insertions(+), 42 deletions(-) diff --git a/src/Common/FileCache.cpp b/src/Common/FileCache.cpp index 48c2276c741..44ecac2cc02 100644 --- a/src/Common/FileCache.cpp +++ b/src/Common/FileCache.cpp @@ -71,7 +71,7 @@ bool FileCache::isReadOnly() return !isQueryInitialized(); } -void FileCache::assertInitialized() const +void FileCache::assertInitialized(std::lock_guard & /* cache_lock */) const { if (!is_initialized) { @@ -118,7 +118,6 @@ void FileCache::useCell( fs::path path = file_segment->getPathInLocalCache(); if (!fs::exists(path)) { - remove(file_segment, cache_lock); throw Exception( ErrorCodes::LOGICAL_ERROR, "File path does not exist, but file has DOWNLOADED state. {}", @@ -127,7 +126,6 @@ void FileCache::useCell( if (fs::file_size(path) == 0) { - remove(file_segment, cache_lock); throw Exception( ErrorCodes::LOGICAL_ERROR, "Cannot have zero size downloaded file segments. {}", @@ -373,16 +371,16 @@ void FileCache::fillHolesWithEmptyFileSegments( FileSegmentsHolder FileCache::getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent) { - assertInitialized(); - - FileSegment::Range range(offset, offset + size - 1); - std::lock_guard cache_lock(mutex); + assertInitialized(cache_lock); + #ifndef NDEBUG assertCacheCorrectness(key, cache_lock); #endif + FileSegment::Range range(offset, offset + size - 1); + /// Get all segments which intersect with the given range. auto file_segments = getImpl(key, range, cache_lock); @@ -401,16 +399,16 @@ FileSegmentsHolder FileCache::getOrSet(const Key & key, size_t offset, size_t si FileSegmentsHolder FileCache::get(const Key & key, size_t offset, size_t size) { - assertInitialized(); - - FileSegment::Range range(offset, offset + size - 1); - std::lock_guard cache_lock(mutex); + assertInitialized(cache_lock); + #ifndef NDEBUG assertCacheCorrectness(key, cache_lock); #endif + FileSegment::Range range(offset, offset + size - 1); + /// Get all segments which intersect with the given range. auto file_segments = getImpl(key, range, cache_lock); @@ -791,10 +789,10 @@ bool FileCache::tryReserveForMainList( void FileCache::removeIfExists(const Key & key) { - assertInitialized(); - std::lock_guard cache_lock(mutex); + assertInitialized(cache_lock); + auto it = files.find(key); if (it == files.end()) return; @@ -900,15 +898,19 @@ void FileCache::remove( { LOG_DEBUG(log, "Remove from cache. Key: {}, offset: {}", key.toString(), offset); - auto * cell = getCell(key, offset, cache_lock); - if (!cell) - throw Exception(ErrorCodes::LOGICAL_ERROR, "No cache cell for key: {}, offset: {}", key.toString(), offset); + String cache_file_path; - auto cache_file_path = cell->file_segment->getPathInLocalCache(); - - if (cell->queue_iterator) { - cell->queue_iterator->removeAndGetNext(cache_lock); + auto * cell = getCell(key, offset, cache_lock); + if (!cell) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No cache cell for key: {}, offset: {}", key.toString(), offset); + + if (cell->queue_iterator) + { + cell->queue_iterator->removeAndGetNext(cache_lock); + } + + cache_file_path = cell->file_segment->getPathInLocalCache(); } auto & offsets = files[key]; diff --git a/src/Common/FileCache.h b/src/Common/FileCache.h index e1a06dc48e4..b5b1e917e76 100644 --- a/src/Common/FileCache.h +++ b/src/Common/FileCache.h @@ -170,7 +170,7 @@ private: std::lock_guard & cache_lock, std::lock_guard & segment_lock); - void assertInitialized() const; + void assertInitialized(std::lock_guard & cache_lock) const; struct FileSegmentCell : private boost::noncopyable { @@ -226,7 +226,7 @@ private: bool is_persistent, std::lock_guard & cache_lock); - void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard & cache_lock); + static void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard & cache_lock); bool tryReserveForMainList( const Key & key, diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index 7d4ddc7293e..10c89e21c3e 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -78,19 +78,19 @@ FileSegment::State FileSegment::state() const size_t FileSegment::getDownloadOffset() const { std::lock_guard segment_lock(mutex); - return range().left + getDownloadedSize(segment_lock); + return range().left + getDownloadedSizeUnlocked(segment_lock); } size_t FileSegment::getDownloadedSize() const { std::lock_guard segment_lock(mutex); - return getDownloadedSize(segment_lock); + return getDownloadedSizeUnlocked(segment_lock); } size_t FileSegment::getRemainingSizeToDownload() const { std::lock_guard segment_lock(mutex); - return range().size() - getDownloadedSize(segment_lock); + return range().size() - getDownloadedSizeUnlocked(segment_lock); } bool FileSegment::isDetached() const @@ -99,7 +99,7 @@ bool FileSegment::isDetached() const return is_detached; } -size_t FileSegment::getDownloadedSize(std::lock_guard & /* segment_lock */) const +size_t FileSegment::getDownloadedSizeUnlocked(std::lock_guard & /* segment_lock */) const { if (download_state == State::DOWNLOADED) return downloaded_size; @@ -159,7 +159,7 @@ void FileSegment::resetDownloader() void FileSegment::resetDownloaderImpl(std::lock_guard & segment_lock) { - if (getDownloadedSize(segment_lock) == range().size()) + if (getDownloadedSizeUnlocked(segment_lock) == range().size()) setDownloaded(segment_lock); else download_state = State::PARTIALLY_DOWNLOADED; @@ -333,9 +333,9 @@ FileSegment::State FileSegment::wait() return download_state; } -bool FileSegment::reserve(size_t size) +bool FileSegment::reserve(size_t size_to_reserve) { - if (!size) + if (!size_to_reserve) throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed"); { @@ -352,13 +352,13 @@ bool FileSegment::reserve(size_t size) caller_id, downloader_id); } - size_t current_downloaded_size = getDownloadedSize(segment_lock); - if (current_downloaded_size + size > range().size()) + size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock); + if (current_downloaded_size + size_to_reserve > range().size()) { throw Exception( ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Attempt to reserve space too much space: {} ({})", - size, getInfoForLogImpl(segment_lock)); + size_to_reserve, getInfoForLogImpl(segment_lock)); } assert(reserved_size >= current_downloaded_size); @@ -372,14 +372,14 @@ bool FileSegment::reserve(size_t size) size_t current_downloaded_size = getDownloadedSize(); assert(reserved_size >= current_downloaded_size); - size_t free_space = reserved_size - current_downloaded_size; + size_t already_reserved_size = reserved_size - current_downloaded_size; - bool reserved = free_space >= size; + bool reserved = already_reserved_size >= size_to_reserve; if (!reserved) { std::lock_guard cache_lock(cache->mutex); - size_t size_to_reserve = size - free_space; + size_to_reserve = size_to_reserve - already_reserved_size; reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock); if (reserved) @@ -420,7 +420,7 @@ void FileSegment::setDownloaded([[maybe_unused]] std::lock_guard & s download_state = State::DOWNLOADED; is_downloaded = true; - assert(getDownloadedSize(segment_lock) > 0); + assert(getDownloadedSizeUnlocked(segment_lock) > 0); assert(std::filesystem::file_size(getPathInLocalCache()) > 0); } @@ -454,7 +454,7 @@ void FileSegment::completeBatchAndResetDownloader() resetDownloaderImpl(segment_lock); - LOG_TEST(log, "Complete batch. Current downloaded size: {}", getDownloadedSize(segment_lock)); + LOG_TEST(log, "Complete batch. Current downloaded size: {}", getDownloadedSizeUnlocked(segment_lock)); cv.notify_all(); } @@ -503,7 +503,7 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach bool is_downloader = isDownloaderImpl(segment_lock); bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock); bool can_update_segment_state = is_downloader || is_last_holder; - size_t current_downloaded_size = getDownloadedSize(segment_lock); + size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock); SCOPE_EXIT({ if (is_downloader) @@ -535,7 +535,7 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach } case State::DOWNLOADED: { - assert(getDownloadedSize(segment_lock) == range().size()); + assert(getDownloadedSizeUnlocked(segment_lock) == range().size()); assert(isDownloadedUnlocked(segment_lock)); break; } @@ -598,7 +598,7 @@ String FileSegment::getInfoForLogImpl(std::lock_guard & segment_lock info << "File segment: " << range().toString() << ", "; info << "key: " << key().toString() << ", "; info << "state: " << download_state << ", "; - info << "downloaded size: " << getDownloadedSize(segment_lock) << ", "; + info << "downloaded size: " << getDownloadedSizeUnlocked(segment_lock) << ", "; info << "reserved size: " << reserved_size << ", "; info << "downloader id: " << (downloader_id.empty() ? "None" : downloader_id) << ", "; info << "caller id: " << getCallerId() << ", "; @@ -694,7 +694,7 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std snapshot->hits_count = file_segment->getHitsCount(); snapshot->ref_count = file_segment.use_count(); - snapshot->downloaded_size = file_segment->getDownloadedSize(segment_lock); + snapshot->downloaded_size = file_segment->getDownloadedSizeUnlocked(segment_lock); snapshot->download_state = file_segment->download_state; snapshot->is_persistent = file_segment->isPersistent(); diff --git a/src/Common/FileSegment.h b/src/Common/FileSegment.h index b0e81e0f268..8b826576e1f 100644 --- a/src/Common/FileSegment.h +++ b/src/Common/FileSegment.h @@ -171,7 +171,7 @@ public: private: size_t availableSize() const { return reserved_size - downloaded_size; } - size_t getDownloadedSize(std::lock_guard & segment_lock) const; + size_t getDownloadedSizeUnlocked(std::lock_guard & segment_lock) const; String getInfoForLogImpl(std::lock_guard & segment_lock) const; void assertCorrectnessImpl(std::lock_guard & segment_lock) const; bool hasFinalizedState() const; From ff2db8e2a72d10b4791c7c364a7a8c0a0de7b32d Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 29 Aug 2022 16:46:21 +0200 Subject: [PATCH 17/32] update submodule --- contrib/libuv | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/libuv b/contrib/libuv index eede294dc4c..3a85b2eb3d8 160000 --- a/contrib/libuv +++ b/contrib/libuv @@ -1 +1 @@ -Subproject commit eede294dc4cd710490c63e2eee4229d160ff9eee +Subproject commit 3a85b2eb3d83f369b8a8cafd329d7e9dc28f60cf From cfe509c3decb4ae2edb65610b6e24c9ff7eded2c Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 29 Aug 2022 17:49:01 +0200 Subject: [PATCH 18/32] Block overcommit tracker in ProcessList near allocations --- src/Interpreters/ProcessList.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index 9643e2750ba..c75dfd41e98 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -270,6 +270,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as ProcessListEntry::~ProcessListEntry() { + OvercommitTrackerBlockerInThread overcommit_tracker_blocker; std::lock_guard lock(parent.mutex); String user = it->getClientInfo().current_user; @@ -497,6 +498,7 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev Info per_query_infos; std::lock_guard lock(mutex); + OvercommitTrackerBlockerInThread overcommit_tracker_blocker; per_query_infos.reserve(processes.size()); for (const auto & process : processes) @@ -532,6 +534,7 @@ ProcessList::UserInfo ProcessList::getUserInfo(bool get_profile_events) const UserInfo per_user_infos; std::lock_guard lock(mutex); + OvercommitTrackerBlockerInThread overcommit_tracker_blocker; per_user_infos.reserve(user_to_queries.size()); From db2bc31e17772ac382f3d93f63a1636b1540d73a Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 29 Aug 2022 19:32:47 +0200 Subject: [PATCH 19/32] Remove incorrect assertion --- src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index dac59c596f5..526611a663a 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -703,22 +703,6 @@ bool CachedOnDiskReadBufferFromFile::updateImplementationBufferIfNeeded() size_t download_offset = file_segment->getDownloadOffset(); bool cached_part_is_finished = download_offset == file_offset_of_buffer_end; -#ifndef NDEBUG - size_t cache_file_size = getFileSizeFromReadBuffer(*implementation_buffer); - size_t cache_file_read_offset = implementation_buffer->getFileOffsetOfBufferEnd(); - size_t implementation_buffer_finished = cache_file_size == cache_file_read_offset; - - if (cached_part_is_finished != implementation_buffer_finished) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Incorrect state of buffers. Current download offset: {}, file offset of buffer end: {}, " - "cache file size: {}, cache file offset: {}, file segment info: {}", - download_offset, file_offset_of_buffer_end, cache_file_size, cache_file_read_offset, - file_segment->getInfoForLog()); - } -#endif - if (cached_part_is_finished) { /// TODO: makes sense to reuse local file reader if we return here with CACHED read type again? From 116931558077887e546afb3762fd4f60b859bdc5 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 29 Aug 2022 19:44:05 +0200 Subject: [PATCH 20/32] Add OvercommitTracker blocking --- src/Interpreters/ProcessList.cpp | 13 +++++-------- src/Interpreters/ProcessList.h | 8 ++++++++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index c75dfd41e98..93be4708dd9 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -270,8 +270,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as ProcessListEntry::~ProcessListEntry() { - OvercommitTrackerBlockerInThread overcommit_tracker_blocker; - std::lock_guard lock(parent.mutex); + auto lock = parent.safeLock(); String user = it->getClientInfo().current_user; String query_id = it->getClientInfo().current_query_id; @@ -432,7 +431,7 @@ QueryStatus * ProcessList::tryGetProcessListElement(const String & current_query CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill) { - std::lock_guard lock(mutex); + auto lock = safeLock(); QueryStatus * elem = tryGetProcessListElement(current_query_id, current_user); @@ -445,7 +444,7 @@ CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, void ProcessList::killAllQueries() { - std::lock_guard lock(mutex); + auto lock = safeLock(); for (auto & process : processes) process.cancelQuery(true); @@ -497,8 +496,7 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev { Info per_query_infos; - std::lock_guard lock(mutex); - OvercommitTrackerBlockerInThread overcommit_tracker_blocker; + auto lock = safeLock(); per_query_infos.reserve(processes.size()); for (const auto & process : processes) @@ -533,8 +531,7 @@ ProcessList::UserInfo ProcessList::getUserInfo(bool get_profile_events) const { UserInfo per_user_infos; - std::lock_guard lock(mutex); - OvercommitTrackerBlockerInThread overcommit_tracker_blocker; + auto lock = safeLock(); per_user_infos.reserve(user_to_queries.size()); diff --git a/src/Interpreters/ProcessList.h b/src/Interpreters/ProcessList.h index 51cd7eb98d9..49a39d20913 100644 --- a/src/Interpreters/ProcessList.h +++ b/src/Interpreters/ProcessList.h @@ -310,6 +310,14 @@ protected: mutable std::mutex mutex; mutable std::condition_variable have_space; /// Number of currently running queries has become less than maximum. + struct LockAndBlocker + { + std::lock_guard guard; + OvercommitTrackerBlockerInThread blocker; + }; + + LockAndBlocker safeLock() const noexcept { return { std::lock_guard{mutex}, {} }; } + /// List of queries Container processes; size_t max_size = 0; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown. From 961365c7a40eccc913ff91e289c59a619fd24ece Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 29 Aug 2022 15:11:39 -0300 Subject: [PATCH 21/32] Fix CaresPTRResolver not reading hosts file --- src/Access/Common/AllowedClientHosts.cpp | 4 +- src/Common/CaresPTRResolver.cpp | 26 +++++++---- src/Common/CaresPTRResolver.h | 8 ++-- src/Common/DNSPTRResolver.h | 6 +-- src/Common/DNSResolver.cpp | 4 +- src/Common/DNSResolver.h | 2 +- .../__init__.py | 0 .../configs/host_regexp.xml | 11 +++++ .../configs/listen_host.xml | 5 +++ .../test.py | 45 +++++++++++++++++++ 10 files changed, 91 insertions(+), 20 deletions(-) create mode 100644 tests/integration/test_host_regexp_hosts_file_resolution/__init__.py create mode 100644 tests/integration/test_host_regexp_hosts_file_resolution/configs/host_regexp.xml create mode 100644 tests/integration/test_host_regexp_hosts_file_resolution/configs/listen_host.xml create mode 100644 tests/integration/test_host_regexp_hosts_file_resolution/test.py diff --git a/src/Access/Common/AllowedClientHosts.cpp b/src/Access/Common/AllowedClientHosts.cpp index efbdf3924e8..2f8151bf757 100644 --- a/src/Access/Common/AllowedClientHosts.cpp +++ b/src/Access/Common/AllowedClientHosts.cpp @@ -110,7 +110,7 @@ namespace } /// Returns the host name by its address. - Strings getHostsByAddress(const IPAddress & address) + std::unordered_set getHostsByAddress(const IPAddress & address) { auto hosts = DNSResolver::instance().reverseResolve(address); @@ -526,7 +526,7 @@ bool AllowedClientHosts::contains(const IPAddress & client_address) const return true; /// Check `name_regexps`. - std::optional resolved_hosts; + std::optional> resolved_hosts; auto check_name_regexp = [&](const String & name_regexp_) { try diff --git a/src/Common/CaresPTRResolver.cpp b/src/Common/CaresPTRResolver.cpp index 376d3665f7e..e5d48b864c8 100644 --- a/src/Common/CaresPTRResolver.cpp +++ b/src/Common/CaresPTRResolver.cpp @@ -15,13 +15,23 @@ namespace DB static void callback(void * arg, int status, int, struct hostent * host) { - auto * ptr_records = reinterpret_cast*>(arg); + auto * ptr_records = reinterpret_cast*>(arg); if (status == ARES_SUCCESS && host->h_aliases) { + /* + * In some cases (e.g /etc/hosts), hostent::h_name is filled and hostent::h_aliases is empty. + * Thus, we can't rely solely on hostent::h_aliases. More info on: + * https://github.com/ClickHouse/ClickHouse/issues/40595#issuecomment-1230526931 + * */ + if (auto * ptr_record = host->h_name) + { + ptr_records->insert(ptr_record); + } + int i = 0; while (auto * ptr_record = host->h_aliases[i]) { - ptr_records->emplace_back(ptr_record); + ptr_records->insert(ptr_record); i++; } } @@ -58,9 +68,9 @@ namespace DB * */ } - std::vector CaresPTRResolver::resolve(const std::string & ip) + std::unordered_set CaresPTRResolver::resolve(const std::string & ip) { - std::vector ptr_records; + std::unordered_set ptr_records; resolve(ip, ptr_records); wait(); @@ -68,9 +78,9 @@ namespace DB return ptr_records; } - std::vector CaresPTRResolver::resolve_v6(const std::string & ip) + std::unordered_set CaresPTRResolver::resolve_v6(const std::string & ip) { - std::vector ptr_records; + std::unordered_set ptr_records; resolve_v6(ip, ptr_records); wait(); @@ -78,7 +88,7 @@ namespace DB return ptr_records; } - void CaresPTRResolver::resolve(const std::string & ip, std::vector & response) + void CaresPTRResolver::resolve(const std::string & ip, std::unordered_set & response) { in_addr addr; @@ -87,7 +97,7 @@ namespace DB ares_gethostbyaddr(channel, reinterpret_cast(&addr), sizeof(addr), AF_INET, callback, &response); } - void CaresPTRResolver::resolve_v6(const std::string & ip, std::vector & response) + void CaresPTRResolver::resolve_v6(const std::string & ip, std::unordered_set & response) { in6_addr addr; inet_pton(AF_INET6, ip.c_str(), &addr); diff --git a/src/Common/CaresPTRResolver.h b/src/Common/CaresPTRResolver.h index fd6a1cf7bc5..e5182d34682 100644 --- a/src/Common/CaresPTRResolver.h +++ b/src/Common/CaresPTRResolver.h @@ -25,16 +25,16 @@ namespace DB explicit CaresPTRResolver(provider_token); ~CaresPTRResolver() override; - std::vector resolve(const std::string & ip) override; + std::unordered_set resolve(const std::string & ip) override; - std::vector resolve_v6(const std::string & ip) override; + std::unordered_set resolve_v6(const std::string & ip) override; private: void wait(); - void resolve(const std::string & ip, std::vector & response); + void resolve(const std::string & ip, std::unordered_set & response); - void resolve_v6(const std::string & ip, std::vector & response); + void resolve_v6(const std::string & ip, std::unordered_set & response); ares_channel channel; }; diff --git a/src/Common/DNSPTRResolver.h b/src/Common/DNSPTRResolver.h index e6cce83f79d..38f890ae29a 100644 --- a/src/Common/DNSPTRResolver.h +++ b/src/Common/DNSPTRResolver.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include namespace DB { @@ -10,9 +10,9 @@ namespace DB virtual ~DNSPTRResolver() = default; - virtual std::vector resolve(const std::string & ip) = 0; + virtual std::unordered_set resolve(const std::string & ip) = 0; - virtual std::vector resolve_v6(const std::string & ip) = 0; + virtual std::unordered_set resolve_v6(const std::string & ip) = 0; }; } diff --git a/src/Common/DNSResolver.cpp b/src/Common/DNSResolver.cpp index cf61d2795f0..67d87f757c7 100644 --- a/src/Common/DNSResolver.cpp +++ b/src/Common/DNSResolver.cpp @@ -136,7 +136,7 @@ static DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host) return addresses; } -static Strings reverseResolveImpl(const Poco::Net::IPAddress & address) +static std::unordered_set reverseResolveImpl(const Poco::Net::IPAddress & address) { auto ptr_resolver = DB::DNSPTRResolverProvider::get(); @@ -234,7 +234,7 @@ std::vector DNSResolver::resolveAddressList(const std: return addresses; } -Strings DNSResolver::reverseResolve(const Poco::Net::IPAddress & address) +std::unordered_set DNSResolver::reverseResolve(const Poco::Net::IPAddress & address) { if (impl->disable_cache) return reverseResolveImpl(address); diff --git a/src/Common/DNSResolver.h b/src/Common/DNSResolver.h index 84c88586636..83de616d81a 100644 --- a/src/Common/DNSResolver.h +++ b/src/Common/DNSResolver.h @@ -37,7 +37,7 @@ public: std::vector resolveAddressList(const std::string & host, UInt16 port); /// Accepts host IP and resolves its host names - Strings reverseResolve(const Poco::Net::IPAddress & address); + std::unordered_set reverseResolve(const Poco::Net::IPAddress & address); /// Get this server host name String getHostName(); diff --git a/tests/integration/test_host_regexp_hosts_file_resolution/__init__.py b/tests/integration/test_host_regexp_hosts_file_resolution/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_host_regexp_hosts_file_resolution/configs/host_regexp.xml b/tests/integration/test_host_regexp_hosts_file_resolution/configs/host_regexp.xml new file mode 100644 index 00000000000..7a2141e6c7e --- /dev/null +++ b/tests/integration/test_host_regexp_hosts_file_resolution/configs/host_regexp.xml @@ -0,0 +1,11 @@ + + + + + + test1\.example\.com$ + + default + + + \ No newline at end of file diff --git a/tests/integration/test_host_regexp_hosts_file_resolution/configs/listen_host.xml b/tests/integration/test_host_regexp_hosts_file_resolution/configs/listen_host.xml new file mode 100644 index 00000000000..58ef55cd3f3 --- /dev/null +++ b/tests/integration/test_host_regexp_hosts_file_resolution/configs/listen_host.xml @@ -0,0 +1,5 @@ + + :: + 0.0.0.0 + 1 + diff --git a/tests/integration/test_host_regexp_hosts_file_resolution/test.py b/tests/integration/test_host_regexp_hosts_file_resolution/test.py new file mode 100644 index 00000000000..1aa48e79484 --- /dev/null +++ b/tests/integration/test_host_regexp_hosts_file_resolution/test.py @@ -0,0 +1,45 @@ +import pytest +from helpers.cluster import ClickHouseCluster, get_docker_compose_path, run_and_check +import os + +DOCKER_COMPOSE_PATH = get_docker_compose_path() +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + +cluster = ClickHouseCluster(__file__) + +ch_server = cluster.add_instance( + "clickhouse-server", + main_configs=["configs/listen_host.xml"], + user_configs=["configs/host_regexp.xml"], +) + +client = cluster.add_instance( + "clickhouse-client", +) + + +def build_endpoint_v4(ip): + return f"'http://{ip}:8123/?query=SELECT+1&user=test_dns'" + + +@pytest.fixture(scope="module") +def started_cluster(): + global cluster + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def test_host_regexp_multiple_ptr_hosts_file_v4(started_cluster): + server_ip = cluster.get_instance_ip("clickhouse-server") + + ch_server.exec_in_container( + (["bash", "-c", f"echo '{server_ip} test1.example.com' > /etc/hosts"]) + ) + + endpoint = build_endpoint_v4(server_ip) + + assert "1\n" == client.exec_in_container((["bash", "-c", f"curl {endpoint}"])) From 865ee5d0d6db8027aa5ba37e827f0acb280b6cff Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 29 Aug 2022 20:24:35 +0200 Subject: [PATCH 22/32] Refactor code --- src/Common/OvercommitTracker.cpp | 17 +++++++-------- src/Common/OvercommitTracker.h | 31 ++++++++++++-------------- src/Interpreters/ProcessList.cpp | 3 +-- src/Interpreters/ProcessList.h | 37 +++++++++++++++++++++----------- 4 files changed, 47 insertions(+), 41 deletions(-) diff --git a/src/Common/OvercommitTracker.cpp b/src/Common/OvercommitTracker.cpp index 3da18e702e5..c7730667f55 100644 --- a/src/Common/OvercommitTracker.cpp +++ b/src/Common/OvercommitTracker.cpp @@ -14,10 +14,10 @@ using namespace std::chrono_literals; constexpr std::chrono::microseconds ZERO_MICROSEC = 0us; -OvercommitTracker::OvercommitTracker(std::mutex & global_mutex_) +OvercommitTracker::OvercommitTracker(DB::ProcessList * process_list_) : picked_tracker(nullptr) + , process_list(process_list_) , cancellation_state(QueryCancellationState::NONE) - , global_mutex(global_mutex_) , freed_memory(0) , required_memory(0) , next_id(0) @@ -33,11 +33,11 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int return OvercommitResult::NONE; // NOTE: Do not change the order of locks // - // global_mutex must be acquired before overcommit_m, because + // global mutex must be acquired before overcommit_m, because // method OvercommitTracker::onQueryStop(MemoryTracker *) is - // always called with already acquired global_mutex in + // always called with already acquired global mutex in // ProcessListEntry::~ProcessListEntry(). - std::unique_lock global_lock(global_mutex); + auto global_lock = process_list->unsafeLock(); std::unique_lock lk(overcommit_m); size_t id = next_id++; @@ -137,8 +137,8 @@ void OvercommitTracker::releaseThreads() cv.notify_all(); } -UserOvercommitTracker::UserOvercommitTracker(DB::ProcessList * process_list, DB::ProcessListForUser * user_process_list_) - : OvercommitTracker(process_list->mutex) +UserOvercommitTracker::UserOvercommitTracker(DB::ProcessList * process_list_, DB::ProcessListForUser * user_process_list_) + : OvercommitTracker(process_list_) , user_process_list(user_process_list_) {} @@ -169,8 +169,7 @@ void UserOvercommitTracker::pickQueryToExcludeImpl() } GlobalOvercommitTracker::GlobalOvercommitTracker(DB::ProcessList * process_list_) - : OvercommitTracker(process_list_->mutex) - , process_list(process_list_) + : OvercommitTracker(process_list_) {} void GlobalOvercommitTracker::pickQueryToExcludeImpl() diff --git a/src/Common/OvercommitTracker.h b/src/Common/OvercommitTracker.h index 684971b4205..64fb6cdc926 100644 --- a/src/Common/OvercommitTracker.h +++ b/src/Common/OvercommitTracker.h @@ -36,6 +36,12 @@ struct OvercommitRatio class MemoryTracker; +namespace DB +{ + class ProcessList; + struct ProcessListForUser; +} + enum class OvercommitResult { NONE, @@ -71,7 +77,7 @@ struct OvercommitTracker : boost::noncopyable virtual ~OvercommitTracker() = default; protected: - explicit OvercommitTracker(std::mutex & global_mutex_); + explicit OvercommitTracker(DB::ProcessList * process_list_); virtual void pickQueryToExcludeImpl() = 0; @@ -86,6 +92,12 @@ protected: // overcommit tracker is in SELECTED state. MemoryTracker * picked_tracker; + // Global mutex stored in ProcessList is used to synchronize + // insertion and deletion of queries. + // OvercommitTracker::pickQueryToExcludeImpl() implementations + // require this mutex to be locked, because they read list (or sublist) + // of queries. + DB::ProcessList * process_list; private: void pickQueryToExclude() @@ -113,12 +125,6 @@ private: QueryCancellationState cancellation_state; - // Global mutex which is used in ProcessList to synchronize - // insertion and deletion of queries. - // OvercommitTracker::pickQueryToExcludeImpl() implementations - // require this mutex to be locked, because they read list (or sublist) - // of queries. - std::mutex & global_mutex; Int64 freed_memory; Int64 required_memory; @@ -128,15 +134,9 @@ private: bool allow_release; }; -namespace DB -{ - class ProcessList; - struct ProcessListForUser; -} - struct UserOvercommitTracker : OvercommitTracker { - explicit UserOvercommitTracker(DB::ProcessList * process_list, DB::ProcessListForUser * user_process_list_); + explicit UserOvercommitTracker(DB::ProcessList * process_list_, DB::ProcessListForUser * user_process_list_); ~UserOvercommitTracker() override = default; @@ -155,9 +155,6 @@ struct GlobalOvercommitTracker : OvercommitTracker protected: void pickQueryToExcludeImpl() override; - -private: - DB::ProcessList * process_list; }; // This class is used to disallow tracking during logging to avoid deadlocks. diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index 93be4708dd9..d0ffeda49e4 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -82,8 +82,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as bool is_unlimited_query = isUnlimitedQuery(ast); { - std::unique_lock lock(mutex); - OvercommitTrackerBlockerInThread overcommit_blocker; // To avoid deadlock in case of OOM + auto [lock, overcommit_blocker] = safeLock(); // To avoid deadlock in case of OOM IAST::QueryKind query_kind = ast->getQueryKind(); const auto queue_max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds(); diff --git a/src/Interpreters/ProcessList.h b/src/Interpreters/ProcessList.h index 49a39d20913..542f7884c09 100644 --- a/src/Interpreters/ProcessList.h +++ b/src/Interpreters/ProcessList.h @@ -285,7 +285,26 @@ public: }; -class ProcessList +class ProcessListBase +{ + mutable std::mutex mutex; + +protected: + using Lock = std::unique_lock; + struct LockAndBlocker + { + Lock lock; + OvercommitTrackerBlockerInThread blocker; + }; + + // It is forbiden to do allocations/deallocations with acqiured mutex and + // enabled OvercommitTracker. This leads to deadlock in the case of OOM. + LockAndBlocker safeLock() const noexcept { return { std::unique_lock{mutex}, {} }; } + Lock unsafeLock() const noexcept { return std::unique_lock{mutex}; } +}; + + +class ProcessList : public ProcessListBase { public: using Element = QueryStatus; @@ -304,20 +323,12 @@ public: protected: friend class ProcessListEntry; + friend struct ::OvercommitTracker; friend struct ::UserOvercommitTracker; friend struct ::GlobalOvercommitTracker; - mutable std::mutex mutex; mutable std::condition_variable have_space; /// Number of currently running queries has become less than maximum. - struct LockAndBlocker - { - std::lock_guard guard; - OvercommitTrackerBlockerInThread blocker; - }; - - LockAndBlocker safeLock() const noexcept { return { std::lock_guard{mutex}, {} }; } - /// List of queries Container processes; size_t max_size = 0; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown. @@ -368,19 +379,19 @@ public: void setMaxSize(size_t max_size_) { - std::lock_guard lock(mutex); + auto lock = unsafeLock(); max_size = max_size_; } void setMaxInsertQueriesAmount(size_t max_insert_queries_amount_) { - std::lock_guard lock(mutex); + auto lock = unsafeLock(); max_insert_queries_amount = max_insert_queries_amount_; } void setMaxSelectQueriesAmount(size_t max_select_queries_amount_) { - std::lock_guard lock(mutex); + auto lock = unsafeLock(); max_select_queries_amount = max_select_queries_amount_; } From e25ed9547e77aa458aac2acb536561f61b2d73d8 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 29 Aug 2022 20:26:37 +0200 Subject: [PATCH 23/32] Update src/Interpreters/ProcessList.h --- src/Interpreters/ProcessList.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/ProcessList.h b/src/Interpreters/ProcessList.h index 542f7884c09..203a1f114df 100644 --- a/src/Interpreters/ProcessList.h +++ b/src/Interpreters/ProcessList.h @@ -297,7 +297,7 @@ protected: OvercommitTrackerBlockerInThread blocker; }; - // It is forbiden to do allocations/deallocations with acqiured mutex and + // It is forbidden to do allocations/deallocations with acquired mutex and // enabled OvercommitTracker. This leads to deadlock in the case of OOM. LockAndBlocker safeLock() const noexcept { return { std::unique_lock{mutex}, {} }; } Lock unsafeLock() const noexcept { return std::unique_lock{mutex}; } From dd49b44abb8531a818db8480ad522e12e9d7fe8d Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 29 Aug 2022 15:58:18 -0300 Subject: [PATCH 24/32] Fix host_regexp hosts file tst --- .../integration/test_host_regexp_hosts_file_resolution/test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_host_regexp_hosts_file_resolution/test.py b/tests/integration/test_host_regexp_hosts_file_resolution/test.py index 1aa48e79484..3fd9e65cf2a 100644 --- a/tests/integration/test_host_regexp_hosts_file_resolution/test.py +++ b/tests/integration/test_host_regexp_hosts_file_resolution/test.py @@ -35,9 +35,10 @@ def started_cluster(): def test_host_regexp_multiple_ptr_hosts_file_v4(started_cluster): server_ip = cluster.get_instance_ip("clickhouse-server") + client_ip = cluster.get_instance_ip("clickhouse-client") ch_server.exec_in_container( - (["bash", "-c", f"echo '{server_ip} test1.example.com' > /etc/hosts"]) + (["bash", "-c", f"echo '{client_ip} test1.example.com' > /etc/hosts"]) ) endpoint = build_endpoint_v4(server_ip) From 80015600826ceefe8af4dba8a44c428beb65caaf Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 30 Aug 2022 00:14:00 +0200 Subject: [PATCH 25/32] Fix test --- tests/integration/test_mysql_protocol/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_mysql_protocol/test.py b/tests/integration/test_mysql_protocol/test.py index 1338b0b2378..02928a24fbd 100644 --- a/tests/integration/test_mysql_protocol/test.py +++ b/tests/integration/test_mysql_protocol/test.py @@ -250,7 +250,7 @@ def test_mysql_client_exception(started_cluster): expected_msg = "\n".join( [ "mysql: [Warning] Using a password on the command line interface can be insecure.", - "ERROR 1000 (00000) at line 1: Poco::Exception. Code: 1000, e.code() = 0, Exception: Connections to all replicas failed: default@127.0.0.1:10086 as user default", + "ERROR 1000 (00000) at line 1: Poco::Exception. Code: 1000, e.code() = 0, Exception: Connections to mysql failed: default@127.0.0.1:10086 as user default", ] ) assert stderr[: len(expected_msg)].decode() == expected_msg From ac41de73df41ed7c754bd9cad399936ae01effed Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 30 Aug 2022 00:17:01 +0200 Subject: [PATCH 26/32] Add cache to zero copy replication test --- .../configs/config.d/storage_conf.xml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/storage_conf.xml b/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/storage_conf.xml index f4a34256ef8..bd59694f65a 100644 --- a/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/storage_conf.xml +++ b/tests/integration/test_replicated_merge_tree_s3_zero_copy/configs/config.d/storage_conf.xml @@ -7,12 +7,18 @@ minio minio123 + + cache + s3 + 100000000 + ./cache_s3/ +
- s3 + cache_s3
From 14adea87925bdfc07a11ec9421d99af00544f295 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Tue, 30 Aug 2022 14:40:26 +0300 Subject: [PATCH 27/32] fix error in docs --- .../functions/string-functions.md | 2 +- .../functions/string-functions.md | 20 +++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/en/sql-reference/functions/string-functions.md b/docs/en/sql-reference/functions/string-functions.md index 6ce654496e4..f0bf5069292 100644 --- a/docs/en/sql-reference/functions/string-functions.md +++ b/docs/en/sql-reference/functions/string-functions.md @@ -523,7 +523,7 @@ Query: ``` sql SELECT base58Encode('Encoded'); -SELECT base58Encode('3dc8KtHrwM'); +SELECT base58Decode('3dc8KtHrwM'); ``` Result: diff --git a/docs/ru/sql-reference/functions/string-functions.md b/docs/ru/sql-reference/functions/string-functions.md index 0ab34153edb..c65774c4906 100644 --- a/docs/ru/sql-reference/functions/string-functions.md +++ b/docs/ru/sql-reference/functions/string-functions.md @@ -16,7 +16,7 @@ sidebar_label: "Функции для работы со строками" empty(x) ``` -Строка считается непустой, если содержит хотя бы один байт, пусть даже это пробел или нулевой байт. +Строка считается непустой, если содержит хотя бы один байт, пусть даже это пробел или нулевой байт. Функция также поддерживает работу с типами [Array](array-functions.md#function-empty) и [UUID](uuid-functions.md#empty). @@ -56,7 +56,7 @@ SELECT empty('text'); notEmpty(x) ``` -Строка считается непустой, если содержит хотя бы один байт, пусть даже это пробел или нулевой байт. +Строка считается непустой, если содержит хотя бы один байт, пусть даже это пробел или нулевой байт. Функция также поддерживает работу с типами [Array](array-functions.md#function-notempty) и [UUID](uuid-functions.md#notempty). @@ -491,21 +491,21 @@ SELECT concat(key1, key2), sum(value) FROM key_val GROUP BY (key1, key2); Возвращает сконвертированную из кодировки from в кодировку to строку s. -## Base58Encode(plaintext), Base58Decode(encoded_text) {#base58} +## base58Encode(plaintext), base58Decode(encoded_text) {#base58} Принимает на вход строку или колонку строк и кодирует/раскодирует их с помощью схемы кодирования [Base58](https://tools.ietf.org/id/draft-msporny-base58-01.html) с использованием стандартного алфавита Bitcoin. **Синтаксис** ```sql -encodeBase58(decoded) -decodeBase58(encoded) +base58Encode(decoded) +base58Decode(encoded) ``` **Аргументы** - `decoded` — Колонка или строка типа [String](../../sql-reference/data-types/string.md). -- `encoded` — Колонка или строка типа [String](../../sql-reference/data-types/string.md). Если входная строка не является корректным кодом для какой-либо другой строки, возникнет исключение `1001`. +- `encoded` — Колонка или строка типа [String](../../sql-reference/data-types/string.md). Если входная строка не является корректным кодом для какой-либо другой строки, возникнет исключение. **Возвращаемое значение** @@ -518,16 +518,16 @@ decodeBase58(encoded) Запрос: ``` sql -SELECT encodeBase58('encode'); -SELECT decodeBase58('izCFiDUY'); +SELECT base58Encode('encode'); +SELECT base58Decode('izCFiDUY'); ``` Результат: ```text -┌─encodeBase58('encode', 'flickr')─┐ +┌─base58Encode('encode', 'flickr')─┐ │ SvyTHb1D │ └──────────────────────────────────┘ -┌─decodeBase58('izCFiDUY', 'ripple')─┐ +┌─base58Decode('izCFiDUY', 'ripple')─┐ │ decode │ └────────────────────────────────────┘ ``` From 93f9abf130b40ed2f4bcad1b1efe06a7390cb8ed Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Tue, 30 Aug 2022 14:41:40 +0300 Subject: [PATCH 28/32] upd2 --- docs/en/sql-reference/functions/string-functions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/sql-reference/functions/string-functions.md b/docs/en/sql-reference/functions/string-functions.md index f0bf5069292..af265aba18f 100644 --- a/docs/en/sql-reference/functions/string-functions.md +++ b/docs/en/sql-reference/functions/string-functions.md @@ -495,7 +495,7 @@ If the ‘s’ string is non-empty and does not contain the ‘c’ character at Returns the string ‘s’ that was converted from the encoding in ‘from’ to the encoding in ‘to’. -## Base58Encode(plaintext), Base58Decode(encoded_text) +## base58Encode(plaintext), base58Decode(encoded_text) Accepts a String and encodes/decodes it using [Base58](https://tools.ietf.org/id/draft-msporny-base58-01.html) encoding scheme using "Bitcoin" alphabet. From c8c8428052c2006d93fd45d681f647ec65849e3b Mon Sep 17 00:00:00 2001 From: Vladimir C Date: Tue, 30 Aug 2022 13:49:21 +0200 Subject: [PATCH 29/32] Apply suggestions from code review --- .../sql-reference/functions/string-functions.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/ru/sql-reference/functions/string-functions.md b/docs/ru/sql-reference/functions/string-functions.md index c65774c4906..9638e25d488 100644 --- a/docs/ru/sql-reference/functions/string-functions.md +++ b/docs/ru/sql-reference/functions/string-functions.md @@ -518,18 +518,18 @@ base58Decode(encoded) Запрос: ``` sql -SELECT base58Encode('encode'); -SELECT base58Decode('izCFiDUY'); +SELECT base58Encode('Encoded'); +SELECT base58Decode('3dc8KtHrwM'); ``` Результат: ```text -┌─base58Encode('encode', 'flickr')─┐ -│ SvyTHb1D │ -└──────────────────────────────────┘ -┌─base58Decode('izCFiDUY', 'ripple')─┐ -│ decode │ -└────────────────────────────────────┘ +┌─base58Encode('Encoded')─┐ +│ 3dc8KtHrwM │ +└─────────────────────────┘ +┌─base58Decode('3dc8KtHrwM')─┐ +│ Encoded │ +└────────────────────────────┘ ``` ## base64Encode(s) {#base64encode} From 6fdfb964d0a70e5d2a78cb727bdf30d0ba5a1a34 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 30 Aug 2022 15:10:10 +0300 Subject: [PATCH 30/32] Revert "Add Annoy index" --- .gitmodules | 4 - contrib/CMakeLists.txt | 2 - contrib/annoy | 1 - contrib/annoy-cmake/CMakeLists.txt | 16 - .../mergetree-family/annindexes.md | 125 ---- .../mergetree-family/mergetree.md | 4 - src/CMakeLists.txt | 4 - src/Core/Settings.h | 2 - src/Storages/MergeTree/CommonANNIndexes.cpp | 595 ------------------ src/Storages/MergeTree/CommonANNIndexes.h | 236 ------- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 27 - .../MergeTree/MergeTreeIndexAnnoy.cpp | 317 ---------- src/Storages/MergeTree/MergeTreeIndexAnnoy.h | 123 ---- src/Storages/MergeTree/MergeTreeIndices.cpp | 5 - src/Storages/MergeTree/MergeTreeIndices.h | 5 - .../queries/0_stateless/02354_annoy.reference | 16 - tests/queries/0_stateless/02354_annoy.sql | 44 -- 17 files changed, 1526 deletions(-) delete mode 160000 contrib/annoy delete mode 100644 contrib/annoy-cmake/CMakeLists.txt delete mode 100644 docs/en/engines/table-engines/mergetree-family/annindexes.md delete mode 100644 src/Storages/MergeTree/CommonANNIndexes.cpp delete mode 100644 src/Storages/MergeTree/CommonANNIndexes.h delete mode 100644 src/Storages/MergeTree/MergeTreeIndexAnnoy.cpp delete mode 100644 src/Storages/MergeTree/MergeTreeIndexAnnoy.h delete mode 100644 tests/queries/0_stateless/02354_annoy.reference delete mode 100644 tests/queries/0_stateless/02354_annoy.sql diff --git a/.gitmodules b/.gitmodules index f372a309cad..62b2f9d7766 100644 --- a/.gitmodules +++ b/.gitmodules @@ -259,10 +259,6 @@ [submodule "contrib/minizip-ng"] path = contrib/minizip-ng url = https://github.com/zlib-ng/minizip-ng -[submodule "contrib/annoy"] - path = contrib/annoy - url = https://github.com/ClickHouse/annoy.git - branch = ClickHouse-master [submodule "contrib/qpl"] path = contrib/qpl url = https://github.com/intel/qpl.git diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 486fca60912..08b91c1b81c 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -159,8 +159,6 @@ add_contrib (s2geometry-cmake s2geometry) add_contrib (c-ares-cmake c-ares) add_contrib (qpl-cmake qpl) -add_contrib(annoy-cmake annoy) - # Put all targets defined here and in subdirectories under "contrib/" folders in GUI-based IDEs. # Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they would not appear # in "contrib/..." as originally planned, so we workaround this by fixing FOLDER properties of all targets manually, diff --git a/contrib/annoy b/contrib/annoy deleted file mode 160000 index 9d8a603a4cd..00000000000 --- a/contrib/annoy +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 9d8a603a4cd252448589e84c9846f94368d5a289 diff --git a/contrib/annoy-cmake/CMakeLists.txt b/contrib/annoy-cmake/CMakeLists.txt deleted file mode 100644 index f2535ba7fde..00000000000 --- a/contrib/annoy-cmake/CMakeLists.txt +++ /dev/null @@ -1,16 +0,0 @@ -option(ENABLE_ANNOY "Enable Annoy index support" ${ENABLE_LIBRARIES}) - -if ((NOT ENABLE_ANNOY) OR (SANITIZE STREQUAL "undefined")) - message (STATUS "Not using annoy") - return() -endif() - -set(ANNOY_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/annoy") -set(ANNOY_SOURCE_DIR "${ANNOY_PROJECT_DIR}/src") - -add_library(_annoy INTERFACE) -target_include_directories(_annoy SYSTEM INTERFACE ${ANNOY_SOURCE_DIR}) - -add_library(ch_contrib::annoy ALIAS _annoy) -target_compile_definitions(_annoy INTERFACE ENABLE_ANNOY) -target_compile_definitions(_annoy INTERFACE ANNOYLIB_MULTITHREADED_BUILD) diff --git a/docs/en/engines/table-engines/mergetree-family/annindexes.md b/docs/en/engines/table-engines/mergetree-family/annindexes.md deleted file mode 100644 index 6c669a4f7b6..00000000000 --- a/docs/en/engines/table-engines/mergetree-family/annindexes.md +++ /dev/null @@ -1,125 +0,0 @@ -# Approximate Nearest Neighbor Search Indexes [experimental] {#table_engines-ANNIndex} - -The main task that indexes achieve is to quickly find nearest neighbors for multidimensional data. An example of such a problem can be finding similar pictures (texts) for a given picture (text). That problem can be reduced to finding the nearest [embeddings](https://cloud.google.com/architecture/overview-extracting-and-serving-feature-embeddings-for-machine-learning). They can be created from data using [UDF](../../../sql-reference/functions/index.md#executable-user-defined-functions). - -The next query finds the closest neighbors in N-dimensional space using the L2 (Euclidean) distance: -``` sql -SELECT * -FROM table_name -WHERE L2Distance(Column, Point) < MaxDistance -LIMIT N -``` -But it will take some time for execution because of the long calculation of the distance between `TargetEmbedding` and all other vectors. This is where ANN indexes can help. They store a compact approximation of the search space (e.g. using clustering, search trees, etc.) and are able to compute approximate neighbors quickly. - -## Indexes Structure - -Approximate Nearest Neighbor Search Indexes (`ANNIndexes`) are similar to skip indexes. They are constructed by some granules and determine which of them should be skipped. Compared to skip indices, ANN indices use their results not only to skip some group of granules, but also to select particular granules from a set of granules. - -`ANNIndexes` are designed to speed up two types of queries: - -- ###### Type 1: Where - ``` sql - SELECT * - FROM table_name - WHERE DistanceFunction(Column, Point) < MaxDistance - LIMIT N - ``` -- ###### Type 2: Order by - ``` sql - SELECT * - FROM table_name [WHERE ...] - ORDER BY DistanceFunction(Column, Point) - LIMIT N - ``` - -In these queries, `DistanceFunction` is selected from [distance functions](../../../sql-reference/functions/distance-functions). `Point` is a known vector (something like `(0.1, 0.1, ... )`). To avoid writing large vectors, use [client parameters](../../../interfaces/cli.md#queries-with-parameters-cli-queries-with-parameters). `Value` - a float value that will bound the neighbourhood. - -!!! note "Note" - ANN index can't speed up query that satisfies both types(`where + order by`, only one of them). All queries must have the limit, as algorithms are used to find nearest neighbors and need a specific number of them. - -!!! note "Note" - Indexes are applied only to queries with a limit less than the `max_limit_for_ann_queries` setting. This helps to avoid memory overflows in queries with a large limit. `max_limit_for_ann_queries` setting can be changed if you know you can provide enough memory. The default value is `1000000`. - -Both types of queries are handled the same way. The indexes get `n` neighbors (where `n` is taken from the `LIMIT` clause) and work with them. In `ORDER BY` query they remember the numbers of all parts of the granule that have at least one of neighbor. In `WHERE` query they remember only those parts that satisfy the requirements. - - - -## Create table with ANNIndex - -```sql -CREATE TABLE t -( - `id` Int64, - `number` Tuple(Float32, Float32, Float32), - INDEX x number TYPE annoy GRANULARITY N -) -ENGINE = MergeTree -ORDER BY id; -``` - -```sql -CREATE TABLE t -( - `id` Int64, - `number` Array(Float32), - INDEX x number TYPE annoy GRANULARITY N -) -ENGINE = MergeTree -ORDER BY id; -``` - -With greater `GRANULARITY` indexes remember the data structure better. The `GRANULARITY` indicates how many granules will be used to construct the index. The more data is provided for the index, the more of it can be handled by one index and the more chances that with the right hyperparameters the index will remember the data structure better. But some indexes can't be built if they don't have enough data, so this granule will always participate in the query. For more information, see the description of indexes. - -As the indexes are built only during insertions into table, `INSERT` and `OPTIMIZE` queries are slower than for ordinary table. At this stage indexes remember all the information about the given data. ANNIndexes should be used if you have immutable or rarely changed data and many read requests. - -You can create your table with index which uses certain algorithm. Now only indices based on the following algorithms are supported: - -# Index list -- [Annoy](../../../engines/table-engines/mergetree-family/annindexes.md#annoy-annoy) - -# Annoy {#annoy} -Implementation of the algorithm was taken from [this repository](https://github.com/spotify/annoy). - -Short description of the algorithm: -The algorithm recursively divides in half all space by random linear surfaces (lines in 2D, planes in 3D e.t.c.). Thus it makes tree of polyhedrons and points that they contains. Repeating the operation several times for greater accuracy it creates a forest. -To find K Nearest Neighbours it goes down through the trees and fills the buffer of closest points using the priority queue of polyhedrons. Next, it sorts buffer and return the nearest K points. - -__Examples__: -```sql -CREATE TABLE t -( - id Int64, - number Tuple(Float32, Float32, Float32), - INDEX x number TYPE annoy(T) GRANULARITY N -) -ENGINE = MergeTree -ORDER BY id; -``` - -```sql -CREATE TABLE t -( - id Int64, - number Array(Float32), - INDEX x number TYPE annoy(T) GRANULARITY N -) -ENGINE = MergeTree -ORDER BY id; -``` -!!! note "Note" - Table with array field will work faster, but all arrays **must** have same length. Use [CONSTRAINT](../../../sql-reference/statements/create/table.md#constraints) to avoid errors. For example, `CONSTRAINT constraint_name_1 CHECK length(number) = 256`. - -Parameter `T` is the number of trees which algorithm will create. The bigger it is, the slower (approximately linear) it works (in both `CREATE` and `SELECT` requests), but the better accuracy you get (adjusted for randomness). - -Annoy supports only `L2Distance`. - -In the `SELECT` in the settings (`ann_index_select_query_params`) you can specify the size of the internal buffer (more details in the description above or in the [original repository](https://github.com/spotify/annoy)). During the query it will inspect up to `search_k` nodes which defaults to `n_trees * n` if not provided. `search_k` gives you a run-time tradeoff between better accuracy and speed. - -__Example__: -``` sql -SELECT * -FROM table_name [WHERE ...] -ORDER BY L2Distance(Column, Point) -LIMIT N -SETTING ann_index_select_query_params=`k_search=100` -``` diff --git a/docs/en/engines/table-engines/mergetree-family/mergetree.md b/docs/en/engines/table-engines/mergetree-family/mergetree.md index 9dc7e300d45..0ebe3c99f35 100644 --- a/docs/en/engines/table-engines/mergetree-family/mergetree.md +++ b/docs/en/engines/table-engines/mergetree-family/mergetree.md @@ -481,10 +481,6 @@ For example: - `NOT startsWith(s, 'test')` ::: - -## Approximate Nearest Neighbor Search Indexes [experimental] {#table_engines-ANNIndex} -In addition to skip indices, there are also [Approximate Nearest Neighbor Search Indexes](../../../engines/table-engines/mergetree-family/annindexes.md). - ## Projections {#projections} Projections are like [materialized views](../../../sql-reference/statements/create/view.md#materialized) but defined in part-level. It provides consistency guarantees along with automatic usage in queries. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c8aa0c84a24..3ece5fd410b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -570,10 +570,6 @@ endif() dbms_target_link_libraries(PUBLIC ch_contrib::consistent_hashing) -if (TARGET ch_contrib::annoy) - dbms_target_link_libraries(PUBLIC ch_contrib::annoy) -endif() - include ("${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake") if (ENABLE_TESTS) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 026b603177c..af32c15a867 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -629,8 +629,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, allow_experimental_hash_functions, false, "Enable experimental hash functions (hashid, etc)", 0) \ M(Bool, allow_experimental_object_type, false, "Allow Object and JSON data types", 0) \ M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \ - M(String, ann_index_select_query_params, "", "Parameters passed to ANN indexes in SELECT queries, the format is 'param1=x, param2=y, ...'", 0) \ - M(UInt64, max_limit_for_ann_queries, 1000000, "Maximum limit value for using ANN indexes is used to prevent memory overflow in search queries for indexes", 0) \ M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \ M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \ M(TransactionsWaitCSNMode, wait_changes_become_visible_after_commit_mode, TransactionsWaitCSNMode::WAIT_UNKNOWN, "Wait for committed changes to become actually visible in the latest snapshot", 0) \ diff --git a/src/Storages/MergeTree/CommonANNIndexes.cpp b/src/Storages/MergeTree/CommonANNIndexes.cpp deleted file mode 100644 index 886f9ab1c0f..00000000000 --- a/src/Storages/MergeTree/CommonANNIndexes.cpp +++ /dev/null @@ -1,595 +0,0 @@ -#include -#include - -#include -#include -#include -#include -#include -#include - -#include - -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; - extern const int INCORRECT_QUERY; -} - -namespace -{ - -namespace ANN = ApproximateNearestNeighbour; - -template -void extractTargetVectorFromLiteral(ANN::ANNQueryInformation::Embedding & target, Literal literal) -{ - Float64 float_element_of_target_vector; - Int64 int_element_of_target_vector; - - for (const auto & value : literal.value()) - { - if (value.tryGet(float_element_of_target_vector)) - { - target.emplace_back(float_element_of_target_vector); - } - else if (value.tryGet(int_element_of_target_vector)) - { - target.emplace_back(static_cast(int_element_of_target_vector)); - } - else - { - throw Exception(ErrorCodes::INCORRECT_QUERY, "Wrong type of elements in target vector. Only float or int are supported."); - } - } -} - -ANN::ANNQueryInformation::Metric castMetricFromStringToType(String metric_name) -{ - if (metric_name == "L2Distance") - return ANN::ANNQueryInformation::Metric::L2; - if (metric_name == "LpDistance") - return ANN::ANNQueryInformation::Metric::Lp; - return ANN::ANNQueryInformation::Metric::Unknown; -} - -} - -namespace ApproximateNearestNeighbour -{ - -ANNCondition::ANNCondition(const SelectQueryInfo & query_info, - ContextPtr context) : - block_with_constants{KeyCondition::getBlockWithConstants(query_info.query, query_info.syntax_analyzer_result, context)}, - ann_index_select_query_params{context->getSettings().get("ann_index_select_query_params").get()}, - index_granularity{context->getMergeTreeSettings().get("index_granularity").get()}, - limit_restriction{context->getSettings().get("max_limit_for_ann_queries").get()}, - index_is_useful{checkQueryStructure(query_info)} {} - -bool ANNCondition::alwaysUnknownOrTrue(String metric_name) const -{ - if (!index_is_useful) - { - return true; // Query isn't supported - } - // If query is supported, check metrics for match - return !(castMetricFromStringToType(metric_name) == query_information->metric); -} - -float ANNCondition::getComparisonDistanceForWhereQuery() const -{ - if (index_is_useful && query_information.has_value() - && query_information->query_type == ANNQueryInformation::Type::Where) - { - return query_information->distance; - } - throw Exception(ErrorCodes::LOGICAL_ERROR, "Not supported method for this query type"); -} - -UInt64 ANNCondition::getLimit() const -{ - if (index_is_useful && query_information.has_value()) - { - return query_information->limit; - } - throw Exception(ErrorCodes::LOGICAL_ERROR, "No LIMIT section in query, not supported"); -} - -std::vector ANNCondition::getTargetVector() const -{ - if (index_is_useful && query_information.has_value()) - { - return query_information->target; - } - throw Exception(ErrorCodes::LOGICAL_ERROR, "Target vector was requested for useless or uninitialized index."); -} - -size_t ANNCondition::getNumOfDimensions() const -{ - if (index_is_useful && query_information.has_value()) - { - return query_information->target.size(); - } - throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of dimensions was requested for useless or uninitialized index."); -} - -String ANNCondition::getColumnName() const -{ - if (index_is_useful && query_information.has_value()) - { - return query_information->column_name; - } - throw Exception(ErrorCodes::LOGICAL_ERROR, "Column name was requested for useless or uninitialized index."); -} - -ANNQueryInformation::Metric ANNCondition::getMetricType() const -{ - if (index_is_useful && query_information.has_value()) - { - return query_information->metric; - } - throw Exception(ErrorCodes::LOGICAL_ERROR, "Metric name was requested for useless or uninitialized index."); -} - -float ANNCondition::getPValueForLpDistance() const -{ - if (index_is_useful && query_information.has_value()) - { - return query_information->p_for_lp_dist; - } - throw Exception(ErrorCodes::LOGICAL_ERROR, "P from LPDistance was requested for useless or uninitialized index."); -} - -ANNQueryInformation::Type ANNCondition::getQueryType() const -{ - if (index_is_useful && query_information.has_value()) - { - return query_information->query_type; - } - throw Exception(ErrorCodes::LOGICAL_ERROR, "Query type was requested for useless or uninitialized index."); -} - -bool ANNCondition::checkQueryStructure(const SelectQueryInfo & query) -{ - // RPN-s for different sections of the query - RPN rpn_prewhere_clause; - RPN rpn_where_clause; - RPN rpn_order_by_clause; - RPNElement rpn_limit; - UInt64 limit; - - ANNQueryInformation prewhere_info; - ANNQueryInformation where_info; - ANNQueryInformation order_by_info; - - // Build rpns for query sections - const auto & select = query.query->as(); - - if (select.prewhere()) // If query has PREWHERE clause - { - traverseAST(select.prewhere(), rpn_prewhere_clause); - } - - if (select.where()) // If query has WHERE clause - { - traverseAST(select.where(), rpn_where_clause); - } - - if (select.limitLength()) // If query has LIMIT clause - { - traverseAtomAST(select.limitLength(), rpn_limit); - } - - if (select.orderBy()) // If query has ORDERBY clause - { - traverseOrderByAST(select.orderBy(), rpn_order_by_clause); - } - - // Reverse RPNs for conveniences during parsing - std::reverse(rpn_prewhere_clause.begin(), rpn_prewhere_clause.end()); - std::reverse(rpn_where_clause.begin(), rpn_where_clause.end()); - std::reverse(rpn_order_by_clause.begin(), rpn_order_by_clause.end()); - - // Match rpns with supported types and extract information - const bool prewhere_is_valid = matchRPNWhere(rpn_prewhere_clause, prewhere_info); - const bool where_is_valid = matchRPNWhere(rpn_where_clause, where_info); - const bool order_by_is_valid = matchRPNOrderBy(rpn_order_by_clause, order_by_info); - const bool limit_is_valid = matchRPNLimit(rpn_limit, limit); - - // Query without a LIMIT clause or with a limit greater than a restriction is not supported - if (!limit_is_valid || limit_restriction < limit) - { - return false; - } - - // Search type query in both sections isn't supported - if (prewhere_is_valid && where_is_valid) - { - return false; - } - - // Search type should be in WHERE or PREWHERE clause - if (prewhere_is_valid || where_is_valid) - { - query_information = std::move(prewhere_is_valid ? prewhere_info : where_info); - } - - if (order_by_is_valid) - { - // Query with valid where and order by type is not supported - if (query_information.has_value()) - { - return false; - } - - query_information = std::move(order_by_info); - } - - if (query_information) - query_information->limit = limit; - - return query_information.has_value(); -} - -void ANNCondition::traverseAST(const ASTPtr & node, RPN & rpn) -{ - // If the node is ASTFunction, it may have children nodes - if (const auto * func = node->as()) - { - const ASTs & children = func->arguments->children; - // Traverse children nodes - for (const auto& child : children) - { - traverseAST(child, rpn); - } - } - - RPNElement element; - // Get the data behind node - if (!traverseAtomAST(node, element)) - { - element.function = RPNElement::FUNCTION_UNKNOWN; - } - - rpn.emplace_back(std::move(element)); -} - -bool ANNCondition::traverseAtomAST(const ASTPtr & node, RPNElement & out) -{ - // Match Functions - if (const auto * function = node->as()) - { - // Set the name - out.func_name = function->name; - - if (function->name == "L1Distance" || - function->name == "L2Distance" || - function->name == "LinfDistance" || - function->name == "cosineDistance" || - function->name == "dotProduct" || - function->name == "LpDistance") - { - out.function = RPNElement::FUNCTION_DISTANCE; - } - else if (function->name == "tuple") - { - out.function = RPNElement::FUNCTION_TUPLE; - } - else if (function->name == "array") - { - out.function = RPNElement::FUNCTION_ARRAY; - } - else if (function->name == "less" || - function->name == "greater" || - function->name == "lessOrEquals" || - function->name == "greaterOrEquals") - { - out.function = RPNElement::FUNCTION_COMPARISON; - } - else if (function->name == "_CAST") - { - out.function = RPNElement::FUNCTION_CAST; - } - else - { - return false; - } - - return true; - } - // Match identifier - else if (const auto * identifier = node->as()) - { - out.function = RPNElement::FUNCTION_IDENTIFIER; - out.identifier.emplace(identifier->name()); - out.func_name = "column identifier"; - - return true; - } - - // Check if we have constants behind the node - return tryCastToConstType(node, out); -} - -bool ANNCondition::tryCastToConstType(const ASTPtr & node, RPNElement & out) -{ - Field const_value; - DataTypePtr const_type; - - if (KeyCondition::getConstant(node, block_with_constants, const_value, const_type)) - { - /// Check for constant types - if (const_value.getType() == Field::Types::Float64) - { - out.function = RPNElement::FUNCTION_FLOAT_LITERAL; - out.float_literal.emplace(const_value.get()); - out.func_name = "Float literal"; - return true; - } - - if (const_value.getType() == Field::Types::UInt64) - { - out.function = RPNElement::FUNCTION_INT_LITERAL; - out.int_literal.emplace(const_value.get()); - out.func_name = "Int literal"; - return true; - } - - if (const_value.getType() == Field::Types::Int64) - { - out.function = RPNElement::FUNCTION_INT_LITERAL; - out.int_literal.emplace(const_value.get()); - out.func_name = "Int literal"; - return true; - } - - if (const_value.getType() == Field::Types::Tuple) - { - out.function = RPNElement::FUNCTION_LITERAL_TUPLE; - out.tuple_literal = const_value.get(); - out.func_name = "Tuple literal"; - return true; - } - - if (const_value.getType() == Field::Types::Array) - { - out.function = RPNElement::FUNCTION_LITERAL_ARRAY; - out.array_literal = const_value.get(); - out.func_name = "Array literal"; - return true; - } - - if (const_value.getType() == Field::Types::String) - { - out.function = RPNElement::FUNCTION_STRING_LITERAL; - out.func_name = const_value.get(); - return true; - } - } - - return false; -} - -void ANNCondition::traverseOrderByAST(const ASTPtr & node, RPN & rpn) -{ - if (const auto * expr_list = node->as()) - { - if (const auto * order_by_element = expr_list->children.front()->as()) - { - traverseAST(order_by_element->children.front(), rpn); - } - } -} - -// Returns true and stores ANNQueryInformation if the query has valid WHERE clause -bool ANNCondition::matchRPNWhere(RPN & rpn, ANNQueryInformation & expr) -{ - // WHERE section must have at least 5 expressions - // Operator->Distance(float)->DistanceFunc->Column->Tuple(Array)Func(TargetVector(floats)) - if (rpn.size() < 5) - { - return false; - } - - auto iter = rpn.begin(); - - // Query starts from operator less - if (iter->function != RPNElement::FUNCTION_COMPARISON) - { - return false; - } - - const bool greater_case = iter->func_name == "greater" || iter->func_name == "greaterOrEquals"; - const bool less_case = iter->func_name == "less" || iter->func_name == "lessOrEquals"; - - ++iter; - - if (less_case) - { - if (iter->function != RPNElement::FUNCTION_FLOAT_LITERAL) - { - return false; - } - - expr.distance = getFloatOrIntLiteralOrPanic(iter); - ++iter; - - } - else if (!greater_case) - { - return false; - } - - auto end = rpn.end(); - if (!matchMainParts(iter, end, expr)) - { - return false; - } - - if (greater_case) - { - if (expr.target.size() < 2) - { - return false; - } - expr.distance = expr.target.back(); - expr.target.pop_back(); - } - - // query is ok - return true; -} - -// Returns true and stores ANNExpr if the query has valid ORDERBY clause -bool ANNCondition::matchRPNOrderBy(RPN & rpn, ANNQueryInformation & expr) -{ - // ORDER BY clause must have at least 3 expressions - if (rpn.size() < 3) - { - return false; - } - - auto iter = rpn.begin(); - auto end = rpn.end(); - - return ANNCondition::matchMainParts(iter, end, expr); -} - -// Returns true and stores Length if we have valid LIMIT clause in query -bool ANNCondition::matchRPNLimit(RPNElement & rpn, UInt64 & limit) -{ - if (rpn.function == RPNElement::FUNCTION_INT_LITERAL) - { - limit = rpn.int_literal.value(); - return true; - } - - return false; -} - -/* Matches dist function, target vector, column name */ -bool ANNCondition::matchMainParts(RPN::iterator & iter, const RPN::iterator & end, ANNQueryInformation & expr) -{ - bool identifier_found = false; - - // Matches DistanceFunc->[Column]->[Tuple(array)Func]->TargetVector(floats)->[Column] - if (iter->function != RPNElement::FUNCTION_DISTANCE) - { - return false; - } - - expr.metric = castMetricFromStringToType(iter->func_name); - ++iter; - - if (expr.metric == ANN::ANNQueryInformation::Metric::Lp) - { - if (iter->function != RPNElement::FUNCTION_FLOAT_LITERAL && - iter->function != RPNElement::FUNCTION_INT_LITERAL) - { - return false; - } - expr.p_for_lp_dist = getFloatOrIntLiteralOrPanic(iter); - ++iter; - } - - if (iter->function == RPNElement::FUNCTION_IDENTIFIER) - { - identifier_found = true; - expr.column_name = std::move(iter->identifier.value()); - ++iter; - } - - if (iter->function == RPNElement::FUNCTION_TUPLE || iter->function == RPNElement::FUNCTION_ARRAY) - { - ++iter; - } - - if (iter->function == RPNElement::FUNCTION_LITERAL_TUPLE) - { - extractTargetVectorFromLiteral(expr.target, iter->tuple_literal); - ++iter; - } - - if (iter->function == RPNElement::FUNCTION_LITERAL_ARRAY) - { - extractTargetVectorFromLiteral(expr.target, iter->array_literal); - ++iter; - } - - /// further conditions are possible if there is no tuple or array, or no identifier is found - /// the tuple or array can be inside a cast function. For other cases, see the loop after this condition - if (iter != end && iter->function == RPNElement::FUNCTION_CAST) - { - ++iter; - /// Cast should be made to array or tuple - if (!iter->func_name.starts_with("Array") && !iter->func_name.starts_with("Tuple")) - { - return false; - } - ++iter; - if (iter->function == RPNElement::FUNCTION_LITERAL_TUPLE) - { - extractTargetVectorFromLiteral(expr.target, iter->tuple_literal); - ++iter; - } - else if (iter->function == RPNElement::FUNCTION_LITERAL_ARRAY) - { - extractTargetVectorFromLiteral(expr.target, iter->array_literal); - ++iter; - } - else - { - return false; - } - } - - while (iter != end) - { - if (iter->function == RPNElement::FUNCTION_FLOAT_LITERAL || - iter->function == RPNElement::FUNCTION_INT_LITERAL) - { - expr.target.emplace_back(getFloatOrIntLiteralOrPanic(iter)); - } - else if (iter->function == RPNElement::FUNCTION_IDENTIFIER) - { - if (identifier_found) - { - return false; - } - expr.column_name = std::move(iter->identifier.value()); - identifier_found = true; - } - else - { - return false; - } - - ++iter; - } - - // Final checks of correctness - return identifier_found && !expr.target.empty(); -} - -// Gets float or int from AST node -float ANNCondition::getFloatOrIntLiteralOrPanic(const RPN::iterator& iter) -{ - if (iter->float_literal.has_value()) - { - return iter->float_literal.value(); - } - if (iter->int_literal.has_value()) - { - return static_cast(iter->int_literal.value()); - } - throw Exception("Wrong parsed AST in buildRPN\n", ErrorCodes::INCORRECT_QUERY); -} - -} - -} diff --git a/src/Storages/MergeTree/CommonANNIndexes.h b/src/Storages/MergeTree/CommonANNIndexes.h deleted file mode 100644 index fefb9584863..00000000000 --- a/src/Storages/MergeTree/CommonANNIndexes.h +++ /dev/null @@ -1,236 +0,0 @@ -#pragma once - -#include -#include "base/types.h" - -#include -#include - -namespace DB -{ - -namespace ApproximateNearestNeighbour -{ - -/** - * Queries for Approximate Nearest Neighbour Search - * have similar structure: - * 1) target vector from which all distances are calculated - * 2) metric name (e.g L2Distance, LpDistance, etc.) - * 3) name of column with embeddings - * 4) type of query - * 5) Number of elements, that should be taken (limit) - * - * And two optional parameters: - * 1) p for LpDistance function - * 2) distance to compare with (only for where queries) - */ -struct ANNQueryInformation -{ - using Embedding = std::vector; - - // Extracted data from valid query - Embedding target; - enum class Metric - { - Unknown, - L2, - Lp - } metric; - String column_name; - UInt64 limit; - - enum class Type - { - OrderBy, - Where - } query_type; - - float p_for_lp_dist = -1.0; - float distance = -1.0; -}; - -/** - Class ANNCondition, is responsible for recognizing special query types which - can be speeded up by ANN Indexes. It parses the SQL query and checks - if it matches ANNIndexes. The recognizing method - alwaysUnknownOrTrue - returns false if we can speed up the query, and true otherwise. - It has only one argument, name of the metric with which index was built. - There are two main patterns of queries being supported - - 1) Search query type - SELECT * FROM * WHERE DistanceFunc(column, target_vector) < floatLiteral LIMIT count - - 2) OrderBy query type - SELECT * FROM * WHERE * ORDERBY DistanceFunc(column, target_vector) LIMIT count - - *Query without LIMIT count is not supported* - - target_vector(should have float coordinates) examples: - tuple(0.1, 0.1, ...., 0.1) or (0.1, 0.1, ...., 0.1) - [the word tuple is not needed] - - If the query matches one of these two types, than the class extracts useful information - from the query. If the query has both 1 and 2 types, than we can't speed and alwaysUnknownOrTrue - returns true. - - From matching query it extracts - * targetVector - * metricName(DistanceFunction) - * dimension size if query uses LpDistance - * distance to compare(ONLY for search types, otherwise you get exception) - * spaceDimension(which is targetVector's components count) - * column - * objects count from LIMIT clause(for both queries) - * settings str, if query has settings section with new 'ann_index_select_query_params' value, - than you can get the new value(empty by default) calling method getSettingsStr - * queryHasOrderByClause and queryHasWhereClause return true if query matches the type - - Search query type is also recognized for PREWHERE clause -*/ - -class ANNCondition -{ -public: - ANNCondition(const SelectQueryInfo & query_info, - ContextPtr context); - - // false if query can be speeded up, true otherwise - bool alwaysUnknownOrTrue(String metric_name) const; - - // returns the distance to compare with for search query - float getComparisonDistanceForWhereQuery() const; - - // distance should be calculated regarding to targetVector - std::vector getTargetVector() const; - - // targetVector dimension size - size_t getNumOfDimensions() const; - - String getColumnName() const; - - ANNQueryInformation::Metric getMetricType() const; - - // the P- value if the metric is 'LpDistance' - float getPValueForLpDistance() const; - - ANNQueryInformation::Type getQueryType() const; - - UInt64 getIndexGranularity() const { return index_granularity; } - - // length's value from LIMIT clause - UInt64 getLimit() const; - - // value of 'ann_index_select_query_params' if have in SETTINGS clause, empty string otherwise - String getParamsStr() const { return ann_index_select_query_params; } - -private: - - struct RPNElement - { - enum Function - { - // DistanceFunctions - FUNCTION_DISTANCE, - - //tuple(0.1, ..., 0.1) - FUNCTION_TUPLE, - - //array(0.1, ..., 0.1) - FUNCTION_ARRAY, - - // Operators <, >, <=, >= - FUNCTION_COMPARISON, - - // Numeric float value - FUNCTION_FLOAT_LITERAL, - - // Numeric int value - FUNCTION_INT_LITERAL, - - // Column identifier - FUNCTION_IDENTIFIER, - - // Unknown, can be any value - FUNCTION_UNKNOWN, - - // (0.1, ...., 0.1) vector without word 'tuple' - FUNCTION_LITERAL_TUPLE, - - // [0.1, ...., 0.1] vector without word 'array' - FUNCTION_LITERAL_ARRAY, - - // if client parameters are used, cast will always be in the query - FUNCTION_CAST, - - // name of type in cast function - FUNCTION_STRING_LITERAL, - }; - - explicit RPNElement(Function function_ = FUNCTION_UNKNOWN) - : function(function_), func_name("Unknown"), float_literal(std::nullopt), identifier(std::nullopt) {} - - Function function; - String func_name; - - std::optional float_literal; - std::optional identifier; - std::optional int_literal; - - std::optional tuple_literal; - std::optional array_literal; - - UInt32 dim = 0; - }; - - using RPN = std::vector; - - bool checkQueryStructure(const SelectQueryInfo & query); - - // Util functions for the traversal of AST, parses AST and builds rpn - void traverseAST(const ASTPtr & node, RPN & rpn); - // Return true if we can identify our node type - bool traverseAtomAST(const ASTPtr & node, RPNElement & out); - // Checks if the AST stores ConstType expression - bool tryCastToConstType(const ASTPtr & node, RPNElement & out); - // Traverses the AST of ORDERBY section - void traverseOrderByAST(const ASTPtr & node, RPN & rpn); - - // Returns true and stores ANNExpr if the query has valid WHERE section - static bool matchRPNWhere(RPN & rpn, ANNQueryInformation & expr); - - // Returns true and stores ANNExpr if the query has valid ORDERBY section - static bool matchRPNOrderBy(RPN & rpn, ANNQueryInformation & expr); - - // Returns true and stores Length if we have valid LIMIT clause in query - static bool matchRPNLimit(RPNElement & rpn, UInt64 & limit); - - /* Matches dist function, target vector, column name */ - static bool matchMainParts(RPN::iterator & iter, const RPN::iterator & end, ANNQueryInformation & expr); - - // Gets float or int from AST node - static float getFloatOrIntLiteralOrPanic(const RPN::iterator& iter); - - Block block_with_constants; - - // true if we have one of two supported query types - std::optional query_information; - - // Get from settings ANNIndex parameters - String ann_index_select_query_params; - UInt64 index_granularity; - /// only queries with a lower limit can be considered to avoid memory overflow - UInt64 limit_restriction; - bool index_is_useful = false; -}; - -// condition interface for Ann indexes. Returns vector of indexes of ranges in granule which are useful for query. -class IMergeTreeIndexConditionAnn : public IMergeTreeIndexCondition -{ -public: - virtual std::vector getUsefulRanges(MergeTreeIndexGranulePtr idx_granule) const = 0; -}; - -} - -} diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 12aec29eab6..c5f546a9c36 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -43,8 +43,6 @@ #include -#include - namespace DB { @@ -1671,31 +1669,6 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( { if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin) granule = reader.read(); - // Cast to Ann condition - auto ann_condition = std::dynamic_pointer_cast(condition); - if (ann_condition != nullptr) - { - // vector of indexes of useful ranges - auto result = ann_condition->getUsefulRanges(granule); - if (result.empty()) - { - ++granules_dropped; - } - - for (auto range : result) - { - // range for corresponding index - MarkRange data_range( - std::max(ranges[i].begin, index_mark * index_granularity + range), - std::min(ranges[i].end, index_mark * index_granularity + range + 1)); - - if (res.empty() || res.back().end - data_range.begin > min_marks_for_seek) - res.push_back(data_range); - else - res.back().end = data_range.end; - } - continue; - } if (!condition->mayBeTrueOnGranule(granule)) { diff --git a/src/Storages/MergeTree/MergeTreeIndexAnnoy.cpp b/src/Storages/MergeTree/MergeTreeIndexAnnoy.cpp deleted file mode 100644 index a8b825d832d..00000000000 --- a/src/Storages/MergeTree/MergeTreeIndexAnnoy.cpp +++ /dev/null @@ -1,317 +0,0 @@ -#ifdef ENABLE_ANNOY - -#include - -#include -#include -#include -#include -#include -#include -#include - - -namespace DB -{ - -namespace ApproximateNearestNeighbour -{ - -template -void AnnoyIndex::serialize(WriteBuffer& ostr) const -{ - assert(Base::_built); - writeIntBinary(Base::_s, ostr); - writeIntBinary(Base::_n_items, ostr); - writeIntBinary(Base::_n_nodes, ostr); - writeIntBinary(Base::_nodes_size, ostr); - writeIntBinary(Base::_K, ostr); - writeIntBinary(Base::_seed, ostr); - writeVectorBinary(Base::_roots, ostr); - ostr.write(reinterpret_cast(Base::_nodes), Base::_s * Base::_n_nodes); -} - -template -void AnnoyIndex::deserialize(ReadBuffer& istr) -{ - assert(!Base::_built); - readIntBinary(Base::_s, istr); - readIntBinary(Base::_n_items, istr); - readIntBinary(Base::_n_nodes, istr); - readIntBinary(Base::_nodes_size, istr); - readIntBinary(Base::_K, istr); - readIntBinary(Base::_seed, istr); - readVectorBinary(Base::_roots, istr); - Base::_nodes = realloc(Base::_nodes, Base::_s * Base::_n_nodes); - istr.read(reinterpret_cast(Base::_nodes), Base::_s * Base::_n_nodes); - - Base::_fd = 0; - // set flags - Base::_loaded = false; - Base::_verbose = false; - Base::_on_disk = false; - Base::_built = true; -} - -template -uint64_t AnnoyIndex::getNumOfDimensions() const -{ - return Base::get_f(); -} - -} - - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; - extern const int INCORRECT_QUERY; - extern const int INCORRECT_DATA; -} - -MergeTreeIndexGranuleAnnoy::MergeTreeIndexGranuleAnnoy(const String & index_name_, const Block & index_sample_block_) - : index_name(index_name_) - , index_sample_block(index_sample_block_) - , index(nullptr) -{} - -MergeTreeIndexGranuleAnnoy::MergeTreeIndexGranuleAnnoy( - const String & index_name_, - const Block & index_sample_block_, - AnnoyIndexPtr index_base_) - : index_name(index_name_) - , index_sample_block(index_sample_block_) - , index(std::move(index_base_)) -{} - -void MergeTreeIndexGranuleAnnoy::serializeBinary(WriteBuffer & ostr) const -{ - /// number of dimensions is required in the constructor, - /// so it must be written and read separately from the other part - writeIntBinary(index->getNumOfDimensions(), ostr); // write dimension - index->serialize(ostr); -} - -void MergeTreeIndexGranuleAnnoy::deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion /*version*/) -{ - uint64_t dimension; - readIntBinary(dimension, istr); - index = std::make_shared(dimension); - index->deserialize(istr); -} - - -MergeTreeIndexAggregatorAnnoy::MergeTreeIndexAggregatorAnnoy( - const String & index_name_, - const Block & index_sample_block_, - uint64_t number_of_trees_) - : index_name(index_name_) - , index_sample_block(index_sample_block_) - , number_of_trees(number_of_trees_) -{} - -MergeTreeIndexGranulePtr MergeTreeIndexAggregatorAnnoy::getGranuleAndReset() -{ - // NOLINTNEXTLINE(*) - index->build(number_of_trees, /*number_of_threads=*/1); - auto granule = std::make_shared(index_name, index_sample_block, index); - index = nullptr; - return granule; -} - -void MergeTreeIndexAggregatorAnnoy::update(const Block & block, size_t * pos, size_t limit) -{ - if (*pos >= block.rows()) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "The provided position is not less than the number of block rows. Position: {}, Block rows: {}.", - toString(*pos), toString(block.rows())); - - size_t rows_read = std::min(limit, block.rows() - *pos); - - if (index_sample_block.columns() > 1) - { - throw Exception("Only one column is supported", ErrorCodes::LOGICAL_ERROR); - } - - auto index_column_name = index_sample_block.getByPosition(0).name; - const auto & column_cut = block.getByName(index_column_name).column->cut(*pos, rows_read); - const auto & column_array = typeid_cast(column_cut.get()); - if (column_array) - { - const auto & data = column_array->getData(); - const auto & array = typeid_cast(data).getData(); - const auto & offsets = column_array->getOffsets(); - size_t num_rows = column_array->size(); - - /// All sizes are the same - size_t size = offsets[0]; - for (size_t i = 0; i < num_rows - 1; ++i) - { - if (offsets[i + 1] - offsets[i] != size) - { - throw Exception(ErrorCodes::INCORRECT_DATA, "Arrays should have same length"); - } - } - index = std::make_shared(size); - - for (size_t current_row = 0; current_row < num_rows; ++current_row) - { - index->add_item(index->get_n_items(), &array[offsets[current_row]]); - } - } - else - { - /// Other possible type of column is Tuple - const auto & column_tuple = typeid_cast(column_cut.get()); - - if (!column_tuple) - throw Exception(ErrorCodes::INCORRECT_QUERY, "Wrong type was given to index."); - - const auto & columns = column_tuple->getColumns(); - - std::vector> data{column_tuple->size(), std::vector()}; - for (const auto& column : columns) - { - const auto& pod_array = typeid_cast(column.get())->getData(); - for (size_t i = 0; i < pod_array.size(); ++i) - { - data[i].push_back(pod_array[i]); - } - } - assert(!data.empty()); - if (!index) - { - index = std::make_shared(data[0].size()); - } - for (const auto& item : data) - { - index->add_item(index->get_n_items(), item.data()); - } - } - - *pos += rows_read; -} - - -MergeTreeIndexConditionAnnoy::MergeTreeIndexConditionAnnoy( - const IndexDescription & /*index*/, - const SelectQueryInfo & query, - ContextPtr context) - : condition(query, context) -{} - - -bool MergeTreeIndexConditionAnnoy::mayBeTrueOnGranule(MergeTreeIndexGranulePtr /* idx_granule */) const -{ - throw Exception("mayBeTrueOnGranule is not supported for ANN skip indexes", ErrorCodes::LOGICAL_ERROR); -} - -bool MergeTreeIndexConditionAnnoy::alwaysUnknownOrTrue() const -{ - return condition.alwaysUnknownOrTrue("L2Distance"); -} - -std::vector MergeTreeIndexConditionAnnoy::getUsefulRanges(MergeTreeIndexGranulePtr idx_granule) const -{ - UInt64 limit = condition.getLimit(); - UInt64 index_granularity = condition.getIndexGranularity(); - std::optional comp_dist = condition.getQueryType() == ANN::ANNQueryInformation::Type::Where ? - std::optional(condition.getComparisonDistanceForWhereQuery()) : std::nullopt; - - if (comp_dist && comp_dist.value() < 0) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to optimize query with where without distance"); - - std::vector target_vec = condition.getTargetVector(); - - auto granule = std::dynamic_pointer_cast(idx_granule); - if (granule == nullptr) - { - throw Exception("Granule has the wrong type", ErrorCodes::LOGICAL_ERROR); - } - auto annoy = granule->index; - - if (condition.getNumOfDimensions() != annoy->getNumOfDimensions()) - { - throw Exception("The dimension of the space in the request (" + toString(condition.getNumOfDimensions()) + ") " - + "does not match with the dimension in the index (" + toString(annoy->getNumOfDimensions()) + ")", ErrorCodes::INCORRECT_QUERY); - } - - /// neighbors contain indexes of dots which were closest to target vector - std::vector neighbors; - std::vector distances; - neighbors.reserve(limit); - distances.reserve(limit); - - int k_search = -1; - String params_str = condition.getParamsStr(); - if (!params_str.empty()) - { - try - { - /// k_search=... (algorithm will inspect up to search_k nodes which defaults to n_trees * n if not provided) - k_search = std::stoi(params_str.data() + 9); - } - catch (...) - { - throw Exception("Setting of the annoy index should be int", ErrorCodes::INCORRECT_QUERY); - } - } - annoy->get_nns_by_vector(target_vec.data(), limit, k_search, &neighbors, &distances); - std::unordered_set granule_numbers; - for (size_t i = 0; i < neighbors.size(); ++i) - { - if (comp_dist && distances[i] > comp_dist) - { - continue; - } - granule_numbers.insert(neighbors[i] / index_granularity); - } - - std::vector result_vector; - result_vector.reserve(granule_numbers.size()); - for (auto granule_number : granule_numbers) - { - result_vector.push_back(granule_number); - } - - return result_vector; -} - - -MergeTreeIndexGranulePtr MergeTreeIndexAnnoy::createIndexGranule() const -{ - return std::make_shared(index.name, index.sample_block); -} - -MergeTreeIndexAggregatorPtr MergeTreeIndexAnnoy::createIndexAggregator() const -{ - return std::make_shared(index.name, index.sample_block, number_of_trees); -} - -MergeTreeIndexConditionPtr MergeTreeIndexAnnoy::createIndexCondition( - const SelectQueryInfo & query, ContextPtr context) const -{ - return std::make_shared(index, query, context); -}; - -MergeTreeIndexPtr annoyIndexCreator(const IndexDescription & index) -{ - uint64_t param = index.arguments[0].get(); - return std::make_shared(index, param); -} - -void annoyIndexValidator(const IndexDescription & index, bool /* attach */) -{ - if (index.arguments.size() != 1) - { - throw Exception("Annoy index must have exactly one argument.", ErrorCodes::INCORRECT_QUERY); - } - if (index.arguments[0].getType() != Field::Types::UInt64) - { - throw Exception("Annoy index argument must be UInt64.", ErrorCodes::INCORRECT_QUERY); - } -} - -} -#endif // ENABLE_ANNOY diff --git a/src/Storages/MergeTree/MergeTreeIndexAnnoy.h b/src/Storages/MergeTree/MergeTreeIndexAnnoy.h deleted file mode 100644 index 85bbb0a1bd2..00000000000 --- a/src/Storages/MergeTree/MergeTreeIndexAnnoy.h +++ /dev/null @@ -1,123 +0,0 @@ -#pragma once - -#ifdef ENABLE_ANNOY - -#include - -#include -#include - -namespace DB -{ - -namespace ANN = ApproximateNearestNeighbour; - -// auxiliary namespace for working with spotify-annoy library -// mainly for serialization and deserialization of the index -namespace ApproximateNearestNeighbour -{ - using AnnoyIndexThreadedBuildPolicy = ::Annoy::AnnoyIndexMultiThreadedBuildPolicy; - // TODO: Support different metrics. List of available metrics can be taken from here: - // https://github.com/spotify/annoy/blob/master/src/annoymodule.cc#L151-L171 - template - class AnnoyIndex : public ::Annoy::AnnoyIndex - { - using Base = ::Annoy::AnnoyIndex; - public: - explicit AnnoyIndex(const uint64_t dim) : Base::AnnoyIndex(dim) {} - void serialize(WriteBuffer& ostr) const; - void deserialize(ReadBuffer& istr); - uint64_t getNumOfDimensions() const; - }; -} - -struct MergeTreeIndexGranuleAnnoy final : public IMergeTreeIndexGranule -{ - using AnnoyIndex = ANN::AnnoyIndex<>; - using AnnoyIndexPtr = std::shared_ptr; - - MergeTreeIndexGranuleAnnoy(const String & index_name_, const Block & index_sample_block_); - MergeTreeIndexGranuleAnnoy( - const String & index_name_, - const Block & index_sample_block_, - AnnoyIndexPtr index_base_); - - ~MergeTreeIndexGranuleAnnoy() override = default; - - void serializeBinary(WriteBuffer & ostr) const override; - void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) override; - - bool empty() const override { return !index.get(); } - - String index_name; - Block index_sample_block; - AnnoyIndexPtr index; -}; - - -struct MergeTreeIndexAggregatorAnnoy final : IMergeTreeIndexAggregator -{ - using AnnoyIndex = ANN::AnnoyIndex<>; - using AnnoyIndexPtr = std::shared_ptr; - - MergeTreeIndexAggregatorAnnoy(const String & index_name_, const Block & index_sample_block, uint64_t number_of_trees); - ~MergeTreeIndexAggregatorAnnoy() override = default; - - bool empty() const override { return !index || index->get_n_items() == 0; } - MergeTreeIndexGranulePtr getGranuleAndReset() override; - void update(const Block & block, size_t * pos, size_t limit) override; - - String index_name; - Block index_sample_block; - const uint64_t number_of_trees; - AnnoyIndexPtr index; -}; - - -class MergeTreeIndexConditionAnnoy final : public ANN::IMergeTreeIndexConditionAnn -{ -public: - MergeTreeIndexConditionAnnoy( - const IndexDescription & index, - const SelectQueryInfo & query, - ContextPtr context); - - bool alwaysUnknownOrTrue() const override; - - bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const override; - - std::vector getUsefulRanges(MergeTreeIndexGranulePtr idx_granule) const override; - - ~MergeTreeIndexConditionAnnoy() override = default; - -private: - ANN::ANNCondition condition; -}; - - -class MergeTreeIndexAnnoy : public IMergeTreeIndex -{ -public: - MergeTreeIndexAnnoy(const IndexDescription & index_, uint64_t number_of_trees_) - : IMergeTreeIndex(index_) - , number_of_trees(number_of_trees_) - {} - - ~MergeTreeIndexAnnoy() override = default; - - MergeTreeIndexGranulePtr createIndexGranule() const override; - MergeTreeIndexAggregatorPtr createIndexAggregator() const override; - - MergeTreeIndexConditionPtr createIndexCondition( - const SelectQueryInfo & query, ContextPtr context) const override; - - bool mayBenefitFromIndexForIn(const ASTPtr & /*node*/) const override { return false; } - -private: - const uint64_t number_of_trees; -}; - - -} - -#endif // ENABLE_ANNOY diff --git a/src/Storages/MergeTree/MergeTreeIndices.cpp b/src/Storages/MergeTree/MergeTreeIndices.cpp index eeeef27699f..9d7e0cdfdbe 100644 --- a/src/Storages/MergeTree/MergeTreeIndices.cpp +++ b/src/Storages/MergeTree/MergeTreeIndices.cpp @@ -101,11 +101,6 @@ MergeTreeIndexFactory::MergeTreeIndexFactory() registerCreator("hypothesis", hypothesisIndexCreator); registerValidator("hypothesis", hypothesisIndexValidator); - -#ifdef ENABLE_ANNOY - registerCreator("annoy", annoyIndexCreator); - registerValidator("annoy", annoyIndexValidator); -#endif } MergeTreeIndexFactory & MergeTreeIndexFactory::instance() diff --git a/src/Storages/MergeTree/MergeTreeIndices.h b/src/Storages/MergeTree/MergeTreeIndices.h index 14002534c94..051edd630cb 100644 --- a/src/Storages/MergeTree/MergeTreeIndices.h +++ b/src/Storages/MergeTree/MergeTreeIndices.h @@ -224,9 +224,4 @@ void bloomFilterIndexValidatorNew(const IndexDescription & index, bool attach); MergeTreeIndexPtr hypothesisIndexCreator(const IndexDescription & index); void hypothesisIndexValidator(const IndexDescription & index, bool attach); -#ifdef ENABLE_ANNOY -MergeTreeIndexPtr annoyIndexCreator(const IndexDescription & index); -void annoyIndexValidator(const IndexDescription & index, bool attach); -#endif - } diff --git a/tests/queries/0_stateless/02354_annoy.reference b/tests/queries/0_stateless/02354_annoy.reference deleted file mode 100644 index 2cc62ef4c86..00000000000 --- a/tests/queries/0_stateless/02354_annoy.reference +++ /dev/null @@ -1,16 +0,0 @@ -1 [0,0,10] -2 [0,0,10.5] -3 [0,0,9.5] -4 [0,0,9.7] -5 [0,0,10.2] -1 [0,0,10] -5 [0,0,10.2] -4 [0,0,9.7] -1 [0,0,10] -2 [0,0,10.5] -3 [0,0,9.5] -4 [0,0,9.7] -5 [0,0,10.2] -1 [0,0,10] -5 [0,0,10.2] -4 [0,0,9.7] diff --git a/tests/queries/0_stateless/02354_annoy.sql b/tests/queries/0_stateless/02354_annoy.sql deleted file mode 100644 index da0799cecaa..00000000000 --- a/tests/queries/0_stateless/02354_annoy.sql +++ /dev/null @@ -1,44 +0,0 @@ --- Tags: no-fasttest, no-ubsan - -DROP TABLE IF EXISTS 02354_annoy; - -CREATE TABLE 02354_annoy -( - id Int32, - embedding Array(Float32), - INDEX annoy_index embedding TYPE annoy(100) GRANULARITY 1 -) -ENGINE = MergeTree -ORDER BY id -SETTINGS index_granularity=5; - -INSERT INTO 02354_annoy VALUES (1, [0.0, 0.0, 10.0]), (2, [0.0, 0.0, 10.5]), (3, [0.0, 0.0, 9.5]), (4, [0.0, 0.0, 9.7]), (5, [0.0, 0.0, 10.2]), (6, [10.0, 0.0, 0.0]), (7, [9.5, 0.0, 0.0]), (8, [9.7, 0.0, 0.0]), (9, [10.2, 0.0, 0.0]), (10, [10.5, 0.0, 0.0]), (11, [0.0, 10.0, 0.0]), (12, [0.0, 9.5, 0.0]), (13, [0.0, 9.7, 0.0]), (14, [0.0, 10.2, 0.0]), (15, [0.0, 10.5, 0.0]); - -SELECT * -FROM 02354_annoy -WHERE L2Distance(embedding, [0.0, 0.0, 10.0]) < 1.0 -LIMIT 5; - -SELECT * -FROM 02354_annoy -ORDER BY L2Distance(embedding, [0.0, 0.0, 10.0]) -LIMIT 3; - -SET param_02354_target_vector='[0.0, 0.0, 10.0]'; - -SELECT * -FROM 02354_annoy -WHERE L2Distance(embedding, {02354_target_vector: Array(Float32)}) < 1.0 -LIMIT 5; - -SELECT * -FROM 02354_annoy -ORDER BY L2Distance(embedding, {02354_target_vector: Array(Float32)}) -LIMIT 3; - -SELECT * -FROM 02354_annoy -ORDER BY L2Distance(embedding, [0.0, 0.0]) -LIMIT 3; -- { serverError 80 } - -DROP TABLE IF EXISTS 02354_annoy; From 0d7cc822676c8795a28c62d80d4eafdcbc3b85d5 Mon Sep 17 00:00:00 2001 From: Denny Crane Date: Tue, 30 Aug 2022 11:08:23 -0300 Subject: [PATCH 31/32] Update string-functions.md --- .../functions/string-functions.md | 55 ++++++++++++++----- 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/docs/en/sql-reference/functions/string-functions.md b/docs/en/sql-reference/functions/string-functions.md index af265aba18f..45187abf61b 100644 --- a/docs/en/sql-reference/functions/string-functions.md +++ b/docs/en/sql-reference/functions/string-functions.md @@ -495,25 +495,23 @@ If the ‘s’ string is non-empty and does not contain the ‘c’ character at Returns the string ‘s’ that was converted from the encoding in ‘from’ to the encoding in ‘to’. -## base58Encode(plaintext), base58Decode(encoded_text) +## base58Encode(plaintext) -Accepts a String and encodes/decodes it using [Base58](https://tools.ietf.org/id/draft-msporny-base58-01.html) encoding scheme using "Bitcoin" alphabet. +Accepts a String and encodes it using [Base58](https://tools.ietf.org/id/draft-msporny-base58-01.html) encoding scheme using "Bitcoin" alphabet. **Syntax** ```sql -base58Encode(decoded) -base58Decode(encoded) +base58Encode(plaintext) ``` **Arguments** -- `decoded` — [String](../../sql-reference/data-types/string.md) column or constant. -- `encoded` — [String](../../sql-reference/data-types/string.md) column or constant. If the string is not a valid base58-encoded value, an exception is thrown. +- `plaintext` — [String](../../sql-reference/data-types/string.md) column or constant. **Returned value** -- A string containing encoded/decoded value of 1st argument. +- A string containing encoded value of 1st argument. Type: [String](../../sql-reference/data-types/string.md). @@ -523,17 +521,48 @@ Query: ``` sql SELECT base58Encode('Encoded'); +``` + +Result: +```text +┌─base58Encode('Encoded')─┐ +│ 3dc8KtHrwM │ +└─────────────────────────┘ +``` + +## base58Decode(encoded_text) + +Accepts a String and decodes it using [Base58](https://tools.ietf.org/id/draft-msporny-base58-01.html) encoding scheme using "Bitcoin" alphabet. + +**Syntax** + +```sql +base58Decode(encoded_text) +``` + +**Arguments** + +- `encoded_text` — [String](../../sql-reference/data-types/string.md) column or constant. If the string is not a valid base58-encoded value, an exception is thrown. + +**Returned value** + +- A string containing decoded value of 1st argument. + +Type: [String](../../sql-reference/data-types/string.md). + +**Example** + +Query: + +``` sql SELECT base58Decode('3dc8KtHrwM'); ``` Result: ```text -┌─encodeBase58('Encoded')─┐ -│ 3dc8KtHrwM │ -└──────────────────────────────────┘ -┌─decodeBase58('3dc8KtHrwM')─┐ -│ Encoded │ -└────────────────────────────────────┘ +┌─base58Decode('3dc8KtHrwM')─┐ +│ Encoded │ +└────────────────────────────┘ ``` ## base64Encode(s) From 40468d3304b51251d9d74f0d315bcbaf4e0939f0 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Tue, 30 Aug 2022 20:45:31 +0000 Subject: [PATCH 32/32] Fix typo in docs --- docs/en/sql-reference/data-types/decimal.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/sql-reference/data-types/decimal.md b/docs/en/sql-reference/data-types/decimal.md index c11d5c879d7..2644c6073b4 100644 --- a/docs/en/sql-reference/data-types/decimal.md +++ b/docs/en/sql-reference/data-types/decimal.md @@ -46,7 +46,7 @@ Binary operations on Decimal result in wider result type (with any order of argu Rules for scale: - add, subtract: S = max(S1, S2). -- multuply: S = S1 + S2. +- multiply: S = S1 + S2. - divide: S = S1. For similar operations between Decimal and integers, the result is Decimal of the same size as an argument.