Merge branch 'master' into fp16

This commit is contained in:
Alexey Milovidov 2024-11-13 18:21:20 +01:00 committed by GitHub
commit d33f368fd9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
72 changed files with 778 additions and 345 deletions

View File

@ -145,6 +145,7 @@
#define TSA_TRY_ACQUIRE_SHARED(...) __attribute__((try_acquire_shared_capability(__VA_ARGS__))) /// function tries to acquire a shared capability and returns a boolean value indicating success or failure #define TSA_TRY_ACQUIRE_SHARED(...) __attribute__((try_acquire_shared_capability(__VA_ARGS__))) /// function tries to acquire a shared capability and returns a boolean value indicating success or failure
#define TSA_RELEASE_SHARED(...) __attribute__((release_shared_capability(__VA_ARGS__))) /// function releases the given shared capability #define TSA_RELEASE_SHARED(...) __attribute__((release_shared_capability(__VA_ARGS__))) /// function releases the given shared capability
#define TSA_SCOPED_LOCKABLE __attribute__((scoped_lockable)) /// object of a class has scoped lockable capability #define TSA_SCOPED_LOCKABLE __attribute__((scoped_lockable)) /// object of a class has scoped lockable capability
#define TSA_RETURN_CAPABILITY(...) __attribute__((lock_returned(__VA_ARGS__))) /// to return capabilities in functions
/// Macros for suppressing TSA warnings for specific reads/writes (instead of suppressing it for the whole function) /// Macros for suppressing TSA warnings for specific reads/writes (instead of suppressing it for the whole function)
/// They use a lambda function to apply function attribute to a single statement. This enable us to suppress warnings locally instead of /// They use a lambda function to apply function attribute to a single statement. This enable us to suppress warnings locally instead of

View File

@ -12,9 +12,12 @@
#include <Compression/ParallelCompressedWriteBuffer.h> #include <Compression/ParallelCompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h> #include <Compression/CompressedReadBufferFromFile.h>
#include <Compression/getCompressionCodecForFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/copyData.h> #include <IO/copyData.h>
#include <Parsers/parseQuery.h> #include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ExpressionElementParsers.h> #include <Parsers/ExpressionElementParsers.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <Common/TerminalSize.h> #include <Common/TerminalSize.h>
@ -43,29 +46,24 @@ namespace CurrentMetrics
namespace namespace
{ {
/// Outputs sizes of uncompressed and compressed blocks for compressed file. /// Outputs method, sizes of uncompressed and compressed blocks for compressed file.
void checkAndWriteHeader(DB::ReadBuffer & in, DB::WriteBuffer & out) void checkAndWriteHeader(DB::ReadBuffer & in, DB::WriteBuffer & out)
{ {
while (!in.eof()) while (!in.eof())
{ {
in.ignore(16); /// checksum UInt32 size_compressed;
UInt32 size_decompressed;
char header[COMPRESSED_BLOCK_HEADER_SIZE]; auto codec = DB::getCompressionCodecForFile(in, size_compressed, size_decompressed, true /* skip_to_next_block */);
in.readStrict(header, COMPRESSED_BLOCK_HEADER_SIZE);
UInt32 size_compressed = unalignedLoad<UInt32>(&header[1]);
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE) if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw DB::Exception(DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED, "Too large size_compressed. Most likely corrupted data."); throw DB::Exception(DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED, "Too large size_compressed. Most likely corrupted data.");
UInt32 size_decompressed = unalignedLoad<UInt32>(&header[5]); DB::writeText(queryToString(codec->getFullCodecDesc()), out);
DB::writeChar('\t', out);
DB::writeText(size_decompressed, out); DB::writeText(size_decompressed, out);
DB::writeChar('\t', out); DB::writeChar('\t', out);
DB::writeText(size_compressed, out); DB::writeText(size_compressed, out);
DB::writeChar('\n', out); DB::writeChar('\n', out);
in.ignore(size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
} }
} }

View File

@ -1,23 +1,47 @@
#pragma once #pragma once
#include <Common/OvercommitTracker.h>
#include <base/defines.h> #include <base/defines.h>
#include <Common/Exception.h>
#include <Common/OvercommitTracker.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
};
/** LockGuard provides RAII-style locking mechanism for a mutex. /** LockGuard provides RAII-style locking mechanism for a mutex.
** It's intended to be used like std::unique_ptr but with TSA annotations ** It's intended to be used like std::unique_lock but with TSA annotations
*/ */
template <typename Mutex> template <typename Mutex>
class TSA_SCOPED_LOCKABLE LockGuard class TSA_SCOPED_LOCKABLE LockGuard
{ {
public: public:
explicit LockGuard(Mutex & mutex_) TSA_ACQUIRE(mutex_) : mutex(mutex_) { mutex.lock(); } explicit LockGuard(Mutex & mutex_) TSA_ACQUIRE(mutex_) : mutex(mutex_) { lock(); }
~LockGuard() TSA_RELEASE() { mutex.unlock(); } ~LockGuard() TSA_RELEASE() { if (locked) unlock(); }
void lock() TSA_ACQUIRE()
{
/// Don't allow recursive_mutex for now.
if (locked)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't lock twice the same mutex");
mutex.lock();
locked = true;
}
void unlock() TSA_RELEASE()
{
if (!locked)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't unlock the mutex without locking it first");
mutex.unlock();
locked = false;
}
private: private:
Mutex & mutex; Mutex & mutex;
bool locked = false;
}; };
template <template<typename> typename TLockGuard, typename Mutex> template <template<typename> typename TLockGuard, typename Mutex>

View File

@ -10,33 +10,50 @@
namespace DB namespace DB
{ {
using Checksum = CityHash_v1_0_2::uint128; using Checksum = CityHash_v1_0_2::uint128;
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path) CompressionCodecPtr
getCompressionCodecForFile(ReadBuffer & read_buffer, UInt32 & size_compressed, UInt32 & size_decompressed, bool skip_to_next_block)
{ {
auto read_buffer = data_part_storage.readFile(relative_path, {}, std::nullopt, std::nullopt); read_buffer.ignore(sizeof(Checksum));
read_buffer->ignore(sizeof(Checksum));
UInt8 header_size = ICompressionCodec::getHeaderSize(); UInt8 header_size = ICompressionCodec::getHeaderSize();
size_t starting_bytes = read_buffer.count();
PODArray<char> compressed_buffer; PODArray<char> compressed_buffer;
compressed_buffer.resize(header_size); compressed_buffer.resize(header_size);
read_buffer->readStrict(compressed_buffer.data(), header_size); read_buffer.readStrict(compressed_buffer.data(), header_size);
uint8_t method = ICompressionCodec::readMethod(compressed_buffer.data()); uint8_t method = ICompressionCodec::readMethod(compressed_buffer.data());
size_compressed = unalignedLoad<UInt32>(&compressed_buffer[1]);
size_decompressed = unalignedLoad<UInt32>(&compressed_buffer[5]);
if (method == static_cast<uint8_t>(CompressionMethodByte::Multiple)) if (method == static_cast<uint8_t>(CompressionMethodByte::Multiple))
{ {
compressed_buffer.resize(1); compressed_buffer.resize(1);
read_buffer->readStrict(compressed_buffer.data(), 1); read_buffer.readStrict(compressed_buffer.data(), 1);
compressed_buffer.resize(1 + compressed_buffer[0]); compressed_buffer.resize(1 + compressed_buffer[0]);
read_buffer->readStrict(compressed_buffer.data() + 1, compressed_buffer[0]); read_buffer.readStrict(compressed_buffer.data() + 1, compressed_buffer[0]);
auto codecs_bytes = CompressionCodecMultiple::getCodecsBytesFromData(compressed_buffer.data()); auto codecs_bytes = CompressionCodecMultiple::getCodecsBytesFromData(compressed_buffer.data());
Codecs codecs; Codecs codecs;
for (auto byte : codecs_bytes) for (auto byte : codecs_bytes)
codecs.push_back(CompressionCodecFactory::instance().get(byte)); codecs.push_back(CompressionCodecFactory::instance().get(byte));
if (skip_to_next_block)
read_buffer.ignore(size_compressed - (read_buffer.count() - starting_bytes));
return std::make_shared<CompressionCodecMultiple>(codecs); return std::make_shared<CompressionCodecMultiple>(codecs);
} }
if (skip_to_next_block)
read_buffer.ignore(size_compressed - (read_buffer.count() - starting_bytes));
return CompressionCodecFactory::instance().get(method); return CompressionCodecFactory::instance().get(method);
} }
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path)
{
auto read_buffer = data_part_storage.readFile(relative_path, {}, std::nullopt, std::nullopt);
UInt32 size_compressed;
UInt32 size_decompressed;
return getCompressionCodecForFile(*read_buffer, size_compressed, size_decompressed, false);
}
} }

View File

@ -13,4 +13,8 @@ namespace DB
/// from metadata. /// from metadata.
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path); CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path);
/// Same as above which is used by clickhouse-compressor to print compression statistics of each data block.
CompressionCodecPtr
getCompressionCodecForFile(ReadBuffer & read_buffer, UInt32 & size_compressed, UInt32 & size_decompressed, bool skip_to_next_block);
} }

View File

@ -4560,7 +4560,7 @@ Possible values:
- 0 - Disable - 0 - Disable
- 1 - Enable - 1 - Enable
)", 0) \ )", 0) \
DECLARE(Bool, query_plan_merge_filters, false, R"( DECLARE(Bool, query_plan_merge_filters, true, R"(
Allow to merge filters in the query plan Allow to merge filters in the query plan
)", 0) \ )", 0) \
DECLARE(Bool, query_plan_filter_push_down, true, R"( DECLARE(Bool, query_plan_filter_push_down, true, R"(
@ -4861,9 +4861,9 @@ Allows to record the filesystem caching log for each query
DECLARE(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, R"( DECLARE(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, R"(
Allow to use the filesystem cache in passive mode - benefit from the existing cache entries, but don't put more entries into the cache. If you set this setting for heavy ad-hoc queries and leave it disabled for short real-time queries, this will allows to avoid cache threshing by too heavy queries and to improve the overall system efficiency. Allow to use the filesystem cache in passive mode - benefit from the existing cache entries, but don't put more entries into the cache. If you set this setting for heavy ad-hoc queries and leave it disabled for short real-time queries, this will allows to avoid cache threshing by too heavy queries and to improve the overall system efficiency.
)", 0) \ )", 0) \
DECLARE(Bool, skip_download_if_exceeds_query_cache, true, R"( DECLARE(Bool, filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit, true, R"(
Skip download from remote filesystem if exceeds query cache size Skip download from remote filesystem if exceeds query cache size
)", 0) \ )", 0) ALIAS(skip_download_if_exceeds_query_cache) \
DECLARE(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), R"( DECLARE(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), R"(
Max remote filesystem cache size that can be downloaded by a single query Max remote filesystem cache size that can be downloaded by a single query
)", 0) \ )", 0) \

View File

@ -76,8 +76,10 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"backup_restore_keeper_max_retries_while_initializing", 0, 20, "New setting."}, {"backup_restore_keeper_max_retries_while_initializing", 0, 20, "New setting."},
{"backup_restore_keeper_max_retries_while_handling_error", 0, 20, "New setting."}, {"backup_restore_keeper_max_retries_while_handling_error", 0, 20, "New setting."},
{"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."}, {"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."},
{"query_plan_merge_filters", false, true, "Allow to merge filters in the query plan. This is required to properly support filter-push-down with a new analyzer."},
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"}, {"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
{"allow_experimental_bfloat16_type", false, false, "Add new experimental BFloat16 type"}, {"allow_experimental_bfloat16_type", false, false, "Add new experimental BFloat16 type"},
{"filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit", 1, 1, "Rename of setting skip_download_if_exceeds_query_cache_limit"},
{"filesystem_cache_prefer_bigger_buffer_size", true, true, "New setting"}, {"filesystem_cache_prefer_bigger_buffer_size", true, true, "New setting"},
{"read_in_order_use_virtual_row", false, false, "Use virtual row while reading in order of primary key or its monotonic function fashion. It is useful when searching over multiple parts as only relevant ones are touched."}, {"read_in_order_use_virtual_row", false, false, "Use virtual row while reading in order of primary key or its monotonic function fashion. It is useful when searching over multiple parts as only relevant ones are touched."},
} }

View File

@ -277,19 +277,6 @@ void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const Sha
} }
} }
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void AzureObjectStorage::removeObject(const StoredObject & object)
{
removeObjectImpl(object, client.get(), false);
}
void AzureObjectStorage::removeObjects(const StoredObjects & objects)
{
auto client_ptr = client.get();
for (const auto & object : objects)
removeObjectImpl(object, client_ptr, false);
}
void AzureObjectStorage::removeObjectIfExists(const StoredObject & object) void AzureObjectStorage::removeObjectIfExists(const StoredObject & object)
{ {
removeObjectImpl(object, client.get(), true); removeObjectImpl(object, client.get(), true);

View File

@ -59,11 +59,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override; const WriteSettings & write_settings = {}) override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const StoredObject & object) override; void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const StoredObjects & objects) override; void removeObjectsIfExist(const StoredObjects & objects) override;

View File

@ -148,20 +148,6 @@ void CachedObjectStorage::removeCacheIfExists(const std::string & path_key_for_c
cache->removeKeyIfExists(getCacheKey(path_key_for_cache), FileCache::getCommonUser().user_id); cache->removeKeyIfExists(getCacheKey(path_key_for_cache), FileCache::getCommonUser().user_id);
} }
void CachedObjectStorage::removeObject(const StoredObject & object)
{
removeCacheIfExists(object.remote_path);
object_storage->removeObject(object);
}
void CachedObjectStorage::removeObjects(const StoredObjects & objects)
{
for (const auto & object : objects)
removeCacheIfExists(object.remote_path);
object_storage->removeObjects(objects);
}
void CachedObjectStorage::removeObjectIfExists(const StoredObject & object) void CachedObjectStorage::removeObjectIfExists(const StoredObject & object)
{ {
removeCacheIfExists(object.remote_path); removeCacheIfExists(object.remote_path);

View File

@ -45,10 +45,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override; const WriteSettings & write_settings = {}) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const StoredObject & object) override; void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const StoredObjects & objects) override; void removeObjectsIfExist(const StoredObjects & objects) override;

