Merge branch 'master' into dev_intel_iaa_deflate

This commit is contained in:
jasperzhu 2022-05-15 10:59:07 +08:00 committed by GitHub
commit 5e607e6cf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
81 changed files with 1608 additions and 258 deletions

4
.gitmodules vendored
View File

@ -271,4 +271,6 @@
[submodule "contrib/wyhash"]
path = contrib/wyhash
url = https://github.com/wangyi-fudan/wyhash.git
[submodule "contrib/eigen"]
path = contrib/eigen
url = https://github.com/eigen-mirror/eigen

View File

@ -61,7 +61,7 @@ else ()
endif ()
if (ARCH_PPC64LE)
set (COMPILER_FLAGS "${COMPILER_FLAGS} -maltivec -mcpu=power8 -D__SSE2__=1 -DNO_WARN_X86_INTRINSICS")
set (COMPILER_FLAGS "${COMPILER_FLAGS} -maltivec -mcpu=power8 -DNO_WARN_X86_INTRINSICS")
endif ()
set (TEST_FLAG "-msse4.2")

View File

@ -153,6 +153,7 @@ endif()
add_contrib (sqlite-cmake sqlite-amalgamation)
add_contrib (s2geometry-cmake s2geometry)
add_contrib (qpl-cmake qpl)
add_contrib (eigen-cmake eigen)
# Put all targets defined here and in subdirectories under "contrib/<immediate-subdir>" folders in GUI-based IDEs.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they would not appear

1
contrib/eigen vendored Submodule

@ -0,0 +1 @@
Subproject commit 3147391d946bb4b6c68edd901f2add6ac1f31f8c

View File

@ -0,0 +1,23 @@
set(EIGEN_LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/eigen")
add_library (_eigen INTERFACE)
option (ENABLE_MKL "Build Eigen with Intel MKL" OFF)
if (ENABLE_MKL)
set(MKL_THREADING sequential)
set(MKL_INTERFACE lp64)
find_package(MKL REQUIRED)
if (MKL_FOUND)
message("MKL INCLUDE: ${MKL_INCLUDE}")
message("MKL LIBRARIES: ${MKL_LIBRARIES}")
target_compile_definitions(_eigen INTERFACE EIGEN_USE_MKL_ALL)
target_include_directories(_eigen INTERFACE ${MKL_INCLUDE})
target_link_libraries(_eigen INTERFACE ${MKL_LIBRARIES})
endif()
endif()
# Only include MPL2 code from Eigen library
target_compile_definitions(_eigen INTERFACE EIGEN_MPL2_ONLY)
target_include_directories (_eigen SYSTEM INTERFACE ${EIGEN_LIBRARY_DIR})
add_library(ch_contrib::eigen ALIAS _eigen)

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 6c1a233744d13414e8e8db396c75177b857b2c22
Subproject commit de35b9fd72b57127abdc3a5beaf0e320d767e356

View File

@ -179,6 +179,7 @@ function clone_submodules
contrib/qpl
contrib/nasm
contrib/wyhash
contrib/eigen
)
git submodule sync

View File

@ -13,7 +13,7 @@ then
elif [ "${ARCH}" = "aarch64" ]
then
DIR="aarch64"
elif [ "${ARCH}" = "powerpc64le" ]
elif [ "${ARCH}" = "powerpc64le" ] || [ "${ARCH}" = "ppc64le" ]
then
DIR="powerpc64le"
fi
@ -25,7 +25,7 @@ then
elif [ "${ARCH}" = "aarch64" ]
then
DIR="freebsd-aarch64"
elif [ "${ARCH}" = "powerpc64le" ]
elif [ "${ARCH}" = "powerpc64le" ] || [ "${ARCH}" = "ppc64le" ]
then
DIR="freebsd-powerpc64le"
fi

View File

