Merge branch 'master' into change-hashed-path

This commit is contained in:
Kseniia Sumarokova 2022-04-14 11:18:42 +02:00 committed by GitHub
commit 04926d882a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 608 additions and 168 deletions

View File

@ -153,13 +153,19 @@ jobs:
EOF
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Fast Test
run: |
sudo rm -fr "$GITHUB_WORKSPACE"
mkdir "$GITHUB_WORKSPACE"
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
- name: Check out repository code
uses: actions/checkout@v2
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ env.TEMP_PATH }}
- name: Fast Test
run: |
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 fast_test_check.py
- name: Cleanup

View File

@ -115,6 +115,7 @@ function start_server
function clone_root
{
git config --global --add safe.directory "$FASTTEST_SOURCE"
git clone --depth 1 https://github.com/ClickHouse/ClickHouse.git -- "$FASTTEST_SOURCE" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/clone_log.txt"
(

View File

@ -159,6 +159,10 @@ $ clickhouse-client --query "select count(*) from datasets.ontime"
!!! info "Info"
If you will run the queries described below, you have to use the full table name, `datasets.ontime`.
!!! info "Info"
If you are using the prepared partitions or the Online Playground replace any occurrence of `IATA_CODE_Reporting_Airline` or `IATA_CODE_Reporting_Airline AS Carrier` in the following queries with `Carrier` (see `describe ontime`).
## Queries {#queries}
Q0.

View File

@ -124,7 +124,7 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
- `--time, -t` If specified, print the query execution time to stderr in non-interactive mode.
- `--stacktrace` If specified, also print the stack trace if an exception occurs.
- `--config-file` The name of the configuration file.
- `--secure` If specified, will connect to server over secure connection.
- `--secure` If specified, will connect to server over secure connection (TLS). You might need to configure your CA certificates in the [configuration file](#configuration_files). The available configuration settings are the same as for [server-side TLS configuration](../operations/server-configuration-parameters/settings.md#server_configuration_parameters-openssl).
- `--history_file` — Path to a file containing command history.
- `--param_<name>` — Value for a [query with parameters](#cli-queries-with-parameters).
- `--hardware-utilization` — Print hardware utilization information in progress bar.
@ -148,7 +148,12 @@ Example of a config file:
<config>
<user>username</user>
<password>password</password>
<secure>False</secure>
<secure>true</secure>
<openSSL>
<client>
<caConfig>/etc/ssl/cert.pem</caConfig>
</client>
</openSSL>
</config>
```

View File

@ -3,13 +3,10 @@ toc_priority: 66
toc_title: ClickHouse Keeper
---
# [pre-production] ClickHouse Keeper {#clickHouse-keeper}
# ClickHouse Keeper {#clickHouse-keeper}
ClickHouse server uses [ZooKeeper](https://zookeeper.apache.org/) coordination system for data [replication](../engines/table-engines/mergetree-family/replication.md) and [distributed DDL](../sql-reference/distributed-ddl.md) queries execution. ClickHouse Keeper is an alternative coordination system compatible with ZooKeeper.
!!! warning "Warning"
This feature is currently in the pre-production stage. We test it in our CI and on small internal installations.
## Implementation details {#implementation-details}
ZooKeeper is one of the first well-known open-source coordination systems. It's implemented in Java, has quite a simple and powerful data model. ZooKeeper's coordination algorithm called ZAB (ZooKeeper Atomic Broadcast) doesn't provide linearizability guarantees for reads, because each ZooKeeper node serves reads locally. Unlike ZooKeeper ClickHouse Keeper is written in C++ and uses [RAFT algorithm](https://raft.github.io/) [implementation](https://github.com/eBay/NuRaft). This algorithm allows to have linearizability for reads and writes, has several open-source implementations in different languages.

View File

@ -941,30 +941,30 @@ For more information, see the MergeTreeSettings.h header file.
SSL client/server configuration.
Support for SSL is provided by the `libpoco` library. The interface is described in the file [SSLManager.h](https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/SSLManager.h)
Support for SSL is provided by the `libpoco` library. The available configuration options are explained in [SSLManager.h](https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/SSLManager.h). Default values can be found in [SSLManager.cpp](https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/src/SSLManager.cpp).
Keys for server/client settings:
- privateKeyFile The path to the file with the secret key of the PEM certificate. The file may contain a key and certificate at the same time.
- certificateFile The path to the client/server certificate file in PEM format. You can omit it if `privateKeyFile` contains the certificate.
- caConfig The path to the file or directory that contains trusted root certificates.
- verificationMode The method for checking the nodes certificates. Details are in the description of the [Context](https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/Context.h) class. Possible values: `none`, `relaxed`, `strict`, `once`.
- verificationDepth The maximum length of the verification chain. Verification will fail if the certificate chain length exceeds the set value.
- loadDefaultCAFile Indicates that built-in CA certificates for OpenSSL will be used. Acceptable values: `true`, `false`. \|
- cipherList Supported OpenSSL encryptions. For example: `ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH`.
- cacheSessions Enables or disables caching sessions. Must be used in combination with `sessionIdContext`. Acceptable values: `true`, `false`.
- sessionIdContext A unique set of random characters that the server appends to each generated identifier. The length of the string must not exceed `SSL_MAX_SSL_SESSION_ID_LENGTH`. This parameter is always recommended since it helps avoid problems both if the server caches the session and if the client requested caching. Default value: `${application.name}`.
- sessionCacheSize The maximum number of sessions that the server caches. Default value: 1024\*20. 0 Unlimited sessions.
- sessionTimeout Time for caching the session on the server.
- extendedVerification Automatically extended verification of certificates after the session ends. Acceptable values: `true`, `false`.
- requireTLSv1 Require a TLSv1 connection. Acceptable values: `true`, `false`.
- requireTLSv1_1 Require a TLSv1.1 connection. Acceptable values: `true`, `false`.
- requireTLSv1_2 Require a TLSv1.2 connection. Acceptable values: `true`, `false`.
- fips Activates OpenSSL FIPS mode. Supported if the librarys OpenSSL version supports FIPS.
- privateKeyPassphraseHandler Class (PrivateKeyPassphraseHandler subclass) that requests the passphrase for accessing the private key. For example: `<privateKeyPassphraseHandler>`, `<name>KeyFileHandler</name>`, `<options><password>test</password></options>`, `</privateKeyPassphraseHandler>`.
- invalidCertificateHandler Class (a subclass of CertificateHandler) for verifying invalid certificates. For example: `<invalidCertificateHandler> <name>ConsoleCertificateHandler</name> </invalidCertificateHandler>` .
- disableProtocols Protocols that are not allowed to use.
- preferServerCiphers Preferred server ciphers on the client.
- caConfig (default: none) The path to the file or directory that contains trusted CA certificates. If this points to a file, it must be in PEM format and can contain several CA certificates. If this points to a directory, it must contain one .pem file per CA certificate. The filenames are looked up by the CA subject name hash value. Details can be found in the man page of [SSL_CTX_load_verify_locations](https://www.openssl.org/docs/man3.0/man3/SSL_CTX_load_verify_locations.html).
- verificationMode (default: relaxed) The method for checking the nodes certificates. Details are in the description of the [Context](https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/Context.h) class. Possible values: `none`, `relaxed`, `strict`, `once`.
- verificationDepth (default: 9) The maximum length of the verification chain. Verification will fail if the certificate chain length exceeds the set value.
- loadDefaultCAFile (default: true) Wether built-in CA certificates for OpenSSL will be used. ClickHouse assumes that builtin CA certificates are in the file `/etc/ssl/cert.pem` (resp. the directory `/etc/ssl/certs`) or in file (resp. directory) specified by the environment variable `SSL_CERT_FILE` (resp. `SSL_CERT_DIR`).
- cipherList (default: `ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH`) - Supported OpenSSL encryptions.
- cacheSessions (default: false) Enables or disables caching sessions. Must be used in combination with `sessionIdContext`. Acceptable values: `true`, `false`.
- sessionIdContext (default: `${application.name}`) A unique set of random characters that the server appends to each generated identifier. The length of the string must not exceed `SSL_MAX_SSL_SESSION_ID_LENGTH`. This parameter is always recommended since it helps avoid problems both if the server caches the session and if the client requested caching. Default value: `${application.name}`.
- sessionCacheSize (default: [1024\*20](https://github.com/ClickHouse/boringssl/blob/master/include/openssl/ssl.h#L1978)) The maximum number of sessions that the server caches. A value of 0 means unlimited sessions.
- sessionTimeout (default: [2h](https://github.com/ClickHouse/boringssl/blob/master/include/openssl/ssl.h#L1926)) Time for caching the session on the server.
- extendedVerification (default: false) If enabled, verify that the certificate CN or SAN matches the peer hostname.
- requireTLSv1 (default: false) Require a TLSv1 connection. Acceptable values: `true`, `false`.
- requireTLSv1_1 (default: false) Require a TLSv1.1 connection. Acceptable values: `true`, `false`.
- requireTLSv1_2 (default: false) Require a TLSv1.2 connection. Acceptable values: `true`, `false`.
- fips (default: false) Activates OpenSSL FIPS mode. Supported if the librarys OpenSSL version supports FIPS.
- privateKeyPassphraseHandler (default: `KeyConsoleHandler`) Class (PrivateKeyPassphraseHandler subclass) that requests the passphrase for accessing the private key. For example: `<privateKeyPassphraseHandler>`, `<name>KeyFileHandler</name>`, `<options><password>test</password></options>`, `</privateKeyPassphraseHandler>`.
- invalidCertificateHandler (default: `ConsoleCertificateHandler`) Class (a subclass of CertificateHandler) for verifying invalid certificates. For example: `<invalidCertificateHandler> <name>ConsoleCertificateHandler</name> </invalidCertificateHandler>` .
- disableProtocols (default: "") Protocols that are not allowed to use.
- preferServerCiphers (default: false) Preferred server ciphers on the client.
**Example of settings:**

14
src/Common/Concepts.h Normal file
View File

@ -0,0 +1,14 @@
#pragma once
#include <concepts>
namespace DB
{
template <typename... T>
concept OptionalArgument = requires(T &&...)
{
requires(sizeof...(T) == 0 || sizeof...(T) == 1);
};
}

View File

@ -233,26 +233,8 @@ FileSegments LRUFileCache::splitRangeIntoCells(
return file_segments;
}
FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t size)
{
assertInitialized();
FileSegment::Range range(offset, offset + size - 1);
std::lock_guard cache_lock(mutex);
#ifndef NDEBUG
assertCacheCorrectness(key, cache_lock);
#endif
/// Get all segments which intersect with the given range.
auto file_segments = getImpl(key, range, cache_lock);
if (file_segments.empty())
{
file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::EMPTY, cache_lock);
}
else
void LRUFileCache::fillHolesWithEmptyFileSegments(
FileSegments & file_segments, const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, std::lock_guard<std::mutex> & cache_lock)
{
/// There are segments [segment1, ..., segmentN]
/// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
@ -295,7 +277,17 @@ FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t
assert(current_pos < segment_range.left);
auto hole_size = segment_range.left - current_pos;
if (fill_with_detached_file_segments)
{
auto file_segment = std::make_shared<FileSegment>(current_pos, hole_size, key, this, FileSegment::State::EMPTY);
file_segment->detached = true;
file_segments.insert(it, file_segment);
}
else
{
file_segments.splice(it, splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, cache_lock));
}
current_pos = segment_range.right + 1;
++it;
@ -309,14 +301,77 @@ FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t
/// segmentN
auto hole_size = range.right - current_pos + 1;
if (fill_with_detached_file_segments)
{
auto file_segment = std::make_shared<FileSegment>(current_pos, hole_size, key, this, FileSegment::State::EMPTY);
file_segment->detached = true;
file_segments.insert(file_segments.end(), file_segment);
}
else
{
file_segments.splice(file_segments.end(), splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, cache_lock));
}
}
}
FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t size)
{
assertInitialized();
FileSegment::Range range(offset, offset + size - 1);
std::lock_guard cache_lock(mutex);
#ifndef NDEBUG
assertCacheCorrectness(key, cache_lock);
#endif
/// Get all segments which intersect with the given range.
auto file_segments = getImpl(key, range, cache_lock);
if (file_segments.empty())
{
file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::EMPTY, cache_lock);
}
else
{
fillHolesWithEmptyFileSegments(file_segments, key, range, false, cache_lock);
}
assert(!file_segments.empty());
return FileSegmentsHolder(std::move(file_segments));
}
FileSegmentsHolder LRUFileCache::get(const Key & key, size_t offset, size_t size)
{
assertInitialized();
FileSegment::Range range(offset, offset + size - 1);
std::lock_guard cache_lock(mutex);
#ifndef NDEBUG
assertCacheCorrectness(key, cache_lock);
#endif
/// Get all segments which intersect with the given range.
auto file_segments = getImpl(key, range, cache_lock);
if (file_segments.empty())
{
auto file_segment = std::make_shared<FileSegment>(offset, size, key, this, FileSegment::State::EMPTY);
file_segment->detached = true;
file_segments = { file_segment };
}
else
{
fillHolesWithEmptyFileSegments(file_segments, key, range, true, cache_lock);
}
return FileSegmentsHolder(std::move(file_segments));
}
LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
const Key & key, size_t offset, size_t size, FileSegment::State state,
std::lock_guard<std::mutex> & cache_lock)

View File

@ -72,6 +72,17 @@ public:
*/
virtual FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) = 0;
/**
* Segments in returned list are ordered in ascending order and represent a full contiguous
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
*
* If file segment has state EMPTY, then it is also marked as "detached". E.g. it is "detached"
* from cache (not owned by cache), and as a result will never change it's state and will be destructed
* with the destruction of the holder, while in getOrSet() EMPTY file segments can eventually change
* it's state (and become DOWNLOADED).
*/
virtual FileSegmentsHolder get(const Key & key, size_t offset, size_t size) = 0;
virtual FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) = 0;
virtual FileSegments getSnapshot() const = 0;
@ -124,6 +135,8 @@ public:
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override;
FileSegmentsHolder get(const Key & key, size_t offset, size_t size) override;
FileSegments getSnapshot() const override;
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override;
@ -213,6 +226,9 @@ private:
String dumpStructureImpl(const Key & key_, std::lock_guard<std::mutex> & cache_lock);
void fillHolesWithEmptyFileSegments(
FileSegments & file_segments, const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, std::lock_guard<std::mutex> & cache_lock);
public:
struct Stat
{

View File

@ -107,6 +107,9 @@ String FileSegment::getOrSetDownloader()
{
std::lock_guard segment_lock(mutex);
if (detached)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot set downloader for a detached file segment");
if (downloader_id.empty())
{
assert(download_state != State::DOWNLOADING);
@ -216,6 +219,8 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
"Attempt to write {} bytes to offset: {}, but current download offset is {}",
size, offset_, download_offset);
assertNotDetached();
if (!cache_writer)
{
if (downloaded_size > 0)
@ -263,6 +268,8 @@ void FileSegment::writeInMemory(const char * from, size_t size)
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
assertNotDetached();
std::lock_guard segment_lock(mutex);
if (cache_writer)
@ -297,7 +304,9 @@ size_t FileSegment::finalizeWrite()
size_t size = cache_writer->offset();
if (size == 0)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing size is not allowed");
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed");
assertNotDetached();
try
{
@ -352,6 +361,8 @@ bool FileSegment::reserve(size_t size)
if (!size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed");
assertNotDetached();
{
std::lock_guard segment_lock(mutex);
@ -419,7 +430,10 @@ void FileSegment::completeBatchAndResetDownloader()
if (!is_downloader)
{
cv.notify_all();
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "File segment can be completed only by downloader");
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"File segment can be completed only by downloader ({} != {})",
downloader_id, getCallerId());
}
resetDownloaderImpl(segment_lock);
@ -453,6 +467,8 @@ void FileSegment::complete(State state)
download_state = state;
assertNotDetached();
try
{
completeImpl(cache_lock, segment_lock);
@ -479,6 +495,8 @@ void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
if (download_state != State::DOWNLOADED && getDownloadedSize(segment_lock) == range().size())
setDownloaded(segment_lock);
assertNotDetached();
if (download_state == State::DOWNLOADING || download_state == State::EMPTY)
{
/// Segment state can be changed from DOWNLOADING or EMPTY only if the caller is the
@ -608,6 +626,12 @@ void FileSegment::assertCorrectnessImpl(std::lock_guard<std::mutex> & /* segment
assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(cache->getPathInLocalCache(key(), offset())) > 0);
}
void FileSegment::assertNotDetached() const
{
if (detached)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Operation not allowed, file segment is detached");
}
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & /* cache_lock */)
{
auto snapshot = std::make_shared<FileSegment>(
@ -641,6 +665,15 @@ FileSegmentsHolder::~FileSegmentsHolder()
if (!cache)
cache = file_segment->cache;
if (file_segment->detached)
{
/// This file segment is not owned by cache, so it will be destructed
/// at this point, therefore no completion required.
assert(file_segment->state() == FileSegment::State::EMPTY);
file_segment_it = file_segments.erase(current_file_segment_it);
continue;
}
try
{
/// File segment pointer must be reset right after calling complete() and

View File

@ -150,6 +150,7 @@ private:
size_t getDownloadedSize(std::lock_guard<std::mutex> & segment_lock) const;
String getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock) const;
void assertCorrectnessImpl(std::lock_guard<std::mutex> & segment_lock) const;
void assertNotDetached() const;
void setDownloaded(std::lock_guard<std::mutex> & segment_lock);
void setDownloadFailed(std::lock_guard<std::mutex> & segment_lock);
@ -199,6 +200,8 @@ private:
Poco::Logger * log;
/// "detached" file segment means that it is not owned by cache ("detached" from cache).
/// In general case, all file segments are owned by cache.
bool detached = false;
std::atomic<bool> is_downloaded{false};

View File

@ -1,4 +1,5 @@
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <Common/Exception.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
@ -243,6 +244,9 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
while (true)
{
/// This is inside the loop to also reset previous thread names set inside the jobs.
setThreadName("ThreadPool");
Job job;
bool need_shutdown = false;

View File

@ -57,6 +57,8 @@ struct ZooKeeperRequest : virtual Request
bool restored_from_zookeeper_log = false;
UInt64 request_created_time_ns = 0;
UInt64 thread_id = 0;
String query_id;
ZooKeeperRequest() = default;
ZooKeeperRequest(const ZooKeeperRequest &) = default;

View File

@ -8,6 +8,7 @@
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <base/logger_useful.h>
#include <base/getThreadId.h>
#include <Common/config.h>
@ -1016,6 +1017,11 @@ void ZooKeeper::pushRequest(RequestInfo && info)
try
{
info.time = clock::now();
if (zk_log)
{
info.request->thread_id = getThreadId();
info.request->query_id = String(CurrentThread::getQueryId());
}
if (!info.request->xid)
{
@ -1269,6 +1275,11 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const
elem.event_time = event_time;
elem.address = socket_address;
elem.session_id = session_id;
if (request)
{
elem.thread_id = request->thread_id;
elem.query_id = request->query_id;
}
maybe_zk_log->add(elem);
}
}

View File

@ -234,6 +234,11 @@ bool createFile(const std::string & path)
DB::throwFromErrnoWithPath("Cannot create file: " + path, path, DB::ErrorCodes::CANNOT_CREATE_FILE);
}
bool exists(const std::string & path)
{
return faccessat(AT_FDCWD, path.c_str(), F_OK, AT_EACCESS) == 0;
}
bool canRead(const std::string & path)
{
struct stat st;
@ -249,7 +254,6 @@ bool canRead(const std::string & path)
DB::throwFromErrnoWithPath("Cannot check read access to file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED);
}
bool canWrite(const std::string & path)
{
struct stat st;
@ -265,6 +269,13 @@ bool canWrite(const std::string & path)
DB::throwFromErrnoWithPath("Cannot check write access to file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED);
}
bool canExecute(const std::string & path)
{
if (exists(path))
return faccessat(AT_FDCWD, path.c_str(), X_OK, AT_EACCESS) == 0;
DB::throwFromErrnoWithPath("Cannot check execute access to file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED);
}
time_t getModificationTime(const std::string & path)
{
struct stat st;

View File

@ -70,8 +70,10 @@ namespace FS
{
bool createFile(const std::string & path);
bool exists(const std::string & path);
bool canRead(const std::string & path);
bool canWrite(const std::string & path);
bool canExecute(const std::string & path);
time_t getModificationTime(const std::string & path);
Poco::Timestamp getModificationTimestamp(const std::string & path);

View File

@ -1,8 +1,11 @@
#pragma once
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#include <new>
#include <base/defines.h>
#include <Common/Concepts.h>
#include <Common/CurrentMemoryTracker.h>
#include <Common/config.h>
@ -14,13 +17,24 @@
# include <cstdlib>
#endif
namespace Memory
{
inline ALWAYS_INLINE void * newImpl(std::size_t size)
inline ALWAYS_INLINE size_t alignToSizeT(std::align_val_t align) noexcept
{
auto * ptr = malloc(size);
return static_cast<size_t>(align);
}
template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void * newImpl(std::size_t size, TAlign... align)
{
void * ptr = nullptr;
if constexpr (sizeof...(TAlign) == 1)
ptr = aligned_alloc(alignToSizeT(align...), size);
else
ptr = malloc(size);
if (likely(ptr != nullptr))
return ptr;
@ -33,6 +47,11 @@ inline ALWAYS_INLINE void * newNoExept(std::size_t size) noexcept
return malloc(size);
}
inline ALWAYS_INLINE void * newNoExept(std::size_t size, std::align_val_t align) noexcept
{
return aligned_alloc(static_cast<size_t>(align), size);
}
inline ALWAYS_INLINE void deleteImpl(void * ptr) noexcept
{
free(ptr);
@ -40,17 +59,24 @@ inline ALWAYS_INLINE void deleteImpl(void * ptr) noexcept
#if USE_JEMALLOC
inline ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size) noexcept
template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size, TAlign... align) noexcept
{
if (unlikely(ptr == nullptr))
return;
if constexpr (sizeof...(TAlign) == 1)
sdallocx(ptr, size, MALLOCX_ALIGN(alignToSizeT(align...)));
else
sdallocx(ptr, size, 0);
}
#else
inline ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size [[maybe_unused]]) noexcept
template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size [[maybe_unused]], TAlign... /* align */) noexcept
{
free(ptr);
}
@ -63,8 +89,9 @@ inline ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size [[maybe_unuse
# include <malloc/malloc.h>
#endif
inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size)
template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size, TAlign... align [[maybe_unused]])
{
size_t actual_size = size;
@ -72,26 +99,41 @@ inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size)
/// The nallocx() function allocates no memory, but it performs the same size computation as the mallocx() function
/// @note je_mallocx() != je_malloc(). It's expected they don't differ much in allocation logic.
if (likely(size != 0))
{
if constexpr (sizeof...(TAlign) == 1)
actual_size = nallocx(size, MALLOCX_ALIGN(alignToSizeT(align...)));
else
actual_size = nallocx(size, 0);
}
#endif
return actual_size;
}
inline ALWAYS_INLINE void trackMemory(std::size_t size)
template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void trackMemory(std::size_t size, TAlign... align)
{
std::size_t actual_size = getActualAllocationSize(size);
std::size_t actual_size = getActualAllocationSize(size, align...);
CurrentMemoryTracker::allocNoThrow(actual_size);
}
inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0) noexcept
template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0, TAlign... align [[maybe_unused]]) noexcept
{
try
{
#if USE_JEMALLOC
/// @note It's also possible to use je_malloc_usable_size() here.
if (likely(ptr != nullptr))
{
if constexpr (sizeof...(TAlign) == 1)
CurrentMemoryTracker::free(sallocx(ptr, MALLOCX_ALIGN(alignToSizeT(align...))));
else
CurrentMemoryTracker::free(sallocx(ptr, 0));
}
#else
if (size)
CurrentMemoryTracker::free(size);
@ -103,7 +145,10 @@ inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t
#endif
}
catch (...)
{}
{
}
}
}
#pragma GCC diagnostic pop

View File

@ -1,6 +1,7 @@
#include <Common/memory.h>
#include <Common/config.h>
#include <cassert>
#include <new>
#include <Common/config.h>
#include <Common/memory.h>
#if defined(OS_DARWIN) && (USE_JEMALLOC)
/// In case of OSX jemalloc register itself as a default zone allocator.
@ -53,12 +54,24 @@ void * operator new(std::size_t size)
return Memory::newImpl(size);
}
void * operator new(std::size_t size, std::align_val_t align)
{
Memory::trackMemory(size, align);
return Memory::newImpl(size, align);
}
void * operator new[](std::size_t size)
{
Memory::trackMemory(size);
return Memory::newImpl(size);
}
void * operator new[](std::size_t size, std::align_val_t align)
{
Memory::trackMemory(size, align);
return Memory::newImpl(size, align);
}
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{
Memory::trackMemory(size);
@ -71,6 +84,18 @@ void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
return Memory::newNoExept(size);
}
void * operator new(std::size_t size, std::align_val_t align, const std::nothrow_t &) noexcept
{
Memory::trackMemory(size, align);
return Memory::newNoExept(size, align);
}
void * operator new[](std::size_t size, std::align_val_t align, const std::nothrow_t &) noexcept
{
Memory::trackMemory(size, align);
return Memory::newNoExept(size, align);
}
/// delete
/// C++17 std 21.6.2.1 (11)
@ -81,26 +106,51 @@ void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
/// It's unspecified whether size-aware or size-unaware version is called when deleting objects of
/// incomplete type and arrays of non-class and trivially-destructible class types.
void operator delete(void * ptr) noexcept
{
Memory::untrackMemory(ptr);
Memory::deleteImpl(ptr);
}
void operator delete(void * ptr, std::align_val_t align) noexcept
{
Memory::untrackMemory(ptr, 0, align);
Memory::deleteImpl(ptr);
}
void operator delete[](void * ptr) noexcept
{
Memory::untrackMemory(ptr);
Memory::deleteImpl(ptr);
}
void operator delete[](void * ptr, std::align_val_t align) noexcept
{
Memory::untrackMemory(ptr, 0, align);
Memory::deleteImpl(ptr);
}
void operator delete(void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
Memory::deleteSized(ptr, size);
}
void operator delete(void * ptr, std::size_t size, std::align_val_t align) noexcept
{
Memory::untrackMemory(ptr, size, align);
Memory::deleteSized(ptr, size, align);
}
void operator delete[](void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
Memory::deleteSized(ptr, size);
}
void operator delete[](void * ptr, std::size_t size, std::align_val_t align) noexcept
{
Memory::untrackMemory(ptr, size, align);
Memory::deleteSized(ptr, size, align);
}

View File

@ -139,11 +139,6 @@ void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_st
if (!isObject(column.type))
continue;
if (!isObject(column.type))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Type for column '{}' mismatch in columns list and in block. In list: {}, in block: {}",
column.name, column.type->getName(), column.type->getName());
const auto & column_object = assert_cast<const ColumnObject &>(*column.column);
const auto & subcolumns = column_object.getSubcolumns();

View File

@ -136,10 +136,17 @@ void SerializationObject<Parser>::deserializeTextImpl(IColumn & column, Reader &
reader(buf);
std::optional<ParseResult> result;
/// Treat empty string as an empty object
/// for better CAST from String to Object.
if (!buf.empty())
{
auto parser = parsers_pool.get([] { return new Parser; });
result = parser->parse(buf.data(), buf.size());
}
else
{
result = ParseResult{};
}
if (!result)
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse object");

View File

@ -7,7 +7,6 @@
#include <base/logger_useful.h>
#include <Common/LocalDateTime.h>
#include <Common/filesystemHelpers.h>
#include <Common/ShellCommand.h>
#include <Processors/Sources/ShellCommandSource.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
@ -15,12 +14,10 @@
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Dictionaries/DictionarySourceFactory.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/registerDictionaries.h>
namespace DB
@ -51,12 +48,18 @@ namespace
command,
user_scripts_path);
if (!std::filesystem::exists(std::filesystem::path(script_path)))
if (!FS::exists(script_path))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} does not exist inside user scripts folder {}",
command,
user_scripts_path);
if (!FS::canExecute(script_path))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} is not executable inside user scripts folder {}",
command,
user_scripts_path);
command = std::move(script_path);
}

View File

@ -7,7 +7,6 @@
#include <base/logger_useful.h>
#include <Common/LocalDateTime.h>
#include <Common/filesystemHelpers.h>
#include <Common/ShellCommand.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/ShellCommandSource.h>
@ -20,7 +19,6 @@
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryStructure.h>
namespace DB
{
@ -113,12 +111,18 @@ Pipe ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
command,
user_scripts_path);
if (!std::filesystem::exists(std::filesystem::path(script_path)))
if (!FS::exists(script_path))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} does not exist inside user scripts folder {}",
command,
user_scripts_path);
if (!FS::canExecute(script_path))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} is not executable inside user scripts folder {}",
command,
user_scripts_path);
command = std::move(script_path);
}

View File

@ -45,8 +45,16 @@ CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
}
void CachedReadBufferFromRemoteFS::initialize(size_t offset, size_t size)
{
if (settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
{
file_segments_holder.emplace(cache->get(cache_key, offset, size));
}
else
{
file_segments_holder.emplace(cache->getOrSet(cache_key, offset, size));
}
/**
* Segments in returned list are ordered in ascending order and represent a full contiguous
@ -326,6 +334,10 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
#endif
size_t seek_offset = file_offset_of_buffer_end - range.left;
if (file_offset_of_buffer_end < range.left)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invariant failed. Expected {} > {} (current offset > file segment's start offset)", file_offset_of_buffer_end, range.left);
read_buffer_for_file_segment->seek(seek_offset, SEEK_SET);
break;
@ -577,6 +589,8 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
{
last_caller_id = FileSegment::getCallerId();
assertCorrectness();
if (!initialized)
initialize(file_offset_of_buffer_end, getTotalSizeToRead());
@ -597,8 +611,8 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
{
try
{
bool file_segment_already_completed = !file_segment->isDownloader();
if (!file_segment_already_completed)
bool need_complete_file_segment = file_segment->isDownloader();
if (need_complete_file_segment)
file_segment->completeBatchAndResetDownloader();
}
catch (...)
@ -820,6 +834,12 @@ std::optional<size_t> CachedReadBufferFromRemoteFS::getLastNonDownloadedOffset()
return std::nullopt;
}
void CachedReadBufferFromRemoteFS::assertCorrectness() const
{
if (IFileCache::isReadOnly() && !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache usage is not allowed");
}
String CachedReadBufferFromRemoteFS::getInfoForLog()
{
String implementation_buffer_read_range_str;

View File

@ -50,6 +50,8 @@ private:
bool nextImplStep();
void assertCorrectness() const;
enum class ReadType
{
CACHED,

View File

@ -36,10 +36,12 @@ namespace ErrorCodes
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path, size_t file_size)
{
current_path = path;
auto remote_path = fs::path(common_path_prefix) / path;
auto cache = settings.remote_fs_cache;
bool with_cache = cache && settings.enable_filesystem_cache;
auto remote_path = fs::path(common_path_prefix) / path;
bool with_cache = cache
&& settings.enable_filesystem_cache
&& (!IFileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
auto remote_file_reader_creator = [=, this]()
{

View File

@ -33,7 +33,6 @@ bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader)));
++active_working_reader;
schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); });
return true;
@ -204,6 +203,11 @@ bool ParallelReadBuffer::nextImpl()
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
{
{
std::lock_guard lock{mutex};
++active_working_reader;
}
SCOPE_EXIT({
std::lock_guard lock{mutex};
--active_working_reader;

View File

@ -191,7 +191,7 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
off_t res = ::lseek(fd, seek_pos, SEEK_SET);
if (-1 == res)
throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(),
throwFromErrnoWithPath(fmt::format("Cannot seek through file {} at offset {}", getFileName(), seek_pos), getFileName(),
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
/// Also note that seeking past the file size is not allowed.

View File

@ -1071,6 +1071,38 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
TableProperties properties = getTablePropertiesAndNormalizeCreateQuery(create);
/// Check type compatible for materialized dest table and select columns
if (create.select && create.is_materialized_view && create.to_table_id)
{
if (StoragePtr to_table = DatabaseCatalog::instance().tryGetTable(
{create.to_table_id.database_name, create.to_table_id.table_name, create.to_table_id.uuid},
getContext()
))
{
Block input_block = InterpreterSelectWithUnionQuery(
create.select->clone(), getContext(), SelectQueryOptions().analyze()).getSampleBlock();
Block output_block = to_table->getInMemoryMetadataPtr()->getSampleBlock();
ColumnsWithTypeAndName input_columns;
ColumnsWithTypeAndName output_columns;
for (const auto & input_column : input_block)
{
if (const auto * output_column = output_block.findByName(input_column.name))
{
input_columns.push_back(input_column.cloneEmpty());
output_columns.push_back(output_column->cloneEmpty());
}
}
ActionsDAG::makeConvertingActions(
input_columns,
output_columns,
ActionsDAG::MatchColumnsMode::Position
);
}
}
DatabasePtr database;
bool need_add_to_database = !create.temporary;
if (need_add_to_database)

View File

@ -4,8 +4,6 @@
#include <Common/filesystemHelpers.h>
#include <IO/WriteHelpers.h>
#include <Processors/Sources/ShellCommandSource.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Formats/formatBlock.h>
@ -78,12 +76,18 @@ public:
command,
user_scripts_path);
if (!std::filesystem::exists(std::filesystem::path(script_path)))
if (!FS::exists(script_path))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} does not exist inside user scripts folder {}",
command,
user_scripts_path);
if (!FS::canExecute(script_path))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} is not executable inside user scripts folder {}",
command,
user_scripts_path);
command = std::move(script_path);
}

View File

@ -116,6 +116,8 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"type", std::move(type_enum)},
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime64>(6)},
{"thread_id", std::make_shared<DataTypeUInt64>()},
{"query_id", std::make_shared<DataTypeString>()},
{"address", DataTypeFactory::instance().get("IPv6")},
{"port", std::make_shared<DataTypeUInt16>()},
{"session_id", std::make_shared<DataTypeInt64>()},
@ -164,6 +166,8 @@ void ZooKeeperLogElement::appendToBlock(MutableColumns & columns) const
auto event_time_seconds = event_time / 1000000;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(thread_id);
columns[i++]->insert(query_id);
columns[i++]->insertData(IPv6ToBinary(address.host()).data(), 16);
columns[i++]->insert(address.port());
columns[i++]->insert(session_id);

View File

@ -22,6 +22,8 @@ struct ZooKeeperLogElement
Type type = UNKNOWN;
Decimal64 event_time = 0;
UInt64 thread_id = 0;
String query_id;
Poco::Net::SocketAddress address;
Int64 session_id = 0;

View File

@ -818,22 +818,27 @@ bool AlterCommand::isCommentAlter() const
bool AlterCommand::isTTLAlter(const StorageInMemoryMetadata & metadata) const
{
if (type == MODIFY_TTL)
{
if (!metadata.table_ttl.definition_ast)
return true;
/// If TTL had not been changed, do not require mutations
return queryToString(metadata.table_ttl.definition_ast) != queryToString(ttl);
}
if (!ttl || type != MODIFY_COLUMN)
return false;
bool ttl_changed = true;
bool column_ttl_changed = true;
for (const auto & [name, ttl_ast] : metadata.columns.getColumnTTLs())
{
if (name == column_name && queryToString(*ttl) == queryToString(*ttl_ast))
{
ttl_changed = false;
column_ttl_changed = false;
break;
}
}
return ttl_changed;
return column_ttl_changed;
}
bool AlterCommand::isRemovingProperty() const

View File

@ -1,21 +1,19 @@
#include <Storages/StorageExecutable.h>
#include <filesystem>
#include <unistd.h>
#include <boost/algorithm/string/split.hpp>
#include <Common/ShellCommand.h>
#include <Common/filesystemHelpers.h>
#include <Core/Block.h>
#include <IO/ReadHelpers.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
@ -123,12 +121,18 @@ Pipe StorageExecutable::read(
script_name,
user_scripts_path);
if (!std::filesystem::exists(std::filesystem::path(script_path)))
if (!FS::exists(script_path))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} does not exist inside user scripts folder {}",
script_name,
user_scripts_path);
if (!FS::canExecute(script_path))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} is not executable inside user scripts folder {}",
script_name,
user_scripts_path);
Pipes inputs;
inputs.reserve(input_queries.size());

View File

@ -76,4 +76,5 @@ def get_images_with_versions(
def get_image_with_version(reports_path, image, pull=True, version=None):
logging.info("Looking for images file in %s", reports_path)
return get_images_with_versions(reports_path, [image], pull, version=version)[0]

View File

@ -80,6 +80,7 @@ TRUSTED_CONTRIBUTORS = {
"thomoco", # ClickHouse
"BoloniniD", # Seasoned contributor, HSE
"tonickkozlov", # Cloudflare
"tylerhannan", # ClickHouse Employee
]
}

View File

@ -121,6 +121,7 @@ TRUSTED_CONTRIBUTORS = {
"YiuRULE",
"zlobober", # Developer of YT
"BoloniniD", # Seasoned contributor, HSE
"tylerhannan", # ClickHouse Employee
]
}

View File

@ -1176,6 +1176,9 @@ def check_server_started(args):
except TimeoutError:
print("\nConnection timeout, will not retry")
break
except Exception as e:
print("\nUexpected exception, will not retry: ", str(e))
break
print('\nAll connection tries failed')
sys.stdout.flush()

View File

@ -1407,24 +1407,3 @@ def test_insert_select_schema_inference(started_cluster):
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_insert_select.native')"
)
assert int(result) == 1
def test_parallel_reading_with_memory_limit(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_memory_limit.native') select * from numbers(100000)"
)
result = instance.query_and_get_error(
f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_memory_limit.native') settings max_memory_usage=10000"
)
assert "Memory limit (for query) exceeded" in result
sleep(5)
# Check that server didn't crash
result = instance.query("select 1")
assert int(result) == 1

View File

@ -0,0 +1,4 @@
1 (0,'')
2 (1,'v1')
3 (0,'')
4 (2,'')

View File

@ -0,0 +1,12 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS t_json_empty_str;
SET allow_experimental_object_type = 1;
CREATE TABLE t_json_empty_str(id UInt32, o JSON) ENGINE = Memory;
INSERT INTO t_json_empty_str VALUES (1, ''), (2, '{"k1": 1, "k2": "v1"}'), (3, '{}'), (4, '{"k1": 2}');
SELECT * FROM t_json_empty_str ORDER BY id;
DROP TABLE t_json_empty_str;

View File

@ -0,0 +1 @@
PushEvent some-repo (('https://avatars.githubusercontent.com/u/123213213?','github-actions','',123123123,'github-actions[bot]','https://api.github.com/users/github-actions[bot]'),'2022-01-04 07:00:00',(1001001010101,'some-repo','https://api.github.com/repos/some-repo'),'PushEvent')

View File

@ -0,0 +1,19 @@
-- Tags: no-fasttest
SET allow_experimental_object_type = 1;
DROP TABLE IF EXISTS t_github_json;
CREATE table t_github_json
(
event_type LowCardinality(String) DEFAULT JSONExtractString(message_raw, 'type'),
repo_name LowCardinality(String) DEFAULT JSONExtractString(message_raw, 'repo', 'name'),
message JSON DEFAULT message_raw,
message_raw String EPHEMERAL
) ENGINE = MergeTree ORDER BY (event_type, repo_name);
INSERT INTO t_github_json (message_raw) FORMAT JSONEachRow {"message_raw": "{\"type\":\"PushEvent\", \"created_at\": \"2022-01-04 07:00:00\", \"actor\":{\"avatar_url\":\"https://avatars.githubusercontent.com/u/123213213?\",\"display_login\":\"github-actions\",\"gravatar_id\":\"\",\"id\":123123123,\"login\":\"github-actions[bot]\",\"url\":\"https://api.github.com/users/github-actions[bot]\"},\"repo\":{\"id\":1001001010101,\"name\":\"some-repo\",\"url\":\"https://api.github.com/repos/some-repo\"}}"}
SELECT * FROM t_github_json ORDER BY event_type, repo_name;
DROP TABLE t_github_json;

View File

@ -0,0 +1,5 @@
----------test--------:
----------test--------:
100 \0\0\0\0\0\0\0
101 \0\0\0\0\0\0\0
102 \0\0\0\0\0\0\0

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS test_mv;
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS test_input;
CREATE TABLE test_input(id Int32) ENGINE=MergeTree() order by id;
CREATE TABLE test(`id` Int32, `pv` AggregateFunction(sum, Int32)) ENGINE = AggregatingMergeTree() ORDER BY id;
CREATE MATERIALIZED VIEW test_mv to test(`id` Int32, `pv` AggregateFunction(sum, Int32)) as SELECT id, sumState(1) as pv from test_input group by id; -- { serverError 70 }
INSERT INTO test_input SELECT toInt32(number % 1000) AS id FROM numbers(10);
select '----------test--------:';
select * from test;
create MATERIALIZED VIEW test_mv to test(`id` Int32, `pv` AggregateFunction(sum, Int32)) as SELECT id, sumState(toInt32(1)) as pv from test_input group by id;
INSERT INTO test_input SELECT toInt32(number % 1000) AS id FROM numbers(100,3);
select '----------test--------:';
select * from test;
DROP TABLE test_mv;
DROP TABLE test;
DROP TABLE test_input;

View File

@ -0,0 +1,22 @@
-- { echoOn }
alter table per_table_ttl_02265 modify TTL date + interval 1 month;
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
1
alter table per_table_ttl_02265 modify TTL date + interval 1 month;
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
1
alter table per_table_ttl_02265 modify TTL date + interval 2 month;
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
2
alter table per_table_ttl_02265 modify TTL date + interval 2 month group by key set value = argMax(value, date);
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
3
alter table per_table_ttl_02265 modify TTL date + interval 2 month group by key set value = argMax(value, date);
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
3
alter table per_table_ttl_02265 modify TTL date + interval 2 month recompress codec(ZSTD(17));
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
4
alter table per_table_ttl_02265 modify TTL date + interval 2 month recompress codec(ZSTD(17));
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
4

View File

@ -0,0 +1,22 @@
drop table if exists per_table_ttl_02265;
create table per_table_ttl_02265 (key Int, date Date, value String) engine=MergeTree() order by key;
insert into per_table_ttl_02265 values (1, today(), '1');
-- { echoOn }
alter table per_table_ttl_02265 modify TTL date + interval 1 month;
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
alter table per_table_ttl_02265 modify TTL date + interval 1 month;
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
alter table per_table_ttl_02265 modify TTL date + interval 2 month;
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
alter table per_table_ttl_02265 modify TTL date + interval 2 month group by key set value = argMax(value, date);
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
alter table per_table_ttl_02265 modify TTL date + interval 2 month group by key set value = argMax(value, date);
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
alter table per_table_ttl_02265 modify TTL date + interval 2 month recompress codec(ZSTD(17));
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
alter table per_table_ttl_02265 modify TTL date + interval 2 month recompress codec(ZSTD(17));
select count() from system.mutations where database = currentDatabase() and table = 'per_table_ttl_02265';
-- { echoOff }
drop table per_table_ttl_02265;