View File

@ -480,8 +480,7 @@ struct WriteFileObjectStorageOperation final : public IDiskObjectStorageOperatio
void undo() override void undo() override
{ {
if (object_storage.exists(object)) object_storage.removeObjectIfExists(object);
object_storage.removeObject(object);
} }
void finalize() override void finalize() override
@ -543,8 +542,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
void undo() override void undo() override
{ {
for (const auto & object : created_objects) destination_object_storage.removeObjectsIfExist(created_objects);
destination_object_storage.removeObject(object);
} }
void finalize() override void finalize() override

View File

@ -77,11 +77,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override; const WriteSettings & write_settings = {}) override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const StoredObject & object) override; void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const StoredObjects & objects) override; void removeObjectsIfExist(const StoredObjects & objects) override;
@ -117,6 +112,11 @@ private:
void initializeHDFSFS() const; void initializeHDFSFS() const;
std::string extractObjectKeyFromURL(const StoredObject & object) const; std::string extractObjectKeyFromURL(const StoredObject & object) const;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const StoredObject & object);
void removeObjects(const StoredObjects & objects);
const Poco::Util::AbstractConfiguration & config; const Poco::Util::AbstractConfiguration & config;
mutable HDFSBuilderWrapper hdfs_builder; mutable HDFSBuilderWrapper hdfs_builder;

View File

@ -161,11 +161,11 @@ public:
virtual bool isRemote() const = 0; virtual bool isRemote() const = 0;
/// Remove object. Throws exception if object doesn't exists. /// Remove object. Throws exception if object doesn't exists.
virtual void removeObject(const StoredObject & object) = 0; // virtual void removeObject(const StoredObject & object) = 0;
/// Remove multiple objects. Some object storages can do batch remove in a more /// Remove multiple objects. Some object storages can do batch remove in a more
/// optimal way. /// optimal way.
virtual void removeObjects(const StoredObjects & objects) = 0; // virtual void removeObjects(const StoredObjects & objects) = 0;
/// Remove object on path if exists /// Remove object on path if exists
virtual void removeObjectIfExists(const StoredObject & object) = 0; virtual void removeObjectIfExists(const StoredObject & object) = 0;

View File

@ -81,7 +81,7 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
return std::make_unique<WriteBufferFromFile>(object.remote_path, buf_size); return std::make_unique<WriteBufferFromFile>(object.remote_path, buf_size);
} }
void LocalObjectStorage::removeObject(const StoredObject & object) void LocalObjectStorage::removeObject(const StoredObject & object) const
{ {
/// For local object storage files are actually removed when "metadata" is removed. /// For local object storage files are actually removed when "metadata" is removed.
if (!exists(object)) if (!exists(object))
@ -91,7 +91,7 @@ void LocalObjectStorage::removeObject(const StoredObject & object)
ErrnoException::throwFromPath(ErrorCodes::CANNOT_UNLINK, object.remote_path, "Cannot unlink file {}", object.remote_path); ErrnoException::throwFromPath(ErrorCodes::CANNOT_UNLINK, object.remote_path, "Cannot unlink file {}", object.remote_path);
} }
void LocalObjectStorage::removeObjects(const StoredObjects & objects) void LocalObjectStorage::removeObjects(const StoredObjects & objects) const
{ {
for (const auto & object : objects) for (const auto & object : objects)
removeObject(object); removeObject(object);

View File

@ -42,10 +42,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override; const WriteSettings & write_settings = {}) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const StoredObject & object) override; void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const StoredObjects & objects) override; void removeObjectsIfExist(const StoredObjects & objects) override;
@ -82,6 +78,10 @@ public:
ReadSettings patchSettings(const ReadSettings & read_settings) const override; ReadSettings patchSettings(const ReadSettings & read_settings) const override;
private: private:
void removeObject(const StoredObject & object) const;
void removeObjects(const StoredObjects & objects) const;
String key_prefix; String key_prefix;
LoggerPtr log; LoggerPtr log;
std::string description; std::string description;

View File

@ -203,7 +203,7 @@ void MetadataStorageFromPlainObjectStorageTransaction::unlinkFile(const std::str
{ {
auto object_key = metadata_storage.object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */); auto object_key = metadata_storage.object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */);
auto object = StoredObject(object_key.serialize()); auto object = StoredObject(object_key.serialize());
metadata_storage.object_storage->removeObject(object); metadata_storage.object_storage->removeObjectIfExists(object);
} }
void MetadataStorageFromPlainObjectStorageTransaction::removeDirectory(const std::string & path) void MetadataStorageFromPlainObjectStorageTransaction::removeDirectory(const std::string & path)
@ -211,7 +211,7 @@ void MetadataStorageFromPlainObjectStorageTransaction::removeDirectory(const std
if (metadata_storage.object_storage->isWriteOnce()) if (metadata_storage.object_storage->isWriteOnce())
{ {
for (auto it = metadata_storage.iterateDirectory(path); it->isValid(); it->next()) for (auto it = metadata_storage.iterateDirectory(path); it->isValid(); it->next())
metadata_storage.object_storage->removeObject(StoredObject(it->path())); metadata_storage.object_storage->removeObjectIfExists(StoredObject(it->path()));
} }
else else
{ {

View File

@ -107,7 +107,7 @@ void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::undo(std::un
auto metric = object_storage->getMetadataStorageMetrics().directory_map_size; auto metric = object_storage->getMetadataStorageMetrics().directory_map_size;
CurrentMetrics::sub(metric, 1); CurrentMetrics::sub(metric, 1);
object_storage->removeObject(StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME)); object_storage->removeObjectIfExists(StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME));
} }
else if (write_created) else if (write_created)
object_storage->removeObjectIfExists(StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME)); object_storage->removeObjectIfExists(StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME));
@ -247,7 +247,7 @@ void MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::execute(std:
auto metadata_object_key = createMetadataObjectKey(key_prefix, metadata_key_prefix); auto metadata_object_key = createMetadataObjectKey(key_prefix, metadata_key_prefix);
auto metadata_object = StoredObject(/*remote_path*/ metadata_object_key.serialize(), /*local_path*/ path / PREFIX_PATH_FILE_NAME); auto metadata_object = StoredObject(/*remote_path*/ metadata_object_key.serialize(), /*local_path*/ path / PREFIX_PATH_FILE_NAME);
object_storage->removeObject(metadata_object); object_storage->removeObjectIfExists(metadata_object);
{ {
std::lock_guard lock(path_map.mutex); std::lock_guard lock(path_map.mutex);

View File

@ -326,21 +326,11 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
ProfileEvents::DiskS3DeleteObjects); ProfileEvents::DiskS3DeleteObjects);
} }
void S3ObjectStorage::removeObject(const StoredObject & object)
{
removeObjectImpl(object, false);
}
void S3ObjectStorage::removeObjectIfExists(const StoredObject & object) void S3ObjectStorage::removeObjectIfExists(const StoredObject & object)
{ {
removeObjectImpl(object, true); removeObjectImpl(object, true);
} }
void S3ObjectStorage::removeObjects(const StoredObjects & objects)
{
removeObjectsImpl(objects, false);
}
void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects) void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
{ {
removeObjectsImpl(objects, true); removeObjectsImpl(objects, true);

View File

@ -101,13 +101,6 @@ public:
ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const override; ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const override;
/// Uses `DeleteObjectRequest`.
void removeObject(const StoredObject & object) override;
/// Uses `DeleteObjectsRequest` if it is allowed by `s3_capabilities`, otherwise `DeleteObjectRequest`.
/// `DeleteObjectsRequest` is not supported on GCS, see https://issuetracker.google.com/issues/162653700 .
void removeObjects(const StoredObjects & objects) override;
/// Uses `DeleteObjectRequest`. /// Uses `DeleteObjectRequest`.
void removeObjectIfExists(const StoredObject & object) override; void removeObjectIfExists(const StoredObject & object) override;

View File

@ -254,16 +254,6 @@ std::unique_ptr<WriteBufferFromFileBase> WebObjectStorage::writeObject( /// NOLI
throwNotAllowed(); throwNotAllowed();
} }
void WebObjectStorage::removeObject(const StoredObject &)
{
throwNotAllowed();
}
void WebObjectStorage::removeObjects(const StoredObjects &)
{
throwNotAllowed();
}
void WebObjectStorage::removeObjectIfExists(const StoredObject &) void WebObjectStorage::removeObjectIfExists(const StoredObject &)
{ {
throwNotAllowed(); throwNotAllowed();

View File

@ -47,10 +47,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override; const WriteSettings & write_settings = {}) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const StoredObject & object) override; void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const StoredObjects & objects) override; void removeObjectsIfExist(const StoredObjects & objects) override;

View File