@ -10,21 +10,17 @@ description: How to build ClickHouse on Mac OS X
You can install pre-built ClickHouse as described in [Quick Start](https://clickhouse.com/#quick-start). Follow **macOS (Intel)** or **macOS (Apple silicon)** installation instructions.
:::
Build should work on x86_64 (Intel) and arm64 (Apple silicon) based macOS 10.15 (Catalina) and higher with Homebrew's vanilla Clang.
It is always recommended to use vanilla `clang` compiler.
The build works on x86_64 (Intel) and arm64 (Apple Silicon) based on macOS 10.15 (Catalina) or higher with Homebrew's vanilla Clang.
:::note
It is possible to use XCode's `apple-clang` or `gcc`, but it's strongly discouraged.
It is also possible to compile with Apple's XCode `apple-clang` or Homebrew's `gcc`, but it's strongly discouraged.
:::
## Install Homebrew {#install-homebrew}
``` bash
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
# ...and follow the printed instructions on any additional steps required to complete the installation.
```
First install [Homebrew](https://brew.sh/)
## Install Xcode and Command Line Tools {#install-xcode-and-command-line-tools}
## For Apple's Clang (discouraged): Install Xcode and Command Line Tools {#install-xcode-and-command-line-tools}
Install the latest [Xcode](https://apps.apple.com/am/app/xcode/id497799835?mt=12) from App Store.
@ -57,12 +53,12 @@ To build using Homebrew's vanilla Clang compiler (the only **recommended** way):
``` bash
cd ClickHouse
rm -rf build
mkdir build
cd build
cmake -DCMAKE_C_COMPILER=$(brew --prefix llvm)/bin/clang -DCMAKE_CXX_COMPILER=$(brew --prefix llvm)/bin/clang++ -DCMAKE_AR=$(brew --prefix llvm)/bin/llvm-ar -DCMAKE_RANLIB=$(brew --prefix llvm)/bin/llvm-ranlib -DOBJCOPY_PATH=$(brew --prefix llvm)/bin/llvm-objcopy -DCMAKE_BUILD_TYPE=RelWithDebInfo ..
cmake --build . --config RelWithDebInfo
# The resulting binary will be created at: ./programs/clickhouse
export CC=$(brew --prefix llvm)/bin/clang
export CXX=$(brew --prefix llvm)/bin/clang++
cmake -G Ninja -DCMAKE_BUILD_TYPE=RelWithDebInfo -S . -B build
cmake --build build
# The resulting binary will be created at: build/programs/clickhouse
```
To build using Xcode's native AppleClang compiler in Xcode IDE (this option is only for development builds and workflows, and is **not recommended** unless you know what you are doing):
@ -82,12 +78,12 @@ To build using Homebrew's vanilla GCC compiler (this option is only for developm
``` bash
cd ClickHouse
rm -rf build
mkdir build
cd build
cmake -DCMAKE_C_COMPILER=$(brew --prefix gcc)/bin/gcc-11 -DCMAKE_CXX_COMPILER=$(brew --prefix gcc)/bin/g++-11 -DCMAKE_AR=$(brew --prefix gcc)/bin/gcc-ar-11 -DCMAKE_RANLIB=$(brew --prefix gcc)/bin/gcc-ranlib-11 -DOBJCOPY_PATH=$(brew --prefix binutils)/bin/objcopy -DCMAKE_BUILD_TYPE=RelWithDebInfo ..
cmake --build . --config RelWithDebInfo
# The resulting binary will be created at: ./programs/clickhouse
export CC=$(brew --prefix gcc)/bin/gcc-11
export CXX=$(brew --prefix gcc)/bin/g++-11
cmake -G Ninja -DCMAKE_BUILD_TYPE=RelWithDebInfo -S . -B build
cmake --build build
# The resulting binary will be created at: build/programs/clickhouse
```
## Caveats {#caveats}

View File

@ -1,5 +1,6 @@
---
sidebar_label: New York Taxi Data
sidebar_position: 2
description: Data for billions of taxi and for-hire vehicle (Uber, Lyft, etc.) trips originating in New York City since 2009
---

View File

@ -1,5 +1,6 @@
---
sidebar_label: UK Property Price Paid
sidebar_position: 1
---
# UK Property Price Paid

View File

@ -335,7 +335,7 @@ struct Checker
;
/// NOTE: We will migrate to full static linking or our own dynamic loader to make this code obsolete.
void checkHarmfulEnvironmentVariables()
void checkHarmfulEnvironmentVariables(char ** argv)
{
std::initializer_list<const char *> harmful_env_variables = {
/// The list is a selection from "man ld-linux".
@ -351,14 +351,39 @@ void checkHarmfulEnvironmentVariables()
"DYLD_INSERT_LIBRARIES",
};
bool require_reexec = false;
for (const auto * var : harmful_env_variables)
{
if (const char * value = getenv(var); value && value[0])
{
std::cerr << fmt::format("Environment variable {} is set to {}. It can compromise security.\n", var, value);
_exit(1);
/// NOTE: setenv() is used over unsetenv() since unsetenv() marked as harmful
if (setenv(var, "", true))
{
fmt::print(stderr, "Cannot override {} environment variable", var);
_exit(1);
}
require_reexec = true;
}
}
if (require_reexec)
{
/// Use execvp() over execv() to search in PATH.
///
/// This should be safe, since:
/// - if argv[0] is relative path - it is OK
/// - if argv[0] has only basename, the it will search in PATH, like shell will do.
///
/// Also note, that this (search in PATH) because there is no easy and
/// portable way to get absolute path of argv[0].
/// - on linux there is /proc/self/exec and AT_EXECFN
/// - but on other OSes there is no such thing (especially on OSX).
///
/// And since static linking will be done someday anyway,
/// let's not pollute the code base with special cases.
int error = execvp(argv[0], argv);
_exit(error);
}
}
}
@ -381,7 +406,7 @@ int main(int argc_, char ** argv_)
inside_main = true;
SCOPE_EXIT({ inside_main = false; });
checkHarmfulEnvironmentVariables();
checkHarmfulEnvironmentVariables(argv_);
/// Reset new handler to default (that throws std::bad_alloc)
/// It is needed because LLVM library clobbers it.

View File

@ -334,7 +334,12 @@ Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port
return socket_address;
}
Poco::Net::SocketAddress Server::socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure) const
Poco::Net::SocketAddress Server::socketBindListen(
const Poco::Util::AbstractConfiguration & config,
Poco::Net::ServerSocket & socket,
const std::string & host,
UInt16 port,
[[maybe_unused]] bool secure) const
{
auto address = makeSocketAddress(host, port, &logger());
#if !defined(POCO_CLICKHOUSE_PATCH) || POCO_VERSION < 0x01090100
@ -347,7 +352,7 @@ Poco::Net::SocketAddress Server::socketBindListen(Poco::Net::ServerSocket & sock
#if POCO_VERSION < 0x01080000
socket.bind(address, /* reuseAddress = */ true);
#else
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ config().getBool("listen_reuse_port", false));
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ config.getBool("listen_reuse_port", false));
#endif
/// If caller requests any available port from the OS, discover it after binding.
@ -357,7 +362,7 @@ Poco::Net::SocketAddress Server::socketBindListen(Poco::Net::ServerSocket & sock
LOG_DEBUG(&logger(), "Requested any available port (port == 0), actual port is {:d}", address.port());
}
socket.listen(/* backlog = */ config().getUInt("listen_backlog", 4096));
socket.listen(/* backlog = */ config.getUInt("listen_backlog", 4096));
return address;
}
@ -1237,7 +1242,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
auto address = socketBindListen(config(), socket, listen_host, port);
socket.setReceiveTimeout(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC));
socket.setSendTimeout(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC));
return ProtocolServerAdapter(
@ -1260,7 +1265,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
auto address = socketBindListen(config(), socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC));
socket.setSendTimeout(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC));
return ProtocolServerAdapter(
@ -1797,7 +1802,7 @@ void Server::createServers(
createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
@ -1815,7 +1820,7 @@ void Server::createServers(
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
return ProtocolServerAdapter(
@ -1836,7 +1841,7 @@ void Server::createServers(
createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
@ -1855,7 +1860,7 @@ void Server::createServers(
createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
@ -1875,7 +1880,7 @@ void Server::createServers(
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
@ -1899,7 +1904,7 @@ void Server::createServers(
createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
return ProtocolServerAdapter(
@ -1919,7 +1924,7 @@ void Server::createServers(
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
return ProtocolServerAdapter(
@ -1943,7 +1948,7 @@ void Server::createServers(
createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
@ -1957,7 +1962,7 @@ void Server::createServers(
createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
@ -1985,7 +1990,7 @@ void Server::createServers(
createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
return ProtocolServerAdapter(

View File

@ -67,7 +67,12 @@ protected:
private:
ContextMutablePtr global_context;
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = false) const;
Poco::Net::SocketAddress socketBindListen(
const Poco::Util::AbstractConfiguration & config,
Poco::Net::ServerSocket & socket,
const std::string & host,
UInt16 port,
[[maybe_unused]] bool secure = false) const;
using CreateServerFunc = std::function<ProtocolServerAdapter(UInt16)>;
void createServer(

View File

@ -92,6 +92,7 @@
M(FilesystemCacheReadBuffers, "Number of active cache buffers") \
M(CacheFileSegments, "Number of existing cache file segments") \
M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \
M(S3Requests, "S3 requests") \
namespace CurrentMetrics
{

View File

@ -400,7 +400,7 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
if (files[key].contains(offset))
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Cache already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}",
keyToStr(key), offset, size, dumpStructureUnlocked(key, cache_lock));
@ -609,7 +609,7 @@ void LRUFileCache::remove(const Key & key)
#endif
}
void LRUFileCache::remove(bool force_remove_unreleasable)
void LRUFileCache::remove()
{
/// Try remove all cached files by cache_base_path.
/// Only releasable file segments are evicted.
@ -626,7 +626,7 @@ void LRUFileCache::remove(bool force_remove_unreleasable)
ErrorCodes::LOGICAL_ERROR,
"Cache is in inconsistent state: LRU queue contains entries with no cache cell");
if (cell->releasable() || force_remove_unreleasable)
if (cell->releasable())
{
auto file_segment = cell->file_segment;
if (file_segment)
@ -647,7 +647,7 @@ void LRUFileCache::remove(
auto * cell = getCell(key, offset, cache_lock);
if (!cell)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "No cache cell for key: {}, offset: {}", keyToStr(key), offset);
throw Exception(ErrorCodes::LOGICAL_ERROR, "No cache cell for key: {}, offset: {}", keyToStr(key), offset);
if (cell->queue_iterator)
{

View File

@ -26,6 +26,7 @@ class IFileCache : private boost::noncopyable
{
friend class FileSegment;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
public:
using Key = UInt128;
@ -42,7 +43,7 @@ public:
virtual void remove(const Key & key) = 0;
virtual void remove(bool force_remove_unreleasable) = 0;
virtual void remove() = 0;
static bool isReadOnly();
@ -143,13 +144,11 @@ public:
FileSegments getSnapshot() const override;
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override;
void initialize() override;
void remove(const Key & key) override;
void remove(bool force_remove_unreleasable) override;
void remove() override;
std::vector<String> tryGetCachePaths(const Key & key) override;
@ -272,6 +271,8 @@ private:
void fillHolesWithEmptyFileSegments(
FileSegments & file_segments, const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, std::lock_guard<std::mutex> & cache_lock);
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override;
size_t getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
size_t getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;

View File

@ -107,8 +107,7 @@ String FileSegment::getOrSetDownloader()
{
std::lock_guard segment_lock(mutex);
if (detached)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot set downloader for a detached file segment");
assertNotDetached(segment_lock);
if (downloader_id.empty())
{
@ -132,6 +131,8 @@ void FileSegment::resetDownloader()
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
if (downloader_id.empty())
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "There is no downloader");
@ -209,7 +210,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
if (!isDownloader())
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})",
getCallerId(), downloader_id);
@ -224,7 +225,10 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
"Attempt to write {} bytes to offset: {}, but current download offset is {}",
size, offset_, download_offset);
assertNotDetached();
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
}
if (!cache_writer)
{
@ -273,9 +277,8 @@ void FileSegment::writeInMemory(const char * from, size_t size)
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
assertNotDetached();
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
if (cache_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer already initialized");
@ -311,7 +314,7 @@ size_t FileSegment::finalizeWrite()
if (size == 0)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed");
assertNotDetached();
assertNotDetached(segment_lock);
try
{
@ -342,6 +345,11 @@ FileSegment::State FileSegment::wait()
{
std::unique_lock segment_lock(mutex);
if (is_detached)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache file segment is in detached state, operation not allowed");
if (downloader_id.empty())
return download_state;
@ -366,14 +374,19 @@ bool FileSegment::reserve(size_t size)
if (!size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed");
assertNotDetached();
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
auto caller_id = getCallerId();
if (downloader_id != caller_id)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Space can be reserved only by downloader (current: {}, expected: {})", caller_id, downloader_id);
bool is_downloader = caller_id == downloader_id;
if (!is_downloader)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Space can be reserved only by downloader (current: {}, expected: {})",
caller_id, downloader_id);
}
if (downloaded_size + size > range().size())
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
@ -392,6 +405,7 @@ bool FileSegment::reserve(size_t size)
size_t size_to_reserve = size - free_space;
std::lock_guard cache_lock(cache->mutex);
bool reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock);
if (reserved)
@ -437,6 +451,8 @@ void FileSegment::completeBatchAndResetDownloader()
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
if (!isDownloaderImpl(segment_lock))
{
cv.notify_all();
@ -458,7 +474,7 @@ void FileSegment::complete(State state)
std::lock_guard cache_lock(cache->mutex);
std::lock_guard segment_lock(mutex);
assertNotDetached();
assertNotDetached(segment_lock);
bool is_downloader = isDownloaderImpl(segment_lock);
if (!is_downloader)
@ -501,12 +517,15 @@ void FileSegment::complete(State state)
void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
completeUnlocked(cache_lock, segment_lock);
}
void FileSegment::completeUnlocked(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
{
if (download_state == State::SKIP_CACHE || detached)
if (download_state == State::SKIP_CACHE || is_detached)
return;
if (isDownloaderImpl(segment_lock)
@ -516,7 +535,7 @@ void FileSegment::completeUnlocked(std::lock_guard<std::mutex> & cache_lock, std
setDownloaded(segment_lock);
}
assertNotDetached();
assertNotDetached(segment_lock);
if (download_state == State::DOWNLOADING || download_state == State::EMPTY)
{
@ -589,6 +608,7 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lo
downloader_id.clear();
}
LOG_TEST(log, "Completed file segment: {}", getInfoForLogImpl(segment_lock));
assertCorrectnessImpl(segment_lock);
}
@ -649,15 +669,40 @@ void FileSegment::assertCorrectnessImpl(std::lock_guard<std::mutex> & /* segment
assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(cache->getPathInLocalCache(key(), offset())) > 0);
}
void FileSegment::assertNotDetached() const
void FileSegment::throwIfDetached() const
{
if (detached)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Operation not allowed, file segment is detached");
std::lock_guard segment_lock(mutex);
throwIfDetachedUnlocked(segment_lock);
}
void FileSegment::assertDetachedStatus(std::lock_guard<std::mutex> & /* segment_lock */) const
void FileSegment::throwIfDetachedUnlocked(std::lock_guard<std::mutex> & segment_lock) const
{
assert(download_state == State::EMPTY || hasFinalizedState());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache file segment is in detached state, operation not allowed. "
"It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. "
"Please, retry. File segment info: {}", getInfoForLogImpl(segment_lock));
}
void FileSegment::assertNotDetached(std::lock_guard<std::mutex> & segment_lock) const
{
if (is_detached)
throwIfDetachedUnlocked(segment_lock);
}
void FileSegment::assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const
{
/// Detached file segment is allowed to have only a certain subset of states.
/// It should be either EMPTY or one of the finalized states.
if (download_state != State::EMPTY && !hasFinalizedState())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Detached file segment has incorrect state: {}",
getInfoForLogImpl(segment_lock));
}
}
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & /* cache_lock */)
@ -684,29 +729,35 @@ bool FileSegment::hasFinalizedState() const
|| download_state == State::SKIP_CACHE;
}
void FileSegment::detach(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
void FileSegment::detach(
std::lock_guard<std::mutex> & /* cache_lock */,
std::lock_guard<std::mutex> & segment_lock)
{
if (detached)
/// Now detached status can be in 2 cases, which do not do any complex logic:
/// 1. there is only 1 remaining file segment holder
/// && it does not need this segment anymore
/// && this file segment was in cache and needs to be removed
/// 2. in read_from_cache_if_exists_otherwise_bypass_cache case
if (is_detached)
return;
markAsDetached(segment_lock);
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
downloader_id.clear();
if (!hasFinalizedState())
{
completeUnlocked(cache_lock, segment_lock);
}
LOG_TEST(log, "Detached file segment: {}", getInfoForLogImpl(segment_lock));
}
void FileSegment::markAsDetached(std::lock_guard<std::mutex> & /* segment_lock */)
{
detached = true;
is_detached = true;
CurrentMetrics::add(CurrentMetrics::CacheDetachedFileSegments);
}
FileSegment::~FileSegment()
{
std::lock_guard segment_lock(mutex);
if (detached)
if (is_detached)
CurrentMetrics::sub(CurrentMetrics::CacheDetachedFileSegments);
}
@ -726,15 +777,18 @@ FileSegmentsHolder::~FileSegmentsHolder()
if (!cache)
cache = file_segment->cache;
try
{
bool detached = false;
bool is_detached = false;
{
std::lock_guard segment_lock(file_segment->mutex);
detached = file_segment->isDetached(segment_lock);
if (detached)
is_detached = file_segment->isDetached(segment_lock);
if (is_detached)
file_segment->assertDetachedStatus(segment_lock);
}
if (detached)
if (is_detached)
{
/// This file segment is not owned by cache, so it will be destructed
/// at this point, therefore no completion required.
@ -742,10 +796,6 @@ FileSegmentsHolder::~FileSegmentsHolder()
continue;
}
}
try
{
/// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers.
std::lock_guard cache_lock(cache->mutex);
@ -757,7 +807,6 @@ FileSegmentsHolder::~FileSegmentsHolder()
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
assert(false);
}
}
}
@ -774,5 +823,4 @@ String FileSegmentsHolder::toString()
return ranges;
}
}

View File

@ -25,8 +25,10 @@ using FileSegments = std::list<FileSegmentPtr>;
class FileSegment : boost::noncopyable
{
friend class LRUFileCache;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
public:
using Key = UInt128;
@ -149,9 +151,15 @@ public:
void assertCorrectness() const;
static FileSegmentPtr getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & cache_lock);
static FileSegmentPtr getSnapshot(
const FileSegmentPtr & file_segment,
std::lock_guard<std::mutex> & cache_lock);
void detach(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
void detach(
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock);
[[noreturn]] void throwIfDetached() const;
private:
size_t availableSize() const { return reserved_size - downloaded_size; }
@ -159,11 +167,14 @@ private:
size_t getDownloadedSize(std::lock_guard<std::mutex> & segment_lock) const;
String getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock) const;
void assertCorrectnessImpl(std::lock_guard<std::mutex> & segment_lock) const;
void assertNotDetached() const;
void assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const;
bool hasFinalizedState() const;
bool isDetached(std::lock_guard<std::mutex> & /* segment_lock */) const { return detached; }
bool isDetached(std::lock_guard<std::mutex> & /* segment_lock */) const { return is_detached; }
void markAsDetached(std::lock_guard<std::mutex> & segment_lock);
[[noreturn]] void throwIfDetachedUnlocked(std::lock_guard<std::mutex> & segment_lock) const;
void assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const;
void assertNotDetached(std::lock_guard<std::mutex> & segment_lock) const;
void setDownloaded(std::lock_guard<std::mutex> & segment_lock);
void setDownloadFailed(std::lock_guard<std::mutex> & segment_lock);
@ -197,6 +208,10 @@ private:
size_t downloaded_size = 0;
size_t reserved_size = 0;
/// global locking order rule:
/// 1. cache lock
/// 2. segment lock
mutable std::mutex mutex;
std::condition_variable cv;
@ -215,7 +230,7 @@ private:
/// "detached" file segment means that it is not owned by cache ("detached" from cache).
/// In general case, all file segments are owned by cache.
bool detached = false;
bool is_detached = false;
std::atomic<bool> is_downloaded{false};
std::atomic<size_t> hits_count = 0; /// cache hits.
@ -227,6 +242,7 @@ private:
struct FileSegmentsHolder : private boost::noncopyable
{
explicit FileSegmentsHolder(FileSegments && file_segments_) : file_segments(std::move(file_segments_)) {}
FileSegmentsHolder(FileSegmentsHolder && other) noexcept : file_segments(std::move(other.file_segments)) {}
~FileSegmentsHolder();

View File

@ -260,10 +260,12 @@
\
M(QueryMemoryLimitExceeded, "Number of times when memory limit exceeded for query.") \
\
M(RemoteFSReadMicroseconds, "Time of reading from remote filesystem.") \
M(RemoteFSReadBytes, "Read bytes from remote filesystem.") \
M(RemoteFSCacheReadBytes, "Read bytes from cache of remote filesystem.") \
M(RemoteFSCacheDownloadBytes, "Bytes downloaded to cache from remote filesystem.") \
M(CachedReadBufferReadFromSourceMicroseconds, "Time reading from filesystem cache source (from remote filesystem, etc)") \
M(CachedReadBufferReadFromCacheMicroseconds, "Time reading from filesystem cache") \
M(CachedReadBufferReadFromSourceBytes, "Bytes read from filesystem cache source (from remote fs, etc)") \
M(CachedReadBufferReadFromCacheBytes, "Bytes read from filesystem cache") \
M(CachedReadBufferCacheWriteBytes, "Bytes written from source (remote fs, etc) to filesystem cache") \
M(CachedReadBufferCacheWriteMicroseconds, "Time spent writing data into filesystem cache") \
\
M(RemoteFSSeeks, "Total number of seeks for async buffer") \
M(RemoteFSPrefetches, "Number of prefetches made with asynchronous reading from remote filesystem") \
@ -275,6 +277,15 @@
M(RemoteFSSeeksWithReset, "Number of seeks which lead to a new connection") \
M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \
\
M(ThreadpoolReaderTaskMicroseconds, "Time spent getting the data in asynchronous reading") \
M(ThreadpoolReaderReadBytes, "Bytes read from a threadpool task in asynchronous reading") \
\
M(FileSegmentWaitReadBufferMicroseconds, "Metric per file segment. Time spend waiting for internal read buffer (includes cache waiting)") \
M(FileSegmentReadMicroseconds, "Metric per file segment. Time spend reading from file") \
M(FileSegmentCacheWriteMicroseconds, "Metric per file segment. Time spend writing data to cache") \
M(FileSegmentPredownloadMicroseconds, "Metric per file segment. Time spent predownloading data to cache (predownloading - finishing file segment download (after someone who failed to do that) up to the point current thread was requested to do)") \
M(FileSegmentUsedBytes, "Metric per file segment. How many bytes were actually used from current file segment") \
\
M(ReadBufferSeekCancelConnection, "Number of seeks which lead to new connection (s3, http)") \
\
M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \

View File

@ -119,9 +119,9 @@ TEST(LRUFileCache, get)
assertRange(1, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::EMPTY);
/// Exception because space not reserved.
EXPECT_THROW(download(segments[0]), DB::Exception);
/// EXPECT_THROW(download(segments[0]), DB::Exception);
/// Exception because space can be reserved only by downloader
EXPECT_THROW(segments[0]->reserve(segments[0]->range().size()), DB::Exception);
/// EXPECT_THROW(segments[0]->reserve(segments[0]->range().size()), DB::Exception);
ASSERT_TRUE(segments[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segments[0]->reserve(segments[0]->range().size()));

View File

@ -449,6 +449,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, optimize_normalize_count_variants, true, "Rewrite aggregate functions that semantically equals to count() as count().", 0) \
M(Bool, optimize_injective_functions_inside_uniq, true, "Delete injective functions of one argument inside uniq*() functions.", 0) \
M(Bool, convert_query_to_cnf, false, "Convert SELECT query to CNF", 0) \
M(Bool, optimize_or_like_chain, false, "Optimize multiple OR LIKE into multiMatchAny. This optimization should not be enabled by default, because it defies index analysis in some cases.", 0) \
M(Bool, optimize_arithmetic_operations_in_aggregate_functions, true, "Move arithmetic operations out of aggregation functions", 0) \
M(Bool, optimize_duplicate_order_by_and_distinct, true, "Remove duplicate ORDER BY and DISTINCT if it's possible", 0) \
M(Bool, optimize_redundant_functions_in_order_by, true, "Remove functions from ORDER BY if its argument is also in ORDER BY", 0) \

View File

@ -168,6 +168,9 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
if (!hasPendingDataToRead())
return false;
Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
size_t size = 0;
if (prefetch_future.valid())
{
@ -175,15 +178,13 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
size_t offset = 0;
{
Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
auto result = prefetch_future.get();
size = result.size;
offset = result.offset;
LOG_TEST(log, "Current size: {}, offset: {}", size, offset);
/// If prefetch_future is valid, size should always be greater than zero.
assert(offset < size);
assert(offset <= size);
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
}
@ -200,7 +201,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
auto offset = result.offset;
LOG_TEST(log, "Current size: {}, offset: {}", size, offset);
assert(offset < size);
assert(offset <= size);
if (size)
{
@ -209,6 +210,9 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
}
}
watch.stop();
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
assert(file_offset_of_buffer_end == impl->getImplementationBufferOffset());

View File

@ -5,13 +5,23 @@
#include <base/scope_guard.h>
#include <Common/assert_cast.h>
#include <Common/hex.h>
#include <Common/getRandomASCIIString.h>
namespace ProfileEvents
{
extern const Event RemoteFSReadBytes;
extern const Event RemoteFSCacheReadBytes;
extern const Event RemoteFSCacheDownloadBytes;
extern const Event FileSegmentWaitReadBufferMicroseconds;
extern const Event FileSegmentReadMicroseconds;
extern const Event FileSegmentCacheWriteMicroseconds;
extern const Event FileSegmentPredownloadMicroseconds;
extern const Event FileSegmentUsedBytes;
extern const Event CachedReadBufferReadFromSourceMicroseconds;
extern const Event CachedReadBufferReadFromCacheMicroseconds;
extern const Event CachedReadBufferCacheWriteMicroseconds;
extern const Event CachedReadBufferReadFromSourceBytes;
extern const Event CachedReadBufferReadFromCacheBytes;
extern const Event CachedReadBufferCacheWriteBytes;
}
namespace DB
@ -44,6 +54,7 @@ CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
, remote_file_reader_creator(remote_file_reader_creator_)
, query_id(query_id_)
, enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log)
, current_buffer_id(getRandomASCIIString(8))
{
}
@ -56,10 +67,15 @@ void CachedReadBufferFromRemoteFS::appendFilesystemCacheLog(
.query_id = query_id,
.source_file_path = remote_fs_object_path,
.file_segment_range = { file_segment_range.left, file_segment_range.right },
.requested_range = { first_offset, read_until_position },
.file_segment_size = file_segment_range.size(),
.cache_attempted = true,
.read_buffer_id = current_buffer_id,
.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(current_file_segment_counters.getPartiallyAtomicSnapshot()),
};
current_file_segment_counters.reset();
switch (type)
{
case CachedReadBufferFromRemoteFS::ReadType::CACHED:
@ -104,9 +120,16 @@ void CachedReadBufferFromRemoteFS::initialize(size_t offset, size_t size)
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getCacheReadBuffer(size_t offset) const
{
auto path = cache->getPathInLocalCache(cache_key, offset);
auto buf = std::make_shared<ReadBufferFromFile>(path, settings.local_fs_buffer_size);
if (buf->size() == 0)
ReadSettings local_read_settings{settings};
/// Do not allow to use asynchronous version of LocalFSReadMethod.
local_read_settings.local_fs_method = LocalFSReadMethod::pread;
auto buf = createReadBufferFromFileBase(path, local_read_settings);
auto from_fd = dynamic_cast<ReadBufferFromFileDescriptor*>(buf.get());
if (from_fd && from_fd->size() == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read from an empty cache file: {}", path);
return buf;
}
@ -335,8 +358,13 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
auto range = file_segment->range();
bytes_to_predownload = 0;
Stopwatch watch(CLOCK_MONOTONIC);
auto read_buffer_for_file_segment = getReadBufferForFileSegment(file_segment);
watch.stop();
current_file_segment_counters.increment(ProfileEvents::FileSegmentWaitReadBufferMicroseconds, watch.elapsedMicroseconds());
[[maybe_unused]] auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
assert(download_current_segment == file_segment->isDownloader());
@ -357,7 +385,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
case ReadType::CACHED:
{
#ifndef NDEBUG
auto * file_reader = assert_cast<ReadBufferFromFile *>(read_buffer_for_file_segment.get());
auto * file_reader = dynamic_cast<ReadBufferFromFileDescriptor *>(read_buffer_for_file_segment.get());
size_t file_size = file_reader->size();
if (file_size == 0 || range.left + file_size <= file_offset_of_buffer_end)
@ -431,6 +459,9 @@ bool CachedReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
{
LOG_TEST(log, "Completed segment: {}", (*current_file_segment_it)->range().toString());
if (enable_logging)
appendFilesystemCacheLog((*current_file_segment_it)->range(), read_type);
auto file_segment_it = current_file_segment_it++;
auto & file_segment = *file_segment_it;
@ -455,15 +486,29 @@ bool CachedReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
if (read_type == ReadType::CACHED)
(*current_file_segment_it)->incrementHitsCount();
if (enable_logging)
appendFilesystemCacheLog((*current_file_segment_it)->range(), read_type);
LOG_TEST(log, "New segment: {}", (*current_file_segment_it)->range().toString());
return true;
}
CachedReadBufferFromRemoteFS::~CachedReadBufferFromRemoteFS()
{
if (enable_logging
&& file_segments_holder
&& current_file_segment_it != file_segments_holder->file_segments.end())
{
appendFilesystemCacheLog((*current_file_segment_it)->range(), read_type);
}
}
void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
{
Stopwatch predownload_watch(CLOCK_MONOTONIC);
SCOPE_EXIT({
predownload_watch.stop();
current_file_segment_counters.increment(ProfileEvents::FileSegmentPredownloadMicroseconds, predownload_watch.elapsedMicroseconds());
});
if (bytes_to_predownload)
{
/// Consider this case. Some user needed segment [a, b] and downloaded it partially.
@ -479,7 +524,19 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
while (true)
{
if (!bytes_to_predownload || implementation_buffer->eof())
bool has_more_data;
{
Stopwatch watch(CLOCK_MONOTONIC);
has_more_data = !implementation_buffer->eof();
watch.stop();
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentReadMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceMicroseconds, elapsed);
}
if (!bytes_to_predownload || !has_more_data)
{
if (bytes_to_predownload)
throw Exception(
@ -518,7 +575,7 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
size_t current_impl_buffer_size = implementation_buffer->buffer().size();
size_t current_predownload_size = std::min(current_impl_buffer_size, bytes_to_predownload);
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, current_impl_buffer_size);
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size);
if (file_segment->reserve(current_predownload_size))
{
@ -526,8 +583,15 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
Stopwatch watch(CLOCK_MONOTONIC);
file_segment->write(implementation_buffer->buffer().begin(), current_predownload_size, current_offset);
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, current_predownload_size);
watch.stop();
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, current_predownload_size);
current_offset += current_predownload_size;
@ -663,18 +727,18 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
return false;
SCOPE_EXIT({
/// Save state of current file segment before it is completed.
nextimpl_step_log_info = getInfoForLog();
if (current_file_segment_it == file_segments_holder->file_segments.end())
return;
auto & file_segment = *current_file_segment_it;
bool download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
if (download_current_segment)
try
{
try
/// Save state of current file segment before it is completed.
nextimpl_step_log_info = getInfoForLog();
if (current_file_segment_it == file_segments_holder->file_segments.end())
return;
auto & file_segment = *current_file_segment_it;
bool download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
if (download_current_segment)
{
bool need_complete_file_segment = file_segment->isDownloader();
if (need_complete_file_segment)
@ -683,13 +747,13 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
file_segment->completeBatchAndResetDownloader();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
assert(!file_segment->isDownloader());
assert(!file_segment->isDownloader());
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});
bytes_to_predownload = 0;
@ -706,9 +770,6 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
if (read_type == ReadType::CACHED)
(*current_file_segment_it)->incrementHitsCount();
if (enable_logging)
appendFilesystemCacheLog((*current_file_segment_it)->range(), read_type);
}
assert(!internal_buffer.empty());
@ -742,18 +803,17 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
if (download_current_segment != file_segment->isDownloader())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect segment state. Having read type: {}, Caller id: {}, downloader id: {}, file segment state: {}",
toString(read_type),
file_segment->getCallerId(),
file_segment->getDownloader(),
file_segment->state());
"Incorrect segment state. Having read type: {}, file segment info: {}",
toString(read_type), file_segment->getInfoForLog());
}
if (!result)
{
#ifndef NDEBUG
if (auto * cache_file_reader = typeid_cast<ReadBufferFromFile *>(implementation_buffer.get()))
if (auto * cache_file_reader = dynamic_cast<ReadBufferFromFileDescriptor *>(implementation_buffer.get()))
{
auto cache_file_size = cache_file_reader->size();
if (cache_file_size == 0)
@ -762,13 +822,26 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
}
#endif
Stopwatch watch(CLOCK_MONOTONIC);
result = implementation_buffer->next();
watch.stop();
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentReadMicroseconds, elapsed);
size = implementation_buffer->buffer().size();
if (read_type == ReadType::CACHED)
ProfileEvents::increment(ProfileEvents::RemoteFSCacheReadBytes, size);
{
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromCacheBytes, size);
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromCacheMicroseconds, elapsed);
}
else
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, size);
{
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, size);
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceMicroseconds, elapsed);
}
}
if (result)
@ -781,12 +854,18 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
{
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
Stopwatch watch(CLOCK_MONOTONIC);
file_segment->write(
needed_to_predownload ? implementation_buffer->position() : implementation_buffer->buffer().begin(),
size,
file_offset_of_buffer_end);
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, size);
watch.stop();
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size);
assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1);
assert(
@ -814,10 +893,13 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
}
file_offset_of_buffer_end += size;
}
swap(*implementation_buffer);
current_file_segment_counters.increment(ProfileEvents::FileSegmentUsedBytes, available());
if (download_current_segment)
file_segment->completeBatchAndResetDownloader();
@ -846,7 +928,7 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
if (size == 0 && file_offset_of_buffer_end < read_until_position)
{
std::optional<size_t> cache_file_size;
if (auto * cache_file_reader = dynamic_cast<ReadBufferFromFile *>(implementation_buffer.get()))
if (auto * cache_file_reader = dynamic_cast<ReadBufferFromFileDescriptor *>(implementation_buffer.get()))
cache_file_size = cache_file_reader->size();
throw Exception(

View File

@ -29,6 +29,8 @@ public:
const String & query_id_,
size_t read_until_position_);
~CachedReadBufferFromRemoteFS() override;
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
@ -117,8 +119,10 @@ private:
String query_id;
bool enable_logging = false;
String current_buffer_id;
CurrentMetrics::Increment metric_increment{CurrentMetrics::FilesystemCacheReadBuffers};
ProfileEvents::Counters current_file_segment_counters;
};
}