@ -1035,6 +1035,9 @@ private:
size_t tuple_size, size_t tuple_size,
size_t input_rows_count) const size_t input_rows_count) const
{ {
if (0 == tuple_size)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Comparison of zero-sized tuples is not implemented");
ColumnsWithTypeAndName less_columns(tuple_size); ColumnsWithTypeAndName less_columns(tuple_size);
ColumnsWithTypeAndName equal_columns(tuple_size - 1); ColumnsWithTypeAndName equal_columns(tuple_size - 1);
ColumnsWithTypeAndName tmp_columns(2); ColumnsWithTypeAndName tmp_columns(2);

View File

@ -668,6 +668,9 @@ private:
temporary_columns[0] = arguments[0]; temporary_columns[0] = arguments[0];
size_t tuple_size = type1.getElements().size(); size_t tuple_size = type1.getElements().size();
if (tuple_size == 0)
return ColumnTuple::create(input_rows_count);
Columns tuple_columns(tuple_size); Columns tuple_columns(tuple_size);
for (size_t i = 0; i < tuple_size; ++i) for (size_t i = 0; i < tuple_size; ++i)

View File

@ -69,7 +69,7 @@ struct ReadSettings
std::shared_ptr<PageCache> page_cache; std::shared_ptr<PageCache> page_cache;
size_t filesystem_cache_max_download_size = (128UL * 1024 * 1024 * 1024); size_t filesystem_cache_max_download_size = (128UL * 1024 * 1024 * 1024);
bool skip_download_if_exceeds_query_cache = true; bool filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit = true;
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE; size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;

View File

@ -37,7 +37,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
/// Case when bucket name represented in domain name of S3 URL. /// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key) /// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access /// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3express[\-a-z0-9]+|s3|cos|obs|oss|eos)([.\-][a-z0-9\-.:]+))"); static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3express[\-a-z0-9]+|s3|cos|obs|oss-data-acc|oss|eos)([.\-][a-z0-9\-.:]+))");
/// Case when AWS Private Link Interface is being used /// Case when AWS Private Link Interface is being used
/// E.g. (bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/bucket-name/key) /// E.g. (bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/bucket-name/key)
@ -115,7 +115,15 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
&& re2::RE2::FullMatch(uri.getAuthority(), virtual_hosted_style_pattern, &bucket, &name, &endpoint_authority_from_uri)) && re2::RE2::FullMatch(uri.getAuthority(), virtual_hosted_style_pattern, &bucket, &name, &endpoint_authority_from_uri))
{ {
is_virtual_hosted_style = true; is_virtual_hosted_style = true;
endpoint = uri.getScheme() + "://" + name + endpoint_authority_from_uri; if (name == "oss-data-acc")
{
bucket = bucket.substr(0, bucket.find('.'));
endpoint = uri.getScheme() + "://" + uri.getHost().substr(bucket.length() + 1);
}
else
{
endpoint = uri.getScheme() + "://" + name + endpoint_authority_from_uri;
}
validateBucket(bucket, uri); validateBucket(bucket, uri);
if (!uri.getPath().empty()) if (!uri.getPath().empty())

View File

@ -212,6 +212,22 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("", uri.version_id); ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style); ASSERT_EQ(true, uri.is_virtual_hosted_style);
} }
{
S3::URI uri("https://bucket-test1.oss-cn-beijing-internal.aliyuncs.com/ab-test");
ASSERT_EQ("https://oss-cn-beijing-internal.aliyuncs.com", uri.endpoint);
ASSERT_EQ("bucket-test1", uri.bucket);
ASSERT_EQ("ab-test", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
S3::URI uri("https://bucket-test.cn-beijing-internal.oss-data-acc.aliyuncs.com/ab-test");
ASSERT_EQ("https://cn-beijing-internal.oss-data-acc.aliyuncs.com", uri.endpoint);
ASSERT_EQ("bucket-test", uri.bucket);
ASSERT_EQ("ab-test", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
} }
TEST(S3UriTest, versionIdChecks) TEST(S3UriTest, versionIdChecks)

View File

@ -53,7 +53,7 @@ FileCacheQueryLimit::QueryContextPtr FileCacheQueryLimit::getOrSetQueryContext(
{ {
it->second = std::make_shared<QueryContext>( it->second = std::make_shared<QueryContext>(
settings.filesystem_cache_max_download_size, settings.filesystem_cache_max_download_size,
!settings.skip_download_if_exceeds_query_cache); !settings.filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit);
} }
return it->second; return it->second;

View File

@ -237,7 +237,7 @@ namespace Setting
extern const SettingsUInt64 remote_fs_read_backoff_max_tries; extern const SettingsUInt64 remote_fs_read_backoff_max_tries;
extern const SettingsUInt64 remote_read_min_bytes_for_seek; extern const SettingsUInt64 remote_read_min_bytes_for_seek;
extern const SettingsBool throw_on_error_from_cache_on_write_operations; extern const SettingsBool throw_on_error_from_cache_on_write_operations;
extern const SettingsBool skip_download_if_exceeds_query_cache; extern const SettingsBool filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit;
extern const SettingsBool s3_allow_parallel_part_upload; extern const SettingsBool s3_allow_parallel_part_upload;
extern const SettingsBool use_page_cache_for_disks_without_file_cache; extern const SettingsBool use_page_cache_for_disks_without_file_cache;
extern const SettingsUInt64 use_structure_from_insertion_table_in_table_functions; extern const SettingsUInt64 use_structure_from_insertion_table_in_table_functions;
@ -5755,7 +5755,7 @@ ReadSettings Context::getReadSettings() const
res.filesystem_cache_prefer_bigger_buffer_size = settings_ref[Setting::filesystem_cache_prefer_bigger_buffer_size]; res.filesystem_cache_prefer_bigger_buffer_size = settings_ref[Setting::filesystem_cache_prefer_bigger_buffer_size];
res.filesystem_cache_max_download_size = settings_ref[Setting::filesystem_cache_max_download_size]; res.filesystem_cache_max_download_size = settings_ref[Setting::filesystem_cache_max_download_size];
res.skip_download_if_exceeds_query_cache = settings_ref[Setting::skip_download_if_exceeds_query_cache]; res.filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit = settings_ref[Setting::filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit];
res.page_cache = getPageCache(); res.page_cache = getPageCache();
res.use_page_cache_for_disks_without_file_cache = settings_ref[Setting::use_page_cache_for_disks_without_file_cache]; res.use_page_cache_for_disks_without_file_cache = settings_ref[Setting::use_page_cache_for_disks_without_file_cache];

View File

@ -1,6 +1,7 @@
#include <base/getFQDNOrHostName.h> #include <base/getFQDNOrHostName.h>
#include <Common/DateLUT.h> #include <Common/DateLUT.h>
#include <Common/DateLUTImpl.h> #include <Common/DateLUTImpl.h>
#include <Common/LockGuard.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h> #include <DataTypes/DataTypeDateTime64.h>
@ -16,7 +17,6 @@
#include <chrono> #include <chrono>
#include <fmt/chrono.h> #include <fmt/chrono.h>
#include <mutex>
namespace DB namespace DB
@ -24,6 +24,15 @@ namespace DB
static auto logger = getLogger("QueryMetricLog"); static auto logger = getLogger("QueryMetricLog");
String timePointToString(QueryMetricLog::TimePoint time)
{
/// fmtlib supports subsecond formatting in 10.0.0. We're in 9.1.0, so we need to add the milliseconds ourselves.
auto seconds = std::chrono::time_point_cast<std::chrono::seconds>(time);
auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(time - seconds).count();
return fmt::format("{:%Y.%m.%d %H:%M:%S}.{:06}", seconds, microseconds);
}
ColumnsDescription QueryMetricLogElement::getColumnsDescription() ColumnsDescription QueryMetricLogElement::getColumnsDescription()
{ {
ColumnsDescription result; ColumnsDescription result;
@ -87,36 +96,73 @@ void QueryMetricLog::shutdown()
Base::shutdown(); Base::shutdown();
} }
void QueryMetricLog::startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds) void QueryMetricLog::collectMetric(const ProcessList & process_list, String query_id)
{ {
QueryMetricLogStatus status; auto current_time = std::chrono::system_clock::now();
status.interval_milliseconds = interval_milliseconds; const auto query_info = process_list.getQueryInfo(query_id, false, true, false);
status.next_collect_time = start_time + std::chrono::milliseconds(interval_milliseconds); if (!query_info)
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} is not running anymore, so we couldn't get its QueryStatusInfo", query_id);
return;
}
LockGuard global_lock(queries_mutex);
auto it = queries.find(query_id);
/// The query might have finished while the scheduled task is running.
if (it == queries.end())
{
global_lock.unlock();
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} not found in the list. Finished while this collecting task was running", query_id);
return;
}
auto & query_status = it->second;
if (!query_status.mutex)
{
global_lock.unlock();
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} finished while this collecting task was running", query_id);
return;
}
LockGuard query_lock(query_status.getMutex());
global_lock.unlock();
auto elem = query_status.createLogMetricElement(query_id, *query_info, current_time);
if (elem)
add(std::move(elem.value()));
}
/// We use TSA_NO_THREAD_SAFETY_ANALYSIS to prevent TSA complaining that we're modifying the query_status fields
/// without locking the mutex. Since we're building it from scratch, there's no harm in not holding it.
/// If we locked it to make TSA happy, TSAN build would falsely complain about
/// lock-order-inversion (potential deadlock)
/// which is not a real issue since QueryMetricLogStatus's mutex cannot be locked by anything else
/// until we add it to the queries map.
void QueryMetricLog::startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds) TSA_NO_THREAD_SAFETY_ANALYSIS
{
QueryMetricLogStatus query_status;
QueryMetricLogStatusInfo & info = query_status.info;
info.interval_milliseconds = interval_milliseconds;
info.next_collect_time = start_time;
auto context = getContext(); auto context = getContext();
const auto & process_list = context->getProcessList(); const auto & process_list = context->getProcessList();
status.task = context->getSchedulePool().createTask("QueryMetricLog", [this, &process_list, query_id] { info.task = context->getSchedulePool().createTask("QueryMetricLog", [this, &process_list, query_id] {
auto current_time = std::chrono::system_clock::now(); collectMetric(process_list, query_id);
const auto query_info = process_list.getQueryInfo(query_id, false, true, false);
if (!query_info)
{
LOG_TRACE(logger, "Query {} is not running anymore, so we couldn't get its QueryStatusInfo", query_id);
return;
}
auto elem = createLogMetricElement(query_id, *query_info, current_time);
if (elem)
add(std::move(elem.value()));
}); });
std::lock_guard lock(queries_mutex); LockGuard global_lock(queries_mutex);
status.task->scheduleAfter(interval_milliseconds); query_status.scheduleNext(query_id);
queries.emplace(query_id, std::move(status)); queries.emplace(query_id, std::move(query_status));
} }
void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time, QueryStatusInfoPtr query_info) void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time, QueryStatusInfoPtr query_info)
{ {
std::unique_lock lock(queries_mutex); LockGuard global_lock(queries_mutex);
auto it = queries.find(query_id); auto it = queries.find(query_id);
/// finishQuery may be called from logExceptionBeforeStart when the query has not even started /// finishQuery may be called from logExceptionBeforeStart when the query has not even started
@ -124,9 +170,19 @@ void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time,
if (it == queries.end()) if (it == queries.end())
return; return;
auto & query_status = it->second;
decltype(query_status.mutex) query_mutex;
LockGuard query_lock(query_status.getMutex());
/// Move the query mutex here so that we hold it until the end, after removing the query from queries.
query_mutex = std::move(query_status.mutex);
query_status.mutex = {};
global_lock.unlock();
if (query_info) if (query_info)
{ {
auto elem = createLogMetricElement(query_id, *query_info, finish_time, false); auto elem = query_status.createLogMetricElement(query_id, *query_info, finish_time, false);
if (elem) if (elem)
add(std::move(elem.value())); add(std::move(elem.value()));
} }
@ -139,51 +195,58 @@ void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time,
/// that order. /// that order.
{ {
/// Take ownership of the task so that we can destroy it in this scope after unlocking `queries_mutex`. /// Take ownership of the task so that we can destroy it in this scope after unlocking `queries_mutex`.
auto task = std::move(it->second.task); auto task = std::move(query_status.info.task);
/// Build an empty task for the old task to make sure it does not lock any mutex on its destruction. /// Build an empty task for the old task to make sure it does not lock any mutex on its destruction.
it->second.task = {}; query_status.info.task = {};
query_lock.unlock();
global_lock.lock();
queries.erase(query_id); queries.erase(query_id);
/// Ensure `queries_mutex` is unlocked before calling task's destructor at the end of this /// Ensure `queries_mutex` is unlocked before calling task's destructor at the end of this
/// scope which will lock `exec_mutex`. /// scope which will lock `exec_mutex`.
lock.unlock(); global_lock.unlock();
} }
} }
std::optional<QueryMetricLogElement> QueryMetricLog::createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next) void QueryMetricLogStatus::scheduleNext(String query_id)
{ {
/// fmtlib supports subsecond formatting in 10.0.0. We're in 9.1.0, so we need to add the milliseconds ourselves. info.next_collect_time += std::chrono::milliseconds(info.interval_milliseconds);
auto seconds = std::chrono::time_point_cast<std::chrono::seconds>(query_info_time); const auto now = std::chrono::system_clock::now();
auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(query_info_time - seconds).count(); if (info.next_collect_time > now)
LOG_DEBUG(logger, "Collecting query_metric_log for query {} with QueryStatusInfo from {:%Y.%m.%d %H:%M:%S}.{:06}. Schedule next: {}", query_id, seconds, microseconds, schedule_next);
std::unique_lock lock(queries_mutex);
auto query_status_it = queries.find(query_id);
/// The query might have finished while the scheduled task is running.
if (query_status_it == queries.end())
{ {
lock.unlock(); const auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(info.next_collect_time - now).count();
LOG_TRACE(logger, "Query {} finished already while this collecting task was running", query_id); info.task->scheduleAfter(wait_time);
return {};
} }
else
auto & query_status = query_status_it->second;
if (query_info_time <= query_status.last_collect_time)
{ {
lock.unlock(); LOG_TRACE(logger, "The next collecting task for query {} should have already run at {}. Scheduling it right now",
query_id, timePointToString(info.next_collect_time));
info.task->schedule();
}
}
std::optional<QueryMetricLogElement> QueryMetricLogStatus::createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next)
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Collecting query_metric_log for query {} and interval {} ms with QueryStatusInfo from {}. Next collection time: {}",
query_id, info.interval_milliseconds, timePointToString(query_info_time),
schedule_next ? timePointToString(info.next_collect_time + std::chrono::milliseconds(info.interval_milliseconds)) : "finished");
if (query_info_time <= info.last_collect_time)
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} has a more recent metrics collected. Skipping this one", query_id); LOG_TRACE(logger, "Query {} has a more recent metrics collected. Skipping this one", query_id);
return {}; return {};
} }
query_status.last_collect_time = query_info_time; info.last_collect_time = query_info_time;
QueryMetricLogElement elem; QueryMetricLogElement elem;
elem.event_time = timeInSeconds(query_info_time); elem.event_time = timeInSeconds(query_info_time);
elem.event_time_microseconds = timeInMicroseconds(query_info_time); elem.event_time_microseconds = timeInMicroseconds(query_info_time);
elem.query_id = query_status_it->first; elem.query_id = query_id;
elem.memory_usage = query_info.memory_usage > 0 ? query_info.memory_usage : 0; elem.memory_usage = query_info.memory_usage > 0 ? query_info.memory_usage : 0;
elem.peak_memory_usage = query_info.peak_memory_usage > 0 ? query_info.peak_memory_usage : 0; elem.peak_memory_usage = query_info.peak_memory_usage > 0 ? query_info.peak_memory_usage : 0;
@ -192,7 +255,7 @@ std::optional<QueryMetricLogElement> QueryMetricLog::createLogMetricElement(cons
for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i) for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
{ {
const auto & new_value = (*(query_info.profile_counters))[i]; const auto & new_value = (*(query_info.profile_counters))[i];
auto & old_value = query_status.last_profile_events[i]; auto & old_value = info.last_profile_events[i];
/// Profile event counters are supposed to be monotonic. However, at least the `NetworkReceiveBytes` can be inaccurate. /// Profile event counters are supposed to be monotonic. However, at least the `NetworkReceiveBytes` can be inaccurate.
/// So, since in the future the counter should always have a bigger value than in the past, we skip this event. /// So, since in the future the counter should always have a bigger value than in the past, we skip this event.
@ -208,16 +271,13 @@ std::optional<QueryMetricLogElement> QueryMetricLog::createLogMetricElement(cons
} }
else else
{ {
LOG_TRACE(logger, "Query {} has no profile counters", query_id); /// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_DEBUG(logger, "Query {} has no profile counters", query_id);
elem.profile_events = std::vector<ProfileEvents::Count>(ProfileEvents::end()); elem.profile_events = std::vector<ProfileEvents::Count>(ProfileEvents::end());
} }
if (schedule_next) if (schedule_next)
{ scheduleNext(query_id);
query_status.next_collect_time += std::chrono::milliseconds(query_status.interval_milliseconds);
const auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(query_status.next_collect_time - std::chrono::system_clock::now()).count();
query_status.task->scheduleAfter(wait_time);
}
return elem; return elem;
} }

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <base/defines.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
@ -11,11 +12,17 @@
#include <chrono> #include <chrono>
#include <ctime> #include <ctime>
#include <mutex>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
};
/** QueryMetricLogElement is a log of query metric values measured at regular time interval. /** QueryMetricLogElement is a log of query metric values measured at regular time interval.
*/ */
@ -34,7 +41,7 @@ struct QueryMetricLogElement
void appendToBlock(MutableColumns & columns) const; void appendToBlock(MutableColumns & columns) const;
}; };
struct QueryMetricLogStatus struct QueryMetricLogStatusInfo
{ {
UInt64 interval_milliseconds; UInt64 interval_milliseconds;
std::chrono::system_clock::time_point last_collect_time; std::chrono::system_clock::time_point last_collect_time;
@ -43,24 +50,47 @@ struct QueryMetricLogStatus
BackgroundSchedulePool::TaskHolder task; BackgroundSchedulePool::TaskHolder task;
}; };
struct QueryMetricLogStatus
{
using TimePoint = std::chrono::system_clock::time_point;
using Mutex = std::mutex;
QueryMetricLogStatusInfo info TSA_GUARDED_BY(getMutex());
/// We need to be able to move it for the hash map, so we need to add an indirection here.
std::unique_ptr<Mutex> mutex = std::make_unique<Mutex>();
/// Return a reference to the mutex, used for Thread Sanitizer annotations.
Mutex & getMutex() const TSA_RETURN_CAPABILITY(mutex)
{
if (!mutex)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutex cannot be NULL");
return *mutex;
}
void scheduleNext(String query_id) TSA_REQUIRES(getMutex());
std::optional<QueryMetricLogElement> createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next = true) TSA_REQUIRES(getMutex());
};
class QueryMetricLog : public SystemLog<QueryMetricLogElement> class QueryMetricLog : public SystemLog<QueryMetricLogElement>
{ {
using SystemLog<QueryMetricLogElement>::SystemLog; using SystemLog<QueryMetricLogElement>::SystemLog;
using TimePoint = std::chrono::system_clock::time_point;
using Base = SystemLog<QueryMetricLogElement>; using Base = SystemLog<QueryMetricLogElement>;
public: public:
using TimePoint = std::chrono::system_clock::time_point;
void shutdown() final; void shutdown() final;
// Both startQuery and finishQuery are called from the thread that executes the query /// Both startQuery and finishQuery are called from the thread that executes the query.
void startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds); void startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds);
void finishQuery(const String & query_id, TimePoint finish_time, QueryStatusInfoPtr query_info = nullptr); void finishQuery(const String & query_id, TimePoint finish_time, QueryStatusInfoPtr query_info = nullptr);
private: private:
std::optional<QueryMetricLogElement> createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next = true); void collectMetric(const ProcessList & process_list, String query_id);
std::recursive_mutex queries_mutex; std::mutex queries_mutex;
std::unordered_map<String, QueryMetricLogStatus> queries; std::unordered_map<String, QueryMetricLogStatus> queries TSA_GUARDED_BY(queries_mutex);
}; };
} }

View File

@ -505,6 +505,7 @@ void logQueryFinish(
auto time_now = std::chrono::system_clock::now(); auto time_now = std::chrono::system_clock::now();
QueryStatusInfo info = process_list_elem->getInfo(true, settings[Setting::log_profile_events]); QueryStatusInfo info = process_list_elem->getInfo(true, settings[Setting::log_profile_events]);
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, std::make_shared<QueryStatusInfo>(info));
elem.type = QueryLogElementType::QUERY_FINISH; elem.type = QueryLogElementType::QUERY_FINISH;
addStatusInfoToQueryLogElement(elem, info, query_ast, context); addStatusInfoToQueryLogElement(elem, info, query_ast, context);
@ -623,6 +624,7 @@ void logQueryException(
{ {
elem.query_duration_ms = start_watch.elapsedMilliseconds(); elem.query_duration_ms = start_watch.elapsedMilliseconds();
} }
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, info);
elem.query_cache_usage = QueryCache::Usage::None; elem.query_cache_usage = QueryCache::Usage::None;
@ -652,8 +654,6 @@ void logQueryException(
query_span->addAttribute("clickhouse.exception_code", elem.exception_code); query_span->addAttribute("clickhouse.exception_code", elem.exception_code);
query_span->finish(); query_span->finish();
} }
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, info);
} }
void logExceptionBeforeStart( void logExceptionBeforeStart(
@ -707,6 +707,8 @@ void logExceptionBeforeStart(
elem.client_info = context->getClientInfo(); elem.client_info = context->getClientInfo();
logQueryMetricLogFinish(context, false, elem.client_info.current_query_id, std::chrono::system_clock::now(), nullptr);
elem.log_comment = settings[Setting::log_comment]; elem.log_comment = settings[Setting::log_comment];
if (elem.log_comment.size() > settings[Setting::max_query_size]) if (elem.log_comment.size() > settings[Setting::max_query_size])
elem.log_comment.resize(settings[Setting::max_query_size]); elem.log_comment.resize(settings[Setting::max_query_size]);
@ -751,8 +753,6 @@ void logExceptionBeforeStart(
ProfileEvents::increment(ProfileEvents::FailedInsertQuery); ProfileEvents::increment(ProfileEvents::FailedInsertQuery);
} }
} }
logQueryMetricLogFinish(context, false, elem.client_info.current_query_id, std::chrono::system_clock::now(), nullptr);
} }
void validateAnalyzerSettings(ASTPtr ast, bool context_value) void validateAnalyzerSettings(ASTPtr ast, bool context_value)

View File

@ -6,12 +6,23 @@
namespace DB namespace DB
{ {
namespace Setting
{
extern const SettingsBool query_plan_merge_filters;
}
BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from) BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from)
{ {
const auto & query_settings = from->getSettingsRef();
BuildQueryPipelineSettings settings; BuildQueryPipelineSettings settings;
settings.actions_settings = ExpressionActionsSettings::fromSettings(from->getSettingsRef(), CompileExpressions::yes); settings.actions_settings = ExpressionActionsSettings::fromSettings(query_settings, CompileExpressions::yes);
settings.process_list_element = from->getProcessListElement(); settings.process_list_element = from->getProcessListElement();
settings.progress_callback = from->getProgressCallback(); settings.progress_callback = from->getProgressCallback();
/// Setting query_plan_merge_filters is enabled by default.
/// But it can brake short-circuit without splitting filter step into smaller steps.
/// So, enable and disable this optimizations together.
settings.enable_multiple_filters_transforms_for_and_chain = query_settings[Setting::query_plan_merge_filters];
return settings; return settings;
} }

View File

@ -17,6 +17,8 @@ using TemporaryFileLookupPtr = std::shared_ptr<ITemporaryFileLookup>;
struct BuildQueryPipelineSettings struct BuildQueryPipelineSettings
{ {
bool enable_multiple_filters_transforms_for_and_chain = true;
ExpressionActionsSettings actions_settings; ExpressionActionsSettings actions_settings;
QueryStatusPtr process_list_element; QueryStatusPtr process_list_element;
ProgressCallback progress_callback = nullptr; ProgressCallback progress_callback = nullptr;

View File

@ -5,6 +5,11 @@
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Common/JSONBuilder.h> #include <Common/JSONBuilder.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunction.h>
#include <stack>
#include <ranges>
namespace DB namespace DB
{ {
@ -24,6 +29,92 @@ static ITransformingStep::Traits getTraits()
}; };
} }
static bool isTrivialSubtree(const ActionsDAG::Node * node)
{
while (node->type == ActionsDAG::ActionType::ALIAS)
node = node->children.at(0);
return node->type != ActionsDAG::ActionType::FUNCTION && node->type != ActionsDAG::ActionType::ARRAY_JOIN;
}
struct ActionsAndName
{
ActionsDAG dag;
std::string name;
};
static ActionsAndName splitSingleAndFilter(ActionsDAG & dag, const ActionsDAG::Node * filter_node)
{
auto split_result = dag.split({filter_node}, true);
dag = std::move(split_result.second);
const auto * split_filter_node = split_result.split_nodes_mapping[filter_node];
auto filter_type = removeLowCardinality(split_filter_node->result_type);
if (!filter_type->onlyNull() && !isUInt8(removeNullable(filter_type)))
{
DataTypePtr cast_type = std::make_shared<DataTypeUInt8>();
if (filter_type->isNullable())
cast_type = std::make_shared<DataTypeNullable>(std::move(cast_type));
split_filter_node = &split_result.first.addCast(*split_filter_node, cast_type, {});
}
split_result.first.getOutputs().emplace(split_result.first.getOutputs().begin(), split_filter_node);
auto name = split_filter_node->result_name;
return ActionsAndName{std::move(split_result.first), std::move(name)};
}
/// Try to split the left most AND atom to a separate DAG.
static std::optional<ActionsAndName> trySplitSingleAndFilter(ActionsDAG & dag, const std::string & filter_name)
{
const auto * filter = &dag.findInOutputs(filter_name);
while (filter->type == ActionsDAG::ActionType::ALIAS)
filter = filter->children.at(0);
if (filter->type != ActionsDAG::ActionType::FUNCTION || filter->function_base->getName() != "and")
return {};
const ActionsDAG::Node * condition_to_split = nullptr;
std::stack<const ActionsDAG::Node *> nodes;
nodes.push(filter);
while (!nodes.empty())
{
const auto * node = nodes.top();
nodes.pop();
if (node->type == ActionsDAG::ActionType::FUNCTION && node->function_base->getName() == "and")
{
/// The order is important. We should take the left-most atom, so put conditions on stack in reverse order.
for (const auto * child : node->children | std::ranges::views::reverse)
nodes.push(child);
continue;
}
if (isTrivialSubtree(node))
continue;
/// Do not split subtree if it's the last non-trivial one.
/// So, split the first found condition only when there is a another one found.
if (condition_to_split)
return splitSingleAndFilter(dag, condition_to_split);
condition_to_split = node;
}
return {};
}
std::vector<ActionsAndName> splitAndChainIntoMultipleFilters(ActionsDAG & dag, const std::string & filter_name)
{
std::vector<ActionsAndName> res;
while (auto condition = trySplitSingleAndFilter(dag, filter_name))
res.push_back(std::move(*condition));
return res;
}
FilterStep::FilterStep( FilterStep::FilterStep(
const Header & input_header_, const Header & input_header_,
ActionsDAG actions_dag_, ActionsDAG actions_dag_,
@ -50,6 +141,23 @@ FilterStep::FilterStep(
void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{ {
std::vector<ActionsAndName> and_atoms;
/// Splitting AND filter condition to steps under the setting, which is enabled with merge_filters optimization.
/// This is needed to support short-circuit properly.
if (settings.enable_multiple_filters_transforms_for_and_chain && !actions_dag.hasStatefulFunctions())
and_atoms = splitAndChainIntoMultipleFilters(actions_dag, filter_column_name);
for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
return std::make_shared<FilterTransform>(header, expression, and_atom.name, true, on_totals);
});
}
auto expression = std::make_shared<ExpressionActions>(std::move(actions_dag), settings.getActionsSettings()); auto expression = std::make_shared<ExpressionActions>(std::move(actions_dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
@ -76,18 +184,45 @@ void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
void FilterStep::describeActions(FormatSettings & settings) const void FilterStep::describeActions(FormatSettings & settings) const
{ {
String prefix(settings.offset, settings.indent_char); String prefix(settings.offset, settings.indent_char);
auto cloned_dag = actions_dag.clone();
std::vector<ActionsAndName> and_atoms;
if (!actions_dag.hasStatefulFunctions())
and_atoms = splitAndChainIntoMultipleFilters(cloned_dag, filter_column_name);
for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag));
settings.out << prefix << "AND column: " << and_atom.name << '\n';
expression->describeActions(settings.out, prefix);
}
settings.out << prefix << "Filter column: " << filter_column_name; settings.out << prefix << "Filter column: " << filter_column_name;
if (remove_filter_column) if (remove_filter_column)
settings.out << " (removed)"; settings.out << " (removed)";
settings.out << '\n'; settings.out << '\n';
auto expression = std::make_shared<ExpressionActions>(actions_dag.clone()); auto expression = std::make_shared<ExpressionActions>(std::move(cloned_dag));
expression->describeActions(settings.out, prefix); expression->describeActions(settings.out, prefix);
} }
void FilterStep::describeActions(JSONBuilder::JSONMap & map) const void FilterStep::describeActions(JSONBuilder::JSONMap & map) const
{ {
auto cloned_dag = actions_dag.clone();
std::vector<ActionsAndName> and_atoms;
if (!actions_dag.hasStatefulFunctions())
and_atoms = splitAndChainIntoMultipleFilters(cloned_dag, filter_column_name);
for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag));
map.add("AND column", and_atom.name);
map.add("Expression", expression->toTree());
}
map.add("Filter Column", filter_column_name); map.add("Filter Column", filter_column_name);
map.add("Removes Filter", remove_filter_column); map.add("Removes Filter", remove_filter_column);

View File

@ -32,7 +32,7 @@ struct QueryPlanOptimizationSettings
bool merge_expressions = true; bool merge_expressions = true;
/// If merge-filters optimization is enabled. /// If merge-filters optimization is enabled.
bool merge_filters = false; bool merge_filters = true;
/// If filter push down optimization is enabled. /// If filter push down optimization is enabled.
bool filter_push_down = true; bool filter_push_down = true;

View File

@ -176,6 +176,7 @@ namespace Setting
extern const SettingsBool use_skip_indexes; extern const SettingsBool use_skip_indexes;
extern const SettingsBool use_skip_indexes_if_final; extern const SettingsBool use_skip_indexes_if_final;
extern const SettingsBool use_uncompressed_cache; extern const SettingsBool use_uncompressed_cache;
extern const SettingsBool query_plan_merge_filters;
extern const SettingsUInt64 merge_tree_min_read_task_size; extern const SettingsUInt64 merge_tree_min_read_task_size;
extern const SettingsBool read_in_order_use_virtual_row; extern const SettingsBool read_in_order_use_virtual_row;
} }
@ -208,6 +209,7 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings(
.use_asynchronous_read_from_pool = settings[Setting::allow_asynchronous_read_from_io_pool_for_merge_tree] .use_asynchronous_read_from_pool = settings[Setting::allow_asynchronous_read_from_io_pool_for_merge_tree]
&& (settings[Setting::max_streams_to_max_threads_ratio] > 1 || settings[Setting::max_streams_for_merge_tree_reading] > 1), && (settings[Setting::max_streams_to_max_threads_ratio] > 1 || settings[Setting::max_streams_for_merge_tree_reading] > 1),
.enable_multiple_prewhere_read_steps = settings[Setting::enable_multiple_prewhere_read_steps], .enable_multiple_prewhere_read_steps = settings[Setting::enable_multiple_prewhere_read_steps],
.force_short_circuit_execution = settings[Setting::query_plan_merge_filters]
}; };
} }

View File

@ -2263,18 +2263,18 @@ void IMergeTreeDataPart::checkConsistencyWithProjections(bool require_part_metad
proj_part->checkConsistency(require_part_metadata); proj_part->checkConsistency(require_part_metadata);
} }
void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk() void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional<Block> columns_sample)
{ {
calculateColumnsSizesOnDisk(); calculateColumnsSizesOnDisk(columns_sample);
calculateSecondaryIndicesSizesOnDisk(); calculateSecondaryIndicesSizesOnDisk();
} }
void IMergeTreeDataPart::calculateColumnsSizesOnDisk() void IMergeTreeDataPart::calculateColumnsSizesOnDisk(std::optional<Block> columns_sample)
{ {
if (getColumns().empty() || checksums.empty()) if (getColumns().empty() || checksums.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot calculate columns sizes when columns or checksums are not initialized"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot calculate columns sizes when columns or checksums are not initialized");
calculateEachColumnSizes(columns_sizes, total_columns_size); calculateEachColumnSizes(columns_sizes, total_columns_size, columns_sample);
} }
void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk() void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk()
@ -2521,22 +2521,24 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co
StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr(); StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr();
StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr); StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr);
MergeTreeReaderSettings settings;
settings.can_read_part_without_marks = true;
MergeTreeReaderPtr reader = getReader( MergeTreeReaderPtr reader = getReader(
cols, cols,
storage_snapshot_ptr, storage_snapshot_ptr,
MarkRanges{MarkRange(0, 1)}, MarkRanges{MarkRange(0, total_mark)},
/*virtual_fields=*/ {}, /*virtual_fields=*/ {},
/*uncompressed_cache=*/{}, /*uncompressed_cache=*/{},
storage.getContext()->getMarkCache().get(), storage.getContext()->getMarkCache().get(),
std::make_shared<AlterConversions>(), std::make_shared<AlterConversions>(),
MergeTreeReaderSettings{}, settings,
ValueSizeMap{}, ValueSizeMap{},
ReadBufferFromFileBase::ProfileCallback{}); ReadBufferFromFileBase::ProfileCallback{});
Columns result; Columns result;
result.resize(1); result.resize(1);
reader->readRows(0, 1, false, 0, result); reader->readRows(0, total_mark, false, 0, result);
return result[0]; return result[0];
} }