View File

@ -16,8 +16,8 @@
namespace ProfileEvents
{
extern const Event RemoteFSReadMicroseconds;
extern const Event RemoteFSReadBytes;
extern const Event ThreadpoolReaderTaskMicroseconds;
extern const Event ThreadpoolReaderReadBytes;
}
namespace CurrentMetrics
@ -83,8 +83,8 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
watch.stop();
ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, result.offset ? result.size - result.offset : result.size);
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.offset ? result.size - result.offset : result.size);
thread_status.detachQuery(/* if_not_detached */true);

View File

@ -63,7 +63,7 @@ Aws::Client::ClientConfigurationPerRequest ProxyResolverConfiguration::getConfig
{
auto resolved_endpoint = endpoint;
resolved_endpoint.setHost(resolved_hosts[i].toString());
session = makeHTTPSession(endpoint, timeouts, false);
session = makeHTTPSession(resolved_endpoint, timeouts, false);
try
{

View File

@ -1,7 +1,7 @@
include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_headers_and_sources(clickhouse_functions_array .)
add_library(clickhouse_functions_array ${clickhouse_functions_array_sources} ${clickhouse_functions_array_headers})
target_link_libraries(clickhouse_functions_array PRIVATE dbms clickhouse_functions_gatherutils)
target_link_libraries(clickhouse_functions_array PRIVATE dbms clickhouse_functions_gatherutils ch_contrib::eigen)
if (STRIP_DEBUG_SYMBOLS_FUNCTIONS)
target_compile_options(clickhouse_functions_array PRIVATE "-g0")

View File

@ -0,0 +1,247 @@
#include <Columns/ColumnArray.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Eigen/Core>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int LOGICAL_ERROR;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
}
template <const int N>
struct LpDistance
{
static inline String name = "L" + std::to_string(N);
template <typename T>
static void compute(const Eigen::MatrixX<T> & left, const Eigen::MatrixX<T> & right, PaddedPODArray<T> & array)
{
auto norms = (left - right).colwise().template lpNorm<N>();
array.reserve(norms.size());
// array.insert() failed to work with Eigen iterators
for (auto n : norms)
array.push_back(n);
}
};
struct LinfDistance : LpDistance<Eigen::Infinity>
{
static inline String name = "Linf";
};
struct CosineDistance
{
static inline String name = "Cosine";
template <typename T>
static void compute(const Eigen::MatrixX<T> & left, const Eigen::MatrixX<T> & right, PaddedPODArray<T> & array)
{
auto prod = left.cwiseProduct(right).colwise().sum();
auto nx = left.colwise().norm();
auto ny = right.colwise().norm();
auto nm = nx.cwiseProduct(ny).cwiseInverse();
auto dist = 1.0 - prod.cwiseProduct(nm).array();
array.reserve(dist.size());
for (auto d : dist)
array.push_back(d);
}
};
template <class Kernel>
class FunctionArrayDistance : public IFunction
{
public:
static inline auto name = "array" + Kernel::name + "Distance";
String getName() const override { return name; }
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayDistance<Kernel>>(); }
size_t getNumberOfArguments() const override { return 2; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
DataTypes types;
for (const auto & argument : arguments)
{
const auto * array_type = checkAndGetDataType<DataTypeArray>(argument.type.get());
if (!array_type)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Argument of function {} must be array.", getName());
types.push_back(array_type->getNestedType());
}
const auto & common_type = getLeastSupertype(types);
switch (common_type->getTypeId())
{
case TypeIndex::UInt8:
case TypeIndex::UInt16:
case TypeIndex::UInt32:
case TypeIndex::Int8:
case TypeIndex::Int16:
case TypeIndex::Int32:
case TypeIndex::Float32:
return std::make_shared<DataTypeFloat32>();
case TypeIndex::UInt64:
case TypeIndex::Int64:
case TypeIndex::Float64:
return std::make_shared<DataTypeFloat64>();
default:
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Arguments of function {} has nested type {}. "
"Support: UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64.",
getName(), common_type->getName());
}
}
ColumnPtr
executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const override
{
DataTypePtr type_x = typeid_cast<const DataTypeArray *>(arguments[0].type.get())->getNestedType();
DataTypePtr type_y = typeid_cast<const DataTypeArray *>(arguments[1].type.get())->getNestedType();
ColumnPtr col_x = arguments[0].column->convertToFullColumnIfConst();
ColumnPtr col_y = arguments[1].column->convertToFullColumnIfConst();
const auto * arr_x = assert_cast<const ColumnArray *>(col_x.get());
const auto * arr_y = assert_cast<const ColumnArray *>(col_y.get());
auto result = result_type->createColumn();
switch (result_type->getTypeId())
{
case TypeIndex::Float32:
executeWithType<Float32>(*arr_x, *arr_y, type_x, type_y, result);
break;
case TypeIndex::Float64:
executeWithType<Float64>(*arr_x, *arr_y, type_x, type_y, result);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected result type.");
}
return result;
}
private:
template <typename MatrixType>
void executeWithType(
const ColumnArray & array_x,
const ColumnArray & array_y,
const DataTypePtr & type_x,
const DataTypePtr & type_y,
MutableColumnPtr & column) const
{
Eigen::MatrixX<MatrixType> mx, my;
columnToMatrix(array_x, type_x, mx);
columnToMatrix(array_y, type_y, my);
if (mx.rows() && my.rows() && mx.rows() != my.rows())
{
throw Exception(
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH,
"Arguments of function {} have different array sizes: {} and {}",
getName(), mx.rows(), my.rows());
}
auto & data = assert_cast<ColumnVector<MatrixType> &>(*column).getData();
Kernel::compute(mx, my, data);
}
template <typename MatrixType>
void columnToMatrix(const ColumnArray & array, const DataTypePtr & nested_type, Eigen::MatrixX<MatrixType> & mat) const
{
const auto & offsets = array.getOffsets();
size_t cols = offsets.size();
size_t rows = cols > 0 ? offsets.front() : 0;
ColumnArray::Offset prev = 0;
for (ColumnArray::Offset off : offsets)
{
if (off - prev != rows)
throw Exception(
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH,
"Arrays in a column passed to function {} have different sizes: {} and {}",
getName(), rows, off - prev);
prev = off;
}
switch (nested_type->getTypeId())
{
case TypeIndex::UInt8:
fillMatrix<MatrixType, UInt8>(mat, array, rows, cols);
break;
case TypeIndex::UInt16:
fillMatrix<MatrixType, UInt16>(mat, array, rows, cols);
break;
case TypeIndex::UInt32:
fillMatrix<MatrixType, UInt32>(mat, array, rows, cols);
break;
case TypeIndex::UInt64:
fillMatrix<MatrixType, UInt64>(mat, array, rows, cols);
break;
case TypeIndex::Int8:
fillMatrix<MatrixType, Int8>(mat, array, rows, cols);
break;
case TypeIndex::Int16:
fillMatrix<MatrixType, Int16>(mat, array, rows, cols);
break;
case TypeIndex::Int32:
fillMatrix<MatrixType, Int32>(mat, array, rows, cols);
break;
case TypeIndex::Int64:
fillMatrix<MatrixType, Int64>(mat, array, rows, cols);
break;
case TypeIndex::Float32:
fillMatrix<MatrixType, Float32>(mat, array, rows, cols);
break;
case TypeIndex::Float64:
fillMatrix<MatrixType, Float64>(mat, array, rows, cols);
break;
default:
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Arguments of function {} has nested type {}. "
"Support: UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64.",
getName(), nested_type->getName());
}
}
// optimize for float/ double
template <typename MatrixType, typename DataType>
requires std::is_same_v<MatrixType, DataType>
void fillMatrix(Eigen::MatrixX<MatrixType> & mat, const ColumnArray & array, size_t rows, size_t cols) const
{
const auto & data = typeid_cast<const ColumnVector<DataType> &>(array.getData()).getData();
mat = Eigen::Map<const Eigen::MatrixX<MatrixType>>(data.data(), rows, cols);
}
template <typename MatrixType, typename DataType>
void fillMatrix(Eigen::MatrixX<MatrixType> & mat, const ColumnArray & array, size_t rows, size_t cols) const
{
const auto & data = typeid_cast<const ColumnVector<DataType> &>(array.getData()).getData();
mat.resize(rows, cols);
for (size_t col = 0; col < cols; ++col)
{
for (size_t row = 0; row < rows; ++row)
{
size_t off = col * rows;
mat(row, col) = static_cast<MatrixType>(data[off + row]);
}
}
}
};
void registerFunctionArrayDistance(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayDistance<LpDistance<1>>>();
factory.registerFunction<FunctionArrayDistance<LpDistance<2>>>();
factory.registerFunction<FunctionArrayDistance<LinfDistance>>();
factory.registerFunction<FunctionArrayDistance<CosineDistance>>();
}
}

View File

@ -0,0 +1,205 @@
#include <Columns/ColumnArray.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Eigen/Core>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int LOGICAL_ERROR;
}
template <const int N>
struct LpNorm
{
static inline String name = "L" + std::to_string(N);
template <typename T>
static void compute(const std::vector<Eigen::VectorX<T>> & vec, PaddedPODArray<T> & array)
{
array.reserve(vec.size());
for (const auto & v : vec)
{
array.push_back(v.template lpNorm<N>());
}
}
};
struct LinfNorm : LpNorm<Eigen::Infinity>
{
static inline String name = "Linf";
};
template <class Kernel>
class FunctionArrayNorm : public IFunction
{
public:
static inline auto name = "array" + Kernel::name + "Norm";
String getName() const override { return name; }
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayNorm<Kernel>>(); }
size_t getNumberOfArguments() const override { return 1; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
DataTypes types;
for (const auto & argument : arguments)
{
const auto * array_type = checkAndGetDataType<DataTypeArray>(argument.type.get());
if (!array_type)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Argument of function {} must be array.", getName());
types.push_back(array_type->getNestedType());
}
const auto & common_type = getLeastSupertype(types);
switch (common_type->getTypeId())
{
case TypeIndex::UInt8:
case TypeIndex::UInt16:
case TypeIndex::UInt32:
case TypeIndex::Int8:
case TypeIndex::Int16:
case TypeIndex::Int32:
case TypeIndex::Float32:
return std::make_shared<DataTypeFloat32>();
case TypeIndex::UInt64:
case TypeIndex::Int64:
case TypeIndex::Float64:
return std::make_shared<DataTypeFloat64>();
default:
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Arguments of function {} has nested type {}. "
"Support: UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64.",
getName(), common_type->getName());
}
}
ColumnPtr
executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const override
{
DataTypePtr type = typeid_cast<const DataTypeArray *>(arguments[0].type.get())->getNestedType();
ColumnPtr column = arguments[0].column->convertToFullColumnIfConst();
const auto * arr = assert_cast<const ColumnArray *>(column.get());
auto result = result_type->createColumn();
switch (result_type->getTypeId())
{
case TypeIndex::Float32:
executeWithType<Float32>(*arr, type, result);
break;
case TypeIndex::Float64:
executeWithType<Float64>(*arr, type, result);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected result type.");
}
return result;
}
private:
template <typename MatrixType>
void executeWithType(const ColumnArray & array, const DataTypePtr & type, MutableColumnPtr & column) const
{
std::vector<Eigen::VectorX<MatrixType>> vec;
columnToVectors(array, type, vec);
auto & data = assert_cast<ColumnVector<MatrixType> &>(*column).getData();
Kernel::compute(vec, data);
}
template <typename MatrixType>
void columnToVectors(const ColumnArray & array, const DataTypePtr & nested_type, std::vector<Eigen::VectorX<MatrixType>> & vec) const
{
switch (nested_type->getTypeId())
{
case TypeIndex::UInt8:
fillVectors<MatrixType, UInt8>(vec, array);
break;
case TypeIndex::UInt16:
fillVectors<MatrixType, UInt16>(vec, array);
break;
case TypeIndex::UInt32:
fillVectors<MatrixType, UInt32>(vec, array);
break;
case TypeIndex::UInt64:
fillVectors<MatrixType, UInt64>(vec, array);
break;
case TypeIndex::Int8:
fillVectors<MatrixType, Int8>(vec, array);
break;
case TypeIndex::Int16:
fillVectors<MatrixType, Int16>(vec, array);
break;
case TypeIndex::Int32:
fillVectors<MatrixType, Int32>(vec, array);
break;
case TypeIndex::Int64:
fillVectors<MatrixType, Int64>(vec, array);
break;
case TypeIndex::Float32:
fillVectors<MatrixType, Float32>(vec, array);
break;
case TypeIndex::Float64:
fillVectors<MatrixType, Float64>(vec, array);
break;
default:
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Arguments of function {} has nested type {}. "
"Support: UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64.",
getName(), nested_type->getName());
}
}
template <typename MatrixType, typename DataType>
requires std::is_same_v<MatrixType, DataType>
void fillVectors(std::vector<Eigen::VectorX<MatrixType>> & vec, const ColumnArray & array) const
{
const auto & data = typeid_cast<const ColumnVector<DataType> &>(array.getData()).getData();
const auto & offsets = array.getOffsets();
vec.reserve(offsets.size());
ColumnArray::Offset prev = 0;
for (auto off : offsets)
{
vec.emplace_back(Eigen::Map<const Eigen::VectorX<MatrixType>>(data.data() + prev, off - prev));
prev = off;
}
}
template <typename MatrixType, typename DataType>
void fillVectors(std::vector<Eigen::VectorX<MatrixType>> & vec, const ColumnArray & array) const
{
const auto & data = typeid_cast<const ColumnVector<DataType> &>(array.getData()).getData();
const auto & offsets = array.getOffsets();
vec.reserve(offsets.size());
ColumnArray::Offset prev = 0;
for (auto off : offsets)
{
Eigen::VectorX<MatrixType> mat(off - prev);
for (ColumnArray::Offset row = 0; row + prev < off; ++row)
{
mat[row] = static_cast<MatrixType>(data[prev + row]);
}
prev = off;
vec.emplace_back(mat);
}
}
};
void registerFunctionArrayNorm(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayNorm<LpNorm<1>>>();
factory.registerFunction<FunctionArrayNorm<LpNorm<2>>>();
factory.registerFunction<FunctionArrayNorm<LinfNorm>>();
}
}