View File

@ -428,7 +428,7 @@ public:
bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const; bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const;
/// Calculate column and secondary indices sizes on disk. /// Calculate column and secondary indices sizes on disk.
void calculateColumnsAndSecondaryIndicesSizesOnDisk(); void calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional<Block> columns_sample = std::nullopt);
std::optional<String> getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const; std::optional<String> getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const;
@ -633,7 +633,7 @@ protected:
/// Fill each_columns_size and total_size with sizes from columns files on /// Fill each_columns_size and total_size with sizes from columns files on
/// disk using columns and checksums. /// disk using columns and checksums.
virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const = 0; virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const = 0;
std::optional<String> getRelativePathForDetachedPart(const String & prefix, bool broken) const; std::optional<String> getRelativePathForDetachedPart(const String & prefix, bool broken) const;
@ -715,7 +715,7 @@ private:
void loadPartitionAndMinMaxIndex(); void loadPartitionAndMinMaxIndex();
void calculateColumnsSizesOnDisk(); void calculateColumnsSizesOnDisk(std::optional<Block> columns_sample = std::nullopt);
void calculateSecondaryIndicesSizesOnDisk(); void calculateSecondaryIndicesSizesOnDisk();

View File

@ -54,6 +54,8 @@ public:
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; } const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
virtual Block getColumnsSample() const = 0;
protected: protected:
SerializationPtr getSerialization(const String & column_name) const; SerializationPtr getSerialization(const String & column_name) const;

View File

@ -330,7 +330,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
auto prewhere_actions = MergeTreeSelectProcessor::getPrewhereActions( auto prewhere_actions = MergeTreeSelectProcessor::getPrewhereActions(
prewhere_info, prewhere_info,
actions_settings, actions_settings,
reader_settings.enable_multiple_prewhere_read_steps); reader_settings.enable_multiple_prewhere_read_steps, reader_settings.force_short_circuit_execution);
for (const auto & step : prewhere_actions.steps) for (const auto & step : prewhere_actions.steps)
add_step(*step); add_step(*step);

View File

@ -80,7 +80,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter(
} }
void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size) const void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size, std::optional<Block> /*columns_sample*/) const
{ {
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION); auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
if (bin_checksum != checksums.files.end()) if (bin_checksum != checksums.files.end())

View File

@ -70,7 +70,7 @@ private:
void loadIndexGranularity() override; void loadIndexGranularity() override;
/// Compact parts don't support per column size, only total size /// Compact parts don't support per column size, only total size
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override; void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const override;
}; };
} }

View File

@ -82,7 +82,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter(
/// Takes into account the fact that several columns can e.g. share their .size substreams. /// Takes into account the fact that several columns can e.g. share their .size substreams.
/// When calculating totals these should be counted only once. /// When calculating totals these should be counted only once.
ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
const NameAndTypePair & column, std::unordered_set<String> * processed_substreams) const const NameAndTypePair & column, std::unordered_set<String> * processed_substreams, std::optional<Block> columns_sample) const
{ {
ColumnSize size; ColumnSize size;
if (checksums.empty()) if (checksums.empty())
@ -108,7 +108,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
auto mrk_checksum = checksums.files.find(*stream_name + getMarksFileExtension()); auto mrk_checksum = checksums.files.find(*stream_name + getMarksFileExtension());
if (mrk_checksum != checksums.files.end()) if (mrk_checksum != checksums.files.end())
size.marks += mrk_checksum->second.file_size; size.marks += mrk_checksum->second.file_size;
}); }, column.type, columns_sample && columns_sample->has(column.name) ? columns_sample->getByName(column.name).column : getColumnSample(column));
return size; return size;
} }
@ -374,12 +374,12 @@ std::optional<String> MergeTreeDataPartWide::getFileNameForColumn(const NameAndT
return filename; return filename;
} }
void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const
{ {
std::unordered_set<String> processed_substreams; std::unordered_set<String> processed_substreams;
for (const auto & column : columns) for (const auto & column : columns)
{ {
ColumnSize size = getColumnSizeImpl(column, &processed_substreams); ColumnSize size = getColumnSizeImpl(column, &processed_substreams, columns_sample);
each_columns_size[column.name] = size; each_columns_size[column.name] = size;
total_size.add(size); total_size.add(size);

View File

@ -64,9 +64,9 @@ private:
/// Loads marks index granularity into memory /// Loads marks index granularity into memory
void loadIndexGranularity() override; void loadIndexGranularity() override;
ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set<String> * processed_substreams) const; ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set<String> * processed_substreams, std::optional<Block> columns_sample) const;
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override; void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const override;
}; };

View File

@ -123,6 +123,8 @@ public:
written_offset_columns = written_offset_columns_; written_offset_columns = written_offset_columns_;
} }
Block getColumnsSample() const override { return block_sample; }
protected: protected:
/// Count index_granularity for block and store in `index_granularity` /// Count index_granularity for block and store in `index_granularity`
size_t computeIndexGranularity(const Block & block) const; size_t computeIndexGranularity(const Block & block) const;

View File

@ -45,6 +45,8 @@ struct MergeTreeReaderSettings
bool use_asynchronous_read_from_pool = false; bool use_asynchronous_read_from_pool = false;
/// If PREWHERE has multiple conditions combined with AND, execute them in separate read/filtering steps. /// If PREWHERE has multiple conditions combined with AND, execute them in separate read/filtering steps.
bool enable_multiple_prewhere_read_steps = false; bool enable_multiple_prewhere_read_steps = false;
/// In case of multiple prewhere steps, execute filtering earlier to support short-circuit properly.
bool force_short_circuit_execution = false;
/// If true, try to lower size of read buffer according to granule size and compressed block size. /// If true, try to lower size of read buffer according to granule size and compressed block size.
bool adjust_read_buffer_size = true; bool adjust_read_buffer_size = true;
/// If true, it's allowed to read the whole part without reading marks. /// If true, it's allowed to read the whole part without reading marks.

View File

@ -172,7 +172,7 @@ size_t MergeTreeReaderWide::readRows(
throw; throw;
} }
if (column->empty()) if (column->empty() && max_rows_to_read > 0)
res_columns[pos] = nullptr; res_columns[pos] = nullptr;
} }

View File