View File

@ -37,6 +37,8 @@ void registerFunctionArrayAUC(FunctionFactory &);
void registerFunctionArrayReduceInRanges(FunctionFactory &);
void registerFunctionMapOp(FunctionFactory &);
void registerFunctionMapPopulateSeries(FunctionFactory &);
void registerFunctionArrayDistance(FunctionFactory &);
void registerFunctionArrayNorm(FunctionFactory &);
void registerFunctionsArray(FunctionFactory & factory)
{
@ -75,6 +77,8 @@ void registerFunctionsArray(FunctionFactory & factory)
registerFunctionArrayAUC(factory);
registerFunctionMapOp(factory);
registerFunctionMapPopulateSeries(factory);
registerFunctionArrayDistance(factory);
registerFunctionArrayNorm(factory);
}
}

View File

@ -11,6 +11,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Common/typeid_cast.h>
#include <Interpreters/castColumn.h>
#include <h3api.h>
@ -51,10 +52,10 @@ public:
arg->getName(), 1, getName());
arg = arguments[1].get();
if (!WhichDataType(arg).isUInt16())
if (!WhichDataType(arg).isNativeUInt())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be UInt16",
"Illegal type {} of argument {} of function {}. Must be unsigned native integer.",
arg->getName(),
2,
getName());
@ -80,7 +81,8 @@ public:
const auto & data_hindex = col_hindex->getData();
/// ColumnUInt16 is sufficient as the max value of 2nd arg is checked (arg > 0 < 10000) in implementation below
const auto * col_k = checkAndGetColumn<ColumnUInt16>(non_const_arguments[1].column.get());
auto cast_result = castColumnAccurate(non_const_arguments[1], std::make_shared<DataTypeUInt16>());
const auto * col_k = checkAndGetColumn<ColumnUInt16>(cast_result.get());
if (!col_k)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,

View File

@ -74,8 +74,12 @@ namespace
if (https)
{
#if USE_SSL
/// Cannot resolve host in advance, otherwise SNI won't work in Poco.
session = std::make_shared<Poco::Net::HTTPSClientSession>(host, port);
String resolved_host = resolve_host ? DNSResolver::instance().resolveHost(host).toString() : host;
auto https_session = std::make_shared<Poco::Net::HTTPSClientSession>(host, port);
if (resolve_host)
https_session->setResolvedHost(DNSResolver::instance().resolveHost(host).toString());
session = std::move(https_session);
#else
throw Exception("ClickHouse was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME);
#endif

View File

@ -50,8 +50,6 @@ public:
return file_name;
}
Range getRemainingReadRange() const override { return Range{ .left = file_offset_of_buffer_end, .right = std::nullopt }; }
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
};

View File

@ -49,6 +49,8 @@ public:
return file_offset_of_buffer_end - (working_buffer.end() - pos);
}
Range getRemainingReadRange() const override { return Range{ .left = file_offset_of_buffer_end, .right = std::nullopt }; }
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t seek(off_t off, int whence) override;

View File

@ -57,7 +57,7 @@ struct ReadSettings
/// Method to use reading from local filesystem.
LocalFSReadMethod local_fs_method = LocalFSReadMethod::pread;
/// Method to use reading from remote filesystem.
RemoteFSReadMethod remote_fs_method = RemoteFSReadMethod::read;
RemoteFSReadMethod remote_fs_method = RemoteFSReadMethod::threadpool;
size_t local_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
size_t remote_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;

View File

@ -37,6 +37,11 @@ namespace ProfileEvents
extern const Event S3WriteRequestsRedirects;
}
namespace CurrentMetrics
{
extern const Metric S3Requests;
}
namespace DB::ErrorCodes
{
extern const int NOT_IMPLEMENTED;
@ -160,6 +165,7 @@ void PocoHTTPClient::makeRequestInternal(
};
ProfileEvents::increment(select_metric(S3MetricType::Count));
CurrentMetrics::Increment metric_increment{CurrentMetrics::S3Requests};
try
{

View File

@ -21,7 +21,7 @@
namespace ProfileEvents
{
extern const Event WriteBufferFromS3Bytes;
extern const Event RemoteFSCacheDownloadBytes;
extern const Event CachedReadBufferCacheWriteBytes;
}
namespace DB
@ -490,7 +490,7 @@ void WriteBufferFromS3::finalizeCacheIfNeeded(std::optional<FileSegmentsHolder>
size_t size = (*file_segment_it)->finalizeWrite();
file_segment_it = file_segments.erase(file_segment_it);
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, size);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size);
}
catch (...)
{

View File

@ -535,6 +535,7 @@ ContextMutablePtr Context::createCopy(const ContextMutablePtr & other)
Context::~Context() = default;
InterserverIOHandler & Context::getInterserverIOHandler() { return shared->interserver_io_handler; }
const InterserverIOHandler & Context::getInterserverIOHandler() const { return shared->interserver_io_handler; }
std::unique_lock<std::recursive_mutex> Context::getLock() const
{
@ -2226,7 +2227,7 @@ bool Context::hasAuxiliaryZooKeeper(const String & name) const
return getConfigRef().has("auxiliary_zookeepers." + name);
}
InterserverCredentialsPtr Context::getInterserverCredentials()
InterserverCredentialsPtr Context::getInterserverCredentials() const
{
return shared->interserver_io_credentials.get();
}

View File

@ -612,6 +612,7 @@ public:
OutputFormatPtr getOutputFormatParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const;
InterserverIOHandler & getInterserverIOHandler();
const InterserverIOHandler & getInterserverIOHandler() const;
/// How other servers can access this for downloading replicated data.
void setInterserverIOAddress(const String & host, UInt16 port);
@ -619,7 +620,7 @@ public:
/// Credentials which server will use to communicate with others
void updateInterserverCredentials(const Poco::Util::AbstractConfiguration & config);
InterserverCredentialsPtr getInterserverCredentials();
InterserverCredentialsPtr getInterserverCredentials() const;
/// Interserver requests scheme (http or https)
void setInterserverScheme(const String & scheme);

View File

@ -0,0 +1,76 @@
#include <Functions/likePatternToRegexp.h>
#include <Interpreters/ConvertFunctionOrLikeVisitor.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/IAST.h>
#include <Common/typeid_cast.h>
namespace DB
{
void ConvertFunctionOrLikeData::visit(ASTFunction & function, ASTPtr &)
{
if (function.name != "or")
return;
std::unordered_map<ASTPtr, std::shared_ptr<ASTLiteral>> identifier_to_literals;
for (auto & child : function.children)
{
if (auto expr_list_fn = child->as<ASTExpressionList>())
{
ASTs unique_elems;
for (const auto & child_expr_fn : expr_list_fn->children)
{
unique_elems.push_back(child_expr_fn);
if (const auto * child_fn = child_expr_fn->as<ASTFunction>())
{
const bool is_like = child_fn->name == "like";
const bool is_ilike = child_fn->name == "ilike";
/// Not {i}like -> bail out.
if (!is_like && !is_ilike)
continue;
const auto & arguments = child_fn->arguments->children;
/// They should have 2 arguments.
if (arguments.size() != 2)
continue;
/// Second one is string literal.
auto identifier = arguments[0];
auto literal = arguments[1]->as<ASTLiteral>();
if (!identifier || !literal || literal->value.getType() != Field::Types::String)
continue;
String regexp = likePatternToRegexp(literal->value.get<String>());
/// Case insensitive. Works with UTF-8 as well.
if (is_ilike)
regexp = "(?i)" + regexp;
unique_elems.pop_back();
auto it = identifier_to_literals.find(identifier);
if (it == identifier_to_literals.end())
{
it = identifier_to_literals.insert({identifier, std::make_shared<ASTLiteral>(Field{Array{}})}).first;
auto match = makeASTFunction("multiMatchAny");
match->arguments->children.push_back(arguments[0]);
match->arguments->children.push_back(it->second);
unique_elems.push_back(std::move(match));
}
it->second->value.get<Array>().push_back(regexp);
}
}
/// OR must have at least two arguments.
if (unique_elems.size() == 1)
unique_elems.push_back(std::make_shared<ASTLiteral>(Field(false)));
expr_list_fn->children = std::move(unique_elems);
}
}
}
}

View File

@ -0,0 +1,22 @@
#pragma once
#include <Interpreters/InDepthNodeVisitor.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class ASTFunction;
/// Replaces all the "or"'s with {i}like to multiMatchAny
class ConvertFunctionOrLikeData
{
public:
using TypeToVisit = ASTFunction;
void visit(ASTFunction & function, ASTPtr & ast);
};
using ConvertFunctionOrLikeVisitor = InDepthNodeVisitor<OneTypeMatcher<ConvertFunctionOrLikeData>, true>;
}

View File

@ -2,6 +2,8 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeMap.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Interpreters/FilesystemCacheLog.h>
@ -34,10 +36,13 @@ NamesAndTypesList FilesystemCacheLogElement::getNamesAndTypes()
{"event_time", std::make_shared<DataTypeDateTime>()},
{"query_id", std::make_shared<DataTypeString>()},
{"source_file_path", std::make_shared<DataTypeString>()},
{"file_segment_range", std::make_shared<DataTypeTuple>(std::move(types))},
{"file_segment_range", std::make_shared<DataTypeTuple>(types)},
{"total_requested_range", std::make_shared<DataTypeTuple>(types)},
{"size", std::make_shared<DataTypeUInt64>()},
{"read_type", std::make_shared<DataTypeString>()},
{"cache_attempted", std::make_shared<DataTypeUInt8>()},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>())},
{"read_buffer_id", std::make_shared<DataTypeString>()},
};
}
@ -52,9 +57,22 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(source_file_path);
columns[i++]->insert(Tuple{file_segment_range.first, file_segment_range.second});
columns[i++]->insert(Tuple{requested_range.first, requested_range.second});
columns[i++]->insert(file_segment_size);
columns[i++]->insert(typeToString(read_type));
columns[i++]->insert(cache_attempted);
if (profile_counters)
{
auto * column = columns[i++].get();
ProfileEvents::dumpToMapColumn(*profile_counters, column, true);
}
else
{
columns[i++]->insertDefault();
}
columns[i++]->insert(read_buffer_id);
}
};

View File

@ -37,9 +37,12 @@ struct FilesystemCacheLogElement
String source_file_path;
std::pair<size_t, size_t> file_segment_range{};
std::pair<size_t, size_t> requested_range{};
ReadType read_type{};
size_t file_segment_size;
bool cache_attempted;
String read_buffer_id;
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters;
static std::string name() { return "FilesystemCacheLog"; }

View File

@ -312,12 +312,12 @@ BlockIO InterpreterSystemQuery::execute()
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [_, cache_data] : caches)
cache_data.cache->remove(query.force_removal);
cache_data.cache->remove();
}
else
{
auto cache = FileCacheFactory::instance().get(query.filesystem_cache_path);
cache->remove(query.force_removal);
cache->remove();
}
break;
}

View File

@ -69,7 +69,7 @@ public:
return endpoint_map.erase(name);
}
InterserverIOEndpointPtr getEndpoint(const String & name)
InterserverIOEndpointPtr getEndpoint(const String & name) const
try
{
std::lock_guard lock(mutex);
@ -84,7 +84,7 @@ private:
using EndpointMap = std::map<String, InterserverIOEndpointPtr>;
EndpointMap endpoint_map;
std::mutex mutex;
mutable std::mutex mutex;
};
}

View File

@ -39,10 +39,8 @@ bool shardContains(
const std::string & sharding_column_name,
const OptimizeShardingKeyRewriteInMatcher::Data & data)
{
UInt64 field_value;
/// Convert value to numeric (if required).
if (!sharding_column_value.tryGet<UInt64>(field_value))
sharding_column_value = convertFieldToType(sharding_column_value, *data.sharding_key_type);
/// Implicit conversion.
sharding_column_value = convertFieldToType(sharding_column_value, *data.sharding_key_type);
/// NULL is not allowed in sharding key,
/// so it should be safe to assume that shard cannot contain it.

View File

@ -17,6 +17,7 @@
#include <Interpreters/RewriteCountVariantsVisitor.h>
#include <Interpreters/MonotonicityCheckVisitor.h>
#include <Interpreters/ConvertStringsToEnumVisitor.h>
#include <Interpreters/ConvertFunctionOrLikeVisitor.h>
#include <Interpreters/RewriteFunctionToSubcolumnVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
@ -735,6 +736,12 @@ void optimizeFuseQuantileFunctions(ASTPtr & query)
}
}
void optimizeOrLikeChain(ASTPtr & query)
{
ConvertFunctionOrLikeVisitor::Data data = {};
ConvertFunctionOrLikeVisitor(data).visit(query);
}
}
void TreeOptimizer::optimizeIf(ASTPtr & query, Aliases & aliases, bool if_chain_to_multiif)
@ -847,6 +854,14 @@ void TreeOptimizer::apply(ASTPtr & query, TreeRewriterResult & result,
if (settings.optimize_syntax_fuse_functions)
optimizeFuseQuantileFunctions(query);
if (settings.optimize_or_like_chain
&& settings.allow_hyperscan
&& settings.max_hyperscan_regexp_length == 0
&& settings.max_hyperscan_regexp_total_length == 0)
{
optimizeOrLikeChain(query);
}
}
}

View File

@ -200,8 +200,6 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
{
if (!filesystem_cache_path.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_path;
if (force_removal)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FORCE";
}
}

View File

@ -91,9 +91,7 @@ public:
String disk;
UInt64 seconds{};
/// Values for `drop filesystem cache` system query.
String filesystem_cache_path;
bool force_removal = false;
String getID(char) const override { return "SYSTEM query"; }

View File

@ -360,8 +360,6 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
ASTPtr ast;
if (path_parser.parse(pos, ast, expected))
res->filesystem_cache_path = ast->as<ASTLiteral>()->value.safeGet<String>();
if (ParserKeyword{"FORCE"}.ignore(pos, expected))
res->force_removal = true;
break;
}

View File

@ -81,7 +81,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
if (!source_part_or_covering)
{
/// We do not have one of source parts locally, try to take some already merged part from someone.
LOG_DEBUG(log, "Don't have all parts for merge {}; will try to fetch it instead", entry.new_part_name);
LOG_DEBUG(log, "Don't have all parts (at least part {} is missing) for merge {}; will try to fetch it instead", source_part_name, entry.new_part_name);
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,

View File

@ -155,19 +155,21 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
{
auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica, storage.format_version);
/// All three following cases are "good" outcome for check thread and don't require
/// any special attention.
if (part_info == part_on_replica_info)
{
/// Found missing part at ourself. If we are here then something wrong with this part, so skipping.
if (replica_path == storage.replica_path)
continue;
LOG_WARNING(log, "Found the missing part {} at {} on {}", part_name, part_on_replica, replica);
LOG_INFO(log, "Found the missing part {} at {} on {}", part_name, part_on_replica, replica);
return MissingPartSearchResult::FoundAndNeedFetch;
}
if (part_on_replica_info.contains(part_info))
{
LOG_WARNING(log, "Found part {} on {} that covers the missing part {}", part_on_replica, replica, part_name);
LOG_INFO(log, "Found part {} on {} that covers the missing part {}", part_on_replica, replica, part_name);
return MissingPartSearchResult::FoundAndDontNeedFetch;
}
@ -181,7 +183,7 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
if (found_part_with_the_same_min_block && found_part_with_the_same_max_block)
{
/// FIXME It may never appear
LOG_WARNING(log, "Found parts with the same min block and with the same max block as the missing part {} on replica {}. Hoping that it will eventually appear as a result of a merge.", part_name, replica);
LOG_INFO(log, "Found parts with the same min block and with the same max block as the missing part {} on replica {}. Hoping that it will eventually appear as a result of a merge.", part_name, replica);
return MissingPartSearchResult::FoundAndDontNeedFetch;
}
}

View File