@ -91,7 +91,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
, algorithm(std::move(algorithm_)) , algorithm(std::move(algorithm_))
, prewhere_info(prewhere_info_) , prewhere_info(prewhere_info_)
, actions_settings(actions_settings_) , actions_settings(actions_settings_)
, prewhere_actions(getPrewhereActions(prewhere_info, actions_settings, reader_settings_.enable_multiple_prewhere_read_steps)) , prewhere_actions(getPrewhereActions(prewhere_info, actions_settings, reader_settings_.enable_multiple_prewhere_read_steps, reader_settings_.force_short_circuit_execution))
, reader_settings(reader_settings_) , reader_settings(reader_settings_)
, result_header(transformHeader(pool->getHeader(), prewhere_info)) , result_header(transformHeader(pool->getHeader(), prewhere_info))
{ {
@ -124,9 +124,9 @@ String MergeTreeSelectProcessor::getName() const
return fmt::format("MergeTreeSelect(pool: {}, algorithm: {})", pool->getName(), algorithm->getName()); return fmt::format("MergeTreeSelect(pool: {}, algorithm: {})", pool->getName(), algorithm->getName());
} }
bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere); bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere, bool force_short_circuit_execution);
PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps) PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps, bool force_short_circuit_execution)
{ {
PrewhereExprInfo prewhere_actions; PrewhereExprInfo prewhere_actions;
if (prewhere_info) if (prewhere_info)
@ -147,7 +147,7 @@ PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr pr
} }
if (!enable_multiple_prewhere_read_steps || if (!enable_multiple_prewhere_read_steps ||
!tryBuildPrewhereSteps(prewhere_info, actions_settings, prewhere_actions)) !tryBuildPrewhereSteps(prewhere_info, actions_settings, prewhere_actions, force_short_circuit_execution))
{ {
PrewhereExprStep prewhere_step PrewhereExprStep prewhere_step
{ {

View File

@ -73,7 +73,8 @@ public:
static PrewhereExprInfo getPrewhereActions( static PrewhereExprInfo getPrewhereActions(
PrewhereInfoPtr prewhere_info, PrewhereInfoPtr prewhere_info,
const ExpressionActionsSettings & actions_settings, const ExpressionActionsSettings & actions_settings,
bool enable_multiple_prewhere_read_steps); bool enable_multiple_prewhere_read_steps,
bool force_short_circuit_execution);
void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; } void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; }

View File

@ -4,6 +4,7 @@
#include <Storages/SelectQueryInfo.h> #include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h> #include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
@ -57,9 +58,9 @@ struct DAGNodeRef
const ActionsDAG::Node * node; const ActionsDAG::Node * node;
}; };
/// Result name -> DAGNodeRef /// ResultNode -> DAGNodeRef
using OriginalToNewNodeMap = std::unordered_map<String, DAGNodeRef>; using OriginalToNewNodeMap = std::unordered_map<const ActionsDAG::Node *, DAGNodeRef>;
using NodeNameToLastUsedStepMap = std::unordered_map<String, size_t>; using NodeNameToLastUsedStepMap = std::unordered_map<const ActionsDAG::Node *, size_t>;
/// Clones the part of original DAG responsible for computing the original_dag_node and adds it to the new DAG. /// Clones the part of original DAG responsible for computing the original_dag_node and adds it to the new DAG.
const ActionsDAG::Node & addClonedDAGToDAG( const ActionsDAG::Node & addClonedDAGToDAG(
@ -69,25 +70,28 @@ const ActionsDAG::Node & addClonedDAGToDAG(
OriginalToNewNodeMap & node_remap, OriginalToNewNodeMap & node_remap,
NodeNameToLastUsedStepMap & node_to_step_map) NodeNameToLastUsedStepMap & node_to_step_map)
{ {
const String & node_name = original_dag_node->result_name;
/// Look for the node in the map of already known nodes /// Look for the node in the map of already known nodes
if (node_remap.contains(node_name)) if (node_remap.contains(original_dag_node))
{ {
/// If the node is already in the new DAG, return it /// If the node is already in the new DAG, return it
const auto & node_ref = node_remap.at(node_name); const auto & node_ref = node_remap.at(original_dag_node);
if (node_ref.dag == new_dag.get()) if (node_ref.dag == new_dag.get())
return *node_ref.node; return *node_ref.node;
/// If the node is known from the previous steps, add it as an input, except for constants /// If the node is known from the previous steps, add it as an input, except for constants
if (original_dag_node->type != ActionsDAG::ActionType::COLUMN) if (original_dag_node->type != ActionsDAG::ActionType::COLUMN)
{ {
node_ref.dag->addOrReplaceInOutputs(*node_ref.node); /// If the node was found in node_remap, it was not added to outputs yet.
/// The only exception is the filter node, which is always the first one.
if (node_ref.dag->getOutputs().at(0) != node_ref.node)
node_ref.dag->getOutputs().push_back(node_ref.node);
const auto & new_node = new_dag->addInput(node_ref.node->result_name, node_ref.node->result_type); const auto & new_node = new_dag->addInput(node_ref.node->result_name, node_ref.node->result_type);
node_remap[node_name] = {new_dag.get(), &new_node}; /// TODO: here we update the node reference. Is it always correct? node_remap[original_dag_node] = {new_dag.get(), &new_node};
/// Remember the index of the last step which reuses this node. /// Remember the index of the last step which reuses this node.
/// We cannot remove this node from the outputs before that step. /// We cannot remove this node from the outputs before that step.
node_to_step_map[node_name] = step; node_to_step_map[original_dag_node] = step;
return new_node; return new_node;
} }
} }
@ -96,7 +100,7 @@ const ActionsDAG::Node & addClonedDAGToDAG(
if (original_dag_node->type == ActionsDAG::ActionType::INPUT) if (original_dag_node->type == ActionsDAG::ActionType::INPUT)
{ {
const auto & new_node = new_dag->addInput(original_dag_node->result_name, original_dag_node->result_type); const auto & new_node = new_dag->addInput(original_dag_node->result_name, original_dag_node->result_type);
node_remap[node_name] = {new_dag.get(), &new_node}; node_remap[original_dag_node] = {new_dag.get(), &new_node};
return new_node; return new_node;
} }
@ -105,7 +109,7 @@ const ActionsDAG::Node & addClonedDAGToDAG(
{ {
const auto & new_node = new_dag->addColumn( const auto & new_node = new_dag->addColumn(
ColumnWithTypeAndName(original_dag_node->column, original_dag_node->result_type, original_dag_node->result_name)); ColumnWithTypeAndName(original_dag_node->column, original_dag_node->result_type, original_dag_node->result_name));
node_remap[node_name] = {new_dag.get(), &new_node}; node_remap[original_dag_node] = {new_dag.get(), &new_node};
return new_node; return new_node;
} }
@ -113,7 +117,7 @@ const ActionsDAG::Node & addClonedDAGToDAG(
{ {
const auto & alias_child = addClonedDAGToDAG(step, original_dag_node->children[0], new_dag, node_remap, node_to_step_map); const auto & alias_child = addClonedDAGToDAG(step, original_dag_node->children[0], new_dag, node_remap, node_to_step_map);
const auto & new_node = new_dag->addAlias(alias_child, original_dag_node->result_name); const auto & new_node = new_dag->addAlias(alias_child, original_dag_node->result_name);
node_remap[node_name] = {new_dag.get(), &new_node}; node_remap[original_dag_node] = {new_dag.get(), &new_node};
return new_node; return new_node;
} }
@ -128,7 +132,7 @@ const ActionsDAG::Node & addClonedDAGToDAG(
} }
const auto & new_node = new_dag->addFunction(original_dag_node->function_base, new_children, original_dag_node->result_name); const auto & new_node = new_dag->addFunction(original_dag_node->function_base, new_children, original_dag_node->result_name);
node_remap[node_name] = {new_dag.get(), &new_node}; node_remap[original_dag_node] = {new_dag.get(), &new_node};
return new_node; return new_node;
} }
@ -138,11 +142,9 @@ const ActionsDAG::Node & addClonedDAGToDAG(
const ActionsDAG::Node & addFunction( const ActionsDAG::Node & addFunction(
const ActionsDAGPtr & new_dag, const ActionsDAGPtr & new_dag,
const FunctionOverloadResolverPtr & function, const FunctionOverloadResolverPtr & function,
ActionsDAG::NodeRawConstPtrs children, ActionsDAG::NodeRawConstPtrs children)
OriginalToNewNodeMap & node_remap)
{ {
const auto & new_node = new_dag->addFunction(function, children, ""); const auto & new_node = new_dag->addFunction(function, children, "");
node_remap[new_node.result_name] = {new_dag.get(), &new_node};
return new_node; return new_node;
} }
@ -152,14 +154,12 @@ const ActionsDAG::Node & addFunction(
const ActionsDAG::Node & addCast( const ActionsDAG::Node & addCast(
const ActionsDAGPtr & dag, const ActionsDAGPtr & dag,
const ActionsDAG::Node & node_to_cast, const ActionsDAG::Node & node_to_cast,
const DataTypePtr & to_type, const DataTypePtr & to_type)
OriginalToNewNodeMap & node_remap)
{ {
if (!node_to_cast.result_type->equals(*to_type)) if (!node_to_cast.result_type->equals(*to_type))
return node_to_cast; return node_to_cast;
const auto & new_node = dag->addCast(node_to_cast, to_type, {}); const auto & new_node = dag->addCast(node_to_cast, to_type, {});
node_remap[new_node.result_name] = {dag.get(), &new_node};
return new_node; return new_node;
} }
@ -169,8 +169,7 @@ const ActionsDAG::Node & addCast(
/// 2. makes sure that the result contains only 0 or 1 values even if the source column contains non-boolean values. /// 2. makes sure that the result contains only 0 or 1 values even if the source column contains non-boolean values.
const ActionsDAG::Node & addAndTrue( const ActionsDAG::Node & addAndTrue(
const ActionsDAGPtr & dag, const ActionsDAGPtr & dag,
const ActionsDAG::Node & filter_node_to_normalize, const ActionsDAG::Node & filter_node_to_normalize)
OriginalToNewNodeMap & node_remap)
{ {
Field const_true_value(true); Field const_true_value(true);
@ -181,7 +180,7 @@ const ActionsDAG::Node & addAndTrue(
const auto * const_true_node = &dag->addColumn(std::move(const_true_column)); const auto * const_true_node = &dag->addColumn(std::move(const_true_column));
ActionsDAG::NodeRawConstPtrs children = {&filter_node_to_normalize, const_true_node}; ActionsDAG::NodeRawConstPtrs children = {&filter_node_to_normalize, const_true_node};
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>()); FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
return addFunction(dag, func_builder_and, children, node_remap); return addFunction(dag, func_builder_and, children);
} }
} }
@ -206,7 +205,11 @@ const ActionsDAG::Node & addAndTrue(
/// 6. Find all outputs of the original DAG /// 6. Find all outputs of the original DAG
/// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed /// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed
/// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4 /// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4
bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere) bool tryBuildPrewhereSteps(
PrewhereInfoPtr prewhere_info,
const ExpressionActionsSettings & actions_settings,
PrewhereExprInfo & prewhere,
bool force_short_circuit_execution)
{ {
if (!prewhere_info) if (!prewhere_info)
return true; return true;
@ -243,7 +246,10 @@ bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionAction
struct Step struct Step
{ {
ActionsDAGPtr actions; ActionsDAGPtr actions;
String column_name; /// Original condition, in case if we have only one condition, and it was not casted
const ActionsDAG::Node * original_node;
/// Result condition node
const ActionsDAG::Node * result_node;
}; };
std::vector<Step> steps; std::vector<Step> steps;
@ -254,7 +260,8 @@ bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionAction
{ {
const auto & condition_group = condition_groups[step_index]; const auto & condition_group = condition_groups[step_index];
ActionsDAGPtr step_dag = std::make_unique<ActionsDAG>(); ActionsDAGPtr step_dag = std::make_unique<ActionsDAG>();
String result_name; const ActionsDAG::Node * original_node = nullptr;
const ActionsDAG::Node * result_node;
std::vector<const ActionsDAG::Node *> new_condition_nodes; std::vector<const ActionsDAG::Node *> new_condition_nodes;
for (const auto * node : condition_group) for (const auto * node : condition_group)
@ -267,48 +274,37 @@ bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionAction
{ {
/// Add AND function to combine the conditions /// Add AND function to combine the conditions
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>()); FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
const auto & and_function_node = addFunction(step_dag, func_builder_and, new_condition_nodes, node_remap); const auto & and_function_node = addFunction(step_dag, func_builder_and, new_condition_nodes);
step_dag->addOrReplaceInOutputs(and_function_node); result_node = &and_function_node;
result_name = and_function_node.result_name;
} }
else else
{ {
const auto & result_node = *new_condition_nodes.front(); result_node = new_condition_nodes.front();
/// Check if explicit cast is needed for the condition to serve as a filter. /// Check if explicit cast is needed for the condition to serve as a filter.
const auto result_type_name = result_node.result_type->getName(); if (!isUInt8(removeNullable(removeLowCardinality(result_node->result_type))))
if (result_type_name == "UInt8" ||
result_type_name == "Nullable(UInt8)" ||
result_type_name == "LowCardinality(UInt8)" ||
result_type_name == "LowCardinality(Nullable(UInt8))")
{
/// No need to cast
step_dag->addOrReplaceInOutputs(result_node);
result_name = result_node.result_name;
}
else
{ {
/// Build "condition AND True" expression to "cast" the condition to UInt8 or Nullable(UInt8) depending on its type. /// Build "condition AND True" expression to "cast" the condition to UInt8 or Nullable(UInt8) depending on its type.
const auto & cast_node = addAndTrue(step_dag, result_node, node_remap); result_node = &addAndTrue(step_dag, *result_node);
step_dag->addOrReplaceInOutputs(cast_node);
result_name = cast_node.result_name;
} }
} }
steps.push_back({std::move(step_dag), result_name}); step_dag->getOutputs().insert(step_dag->getOutputs().begin(), result_node);
steps.push_back({std::move(step_dag), original_node, result_node});
} }
/// 6. Find all outputs of the original DAG /// 6. Find all outputs of the original DAG
auto original_outputs = prewhere_info->prewhere_actions.getOutputs(); auto original_outputs = prewhere_info->prewhere_actions.getOutputs();
steps.back().actions->getOutputs().clear();
/// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed /// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed
/// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4 /// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4
NameSet all_output_names; std::unordered_set<const ActionsDAG::Node *> all_outputs;
for (const auto * output : original_outputs) for (const auto * output : original_outputs)
{ {
all_output_names.insert(output->result_name); all_outputs.insert(output);
if (node_remap.contains(output->result_name)) if (node_remap.contains(output))
{ {
const auto & new_node_info = node_remap[output->result_name]; const auto & new_node_info = node_remap[output];
new_node_info.dag->addOrReplaceInOutputs(*new_node_info.node); new_node_info.dag->getOutputs().push_back(new_node_info.node);
} }
else if (output->result_name == prewhere_info->prewhere_column_name) else if (output->result_name == prewhere_info->prewhere_column_name)
{ {
@ -319,20 +315,21 @@ bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionAction
/// 1. AND the last condition with constant True. This is needed to make sure that in the last step filter has UInt8 type /// 1. AND the last condition with constant True. This is needed to make sure that in the last step filter has UInt8 type
/// but contains values other than 0 and 1 (e.g. if it is (number%5) it contains 2,3,4) /// but contains values other than 0 and 1 (e.g. if it is (number%5) it contains 2,3,4)
/// 2. CAST the result to the exact type of the PREWHERE column from the original DAG /// 2. CAST the result to the exact type of the PREWHERE column from the original DAG
const auto & last_step_result_node_info = node_remap[steps.back().column_name];
auto & last_step_dag = steps.back().actions; auto & last_step_dag = steps.back().actions;
auto & last_step_result_node = steps.back().result_node;
/// Build AND(last_step_result_node, true) /// Build AND(last_step_result_node, true)
const auto & and_node = addAndTrue(last_step_dag, *last_step_result_node_info.node, node_remap); const auto & and_node = addAndTrue(last_step_dag, *last_step_result_node);
/// Build CAST(and_node, type of PREWHERE column) /// Build CAST(and_node, type of PREWHERE column)
const auto & cast_node = addCast(last_step_dag, and_node, output->result_type, node_remap); const auto & cast_node = addCast(last_step_dag, and_node, output->result_type);
/// Add alias for the result with the name of the PREWHERE column /// Add alias for the result with the name of the PREWHERE column
const auto & prewhere_result_node = last_step_dag->addAlias(cast_node, output->result_name); const auto & prewhere_result_node = last_step_dag->addAlias(cast_node, output->result_name);
last_step_dag->addOrReplaceInOutputs(prewhere_result_node); last_step_dag->getOutputs().push_back(&prewhere_result_node);
steps.back().result_node = &prewhere_result_node;
} }
else else
{ {
const auto & node_in_new_dag = addClonedDAGToDAG(steps.size() - 1, output, steps.back().actions, node_remap, node_to_step); const auto & node_in_new_dag = addClonedDAGToDAG(steps.size() - 1, output, steps.back().actions, node_remap, node_to_step);
steps.back().actions->addOrReplaceInOutputs(node_in_new_dag); steps.back().actions->getOutputs().push_back(&node_in_new_dag);
} }
} }
@ -345,17 +342,18 @@ bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionAction
{ {
.type = PrewhereExprStep::Filter, .type = PrewhereExprStep::Filter,
.actions = std::make_shared<ExpressionActions>(std::move(*step.actions), actions_settings), .actions = std::make_shared<ExpressionActions>(std::move(*step.actions), actions_settings),
.filter_column_name = step.column_name, .filter_column_name = step.result_node->result_name,
/// Don't remove if it's in the list of original outputs /// Don't remove if it's in the list of original outputs
.remove_filter_column = .remove_filter_column =
!all_output_names.contains(step.column_name) && node_to_step[step.column_name] <= step_index, step.original_node && !all_outputs.contains(step.original_node) && node_to_step[step.original_node] <= step_index,
.need_filter = false, .need_filter = force_short_circuit_execution,
.perform_alter_conversions = true, .perform_alter_conversions = true,
}; };
prewhere.steps.push_back(std::make_shared<PrewhereExprStep>(std::move(new_step))); prewhere.steps.push_back(std::make_shared<PrewhereExprStep>(std::move(new_step)));
} }
prewhere.steps.back()->remove_filter_column = prewhere_info->remove_prewhere_column;
prewhere.steps.back()->need_filter = prewhere_info->need_filter; prewhere.steps.back()->need_filter = prewhere_info->need_filter;
} }

View File

@ -209,7 +209,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->index_granularity = writer->getIndexGranularity(); new_part->index_granularity = writer->getIndexGranularity();
/// Just in case /// Just in case
new_part->index_granularity.shrinkToFitInMemory(); new_part->index_granularity.shrinkToFitInMemory();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(); new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample());
/// In mutation, existing_rows_count is already calculated in PartMergerWriter /// In mutation, existing_rows_count is already calculated in PartMergerWriter
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count /// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count

View File

@ -659,7 +659,7 @@ void ObjectStorageQueueSource::applyActionAfterProcessing(const String & path)
{ {
if (files_metadata->getTableMetadata().after_processing == ObjectStorageQueueAction::DELETE) if (files_metadata->getTableMetadata().after_processing == ObjectStorageQueueAction::DELETE)
{ {
object_storage->removeObject(StoredObject(path)); object_storage->removeObjectIfExists(StoredObject(path));
} }
} }

View File