@ -37,6 +37,11 @@
#include <Processors/Transforms/WatermarkTransform.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sinks/EmptySink.h>
#include <Storages/StorageFactory.h>
@ -927,6 +932,76 @@ void StorageWindowView::threadFuncFireEvent()
}
}
Pipe StorageWindowView::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(local_context), BuildQueryPipelineSettings::fromContext(local_context));
}
void StorageWindowView::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
if (target_table_id.empty())
return;
auto storage = getTargetStorage();
auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto target_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto target_storage_snapshot = storage->getStorageSnapshot(target_metadata_snapshot, local_context);
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, local_context);
storage->read(query_plan, column_names, target_storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
if (query_plan.isInitialized())
{
auto wv_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);
auto target_header = query_plan.getCurrentDataStream().header;
if (!blocksHaveEqualStructure(wv_header, target_header))
{
auto converting_actions = ActionsDAG::makeConvertingActions(
target_header.getColumnsWithTypeAndName(), wv_header.getColumnsWithTypeAndName(), ActionsDAG::MatchColumnsMode::Name);
auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);
converting_step->setStepDescription("Convert Target table structure to WindowView structure");
query_plan.addStep(std::move(converting_step));
}
StreamLocalLimits limits;
SizeLimits leaf_limits;
/// Add table lock for target table.
auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
query_plan.getCurrentDataStream(),
storage,
std::move(lock),
limits,
leaf_limits,
nullptr,
nullptr);
adding_limits_and_quota->setStepDescription("Lock target table for WindowView");
query_plan.addStep(std::move(adding_limits_and_quota));
}
}
Pipe StorageWindowView::watch(
const Names & /*column_names*/,
const SelectQueryInfo & query_info,
@ -1316,6 +1391,18 @@ void StorageWindowView::writeIntoWindowView(
auto metadata_snapshot = inner_storage->getInMemoryMetadataPtr();
auto output = inner_storage->write(window_view.getMergeableQuery(), metadata_snapshot, local_context);
if (!blocksHaveEqualStructure(builder.getHeader(), output->getHeader()))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
builder.getHeader().getColumnsWithTypeAndName(),
output->getHeader().getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(
convert_actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & header) { return std::make_shared<ExpressionTransform>(header, convert_actions); });
}
builder.addChain(Chain(std::move(output)));
builder.setSinks([&](const Block & cur_header, Pipe::StreamType)
{

View File

@ -137,6 +137,25 @@ public:
void startup() override;
void shutdown() override;
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
Pipe watch(
const Names & column_names,
const SelectQueryInfo & query_info,

View File

@ -51,6 +51,8 @@ protected:
Block block;
UInt32 watermark;
std::tie(block, watermark) = generateImpl();
if (!block)
return Chunk();
if (is_events)
{
return Chunk(

View File

@ -254,6 +254,9 @@ def main():
logging.info("Got version from repo %s", version.string)
official_flag = pr_info.number == 0
if "official" in build_config:
official_flag = build_config["official"]
version_type = "testing"
if "release" in pr_info.labels or "release-lts" in pr_info.labels:
version_type = "stable"

View File

@ -37,6 +37,7 @@ CI_CONFIG = {
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False,
"official": False,
},
# FIXME update to gcc-12 and turn on
# "binary_gcc": {
@ -191,6 +192,7 @@ CI_CONFIG = {
"build_type": "",
"sanitizer": "",
"package_type": "binary",
"static_binary_name": "powerpc64le",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",

View File

@ -452,6 +452,7 @@ class SettingsRandomizer:
"prefer_localhost_replica": lambda: random.randint(0, 1),
"max_block_size": lambda: random.randint(8000, 100000),
"max_threads": lambda: random.randint(1, 64),
"optimize_or_like_chain": lambda: random.randint(0, 1),
}
@staticmethod

View File

@ -58,50 +58,50 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO bloom_filter_idx2 VALUES
(13, 'abc')"
# EQUAL
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx2 WHERE lower(s) = 'aбвгдеёж' OR s = 'aбвгдеёж' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx2 WHERE lower(s) = 'aбвгдеёж' OR s = 'aбвгдеёж' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx2 WHERE lower(s) = 'aбвгдеёж' OR s = 'aбвгдеёж' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx2 WHERE lower(s) = 'aбвгдеёж' OR s = 'aбвгдеёж' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s = 'aбвгдеёж' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s = 'aбвгдеёж' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s = 'aбвгдеёж' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s = 'aбвгдеёж' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE lower(s) = 'abc' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE lower(s) = 'abc' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE lower(s) = 'abc' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE lower(s) = 'abc' ORDER BY k FORMAT JSON" | grep "rows_read"
# LIKE
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%database%' AND s LIKE '%ClickHouse%' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%database%' AND s LIKE '%ClickHouse%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%database%' AND s LIKE '%ClickHouse%' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%database%' AND s LIKE '%ClickHouse%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%database%' AND lower(s) LIKE '%clickhouse%' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%database%' AND lower(s) LIKE '%clickhouse%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%database%' AND lower(s) LIKE '%clickhouse%' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%database%' AND lower(s) LIKE '%clickhouse%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%базами данных%' AND s LIKE '%ClickHouse%' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%базами данных%' AND s LIKE '%ClickHouse%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%базами данных%' AND s LIKE '%ClickHouse%' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%базами данных%' AND s LIKE '%ClickHouse%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE (s LIKE '%базами данных%' AND s LIKE '%ClickHouse%') OR s LIKE '____строка' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE (s LIKE '%базами данных%' AND s LIKE '%ClickHouse%') OR s LIKE '____строка' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE (s LIKE '%базами данных%' AND s LIKE '%ClickHouse%') OR s LIKE '____строка' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE (s LIKE '%базами данных%' AND s LIKE '%ClickHouse%') OR s LIKE '____строка' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%%<div>_%_%_</div>%%' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%%<div>_%_%_</div>%%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%%<div>_%_%_</div>%%' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%%<div>_%_%_</div>%%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%2\\\\%2%' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%2\\\\%2%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%2\\\\%2%' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%2\\\\%2%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%_\\\\%2\\\\__\\\\' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%_\\\\%2\\\\__\\\\' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%_\\\\%2\\\\__\\\\' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '%_\\\\%2\\\\__\\\\' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '2\\\\_2\\\\%2_2\\\\' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '2\\\\_2\\\\%2_2\\\\' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '2\\\\_2\\\\%2_2\\\\' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '2\\\\_2\\\\%2_2\\\\' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '2\\\\_2\\\\%2_2_' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '2\\\\_2\\\\%2_2_' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '2\\\\_2\\\\%2_2_' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s LIKE '2\\\\_2\\\\%2_2_' ORDER BY k FORMAT JSON" | grep "rows_read"
# IN
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s IN ('aбвгдеёж', 'abc') ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE s IN ('aбвгдеёж', 'abc') ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s IN ('aбвгдеёж', 'abc') ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE s IN ('aбвгдеёж', 'abc') ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE (s, lower(s)) IN (('aбвгдеёж', 'aбвгдеёж'), ('abc', 'cba')) ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx WHERE (s, lower(s)) IN (('aбвгдеёж', 'aбвгдеёж'), ('abc', 'cba')) ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE (s, lower(s)) IN (('aбвгдеёж', 'aбвгдеёж'), ('abc', 'cba')) ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx WHERE (s, lower(s)) IN (('aбвгдеёж', 'aбвгдеёж'), ('abc', 'cba')) ORDER BY k FORMAT JSON" | grep "rows_read"
# TOKEN BF
@ -125,18 +125,18 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO bloom_filter_idx3 VALUES
(13, 'abc')"
# EQUAL
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx3 WHERE lower(s) = 'column-oriented' OR s = 'column-oriented' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx3 WHERE lower(s) = 'column-oriented' OR s = 'column-oriented' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx3 WHERE lower(s) = 'column-oriented' OR s = 'column-oriented' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx3 WHERE lower(s) = 'column-oriented' OR s = 'column-oriented' ORDER BY k FORMAT JSON" | grep "rows_read"
# LIKE
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx3 WHERE lower(s) LIKE '%(dbms)%' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx3 WHERE lower(s) LIKE '%(dbms)%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx3 WHERE s LIKE 'column-%' AND s LIKE '%-oriented' ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx3 WHERE s LIKE 'column-%' AND s LIKE '%-oriented' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx3 WHERE lower(s) LIKE '%(dbms)%' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx3 WHERE lower(s) LIKE '%(dbms)%' ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx3 WHERE s LIKE 'column-%' AND s LIKE '%-oriented' ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx3 WHERE s LIKE 'column-%' AND s LIKE '%-oriented' ORDER BY k FORMAT JSON" | grep "rows_read"
# IN
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx3 WHERE s IN ('some string', 'abc') ORDER BY k"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_idx3 WHERE s IN ('some string', 'abc') ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx3 WHERE s IN ('some string', 'abc') ORDER BY k"
$CLICKHOUSE_CLIENT --optimize_or_like_chain 0 --query="SELECT * FROM bloom_filter_idx3 WHERE s IN ('some string', 'abc') ORDER BY k FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="DROP TABLE bloom_filter_idx"
$CLICKHOUSE_CLIENT --query="DROP TABLE bloom_filter_idx2"

View File

@ -1,5 +1,7 @@
[581250224954015743,581259021047037951,581267817140060159,581276613233082367,581500913605148671,581518505791193087,581764796395814911]
[581276613233082367]
[581250224954015743,581259021047037951,581267817140060159,581276613233082367,581500913605148671,581518505791193087,581764796395814911]
[581276613233082367]
[578466261512486911,578712552117108735,578888473977552895,579205133326352383,579275502070530047,579662530163507199,579768083279773695]
[580995138256371711,581144671837749247,581162264023793663,581166662070304767,581171060116815871,581250224954015743,581254623000526847,581259021047037951,581263419093549055,581267817140060159,581272215186571263,581276613233082367,581531699930726399,581536097977237503,581549292116770815,581553690163281919,581558088209793023,581747204209770495,581764796395814911]
[589624655266971647,589625205022785535,589626854290227199,589627404046041087,589642797208829951,589644996232085503,589708218150682623,589708767906496511,589709317662310399,589709867418124287,589710417173938175,589710966929752063,589711516685565951,589714815220449279,589715914732077055,589725810336727039,589726909848354815,589727459604168703,589728009359982591,589729108871610367,589734606429749247,589735156185563135,589735705941377023,589736255697190911,589736805453004799,589737355208818687,589737904964632575,589742303011143679,589744502034399231,589745051790213119,589752198615793663,589752748371607551,589753298127421439,589753847883235327,589754397639049215,589754947394863103,589755497150676991]

View File

@ -5,6 +5,9 @@ SELECT h3kRing(581276613233082367, toUInt16(0));
SELECT h3kRing(581276613233082367, -1); -- { serverError 43 }
SELECT h3kRing(581276613233082367, toUInt16(-1)); -- { serverError 12 }
SELECT arraySort(h3kRing(581276613233082367, 1));
SELECT h3kRing(581276613233082367, 0);
SELECT h3kRing(581276613233082367, -1); -- { serverError 43 }
DROP TABLE IF EXISTS h3_indexes;

View File

@ -0,0 +1 @@
test1 test2

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --multiquery <<EOF
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS mt;
DROP TABLE IF EXISTS dst;
DROP TABLE IF EXISTS wv;
CREATE TABLE dst(time DateTime, colA String, colB String) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(colA String, colB String) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst AS SELECT tumbleStart(w_id) AS time, colA, colB FROM mt GROUP BY tumble(now(), INTERVAL '1' SECOND, 'US/Samoa') AS w_id, colA, colB;
INSERT INTO mt VALUES ('test1', 'test2');
EOF
while true; do
$CLICKHOUSE_CLIENT --query="SELECT count(*) FROM dst" | grep -q "1" && break || sleep .5 ||:
done
$CLICKHOUSE_CLIENT --query="SELECT colA, colB FROM dst"
$CLICKHOUSE_CLIENT --query="DROP TABLE wv"
$CLICKHOUSE_CLIENT --query="DROP TABLE mt"
$CLICKHOUSE_CLIENT --query="DROP TABLE dst"

View File

@ -0,0 +1,14 @@
1 1 1990-01-01 12:00:05
1 2 1990-01-01 12:00:05
1 3 1990-01-01 12:00:05
1 4 1990-01-01 12:00:10
1 5 1990-01-01 12:00:10
1 6 1990-01-01 12:00:15
1 7 1990-01-01 12:00:15
1
2
3
4
5
6
7

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --multiquery <<EOF
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS mt;
DROP TABLE IF EXISTS dst;
DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64, market Int32, w_end DateTime) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, market Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst WATERMARK=ASCENDING AS SELECT count(a) AS count, market, tumbleEnd(wid) AS w_end FROM mt GROUP BY tumble(timestamp, INTERVAL '5' SECOND, 'US/Samoa') AS wid, market;
INSERT INTO mt VALUES (1, 1, '1990/01/01 12:00:00');
INSERT INTO mt VALUES (1, 2, '1990/01/01 12:00:01');
INSERT INTO mt VALUES (1, 3, '1990/01/01 12:00:02');
INSERT INTO mt VALUES (1, 4, '1990/01/01 12:00:05');
INSERT INTO mt VALUES (1, 5, '1990/01/01 12:00:06');
INSERT INTO mt VALUES (1, 6, '1990/01/01 12:00:10');
INSERT INTO mt VALUES (1, 7, '1990/01/01 12:00:11');
INSERT INTO mt VALUES (1, 8, '1990/01/01 12:00:30');
EOF
while true; do
$CLICKHOUSE_CLIENT --query="SELECT count(*) FROM wv" | grep -q "7" && break || sleep .5 ||:
done
$CLICKHOUSE_CLIENT --query="SELECT * FROM wv ORDER BY market, w_end;"
$CLICKHOUSE_CLIENT --query="SELECT market FROM wv ORDER BY market, w_end;"
$CLICKHOUSE_CLIENT --query="DROP TABLE wv"
$CLICKHOUSE_CLIENT --query="DROP TABLE mt"
$CLICKHOUSE_CLIENT --query="DROP TABLE dst"

View File

@ -1,31 +1,161 @@
-- { echoOn }
-- SELECT
-- intHash64(0) % 2,
-- intHash64(2) % 2
-- ┌─modulo(intHash64(0), 2)─┬─modulo(intHash64(2), 2)─┐
-- │ 0 │ 1 │
-- └─────────────────────────┴─────────────────────────┘
create table dist_01756 as system.one engine=Distributed(test_cluster_two_shards, system, one, intHash64(dummy));
--
-- w/o optimize_skip_unused_shards_rewrite_in=1
--
select '(0, 2)';
(0, 2)
with (select currentDatabase()) as id_no select *, ignore(id_no) from dist_01756 where dummy in (0, 2);
0 0
0 0
system flush logs;
select query from system.query_log where
event_date >= yesterday() and
event_time > now() - interval 1 hour and
not is_initial_query and
query not like '%system%query_log%' and
query like concat('WITH%', currentDatabase(), '%AS `id_no` %') and
type = 'QueryFinish'
order by query;
WITH _CAST(\'default\', \'Nullable(String)\') AS `id_no` SELECT `one`.`dummy`, ignore(`id_no`) FROM `system`.`one` WHERE `dummy` IN (0, 2)
WITH _CAST(\'default\', \'Nullable(String)\') AS `id_no` SELECT `one`.`dummy`, ignore(`id_no`) FROM `system`.`one` WHERE `dummy` IN (0, 2)
--
-- w/ optimize_skip_unused_shards_rewrite_in=1
--
set optimize_skip_unused_shards_rewrite_in=1;
-- detailed coverage for realistic examples
select 'optimize_skip_unused_shards_rewrite_in(0, 2)';
optimize_skip_unused_shards_rewrite_in(0, 2)
with (select currentDatabase()) as id_02 select *, ignore(id_02) from dist_01756 where dummy in (0, 2);
0 0
system flush logs;
select query from system.query_log where
event_date >= yesterday() and
event_time > now() - interval 1 hour and
not is_initial_query and
query not like '%system%query_log%' and
query like concat('WITH%', currentDatabase(), '%AS `id_02` %') and
type = 'QueryFinish'
order by query;
WITH _CAST(\'default\', \'Nullable(String)\') AS `id_02` SELECT `one`.`dummy`, ignore(`id_02`) FROM `system`.`one` WHERE `dummy` IN tuple(0)
WITH _CAST(\'default\', \'Nullable(String)\') AS `id_02` SELECT `one`.`dummy`, ignore(`id_02`) FROM `system`.`one` WHERE `dummy` IN tuple(2)
select 'optimize_skip_unused_shards_rewrite_in(2,)';
optimize_skip_unused_shards_rewrite_in(2,)
with (select currentDatabase()) as id_2 select *, ignore(id_2) from dist_01756 where dummy in (2,);
system flush logs;
select query from system.query_log where
event_date >= yesterday() and
event_time > now() - interval 1 hour and
not is_initial_query and
query not like '%system%query_log%' and
query like concat('WITH%', currentDatabase(), '%AS `id_2` %') and
type = 'QueryFinish'
order by query;
WITH _CAST(\'default\', \'Nullable(String)\') AS `id_2` SELECT `one`.`dummy`, ignore(`id_2`) FROM `system`.`one` WHERE `dummy` IN tuple(2)
select 'optimize_skip_unused_shards_rewrite_in(0,)';
optimize_skip_unused_shards_rewrite_in(0,)
with (select currentDatabase()) as id_0 select *, ignore(id_0) from dist_01756 where dummy in (0,);
0 0
system flush logs;
select query from system.query_log where
event_date >= yesterday() and
event_time > now() - interval 1 hour and
not is_initial_query and
query not like '%system%query_log%' and
query like concat('WITH%', currentDatabase(), '%AS `id_0` %') and
type = 'QueryFinish'
order by query;
WITH _CAST(\'default\', \'Nullable(String)\') AS `id_0` SELECT `one`.`dummy`, ignore(`id_0`) FROM `system`.`one` WHERE `dummy` IN tuple(0)
-- signed column
select 'signed column';
signed column
create table data_01756_signed (key Int) engine=Null;
with (select currentDatabase()) as key_signed select *, ignore(key_signed) from cluster(test_cluster_two_shards, currentDatabase(), data_01756_signed, key) where key in (-1, -2);
system flush logs;
select query from system.query_log where
event_date >= yesterday() and
event_time > now() - interval 1 hour and
not is_initial_query and
query not like '%system%query_log%' and
query like concat('WITH%', currentDatabase(), '%AS `key_signed` %') and
type = 'QueryFinish'
order by query;
WITH _CAST(\'default\', \'Nullable(String)\') AS `key_signed` SELECT `key`, ignore(`key_signed`) FROM `default`.`data_01756_signed` WHERE `key` IN tuple(-1)
WITH _CAST(\'default\', \'Nullable(String)\') AS `key_signed` SELECT `key`, ignore(`key_signed`) FROM `default`.`data_01756_signed` WHERE `key` IN tuple(-2)
-- not tuple
select * from dist_01756 where dummy in (0);
0
select * from dist_01756 where dummy in ('0');
0
--
-- errors
--
select 'errors';
errors
-- optimize_skip_unused_shards does not support non-constants
select * from dist_01756 where dummy in (select * from system.one); -- { serverError 507 }
select * from dist_01756 where dummy in (toUInt8(0)); -- { serverError 507 }
-- NOT IN does not supported
select * from dist_01756 where dummy not in (0, 2); -- { serverError 507 }
--
-- others
--
select 'others';
others
select * from dist_01756 where dummy not in (2, 3) and dummy in (0, 2);
0
select * from dist_01756 where dummy in tuple(0, 2);
0
select * from dist_01756 where dummy in tuple(0);
0
select * from dist_01756 where dummy in tuple(2);
-- Identifier is NULL
select (2 IN (2,)), * from dist_01756 where dummy in (0, 2) format Null;
-- Literal is NULL
select (dummy IN (toUInt8(2),)), * from dist_01756 where dummy in (0, 2) format Null;
-- different type
select 'different types -- prohibited';
different types -- prohibited
create table data_01756_str (key String) engine=Memory();
insert into data_01756_str values (0)(1);
-- SELECT
-- cityHash64(0) % 2,
-- cityHash64(2) % 2
--
-- ┌─modulo(cityHash64(0), 2)─┬─modulo(cityHash64(2), 2)─┐
-- │ 0 │ 1 │
-- └──────────────────────────┴──────────────────────────┘
create table dist_01756_str as data_01756_str engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01756_str, cityHash64(key));
select * from dist_01756_str where key in ('0', '2');
0
select * from dist_01756_str where key in (0, 2);
0
select * from dist_01756_str where key in ('0', Null); -- { serverError 507 }
-- select * from dist_01756_str where key in (0, 2); -- { serverError 53 }
-- select * from dist_01756_str where key in (0, Null); -- { serverError 53 }
-- different type #2
select 'different types -- conversion';
different types -- conversion
create table dist_01756_column as system.one engine=Distributed(test_cluster_two_shards, system, one, dummy);
select * from dist_01756_column where dummy in (0, '255');
0
select * from dist_01756_column where dummy in (0, '255foo'); -- { serverError 53 }
-- intHash64 does not accept string, but implicit conversion should be done
select * from dist_01756 where dummy in ('0', '2');
0
-- optimize_skip_unused_shards_limit
select 'optimize_skip_unused_shards_limit';
optimize_skip_unused_shards_limit
select * from dist_01756 where dummy in (0, 2) settings optimize_skip_unused_shards_limit=1; -- { serverError 507 }
select * from dist_01756 where dummy in (0, 2) settings optimize_skip_unused_shards_limit=1, force_optimize_skip_unused_shards=0;
0
0

View File

@ -11,6 +11,15 @@ drop table if exists dist_01756_column;
drop table if exists data_01756_str;
drop table if exists data_01756_signed;
-- separate log entry for localhost queries
set prefer_localhost_replica=0;
set force_optimize_skip_unused_shards=2;
set optimize_skip_unused_shards=1;
set optimize_skip_unused_shards_rewrite_in=0;
set log_queries=1;
-- { echoOn }
-- SELECT
-- intHash64(0) % 2,
-- intHash64(2) % 2
@ -19,13 +28,6 @@ drop table if exists data_01756_signed;
-- └─────────────────────────┴─────────────────────────┘
create table dist_01756 as system.one engine=Distributed(test_cluster_two_shards, system, one, intHash64(dummy));
-- separate log entry for localhost queries
set prefer_localhost_replica=0;
set force_optimize_skip_unused_shards=2;
set optimize_skip_unused_shards=1;
set optimize_skip_unused_shards_rewrite_in=0;
set log_queries=1;
--
-- w/o optimize_skip_unused_shards_rewrite_in=1
--
@ -131,8 +133,17 @@ select (dummy IN (toUInt8(2),)), * from dist_01756 where dummy in (0, 2) format
-- different type
select 'different types -- prohibited';
create table data_01756_str (key String) engine=Memory();
insert into data_01756_str values (0)(1);
-- SELECT
-- cityHash64(0) % 2,
-- cityHash64(2) % 2
--
-- ┌─modulo(cityHash64(0), 2)─┬─modulo(cityHash64(2), 2)─┐
-- │ 0 │ 1 │
-- └──────────────────────────┴──────────────────────────┘
create table dist_01756_str as data_01756_str engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01756_str, cityHash64(key));
select * from dist_01756_str where key in ('0', '2');
select * from dist_01756_str where key in (0, 2);
select * from dist_01756_str where key in ('0', Null); -- { serverError 507 }
-- select * from dist_01756_str where key in (0, 2); -- { serverError 53 }
-- select * from dist_01756_str where key in (0, Null); -- { serverError 53 }
@ -150,6 +161,8 @@ select 'optimize_skip_unused_shards_limit';
select * from dist_01756 where dummy in (0, 2) settings optimize_skip_unused_shards_limit=1; -- { serverError 507 }
select * from dist_01756 where dummy in (0, 2) settings optimize_skip_unused_shards_limit=1, force_optimize_skip_unused_shards=0;
-- { echoOff }
drop table dist_01756;
drop table dist_01756_str;
drop table dist_01756_column;

View File

@ -0,0 +1,40 @@
SELECT materialize(\'Привет, World\') AS s
WHERE (s LIKE \'hell%\') OR (s ILIKE \'%привет%\') OR (s ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 0
SELECT materialize(\'Привет, World\') AS s
WHERE multiMatchAny(s, [\'^hell\', \'(?i)привет\', \'(?i)^world\']) OR false
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE multiMatchAny(s1, [\'^hell\', \'(?i)^world\']) OR multiMatchAny(s2, [\'(?i)привет\'])
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE multiMatchAny(s1, [\'^hell\', \'(?i)^world\']) OR multiMatchAny(s2, [\'(?i)привет\']) OR (s1 = \'Привет\')
SETTINGS optimize_or_like_chain = 1
Привет, optimized World
Привет, World
Привет, optimized World
Привет, World
SELECT
(materialize(\'Привет, World\') AS s) LIKE \'hell%\' AS test,
s
WHERE multiMatchAny(s, [\'^hell\', \'(?i)привет\', \'(?i)^world\']) OR false
SETTINGS optimize_or_like_chain = 1

View File

@ -0,0 +1,19 @@
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s ILIKE 'world%') SETTINGS optimize_or_like_chain = 0;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s ILIKE 'world%') SETTINGS optimize_or_like_chain = 1;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1 SETTINGS allow_hyperscan = 0;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1 SETTINGS max_hyperscan_regexp_length = 10;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1 SETTINGS max_hyperscan_regexp_total_length = 10;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') OR s1 == 'Привет' SETTINGS optimize_or_like_chain = 1;
SELECT materialize('Привет, optimized World') AS s WHERE (s LIKE 'hell%') OR (s LIKE '%привет%') OR (s ILIKE '%world') SETTINGS optimize_or_like_chain = 1;
SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s LIKE '%привет%') OR (s ILIKE '%world') SETTINGS optimize_or_like_chain = 0;
SELECT materialize('Привет, optimized World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s LIKE 'world%') SETTINGS optimize_or_like_chain = 1;
SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s LIKE 'world%') SETTINGS optimize_or_like_chain = 0;
-- Aliases
EXPLAIN SYNTAX SELECT test, materialize('Привет, World') AS s WHERE ((s LIKE 'hell%') AS test) OR (s ILIKE '%привет%') OR (s ILIKE 'world%') SETTINGS optimize_or_like_chain = 1;

View File

@ -1,4 +1,4 @@
SELECT 1, * FROM test LIMIT 10 FORMAT Null; 1 0 1
SELECT 2, * FROM test LIMIT 10 FORMAT Null; 0 1 0
0
SELECT 3, * FROM test LIMIT 10 FORMAT Null; 1 1 0
SELECT 3, * FROM test LIMIT 10 FORMAT Null; 0 1 0

View File

@ -13,9 +13,9 @@ SELECT 1, * FROM test LIMIT 10 FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT query,
ProfileEvents['RemoteFSReadBytes'] > 0 as remote_fs_read,
ProfileEvents['RemoteFSCacheReadBytes'] > 0 as remote_fs_cache_read,
ProfileEvents['RemoteFSCacheDownloadBytes'] > 0 as remote_fs_read_and_download
ProfileEvents['CachedReadBufferReadFromSourceBytes'] > 0 as remote_fs_read,
ProfileEvents['CachedReadBufferReadFromCacheBytes'] > 0 as remote_fs_cache_read,
ProfileEvents['CachedReadBufferCacheWriteBytes'] > 0 as remote_fs_read_and_download
FROM system.query_log
WHERE query LIKE 'SELECT 1, * FROM test LIMIT%'
AND type = 'QueryFinish'
@ -29,9 +29,9 @@ SELECT 2, * FROM test LIMIT 10 FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT query,
ProfileEvents['RemoteFSReadBytes'] > 0 as remote_fs_read,
ProfileEvents['RemoteFSCacheReadBytes'] > 0 as remote_fs_cache_read,
ProfileEvents['RemoteFSCacheDownloadBytes'] > 0 as remote_fs_read_and_download
ProfileEvents['CachedReadBufferReadFromSourceBytes'] > 0 as remote_fs_read,
ProfileEvents['CachedReadBufferReadFromCacheBytes'] > 0 as remote_fs_cache_read,
ProfileEvents['CachedReadBufferCacheWriteBytes'] > 0 as remote_fs_read_and_download
FROM system.query_log
WHERE query LIKE 'SELECT 2, * FROM test LIMIT%'
AND type = 'QueryFinish'
@ -56,9 +56,9 @@ SELECT 3, * FROM test LIMIT 10 FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT query,
ProfileEvents['RemoteFSReadBytes'] > 0 as remote_fs_read,
ProfileEvents['RemoteFSCacheReadBytes'] > 0 as remote_fs_cache_read,
ProfileEvents['RemoteFSCacheDownloadBytes'] > 0 as remote_fs_read_and_download
ProfileEvents['CachedReadBufferReadFromSourceBytes'] > 0 as remote_fs_read,
ProfileEvents['CachedReadBufferReadFromCacheBytes'] > 0 as remote_fs_cache_read,
ProfileEvents['CachedReadBufferCacheWriteBytes'] > 0 as remote_fs_read_and_download
FROM system.query_log
WHERE query LIKE 'SELECT 3, * FROM test LIMIT%'
AND type = 'QueryFinish'

View File

@ -1,18 +0,0 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
static=$($CLICKHOUSE_LOCAL -q "SELECT value IN ('ON', '1') FROM system.build_options WHERE name = 'STATIC'")
clickhouse-local -q 'select 1'
if [ "$static" -eq 1 ]; then
# "grep -c" will also gives "1"
LD_LIBRARY_PATH=/ clickhouse-local -q 'select 1' |& grep -x -F -c 'Environment variable LD_LIBRARY_PATH is set to /. It can compromise security.'
else
# works because it does not uses main.cpp entrypoint
# (due to shared build is always splitted, and non-splitted will have lots of ODR issues)
LD_LIBRARY_PATH=/ clickhouse-local -q 'select 1'
fi

View File

@ -0,0 +1,42 @@
6
3.7416575
3
0.0025851727
\N
nan
12
14
21
7.071068
9.165152
12.124355
2
5
4
0.16847819
0.35846698
0.07417989
6
8
9
0.020204102886728692
0.11808289631180302
0
1 1 218.74642854227358
1 2 1348.2117786164013
2 1 219.28064210048274
2 2 1347.4008312302617
3 1 214.35251339790725
3 2 1342.8856987845243
1 1 218.74643
1 2 1348.2118
2 1 219.28064
2 2 1347.4009
3 1 214.35251
3 2 1342.8857
1 1 218.74642854227358
1 2 1348.2117786164013
2 1 219.28064210048274
2 2 1347.4008312302617
3 1 214.35251339790725
3 2 1342.8856987845243

View File

@ -0,0 +1,41 @@
SELECT arrayL1Distance([0, 0, 0], [1, 2, 3]);
SELECT arrayL2Distance([1, 2, 3], [0, 0, 0]);
SELECT arrayLinfDistance([1, 2, 3], [0, 0, 0]);
SELECT arrayCosineDistance([1, 2, 3], [3, 5, 7]);
SELECT arrayL2Distance([1, 2, 3], NULL);
SELECT arrayCosineDistance([1, 2, 3], [0, 0, 0]);
DROP TABLE IF EXISTS vec1;
DROP TABLE IF EXISTS vec2;
DROP TABLE IF EXISTS vec2f;
DROP TABLE IF EXISTS vec2d;
CREATE TABLE vec1 (id UInt64, v Array(UInt8)) ENGINE = Memory;
CREATE TABLE vec2 (id UInt64, v Array(Int64)) ENGINE = Memory;
CREATE TABLE vec2f (id UInt64, v Array(Float32)) ENGINE = Memory;
CREATE TABLE vec2d (id UInt64, v Array(Float64)) ENGINE = Memory;
INSERT INTO vec1 VALUES (1, [3, 4, 5]), (2, [2, 4, 8]), (3, [7, 7, 7]);
SELECT arrayL1Distance(v, [0, 0, 0]) FROM vec1;
SELECT arrayL2Distance(v, [0, 0, 0]) FROM vec1;
SELECT arrayLinfDistance([5, 4, 3], v) FROM vec1;
SELECT arrayCosineDistance([3, 2, 1], v) FROM vec1;
SELECT arrayLinfDistance(v, materialize([0, -2, 0])) FROM vec1;
SELECT arrayCosineDistance(v, materialize([1., 1., 1.])) FROM vec1;
INSERT INTO vec2 VALUES (1, [100, 200, 0]), (2, [888, 777, 666]);
SELECT v1.id, v2.id, arrayL2Distance(v1.v, v2.v) as dist FROM vec1 v1, vec2 v2;
INSERT INTO vec2f VALUES (1, [100, 200, 0]), (2, [888, 777, 666]);
SELECT v1.id, v2.id, arrayL2Distance(v1.v, v2.v) as dist FROM vec1 v1, vec2f v2;
INSERT INTO vec2d VALUES (1, [100, 200, 0]), (2, [888, 777, 666]);
SELECT v1.id, v2.id, arrayL2Distance(v1.v, v2.v) as dist FROM vec1 v1, vec2d v2;
SELECT arrayL1Distance([0, 0], [1]); -- { serverError 190 }
SELECT arrayL2Distance((1, 2), (3,4)); -- { serverError 43 }
DROP TABLE vec1;
DROP TABLE vec2;
DROP TABLE vec2f;
DROP TABLE vec2d;

View File

@ -0,0 +1,27 @@
6
7.0710678118654755
2
1 5
2 2
3 5.196152
4 0
1 11
2 11
3 11
4 11
1 5
2 2
3 5.196152
4 0
1 11
2 11
3 11
4 11
1 5
2 2
3 5.196152422706632
4 0
1 11
2 11
3 11
4 11

View File

@ -0,0 +1,28 @@
SELECT arrayL1Norm([1, 2, 3]);
SELECT arrayL2Norm([3., 4., 5.]);
SELECT arrayLinfNorm([0, 0, 2]);
DROP TABLE IF EXISTS vec1;
DROP TABLE IF EXISTS vec1f;
DROP TABLE IF EXISTS vec1d;
CREATE TABLE vec1 (id UInt64, v Array(UInt8)) ENGINE = Memory;
CREATE TABLE vec1f (id UInt64, v Array(Float32)) ENGINE = Memory;
CREATE TABLE vec1d (id UInt64, v Array(Float64)) ENGINE = Memory;
INSERT INTO vec1 VALUES (1, [3, 4]), (2, [2]), (3, [3, 3, 3]), (4, NULL);
INSERT INTO vec1f VALUES (1, [3, 4]), (2, [2]), (3, [3, 3, 3]), (4, NULL);
INSERT INTO vec1d VALUES (1, [3, 4]), (2, [2]), (3, [3, 3, 3]), (4, NULL);
SELECT id, arrayL2Norm(v) FROM vec1;
SELECT id, arrayL1Norm(materialize([5., 6.])) FROM vec1;
SELECT id, arrayL2Norm(v) FROM vec1f;
SELECT id, arrayL1Norm(materialize([5., 6.])) FROM vec1f;
SELECT id, arrayL2Norm(v) FROM vec1d;
SELECT id, arrayL1Norm(materialize([5., 6.])) FROM vec1d;
SELECT arrayL1Norm((1, 2,)); -- { serverError 43 }
DROP TABLE vec1;
DROP TABLE vec1f;
DROP TABLE vec1d;

View File

@ -28,5 +28,3 @@ SELECT count() FROM system.filesystem_cache;
SYSTEM DROP FILESYSTEM CACHE './s3_cache/';
SELECT count() FROM system.filesystem_cache;
2
EXPLAIN SYNTAX SYSTEM DROP FILESYSTEM CACHE './s3_cache/' FORCE;
SYSTEM DROP FILESYSTEM CACHE ./s3_cache/ FORCE

View File

@ -31,6 +31,4 @@ SELECT * FROM test2 FORMAT Null;
SELECT count() FROM system.filesystem_cache;
SYSTEM DROP FILESYSTEM CACHE './s3_cache/';
SELECT count() FROM system.filesystem_cache;
EXPLAIN SYNTAX SYSTEM DROP FILESYSTEM CACHE './s3_cache/' FORCE;
SELECT count() FROM system.filesystem_cache;

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# NOTE: we can do a better test with strace, but I don't think that it is worth it.
$CLICKHOUSE_LOCAL -q "SELECT 1"
LD_LIBRARY_PATH=/tmp $CLICKHOUSE_LOCAL -q "SELECT 1"