@ -798,10 +798,6 @@ def _upload_build_profile_data(
logging.info("Unknown CI logs host, skip uploading build profile data") logging.info("Unknown CI logs host, skip uploading build profile data")
return return
if not pr_info.number == 0:
logging.info("Skipping uploading build profile data for PRs")
return
instance_type = get_instance_type() instance_type = get_instance_type()
instance_id = get_instance_id() instance_id = get_instance_id()
auth = { auth = {

View File

@ -82,12 +82,12 @@ Filter column: notEquals(__table1.y, 0_UInt8)
9 10 9 10
> one condition of filter should be pushed down after aggregating, other two conditions are ANDed > one condition of filter should be pushed down after aggregating, other two conditions are ANDed
Filter column Filter column
FUNCTION and(minus(s, 8) :: 5, minus(s, 4) :: 2) -> and(notEquals(y, 0), minus(s, 8), minus(s, 4)) FUNCTION and(minus(s, 8) :: 3, minus(s, 4) :: 5) -> and(notEquals(y, 0), minus(s, 8), minus(s, 4))
Aggregating Aggregating
Filter column: notEquals(y, 0) Filter column: notEquals(y, 0)
> (analyzer) one condition of filter should be pushed down after aggregating, other two conditions are ANDed > (analyzer) one condition of filter should be pushed down after aggregating, other two conditions are ANDed
Filter column Filter column
FUNCTION and(minus(__table1.s, 8_UInt8) :: 1, minus(__table1.s, 4_UInt8) :: 2) -> and(notEquals(__table1.y, 0_UInt8), minus(__table1.s, 8_UInt8), minus(__table1.s, 4_UInt8)) FUNCTION and(minus(__table1.s, 8_UInt8) :: 3, minus(__table1.s, 4_UInt8) :: 5) -> and(notEquals(__table1.y, 0_UInt8), minus(__table1.s, 8_UInt8), minus(__table1.s, 4_UInt8))
Aggregating Aggregating
Filter column: notEquals(__table1.y, 0_UInt8) Filter column: notEquals(__table1.y, 0_UInt8)
0 1 0 1
@ -163,7 +163,6 @@ Filter column: notEquals(__table1.y, 2_UInt8)
> filter is pushed down before CreatingSets > filter is pushed down before CreatingSets
CreatingSets CreatingSets
Filter Filter
Filter
1 1
3 3
> one condition of filter is pushed down before LEFT JOIN > one condition of filter is pushed down before LEFT JOIN

View File

@ -89,14 +89,14 @@ $CLICKHOUSE_CLIENT --enable_analyzer=0 --convert_query_to_cnf=0 -q "
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 and s - 8 and s - 4 ) where y != 0 and s - 8 and s - 4
settings enable_optimize_predicate_expression=0" | settings enable_optimize_predicate_expression=0" |
grep -o "Aggregating\|Filter column\|Filter column: notEquals(y, 0)\|FUNCTION and(minus(s, 8) :: 5, minus(s, 4) :: 2) -> and(notEquals(y, 0), minus(s, 8), minus(s, 4))" grep -o "Aggregating\|Filter column\|Filter column: notEquals(y, 0)\|FUNCTION and(minus(s, 8) :: 3, minus(s, 4) :: 5) -> and(notEquals(y, 0), minus(s, 8), minus(s, 4))"
echo "> (analyzer) one condition of filter should be pushed down after aggregating, other two conditions are ANDed" echo "> (analyzer) one condition of filter should be pushed down after aggregating, other two conditions are ANDed"
$CLICKHOUSE_CLIENT --enable_analyzer=1 --convert_query_to_cnf=0 -q " $CLICKHOUSE_CLIENT --enable_analyzer=1 --convert_query_to_cnf=0 -q "
explain actions = 1 select s, y from ( explain actions = 1 select s, y from (
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 and s - 8 and s - 4 ) where y != 0 and s - 8 and s - 4
settings enable_optimize_predicate_expression=0" | settings enable_optimize_predicate_expression=0" |
grep -o "Aggregating\|Filter column\|Filter column: notEquals(__table1.y, 0_UInt8)\|FUNCTION and(minus(__table1.s, 8_UInt8) :: 1, minus(__table1.s, 4_UInt8) :: 2) -> and(notEquals(__table1.y, 0_UInt8), minus(__table1.s, 8_UInt8), minus(__table1.s, 4_UInt8))" grep -o "Aggregating\|Filter column\|Filter column: notEquals(__table1.y, 0_UInt8)\|FUNCTION and(minus(__table1.s, 8_UInt8) :: 3, minus(__table1.s, 4_UInt8) :: 5) -> and(notEquals(__table1.y, 0_UInt8), minus(__table1.s, 8_UInt8), minus(__table1.s, 4_UInt8))"
$CLICKHOUSE_CLIENT -q " $CLICKHOUSE_CLIENT -q "
select s, y from ( select s, y from (
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y

View File

@ -332,13 +332,12 @@ SETTINGS optimize_aggregators_of_group_by_keys=0 -- avoid removing any() as it d
Expression (Projection) Expression (Projection)
Sorting (Sorting for ORDER BY) Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY) Expression (Before ORDER BY)
Filter ((WHERE + (Projection + Before ORDER BY))) Filter (((WHERE + (Projection + Before ORDER BY)) + HAVING))
Filter (HAVING) Aggregating
Aggregating Expression ((Before GROUP BY + Projection))
Expression ((Before GROUP BY + Projection)) Sorting (Sorting for ORDER BY)
Sorting (Sorting for ORDER BY) Expression ((Before ORDER BY + (Projection + Before ORDER BY)))
Expression ((Before ORDER BY + (Projection + Before ORDER BY))) ReadFromSystemNumbers
ReadFromSystemNumbers
-- execute -- execute
1 1
2 2

View File

@ -28,21 +28,17 @@ WHERE type_1 = \'all\'
(Expression) (Expression)
ExpressionTransform × 2 ExpressionTransform × 2
(Filter) (Filter)
FilterTransform × 2 FilterTransform × 6
(Filter) (Aggregating)
FilterTransform × 2 ExpressionTransform × 2
(Filter) AggregatingTransform × 2
FilterTransform × 2 Copy 1 → 2
(Aggregating) (Expression)
ExpressionTransform × 2 ExpressionTransform
AggregatingTransform × 2 (Expression)
Copy 1 → 2 ExpressionTransform
(Expression) (ReadFromMergeTree)
ExpressionTransform MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression) (Expression)
ExpressionTransform × 2 ExpressionTransform × 2
(Filter) (Filter)
@ -68,14 +64,10 @@ ExpressionTransform × 2
ExpressionTransform × 2 ExpressionTransform × 2
AggregatingTransform × 2 AggregatingTransform × 2
Copy 1 → 2 Copy 1 → 2
(Filter) (Expression)
FilterTransform ExpressionTransform
(Filter) (ReadFromMergeTree)
FilterTransform MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression) (Expression)
ExpressionTransform × 2 ExpressionTransform × 2
(Aggregating) (Aggregating)

View File

@ -163,17 +163,21 @@ Positions: 4 2 0 1
Filter (( + (JOIN actions + Change column names to column identifiers))) Filter (( + (JOIN actions + Change column names to column identifiers)))
Header: __table1.id UInt64 Header: __table1.id UInt64
__table1.value String __table1.value String
Filter column: and(equals(__table1.id, 5_UInt8), equals(__table1.id, 6_UInt8)) (removed) AND column: equals(__table1.id, 5_UInt8)
Actions: INPUT : 0 -> id UInt64 : 0 Actions: INPUT : 0 -> id UInt64 : 0
INPUT : 1 -> value String : 1 COLUMN Const(UInt8) -> 5_UInt8 UInt8 : 1
FUNCTION equals(id : 0, 5_UInt8 :: 1) -> equals(__table1.id, 5_UInt8) UInt8 : 2
Positions: 2 0 2
Filter column: and(equals(__table1.id, 5_UInt8), equals(__table1.id, 6_UInt8)) (removed)
Actions: INPUT : 2 -> value String : 0
INPUT : 1 -> id UInt64 : 1
COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 2 COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 2
COLUMN Const(UInt8) -> 5_UInt8 UInt8 : 3 INPUT : 0 -> equals(__table1.id, 5_UInt8) UInt8 : 3
ALIAS id : 0 -> __table1.id UInt64 : 4 ALIAS value :: 0 -> __table1.value String : 4
ALIAS value :: 1 -> __table1.value String : 5 ALIAS id : 1 -> __table1.id UInt64 : 0
FUNCTION equals(id : 0, 6_UInt8 :: 2) -> equals(__table1.id, 6_UInt8) UInt8 : 1 FUNCTION equals(id :: 1, 6_UInt8 :: 2) -> equals(__table1.id, 6_UInt8) UInt8 : 5
FUNCTION equals(id :: 0, 5_UInt8 :: 3) -> equals(__table1.id, 5_UInt8) UInt8 : 2 FUNCTION and(equals(__table1.id, 5_UInt8) :: 3, equals(__table1.id, 6_UInt8) :: 5) -> and(equals(__table1.id, 5_UInt8), equals(__table1.id, 6_UInt8)) UInt8 : 2
FUNCTION and(equals(__table1.id, 5_UInt8) :: 2, equals(__table1.id, 6_UInt8) :: 1) -> and(equals(__table1.id, 5_UInt8), equals(__table1.id, 6_UInt8)) UInt8 : 3 Positions: 2 0 4
Positions: 3 4 5
ReadFromMergeTree (default.test_table_1) ReadFromMergeTree (default.test_table_1)
Header: id UInt64 Header: id UInt64
value String value String
@ -183,17 +187,21 @@ Positions: 4 2 0 1
Filter (( + (JOIN actions + Change column names to column identifiers))) Filter (( + (JOIN actions + Change column names to column identifiers)))
Header: __table2.id UInt64 Header: __table2.id UInt64
__table2.value String __table2.value String
Filter column: and(equals(__table2.id, 6_UInt8), equals(__table2.id, 5_UInt8)) (removed) AND column: equals(__table2.id, 6_UInt8)
Actions: INPUT : 0 -> id UInt64 : 0 Actions: INPUT : 0 -> id UInt64 : 0
INPUT : 1 -> value String : 1 COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 1
FUNCTION equals(id : 0, 6_UInt8 :: 1) -> equals(__table2.id, 6_UInt8) UInt8 : 2
Positions: 2 0 2
Filter column: and(equals(__table2.id, 6_UInt8), equals(__table2.id, 5_UInt8)) (removed)
Actions: INPUT : 2 -> value String : 0
INPUT : 1 -> id UInt64 : 1
COLUMN Const(UInt8) -> 5_UInt8 UInt8 : 2 COLUMN Const(UInt8) -> 5_UInt8 UInt8 : 2
COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 3 INPUT : 0 -> equals(__table2.id, 6_UInt8) UInt8 : 3
ALIAS id : 0 -> __table2.id UInt64 : 4 ALIAS value :: 0 -> __table2.value String : 4
ALIAS value :: 1 -> __table2.value String : 5 ALIAS id : 1 -> __table2.id UInt64 : 0
FUNCTION equals(id : 0, 5_UInt8 :: 2) -> equals(__table2.id, 5_UInt8) UInt8 : 1 FUNCTION equals(id :: 1, 5_UInt8 :: 2) -> equals(__table2.id, 5_UInt8) UInt8 : 5
FUNCTION equals(id :: 0, 6_UInt8 :: 3) -> equals(__table2.id, 6_UInt8) UInt8 : 2 FUNCTION and(equals(__table2.id, 6_UInt8) :: 3, equals(__table2.id, 5_UInt8) :: 5) -> and(equals(__table2.id, 6_UInt8), equals(__table2.id, 5_UInt8)) UInt8 : 2
FUNCTION and(equals(__table2.id, 6_UInt8) :: 2, equals(__table2.id, 5_UInt8) :: 1) -> and(equals(__table2.id, 6_UInt8), equals(__table2.id, 5_UInt8)) UInt8 : 3 Positions: 2 0 4
Positions: 3 4 5
ReadFromMergeTree (default.test_table_2) ReadFromMergeTree (default.test_table_2)
Header: id UInt64 Header: id UInt64
value String value String
@ -656,17 +664,21 @@ Positions: 4 2 0 1
__table1.value String __table1.value String
__table2.value String __table2.value String
__table2.id UInt64 __table2.id UInt64
Filter column: and(equals(__table1.id, 5_UInt8), equals(__table2.id, 6_UInt8)) (removed) AND column: equals(__table1.id, 5_UInt8)
Actions: INPUT : 0 -> __table1.id UInt64 : 0 Actions: INPUT : 0 -> __table1.id UInt64 : 0
INPUT :: 1 -> __table1.value String : 1 COLUMN Const(UInt8) -> 5_UInt8 UInt8 : 1
INPUT :: 2 -> __table2.value String : 2 FUNCTION equals(__table1.id : 0, 5_UInt8 :: 1) -> equals(__table1.id, 5_UInt8) UInt8 : 2
INPUT : 3 -> __table2.id UInt64 : 3 Positions: 2 0 2
COLUMN Const(UInt8) -> 5_UInt8 UInt8 : 4 Filter column: and(equals(__table1.id, 5_UInt8), equals(__table2.id, 6_UInt8)) (removed)
COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 5 Actions: INPUT :: 1 -> __table1.id UInt64 : 0
FUNCTION equals(__table1.id : 0, 5_UInt8 :: 4) -> equals(__table1.id, 5_UInt8) UInt8 : 6 INPUT :: 2 -> __table1.value String : 1
FUNCTION equals(__table2.id : 3, 6_UInt8 :: 5) -> equals(__table2.id, 6_UInt8) UInt8 : 4 INPUT :: 3 -> __table2.value String : 2
FUNCTION and(equals(__table1.id, 5_UInt8) :: 6, equals(__table2.id, 6_UInt8) :: 4) -> and(equals(__table1.id, 5_UInt8), equals(__table2.id, 6_UInt8)) UInt8 : 5 INPUT : 4 -> __table2.id UInt64 : 3
Positions: 5 0 1 2 3 COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 4
INPUT : 0 -> equals(__table1.id, 5_UInt8) UInt8 : 5
FUNCTION equals(__table2.id : 3, 6_UInt8 :: 4) -> equals(__table2.id, 6_UInt8) UInt8 : 6
FUNCTION and(equals(__table1.id, 5_UInt8) :: 5, equals(__table2.id, 6_UInt8) :: 6) -> and(equals(__table1.id, 5_UInt8), equals(__table2.id, 6_UInt8)) UInt8 : 4
Positions: 4 0 1 2 3
Join (JOIN FillRightFirst) Join (JOIN FillRightFirst)
Header: __table1.id UInt64 Header: __table1.id UInt64
__table1.value String __table1.value String

View File

@ -1,3 +1,5 @@
set allow_reorder_prewhere_conditions=0;
drop table if exists t1; drop table if exists t1;
drop table if exists t2; drop table if exists t2;
@ -49,7 +51,23 @@ tmp1 AS
fs1 fs1
FROM t2 FROM t2
LEFT JOIN tmp1 USING (fs1) LEFT JOIN tmp1 USING (fs1)
WHERE (fs1 IN ('test')) SETTINGS enable_multiple_prewhere_read_steps = 0; WHERE (fs1 IN ('test')) SETTINGS enable_multiple_prewhere_read_steps = 0, query_plan_merge_filters=0;
WITH
tmp1 AS
(
SELECT
CAST(s1, 'FixedString(10)') AS fs1,
s2 AS sector,
s3
FROM t1
WHERE (s3 != 'test')
)
SELECT
fs1
FROM t2
LEFT JOIN tmp1 USING (fs1)
WHERE (fs1 IN ('test')) SETTINGS enable_multiple_prewhere_read_steps = 1, query_plan_merge_filters=1;
optimize table t1 final; optimize table t1 final;
@ -67,4 +85,20 @@ tmp1 AS
fs1 fs1
FROM t2 FROM t2
LEFT JOIN tmp1 USING (fs1) LEFT JOIN tmp1 USING (fs1)
WHERE (fs1 IN ('test')); WHERE (fs1 IN ('test')) SETTINGS enable_multiple_prewhere_read_steps = 0, query_plan_merge_filters=0;
WITH
tmp1 AS
(
SELECT
CAST(s1, 'FixedString(10)') AS fs1,
s2 AS sector,
s3
FROM t1
WHERE (s3 != 'test')
)
SELECT
fs1
FROM t2
LEFT JOIN tmp1 USING (fs1)
WHERE (fs1 IN ('test')) SETTINGS enable_multiple_prewhere_read_steps = 1, query_plan_merge_filters=1;

View File

@ -23,8 +23,8 @@
--Interval 123: check that the SleepFunctionCalls, SleepFunctionMilliseconds and ProfileEvent_SleepFunctionElapsedMicroseconds are correct --Interval 123: check that the SleepFunctionCalls, SleepFunctionMilliseconds and ProfileEvent_SleepFunctionElapsedMicroseconds are correct
1 1
--Check that a query_metric_log_interval=0 disables the collection --Check that a query_metric_log_interval=0 disables the collection
0 1
-Check that a query which execution time is less than query_metric_log_interval is never collected -Check that a query which execution time is less than query_metric_log_interval is never collected
0 1
--Check that there is a final event when queries finish --Check that there is a final event when queries finish
3 1

View File

@ -84,17 +84,17 @@ check_log 123
# query_metric_log_interval=0 disables the collection altogether # query_metric_log_interval=0 disables the collection altogether
$CLICKHOUSE_CLIENT -m -q """ $CLICKHOUSE_CLIENT -m -q """
SELECT '--Check that a query_metric_log_interval=0 disables the collection'; SELECT '--Check that a query_metric_log_interval=0 disables the collection';
SELECT count() FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_0' SELECT count() == 0 FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_0'
""" """
# a quick query that takes less than query_metric_log_interval is never collected # a quick query that takes less than query_metric_log_interval is never collected
$CLICKHOUSE_CLIENT -m -q """ $CLICKHOUSE_CLIENT -m -q """
SELECT '-Check that a query which execution time is less than query_metric_log_interval is never collected'; SELECT '-Check that a query which execution time is less than query_metric_log_interval is never collected';
SELECT count() FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_fast' SELECT count() == 0 FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_fast'
""" """
# a query that takes more than query_metric_log_interval is collected including the final row # a query that takes more than query_metric_log_interval is collected including the final row
$CLICKHOUSE_CLIENT -m -q """ $CLICKHOUSE_CLIENT -m -q """
SELECT '--Check that there is a final event when queries finish'; SELECT '--Check that there is a final event when queries finish';
SELECT count() FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_1000' SELECT count() > 2 FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_1000'
""" """

View File

@ -1,14 +1,44 @@
parseDateTime64InJodaSyntax
2077-10-09 10:30:10.123
1970-01-01 08:00:00
1970-01-01 08:00:01
2024-01-02 00:00:00
1970-01-01 10:30:50
2025-01-01 15:30:10.123456
2024-01-01 00:00:01.123456
2024-10-09 10:30:10.123 2024-10-09 10:30:10.123
2024-10-09 10:30:10.123456 2024-10-09 10:30:10.123456
2024-10-10 02:30:10.123456 2024-10-10 02:30:10.123456
2024-11-05 17:02:03.123456
2024-10-10 01:30:10.123456 2024-10-10 01:30:10.123456
2024-10-09 08:00:10.123456
2024-09-10 10:30:10.123
2024-09-10 10:30:10.000999999
2024-10-10 03:15:10.123456
2024-03-01 03:23:34
2024-02-29 15:22:33
2023-03-01 15:22:33
2024-03-04 16:22:33
2023-03-04 16:22:33
parseDateTime64InJodaSyntaxOrZero
2024-10-09 10:30:10.123 2024-10-09 10:30:10.123
2024-10-09 10:30:10.123456 2024-10-09 10:30:10.123456
1970-01-01 08:00:00.000000000 1970-01-01 08:00:00.000000000
2024-10-10 02:30:10.123456 2024-10-10 02:30:10.123456
2024-10-10 01:30:10.123456 2024-10-10 01:30:10.123456
2024-09-10 10:30:10.123
1970-01-01 08:00:00.000
1970-01-01 08:00:00
1970-01-01 08:00:00
1970-01-01 08:00:00.000
parseDateTime64InJodaSyntaxOrNull
2024-10-09 10:30:10.123 2024-10-09 10:30:10.123
2024-10-09 10:30:10.123456 2024-10-09 10:30:10.123456
\N \N
2024-10-10 02:30:10.123456 2024-10-10 02:30:10.123456
2024-10-10 01:30:10.123456 2024-10-10 01:30:10.123456
2024-09-10 10:30:10.123
\N
\N
\N
1970-01-01 00:00:00

View File

@ -1,19 +1,54 @@
set session_timezone = 'Asia/Shanghai'; set session_timezone = 'Asia/Shanghai';
select 'parseDateTime64InJodaSyntax';
select parseDateTime64InJodaSyntax('', ''); -- { serverError VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE }
select parseDateTime64InJodaSyntax('2077-10-09 10:30:10.123', 'yyyy-MM-dd HH:mm:ss.SSS');
select parseDateTime64InJodaSyntax('2177-10-09 10:30:10.123', 'yyyy-MM-dd HH:mm:ss.SSS'); -- { serverError CANNOT_PARSE_DATETIME }
select parseDateTime64InJodaSyntax('+0000', 'Z');
select parseDateTime64InJodaSyntax('08:01', 'HH:ss');
select parseDateTime64InJodaSyntax('2024-01-02', 'yyyy-MM-dd');
select parseDateTime64InJodaSyntax('10:30:50', 'HH:mm:ss');
select parseDateTime64InJodaSyntax('2024-12-31 23:30:10.123456-0800', 'yyyy-MM-dd HH:mm:ss.SSSSSSZ');
select parseDateTime64InJodaSyntax('2024-01-01 00:00:01.123456+0800', 'yyyy-MM-dd HH:mm:ss.SSSSSSZ');
select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123', 'yyyy-MM-dd HH:mm:ss.SSS'); select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123', 'yyyy-MM-dd HH:mm:ss.SSS');
select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123456', 'yyyy-MM-dd HH:mm:ss.SSSSSS'); select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123456', 'yyyy-MM-dd HH:mm:ss.SSSSSS');
select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS'); -- { serverError CANNOT_PARSE_DATETIME } select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS'); -- { serverError CANNOT_PARSE_DATETIME }
select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123456-0800', 'yyyy-MM-dd HH:mm:ss.SSSSSSZ'); select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123456-0800', 'yyyy-MM-dd HH:mm:ss.SSSSSSZ');
select parseDateTime64InJodaSyntax('2024-11-05-0800 01:02:03.123456', 'yyyy-MM-ddZ HH:mm:ss.SSSSSS');
select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123456America/Los_Angeles', 'yyyy-MM-dd HH:mm:ss.SSSSSSz'); select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123456America/Los_Angeles', 'yyyy-MM-dd HH:mm:ss.SSSSSSz');
select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123456Australia/Adelaide', 'yyyy-MM-dd HH:mm:ss.SSSSSSz');
select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123', 'yyyy-dd-MM HH:mm:ss.SSS');
select parseDateTime64InJodaSyntax('999999 10-09-202410:30:10', 'SSSSSSSSS dd-MM-yyyyHH:mm:ss');
select parseDateTime64InJodaSyntax('2024-10-09 10:30:10.123456-0845', 'yyyy-MM-dd HH:mm:ss.SSSSSSZ');
select parseDateTime64InJodaSyntax('2023-02-29 11:22:33Not/Timezone', 'yyyy-MM-dd HH:mm:ssz'); -- { serverError BAD_ARGUMENTS }
--leap years and non-leap years
select parseDateTime64InJodaSyntax('2024-02-29 11:23:34America/Los_Angeles', 'yyyy-MM-dd HH:mm:ssz');
select parseDateTime64InJodaSyntax('2023-02-29 11:22:33America/Los_Angeles', 'yyyy-MM-dd HH:mm:ssz'); -- { serverError CANNOT_PARSE_DATETIME }
select parseDateTime64InJodaSyntax('2024-02-28 23:22:33America/Los_Angeles', 'yyyy-MM-dd HH:mm:ssz');
select parseDateTime64InJodaSyntax('2023-02-28 23:22:33America/Los_Angeles', 'yyyy-MM-dd HH:mm:ssz');
select parseDateTime64InJodaSyntax('2024-03-01 00:22:33-8000', 'yyyy-MM-dd HH:mm:ssZ');
select parseDateTime64InJodaSyntax('2023-03-01 00:22:33-8000', 'yyyy-MM-dd HH:mm:ssZ');
select 'parseDateTime64InJodaSyntaxOrZero';
select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123', 'yyyy-MM-dd HH:mm:ss.SSS'); select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123', 'yyyy-MM-dd HH:mm:ss.SSS');
select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123456', 'yyyy-MM-dd HH:mm:ss.SSSSSS'); select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123456', 'yyyy-MM-dd HH:mm:ss.SSSSSS');
select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS'); select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS');
select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123456-0800', 'yyyy-MM-dd HH:mm:ss.SSSSSSZ'); select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123456-0800', 'yyyy-MM-dd HH:mm:ss.SSSSSSZ');
select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123456America/Los_Angeles', 'yyyy-MM-dd HH:mm:ss.SSSSSSz'); select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123456America/Los_Angeles', 'yyyy-MM-dd HH:mm:ss.SSSSSSz');
select parseDateTime64InJodaSyntaxOrZero('2024-10-09 10:30:10.123', 'yyyy-dd-MM HH:mm:ss.SSS');
select parseDateTime64InJodaSyntaxOrZero('wrong value', 'yyyy-dd-MM HH:mm:ss.SSS');
select parseDateTime64InJodaSyntaxOrZero('2023-02-29 11:22:33America/Los_Angeles', 'yyyy-MM-dd HH:mm:ssz');
select parseDateTime64InJodaSyntaxOrZero('', '');
select parseDateTime64InJodaSyntaxOrZero('2177-10-09 10:30:10.123', 'yyyy-MM-dd HH:mm:ss.SSS');
select 'parseDateTime64InJodaSyntaxOrNull';
select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123', 'yyyy-MM-dd HH:mm:ss.SSS'); select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123', 'yyyy-MM-dd HH:mm:ss.SSS');
select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123456', 'yyyy-MM-dd HH:mm:ss.SSSSSS'); select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123456', 'yyyy-MM-dd HH:mm:ss.SSSSSS');
select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS'); select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS');
select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123456-0800', 'yyyy-MM-dd HH:mm:ss.SSSSSSZ'); select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123456-0800', 'yyyy-MM-dd HH:mm:ss.SSSSSSZ');
select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123456America/Los_Angeles', 'yyyy-MM-dd HH:mm:ss.SSSSSSz'); select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123456America/Los_Angeles', 'yyyy-MM-dd HH:mm:ss.SSSSSSz');
select parseDateTime64InJodaSyntaxOrNull('2024-10-09 10:30:10.123', 'yyyy-dd-MM HH:mm:ss.SSS');
select parseDateTime64InJodaSyntaxOrNull('2023-02-29 11:22:33America/Los_Angeles', 'yyyy-MM-dd HH:mm:ssz');
select parseDateTime64InJodaSyntaxOrNull('', '');
select parseDateTime64InJodaSyntaxOrNull('2177-10-09 10:30:10.123', 'yyyy-MM-dd HH:mm:ss.SSS');
set session_timezone = 'UTC';
select parseDateTime64InJodaSyntax('', '');

View File

@ -0,0 +1 @@
CODEC(Delta(1), LZ4) 14 48

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo "Hello, World!" | $CLICKHOUSE_COMPRESSOR --codec 'Delta' --codec 'LZ4' | $CLICKHOUSE_COMPRESSOR --stat

View File

@ -0,0 +1 @@
test 10.00 million 352.87 MiB 39.43 MiB 39.45 MiB

View File

@ -0,0 +1,23 @@
-- Tags: no-random-settings
set allow_experimental_dynamic_type = 1;
set allow_experimental_json_type = 1;
drop table if exists test;
create table test (d Dynamic, json JSON) engine=MergeTree order by tuple() settings min_rows_for_wide_part=0, min_bytes_for_wide_part=1;
insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(10000000);
SELECT
`table`,
formatReadableQuantity(sum(rows)) AS rows,
formatReadableSize(sum(data_uncompressed_bytes)) AS data_size_uncompressed,
formatReadableSize(sum(data_compressed_bytes)) AS data_size_compressed,
formatReadableSize(sum(bytes_on_disk)) AS total_size_on_disk
FROM system.parts
WHERE active AND (database = currentDatabase()) AND (`table` = 'test')
GROUP BY `table`
ORDER BY `table` ASC;
drop table test;

View File

@ -0,0 +1,2 @@
Condition: and((materialize(auid) in [1, 1]), (_CAST(toDate(ts)) in (-Inf, 1703980800]))
Granules: 1/3

View File

@ -0,0 +1,36 @@
DROP TABLE IF EXISTS alpha;
DROP TABLE IF EXISTS alpha__day;
SET session_timezone = 'Etc/UTC';
CREATE TABLE alpha
(
`ts` DateTime64(6),
`auid` Int64,
)
ENGINE = MergeTree
ORDER BY (auid, ts)
SETTINGS index_granularity = 1;
CREATE VIEW alpha__day
(
`ts_date` Date,
`auid` Int64,
)
AS SELECT
ts_date,
auid,
FROM
(
SELECT
toDate(ts) AS ts_date,
auid
FROM alpha
)
WHERE ts_date <= toDateTime('2024-01-01 00:00:00') - INTERVAL 1 DAY;
INSERT INTO alpha VALUES (toDateTime64('2024-01-01 00:00:00.000', 3) - INTERVAL 3 DAY, 1);
INSERT INTO alpha VALUES (toDateTime64('2024-01-01 00:00:00.000', 3) - INTERVAL 3 DAY, 2);
INSERT INTO alpha VALUES (toDateTime64('2024-01-01 00:00:00.000', 3) - INTERVAL 3 DAY, 3);
select trimLeft(explain) from (EXPLAIN indexes = 1 SELECT auid FROM alpha__day WHERE auid = 1) where explain like '%Condition:%' or explain like '%Granules:%' settings allow_experimental_analyzer = 1;

View File

@ -0,0 +1 @@
() 2

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS t0;
CREATE TABLE t0 (c0 Tuple(), c1 int) ENGINE = Memory();
INSERT INTO t0 VALUES ((), 1);
ALTER TABLE t0 UPDATE c0 = (), c1 = 2 WHERE EXISTS (SELECT 1);
SELECT * FROM t0;
DROP TABLE t0;