Merge branch 'master' into use-new-named-collections-code-2

This commit is contained in:
Kseniia Sumarokova 2022-12-22 19:04:30 +01:00 committed by GitHub
commit 380f13138a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
89 changed files with 1575 additions and 443 deletions

View File

@ -16,6 +16,6 @@ ClickHouse® is an open-source column-oriented database management system that a
* [Contacts](https://clickhouse.com/company/contact) can help to get your questions answered if there are any.
## Upcoming events
* [**v22.12 Release Webinar**](https://clickhouse.com/company/events/v22-12-release-webinar) 22.12 is the ClickHouse Christmas release. There are plenty of gifts (a new JOIN algorithm among them) and we adopted something from MongoDB. Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release.
* **Recording available**: [**v22.12 Release Webinar**](https://www.youtube.com/watch?v=sREupr6uc2k) 22.12 is the ClickHouse Christmas release. There are plenty of gifts (a new JOIN algorithm among them) and we adopted something from MongoDB. Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release.
* [**ClickHouse Meetup at the CHEQ office in Tel Aviv**](https://www.meetup.com/clickhouse-tel-aviv-user-group/events/289599423/) - Jan 16 - We are very excited to be holding our next in-person ClickHouse meetup at the CHEQ office in Tel Aviv! Hear from CHEQ, ServiceNow and Contentsquare, as well as a deep dive presentation from ClickHouse CTO Alexey Milovidov. Join us for a fun evening of talks, food and discussion!
* [**ClickHouse Meetup at Microsoft Office in Seattle**](https://www.meetup.com/clickhouse-seattle-user-group/events/290310025/) - Jan 18 - Keep an eye on this space as we will be announcing speakers soon!

View File

@ -1,15 +1,15 @@
---
slug: /en/development/build-cross-osx
sidebar_position: 66
title: How to Build ClickHouse on Linux for Mac OS X
sidebar_label: Build on Linux for Mac OS X
title: How to Build ClickHouse on Linux for macOS
sidebar_label: Build on Linux for macOS
---
This is for the case when you have a Linux machine and want to use it to build `clickhouse` binary that will run on OS X.
This is intended for continuous integration checks that run on Linux servers. If you want to build ClickHouse directly on Mac OS X, then proceed with [another instruction](../development/build-osx.md).
This is intended for continuous integration checks that run on Linux servers. If you want to build ClickHouse directly on macOS, then proceed with [another instruction](../development/build-osx.md).
The cross-build for Mac OS X is based on the [Build instructions](../development/build.md), follow them first.
The cross-build for macOS is based on the [Build instructions](../development/build.md), follow them first.
## Install Clang-14

View File

@ -1,9 +1,9 @@
---
slug: /en/development/build-osx
sidebar_position: 65
sidebar_label: Build on Mac OS X
title: How to Build ClickHouse on Mac OS X
description: How to build ClickHouse on Mac OS X
sidebar_label: Build on macOS
title: How to Build ClickHouse on macOS
description: How to build ClickHouse on macOS
---
:::info You don't have to build ClickHouse yourself!

View File

@ -7,7 +7,7 @@ description: Prerequisites and an overview of how to build ClickHouse
# Getting Started Guide for Building ClickHouse
The building of ClickHouse is supported on Linux, FreeBSD and Mac OS X.
The building of ClickHouse is supported on Linux, FreeBSD and macOS.
If you use Windows, you need to create a virtual machine with Ubuntu. To start working with a virtual machine please install VirtualBox. You can download Ubuntu from the website: https://www.ubuntu.com/#download. Please create a virtual machine from the downloaded image (you should reserve at least 4GB of RAM for it). To run a command-line terminal in Ubuntu, please locate a program containing the word “terminal” in its name (gnome-terminal, konsole etc.) or just press Ctrl+Alt+T.
@ -194,7 +194,7 @@ In this case, ClickHouse will use config files located in the current directory.
To connect to ClickHouse with clickhouse-client in another terminal navigate to `ClickHouse/build/programs/` and run `./clickhouse client`.
If you get `Connection refused` message on Mac OS X or FreeBSD, try specifying host address 127.0.0.1:
If you get `Connection refused` message on macOS or FreeBSD, try specifying host address 127.0.0.1:
clickhouse client --host 127.0.0.1
@ -213,7 +213,7 @@ You can also run your custom-built ClickHouse binary with the config file from t
## IDE (Integrated Development Environment) {#ide-integrated-development-environment}
If you do not know which IDE to use, we recommend that you use CLion. CLion is commercial software, but it offers 30 days free trial period. It is also free of charge for students. CLion can be used both on Linux and on Mac OS X.
If you do not know which IDE to use, we recommend that you use CLion. CLion is commercial software, but it offers 30 days free trial period. It is also free of charge for students. CLion can be used both on Linux and on macOS.
KDevelop and QTCreator are other great alternatives of an IDE for developing ClickHouse. KDevelop comes in as a very handy IDE although unstable. If KDevelop crashes after a while upon opening project, you should click “Stop All” button as soon as it has opened the list of projects files. After doing so KDevelop should be fine to work with.

View File

@ -139,7 +139,7 @@ If the system clickhouse-server is already running and you do not want to stop i
Build tests allow to check that build is not broken on various alternative configurations and on some foreign systems. These tests are automated as well.
Examples:
- cross-compile for Darwin x86_64 (Mac OS X)
- cross-compile for Darwin x86_64 (macOS)
- cross-compile for FreeBSD x86_64
- cross-compile for Linux AArch64
- build on Ubuntu with libraries from system packages (discouraged)

View File

@ -9,7 +9,7 @@ slug: /en/install
You have three options for getting up and running with ClickHouse:
- **[ClickHouse Cloud](https://clickhouse.com/cloud/):** The official ClickHouse as a service, - built by, maintained and supported by the creators of ClickHouse
- **[Self-managed ClickHouse](#self-managed-install):** ClickHouse can run on any Linux, FreeBSD, or Mac OS X with x86-64, ARM, or PowerPC64LE CPU architecture
- **[Self-managed ClickHouse](#self-managed-install):** ClickHouse can run on any Linux, FreeBSD, or macOS with x86-64, ARM, or PowerPC64LE CPU architecture
- **[Docker Image](https://hub.docker.com/r/clickhouse/clickhouse-server/):** Read the guide with the official image in Docker Hub
## ClickHouse Cloud
@ -257,7 +257,7 @@ To run ClickHouse inside Docker follow the guide on [Docker Hub](https://hub.doc
### From Sources {#from-sources}
To manually compile ClickHouse, follow the instructions for [Linux](/docs/en/development/build.md) or [Mac OS X](/docs/en/development/build-osx.md).
To manually compile ClickHouse, follow the instructions for [Linux](/docs/en/development/build.md) or [macOS](/docs/en/development/build-osx.md).
You can compile packages and install them or use programs without installing packages.
@ -352,7 +352,7 @@ To continue experimenting, you can download one of the test data sets or go thro
## Recommendations for Self-Managed ClickHouse
ClickHouse can run on any Linux, FreeBSD, or Mac OS X with x86-64, ARM, or PowerPC64LE CPU architecture.
ClickHouse can run on any Linux, FreeBSD, or macOS with x86-64, ARM, or PowerPC64LE CPU architecture.
ClickHouse uses all hardware resources available to process data.

View File

@ -890,7 +890,7 @@ The maximum number of open files.
By default: `maximum`.
We recommend using this option in Mac OS X since the `getrlimit()` function returns an incorrect value.
We recommend using this option in macOS since the `getrlimit()` function returns an incorrect value.
**Example**

View File

@ -3447,13 +3447,45 @@ Default value: 2.
## compatibility {#compatibility}
This setting changes other settings according to provided ClickHouse version.
If a behaviour in ClickHouse was changed by using a different default value for some setting, this compatibility setting allows you to use default values from previous versions for all the settings that were not set by the user.
The `compatibility` setting causes ClickHouse to use the default settings of a previous version of ClickHouse, where the previous version is provided as the setting.
This setting takes ClickHouse version number as a string, like `21.3`, `21.8`. Empty value means that this setting is disabled.
If settings are set to non-default values, then those settings are honored (only settings that have not been modified are affected by the `compatibility` setting).
This setting takes a ClickHouse version number as a string, like `22.3`, `22.8`. An empty value means that this setting is disabled.
Disabled by default.
:::note
In ClickHouse Cloud the compatibility setting must be set by ClickHouse Cloud support. Please [open a case](https://clickhouse.cloud/support) to have it set.
:::
## allow_settings_after_format_in_insert {#allow_settings_after_format_in_insert}
Control whether `SETTINGS` after `FORMAT` in `INSERT` queries is allowed or not. It is not recommended to use this, since this may interpret part of `SETTINGS` as values.
Example:
```sql
INSERT INTO FUNCTION null('foo String') SETTINGS max_threads=1 VALUES ('bar');
```
But the following query will work only with `allow_settings_after_format_in_insert`:
```sql
SET allow_settings_after_format_in_insert=1;
INSERT INTO FUNCTION null('foo String') VALUES ('bar') SETTINGS max_threads=1;
```
Possible values:
- 0 — Disallow.
- 1 — Allow.
Default value: `0`.
!!! note "Warning"
Use this setting only for backward compatibility if your use cases depend on old syntax.
# Format settings {#format-settings}
## input_format_skip_unknown_fields {#input_format_skip_unknown_fields}

View File

@ -1104,6 +1104,7 @@ Using replacement fields, you can define a pattern for the resulting string. “
| %d | day of the month, zero-padded (01-31) | 02 |
| %D | Short MM/DD/YY date, equivalent to %m/%d/%y | 01/02/18 |
| %e | day of the month, space-padded ( 1-31) |   2 |
| %f | fractional second from the fractional part of DateTime64 | 1234560 |
| %F | short YYYY-MM-DD date, equivalent to %Y-%m-%d | 2018-01-02 |
| %G | four-digit year format for ISO week number, calculated from the week-based year [defined by the ISO 8601](https://en.wikipedia.org/wiki/ISO_8601#Week_dates) standard, normally useful only with %V | 2018 |
| %g | two-digit year format, aligned to ISO 8601, abbreviated from four-digit notation | 18 |
@ -1143,6 +1144,20 @@ Result:
└────────────────────────────────────────────┘
```
Query:
``` sql
SELECT formatDateTime(toDateTime64('2010-01-04 12:34:56.123456', 7), '%f')
```
Result:
```
┌─formatDateTime(toDateTime64('2010-01-04 12:34:56.123456', 7), '%f')─┐
│ 1234560 │
└─────────────────────────────────────────────────────────────────────┘
```
## dateName
Returns specified part of date.

View File

@ -595,9 +595,9 @@ SELECT xxHash64('')
**Returned value**
A `Uint32` or `Uint64` data type hash value.
A `UInt32` or `UInt64` data type hash value.
Type: `xxHash`.
Type: `UInt32` for `xxHash32` and `UInt64` for `xxHash64`.
**Example**

View File

@ -21,12 +21,11 @@ Subquery is another `SELECT` query that may be specified in parenthesis inside `
When `FINAL` is specified, ClickHouse fully merges the data before returning the result and thus performs all data transformations that happen during merges for the given table engine.
It is applicable when selecting data from tables that use the [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md)-engine family. Also supported for:
It is applicable when selecting data from ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, CollapsingMergeTree and VersionedCollapsingMergeTree tables.
- [Replicated](../../../engines/table-engines/mergetree-family/replication.md) versions of `MergeTree` engines.
- [View](../../../engines/table-engines/special/view.md), [Buffer](../../../engines/table-engines/special/buffer.md), [Distributed](../../../engines/table-engines/special/distributed.md), and [MaterializedView](../../../engines/table-engines/special/materializedview.md) engines that operate over other engines, provided they were created over `MergeTree`-engine tables.
`SELECT` queries with `FINAL` are executed in parallel. The [max_final_threads](../../../operations/settings/settings.md#max-final-threads) setting limits the number of threads used.
Now `SELECT` queries with `FINAL` are executed in parallel and slightly faster. But there are drawbacks (see below). The [max_final_threads](../../../operations/settings/settings.md#max-final-threads) setting limits the number of threads used.
There are drawbacks to using `FINAL` (see below).
### Drawbacks

View File

@ -91,7 +91,8 @@ bool SortNode::isEqualImpl(const IQueryTreeNode & rhs) const
void SortNode::updateTreeHashImpl(HashState & hash_state) const
{
hash_state.update(sort_direction);
hash_state.update(nulls_sort_direction);
/// use some determined value if `nulls_sort_direction` is `nullopt`
hash_state.update(nulls_sort_direction.value_or(sort_direction));
hash_state.update(with_fill);
if (collator)

View File

@ -0,0 +1,76 @@
#pragma once
#include <Common/hex.h>
namespace DB
{
static void inline hexStringDecode(const char * pos, const char * end, char *& out, size_t word_size = 2)
{
if ((end - pos) & 1)
{
*out = unhex(*pos);
++out;
++pos;
}
while (pos < end)
{
*out = unhex2(pos);
pos += word_size;
++out;
}
*out = '\0';
++out;
}
static void inline binStringDecode(const char * pos, const char * end, char *& out)
{
if (pos == end)
{
*out = '\0';
++out;
return;
}
UInt8 left = 0;
/// end - pos is the length of input.
/// (length & 7) to make remain bits length mod 8 is zero to split.
/// e.g. the length is 9 and the input is "101000001",
/// first left_cnt is 1, left is 0, right shift, pos is 1, left = 1
/// then, left_cnt is 0, remain input is '01000001'.
for (UInt8 left_cnt = (end - pos) & 7; left_cnt > 0; --left_cnt)
{
left = left << 1;
if (*pos != '0')
left += 1;
++pos;
}
if (left != 0 || end - pos == 0)
{
*out = left;
++out;
}
assert((end - pos) % 8 == 0);
while (end - pos != 0)
{
UInt8 c = 0;
for (UInt8 i = 0; i < 8; ++i)
{
c = c << 1;
if (*pos != '0')
c += 1;
++pos;
}
*out = c;
++out;
}
*out = '\0';
++out;
}
}

View File

@ -240,24 +240,52 @@ TEST(Common, RWLockPerfTestReaders)
for (auto pool_size : pool_sizes)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
auto func = [&] ()
auto func = [&] ()
{
for (auto i = 0; i < cycles; ++i)
{
for (auto i = 0; i < cycles; ++i)
{
auto lock = fifo_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY);
}
};
auto lock = fifo_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY);
}
};
std::list<std::thread> threads;
for (size_t thread = 0; thread < pool_size; ++thread)
threads.emplace_back(func);
std::list<std::thread> threads;
for (size_t thread = 0; thread < pool_size; ++thread)
threads.emplace_back(func);
for (auto & thread : threads)
thread.join();
for (auto & thread : threads)
thread.join();
auto total_time = watch.elapsedSeconds();
std::cout << "Threads " << pool_size << ", total_time " << std::setprecision(2) << total_time << "\n";
auto total_time = watch.elapsedSeconds();
std::cout << "Threads " << pool_size << ", total_time " << std::setprecision(2) << total_time << "\n";
}
}
TEST(Common, RWLockNotUpgradeableWithNoQuery)
{
updatePHDRCache();
static auto rw_lock = RWLockImpl::create();
std::thread read_thread([&] ()
{
auto lock = rw_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY, std::chrono::duration<int, std::milli>(50000));
auto sleep_for = std::chrono::duration<int, std::milli>(5000);
std::this_thread::sleep_for(sleep_for);
});
{
auto sleep_for = std::chrono::duration<int, std::milli>(500);
std::this_thread::sleep_for(sleep_for);
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
auto get_lock = rw_lock->getLock(RWLockImpl::Write, RWLockImpl::NO_QUERY, std::chrono::duration<int, std::milli>(50000));
EXPECT_NE(get_lock.get(), nullptr);
/// It took some time
EXPECT_GT(watch.elapsedMilliseconds(), 3000);
}
read_thread.join();
}

View File

@ -620,6 +620,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, enable_filesystem_cache_on_lower_level, true, "If read buffer supports caching inside threadpool, allow it to do it, otherwise cache outside ot threadpool. Do not use this setting, it is needed for testing", 0) \
M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \
M(UInt64, max_query_cache_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be used by a single query", 0) \
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
\
M(Bool, load_marks_asynchronously, false, "Load MergeTree marks asynchronously", 0) \
\

View File

@ -1025,9 +1025,6 @@ void BaseDaemon::setupWatchdog()
#if defined(OS_LINUX)
if (0 != prctl(PR_SET_PDEATHSIG, SIGKILL))
logger().warning("Cannot do prctl to ask termination with parent.");
if (getppid() == 1)
throw Poco::Exception("Parent watchdog process has exited.");
#endif
{

View File

@ -256,6 +256,9 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool, LoadingStrictness
auto startup_one_table = [&](const StoragePtr & table)
{
/// Since startup() method can use physical paths on disk we don't allow any exclusive actions (rename, drop so on)
/// until startup finished.
auto table_lock_holder = table->lockForShare(RWLockImpl::NO_QUERY, getContext()->getSettingsRef().lock_acquire_timeout);
table->startup();
logAboutProgress(log, ++tables_processed, total_tables, watch);
};

View File

@ -44,10 +44,10 @@ FileSegmentRangeWriter::FileSegmentRangeWriter(
const String & source_path_)
: cache(cache_)
, key(key_)
, log(&Poco::Logger::get("FileSegmentRangeWriter"))
, cache_log(cache_log_)
, query_id(query_id_)
, source_path(source_path_)
, current_file_segment_it(file_segments_holder.file_segments.end())
{
}
@ -56,69 +56,68 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
if (finalized)
return false;
if (expected_write_offset != offset)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot write file segment at offset {}, because expected write offset is: {}",
offset, expected_write_offset);
}
auto & file_segments = file_segments_holder.file_segments;
if (current_file_segment_it == file_segments.end())
if (file_segments.empty() || file_segments.back()->isDownloaded())
{
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
else
{
auto file_segment = *current_file_segment_it;
assert(file_segment->getCurrentWriteOffset() == current_file_segment_write_offset);
if (current_file_segment_write_offset != offset)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot write file segment at offset {}, because current write offset is: {}",
offset, current_file_segment_write_offset);
}
if (file_segment->range().size() == file_segment->getDownloadedSize())
{
completeFileSegment(*file_segment);
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
allocateFileSegment(expected_write_offset, is_persistent);
}
auto & file_segment = *current_file_segment_it;
auto downloader = file_segment->getOrSetDownloader();
if (downloader != FileSegment::getCallerId())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set a downloader. ({})", file_segment->getInfoForLog());
auto & file_segment = file_segments.back();
SCOPE_EXIT({
if (file_segment->isDownloader())
file_segment->completePartAndResetDownloader();
if (file_segments.back()->isDownloader())
file_segments.back()->completePartAndResetDownloader();
});
bool reserved = file_segment->reserve(size);
if (!reserved)
while (size > 0)
{
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
appendFilesystemCacheLog(*file_segment);
size_t available_size = file_segment->range().size() - file_segment->getDownloadedSize();
if (available_size == 0)
{
completeFileSegment(*file_segment);
file_segment = allocateFileSegment(expected_write_offset, is_persistent);
continue;
}
LOG_DEBUG(
&Poco::Logger::get("FileSegmentRangeWriter"),
"Unsuccessful space reservation attempt (size: {}, file segment info: {}",
size, file_segment->getInfoForLog());
if (!file_segment->isDownloader()
&& file_segment->getOrSetDownloader() != FileSegment::getCallerId())
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Failed to set a downloader. ({})", file_segment->getInfoForLog());
}
return false;
}
size_t size_to_write = std::min(available_size, size);
try
{
file_segment->write(data, size, offset);
}
catch (...)
{
bool reserved = file_segment->reserve(size_to_write);
if (!reserved)
{
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
appendFilesystemCacheLog(*file_segment);
LOG_DEBUG(
log, "Failed to reserve space in cache (size: {}, file segment info: {}",
size, file_segment->getInfoForLog());
return false;
}
file_segment->write(data, size_to_write, offset);
file_segment->completePartAndResetDownloader();
throw;
}
file_segment->completePartAndResetDownloader();
current_file_segment_write_offset += size;
size -= size_to_write;
expected_write_offset += size_to_write;
offset += size_to_write;
data += size_to_write;
}
return true;
}
@ -129,10 +128,10 @@ void FileSegmentRangeWriter::finalize()
return;
auto & file_segments = file_segments_holder.file_segments;
if (file_segments.empty() || current_file_segment_it == file_segments.end())
if (file_segments.empty())
return;
completeFileSegment(**current_file_segment_it);
completeFileSegment(*file_segments.back());
finalized = true;
}
@ -149,7 +148,7 @@ FileSegmentRangeWriter::~FileSegmentRangeWriter()
}
}
FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
FileSegmentPtr & FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
{
/**
* Allocate a new file segment starting `offset`.
@ -168,7 +167,8 @@ FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset
auto file_segment = cache->createFileSegmentForDownload(
key, offset, cache->max_file_segment_size, create_settings, cache_lock);
return file_segments_holder.add(std::move(file_segment));
auto & file_segments = file_segments_holder.file_segments;
return *file_segments.insert(file_segments.end(), file_segment);
}
void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_segment)
@ -199,7 +199,7 @@ void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_s
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
{
/// File segment can be detached if space reservation failed.
if (file_segment.isDetached())
if (file_segment.isDetached() || file_segment.isCompleted())
return;
file_segment.completeWithoutState();
@ -223,6 +223,7 @@ CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
, is_persistent_cache_file(is_persistent_cache_file_)
, query_id(query_id_)
, enable_cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log)
, throw_on_error_from_cache(settings_.throw_on_error_from_cache)
{
}
@ -246,11 +247,11 @@ void CachedOnDiskWriteBufferFromFile::nextImpl()
}
/// Write data to cache.
cacheData(working_buffer.begin(), size);
cacheData(working_buffer.begin(), size, throw_on_error_from_cache);
current_download_offset += size;
}
void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size)
void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool throw_on_error)
{
if (cache_in_error_state_or_disabled)
return;
@ -285,11 +286,17 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size)
return;
}
if (throw_on_error)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
return;
}
catch (...)
{
if (throw_on_error)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
return;
}

View File

@ -39,7 +39,7 @@ public:
~FileSegmentRangeWriter();
private:
FileSegments::iterator allocateFileSegment(size_t offset, bool is_persistent);
FileSegmentPtr & allocateFileSegment(size_t offset, bool is_persistent);
void appendFilesystemCacheLog(const FileSegment & file_segment);
@ -48,14 +48,14 @@ private:
FileCache * cache;
FileSegment::Key key;
Poco::Logger * log;
std::shared_ptr<FilesystemCacheLog> cache_log;
String query_id;
String source_path;
FileSegmentsHolder file_segments_holder{};
FileSegments::iterator current_file_segment_it;
size_t current_file_segment_write_offset = 0;
size_t expected_write_offset = 0;
bool finalized = false;
};
@ -81,7 +81,7 @@ public:
void finalizeImpl() override;
private:
void cacheData(char * data, size_t size);
void cacheData(char * data, size_t size, bool throw_on_error);
Poco::Logger * log;
@ -95,6 +95,7 @@ private:
bool enable_cache_log;
bool throw_on_error_from_cache;
bool cache_in_error_state_or_disabled = false;
std::unique_ptr<FileSegmentRangeWriter> cache_writer;

View File

@ -7,6 +7,8 @@ namespace DB
{
static const uint8_t BSON_DOCUMENT_END = 0x00;
static const size_t BSON_OBJECT_ID_SIZE = 12;
static const size_t BSON_DB_POINTER_SIZE = 12;
using BSONSizeT = uint32_t;
static const BSONSizeT MAX_BSON_SIZE = std::numeric_limits<BSONSizeT>::max();

View File

@ -32,6 +32,16 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
std::pair<String, String> splitCapnProtoFieldName(const String & name)
{
const auto * begin = name.data();
const auto * end = name.data() + name.size();
const auto * it = find_first_symbols<'_', '.'>(begin, end);
String first = String(begin, it);
String second = it == end ? "" : String(it + 1, end);
return {first, second};
}
capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info)
{
capnp::ParsedSchema schema;
@ -201,9 +211,9 @@ static bool checkEnums(const capnp::Type & capnp_type, const DataTypePtr column_
return result;
}
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message);
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name);
static bool checkNullableType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
static bool checkNullableType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name)
{
if (!capnp_type.isStruct())
return false;
@ -222,9 +232,9 @@ static bool checkNullableType(const capnp::Type & capnp_type, const DataTypePtr
auto nested_type = assert_cast<const DataTypeNullable *>(data_type.get())->getNestedType();
if (first.getType().isVoid())
return checkCapnProtoType(second.getType(), nested_type, mode, error_message);
return checkCapnProtoType(second.getType(), nested_type, mode, error_message, column_name);
if (second.getType().isVoid())
return checkCapnProtoType(first.getType(), nested_type, mode, error_message);
return checkCapnProtoType(first.getType(), nested_type, mode, error_message, column_name);
return false;
}
@ -260,7 +270,7 @@ static bool checkTupleType(const capnp::Type & capnp_type, const DataTypePtr & d
{
KJ_IF_MAYBE(field, struct_schema.findFieldByName(name))
{
if (!checkCapnProtoType(field->getType(), nested_types[tuple_data_type->getPositionByName(name)], mode, error_message))
if (!checkCapnProtoType(field->getType(), nested_types[tuple_data_type->getPositionByName(name)], mode, error_message, name))
return false;
}
else
@ -273,16 +283,28 @@ static bool checkTupleType(const capnp::Type & capnp_type, const DataTypePtr & d
return true;
}
static bool checkArrayType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
static bool checkArrayType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name)
{
if (!capnp_type.isList())
return false;
auto list_schema = capnp_type.asList();
auto nested_type = assert_cast<const DataTypeArray *>(data_type.get())->getNestedType();
return checkCapnProtoType(list_schema.getElementType(), nested_type, mode, error_message);
auto [field_name, nested_name] = splitCapnProtoFieldName(column_name);
if (!nested_name.empty() && list_schema.getElementType().isStruct())
{
auto struct_schema = list_schema.getElementType().asStruct();
KJ_IF_MAYBE(field, struct_schema.findFieldByName(nested_name))
return checkCapnProtoType(field->getType(), nested_type, mode, error_message, nested_name);
error_message += "Element type of List {} doesn't contain field with name " + nested_name;
return false;
}
return checkCapnProtoType(list_schema.getElementType(), nested_type, mode, error_message, column_name);
}
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message, const String & column_name)
{
switch (data_type->getTypeId())
{
@ -301,9 +323,11 @@ static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr
case TypeIndex::Int16:
return capnp_type.isInt16();
case TypeIndex::Date32: [[fallthrough]];
case TypeIndex::Decimal32: [[fallthrough]];
case TypeIndex::Int32:
return capnp_type.isInt32();
case TypeIndex::DateTime64: [[fallthrough]];
case TypeIndex::Decimal64: [[fallthrough]];
case TypeIndex::Int64:
return capnp_type.isInt64();
case TypeIndex::Float32:
@ -318,15 +342,15 @@ static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr
return checkTupleType(capnp_type, data_type, mode, error_message);
case TypeIndex::Nullable:
{
auto result = checkNullableType(capnp_type, data_type, mode, error_message);
auto result = checkNullableType(capnp_type, data_type, mode, error_message, column_name);
if (!result)
error_message += "Nullable can be represented only as a named union of type Void and nested type";
return result;
}
case TypeIndex::Array:
return checkArrayType(capnp_type, data_type, mode, error_message);
return checkArrayType(capnp_type, data_type, mode, error_message, column_name);
case TypeIndex::LowCardinality:
return checkCapnProtoType(capnp_type, assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType(), mode, error_message);
return checkCapnProtoType(capnp_type, assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType(), mode, error_message, column_name);
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
return capnp_type.isText() || capnp_type.isData();
@ -335,19 +359,9 @@ static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr
}
}
static std::pair<String, String> splitFieldName(const String & name)
{
const auto * begin = name.data();
const auto * end = name.data() + name.size();
const auto * it = find_first_symbols<'_', '.'>(begin, end);
String first = String(begin, it);
String second = it == end ? "" : String(it + 1, end);
return {first, second};
}
capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
auto [field_name, nested_name] = splitCapnProtoFieldName(name);
KJ_IF_MAYBE(field, struct_reader.getSchema().findFieldByName(field_name))
{
capnp::DynamicValue::Reader field_reader;
@ -363,6 +377,20 @@ capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Re
if (nested_name.empty())
return field_reader;
/// Support reading Nested as List of Structs.
if (field_reader.getType() == capnp::DynamicValue::LIST)
{
auto list_schema = field->getType().asList();
if (!list_schema.getElementType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Element type of List {} is not a struct", field_name);
auto struct_schema = list_schema.getElementType().asStruct();
KJ_IF_MAYBE(nested_field, struct_schema.findFieldByName(nested_name))
return field_reader;
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Element type of List {} doesn't contain field with name \"{}\"", field_name, nested_name);
}
if (field_reader.getType() != capnp::DynamicValue::STRUCT)
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
@ -374,13 +402,28 @@ capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Re
std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
auto [field_name, nested_name] = splitCapnProtoFieldName(name);
KJ_IF_MAYBE(field, struct_builder.getSchema().findFieldByName(field_name))
{
if (nested_name.empty())
return {struct_builder, *field};
auto field_builder = struct_builder.get(*field);
/// Support reading Nested as List of Structs.
if (field_builder.getType() == capnp::DynamicValue::LIST)
{
auto list_schema = field->getType().asList();
if (!list_schema.getElementType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Element type of List {} is not a struct", field_name);
auto struct_schema = list_schema.getElementType().asStruct();
KJ_IF_MAYBE(nested_field, struct_schema.findFieldByName(nested_name))
return {struct_builder, *field};
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Element type of List {} doesn't contain field with name \"{}\"", field_name, nested_name);
}
if (field_builder.getType() != capnp::DynamicValue::STRUCT)
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
@ -390,13 +433,27 @@ std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBu
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name);
}
static capnp::StructSchema::Field getFieldByName(const capnp::StructSchema & schema, const String & name)
static std::pair<capnp::StructSchema::Field, String> getFieldByName(const capnp::StructSchema & schema, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
auto [field_name, nested_name] = splitCapnProtoFieldName(name);
KJ_IF_MAYBE(field, schema.findFieldByName(field_name))
{
if (nested_name.empty())
return *field;
return {*field, name};
/// Support reading Nested as List of Structs.
if (field->getType().isList())
{
auto list_schema = field->getType().asList();
if (!list_schema.getElementType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Element type of List {} is not a struct", field_name);
auto struct_schema = list_schema.getElementType().asStruct();
KJ_IF_MAYBE(nested_field, struct_schema.findFieldByName(nested_name))
return {*field, name};
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Element type of List {} doesn't contain field with name \"{}\"", field_name, nested_name);
}
if (!field->getType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
@ -416,8 +473,8 @@ void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Blo
String additional_error_message;
for (auto & [name, type] : names_and_types)
{
auto field = getFieldByName(schema, name);
if (!checkCapnProtoType(field.getType(), type, mode, additional_error_message))
auto [field, field_name] = getFieldByName(schema, name);
if (!checkCapnProtoType(field.getType(), type, mode, additional_error_message, field_name))
{
auto e = Exception(
ErrorCodes::CAPN_PROTO_BAD_CAST,

View File

@ -30,6 +30,8 @@ public:
capnp::StructSchema getMessageSchema(const FormatSchemaInfo & schema_info);
};
std::pair<String, String> splitCapnProtoFieldName(const String & name);
bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode);
std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name);

View File

@ -685,37 +685,27 @@ public:
}
else if constexpr (std::is_same_v<ResultDataType, DataTypeDateTime64>)
{
if (typeid_cast<const DataTypeDateTime64 *>(arguments[0].type.get()))
static constexpr auto target_scale = std::invoke(
[]() -> std::optional<UInt32>
{
if constexpr (std::is_base_of_v<AddNanosecondsImpl, Transform>)
return 9;
else if constexpr (std::is_base_of_v<AddMicrosecondsImpl, Transform>)
return 6;
else if constexpr (std::is_base_of_v<AddMillisecondsImpl, Transform>)
return 3;
return {};
});
auto timezone = extractTimeZoneNameFromFunctionArguments(arguments, 2, 0);
if (const auto* datetime64_type = typeid_cast<const DataTypeDateTime64 *>(arguments[0].type.get()))
{
const auto & datetime64_type = assert_cast<const DataTypeDateTime64 &>(*arguments[0].type);
auto from_scale = datetime64_type.getScale();
auto scale = from_scale;
if (std::is_same_v<Transform, AddNanosecondsImpl>)
scale = 9;
else if (std::is_same_v<Transform, AddMicrosecondsImpl>)
scale = 6;
else if (std::is_same_v<Transform, AddMillisecondsImpl>)
scale = 3;
scale = std::max(scale, from_scale);
return std::make_shared<DataTypeDateTime64>(scale, extractTimeZoneNameFromFunctionArguments(arguments, 2, 0));
const auto from_scale = datetime64_type->getScale();
return std::make_shared<DataTypeDateTime64>(std::max(from_scale, target_scale.value_or(from_scale)), std::move(timezone));
}
else
{
auto scale = DataTypeDateTime64::default_scale;
if (std::is_same_v<Transform, AddNanosecondsImpl>)
scale = 9;
else if (std::is_same_v<Transform, AddMicrosecondsImpl>)
scale = 6;
else if (std::is_same_v<Transform, AddMillisecondsImpl>)
scale = 3;
return std::make_shared<DataTypeDateTime64>(scale, extractTimeZoneNameFromFunctionArguments(arguments, 2, 0));
}
return std::make_shared<DataTypeDateTime64>(target_scale.value_or(DataTypeDateTime64::default_scale), std::move(timezone));
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected result type in datetime add interval function");

View File

@ -4,7 +4,7 @@
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <Common/BitHelpers.h>
#include <Common/hex.h>
#include <Common/BinStringDecodeHelper.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
@ -126,20 +126,7 @@ struct UnhexImpl
static void decode(const char * pos, const char * end, char *& out)
{
if ((end - pos) & 1)
{
*out = unhex(*pos);
++out;
++pos;
}
while (pos < end)
{
*out = unhex2(pos);
pos += word_size;
++out;
}
*out = '\0';
++out;
hexStringDecode(pos, end, out, word_size);
}
};
@ -233,52 +220,7 @@ struct UnbinImpl
static void decode(const char * pos, const char * end, char *& out)
{
if (pos == end)
{
*out = '\0';
++out;
return;
}
UInt8 left = 0;
/// end - pos is the length of input.
/// (length & 7) to make remain bits length mod 8 is zero to split.
/// e.g. the length is 9 and the input is "101000001",
/// first left_cnt is 1, left is 0, right shift, pos is 1, left = 1
/// then, left_cnt is 0, remain input is '01000001'.
for (UInt8 left_cnt = (end - pos) & 7; left_cnt > 0; --left_cnt)
{
left = left << 1;
if (*pos != '0')
left += 1;
++pos;
}
if (left != 0 || end - pos == 0)
{
*out = left;
++out;
}
assert((end - pos) % 8 == 0);
while (end - pos != 0)
{
UInt8 c = 0;
for (UInt8 i = 0; i < 8; ++i)
{
c = c << 1;
if (*pos != '0')
c += 1;
++pos;
}
*out = c;
++out;
}
*out = '\0';
++out;
binStringDecode(pos, end, out);
}
};

View File

@ -2,6 +2,7 @@
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <Interpreters/ArrayJoinAction.h>
namespace DB
@ -52,11 +53,11 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
const DataTypeArray * arr = checkAndGetDataType<DataTypeArray>(arguments[0].get());
const auto & arr = getArrayJoinDataType(arguments[0]);
if (!arr)
throw Exception("Argument for function " + getName() + " must be Array.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception("Argument for function " + getName() + " must be Array or Map", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return arr->getNestedType();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t /*input_rows_count*/) const override

View File

@ -48,7 +48,6 @@ template <> struct ActionValueTypeMap<DataTypeUInt64> { using ActionValueTyp
template <> struct ActionValueTypeMap<DataTypeDate> { using ActionValueType = UInt16; };
template <> struct ActionValueTypeMap<DataTypeDate32> { using ActionValueType = Int32; };
template <> struct ActionValueTypeMap<DataTypeDateTime> { using ActionValueType = UInt32; };
// TODO(vnemkov): to add sub-second format instruction, make that DateTime64 and do some math in Action<T>.
template <> struct ActionValueTypeMap<DataTypeDateTime64> { using ActionValueType = Int64; };
@ -113,16 +112,16 @@ private:
class Action
{
public:
using Func = void (*)(char *, Time, const DateLUTImpl &);
using Func = void (*)(char *, Time, UInt64, UInt32, const DateLUTImpl &);
Func func;
size_t shift;
explicit Action(Func func_, size_t shift_ = 0) : func(func_), shift(shift_) {}
void perform(char *& target, Time source, const DateLUTImpl & timezone)
void perform(char *& target, Time source, UInt64 fractional_second, UInt32 scale, const DateLUTImpl & timezone)
{
func(target, source, timezone);
func(target, source, fractional_second, scale, timezone);
target += shift;
}
@ -148,30 +147,30 @@ private:
}
public:
static void noop(char *, Time, const DateLUTImpl &)
static void noop(char *, Time, UInt64 , UInt32 , const DateLUTImpl &)
{
}
static void century(char * target, Time source, const DateLUTImpl & timezone)
static void century(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
auto year = ToYearImpl::execute(source, timezone);
auto century = year / 100;
writeNumber2(target, century);
}
static void dayOfMonth(char * target, Time source, const DateLUTImpl & timezone)
static void dayOfMonth(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
writeNumber2(target, ToDayOfMonthImpl::execute(source, timezone));
}
static void americanDate(char * target, Time source, const DateLUTImpl & timezone)
static void americanDate(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
writeNumber2(target, ToMonthImpl::execute(source, timezone));
writeNumber2(target + 3, ToDayOfMonthImpl::execute(source, timezone));
writeNumber2(target + 6, ToYearImpl::execute(source, timezone) % 100);
}
static void dayOfMonthSpacePadded(char * target, Time source, const DateLUTImpl & timezone)
static void dayOfMonthSpacePadded(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
auto day = ToDayOfMonthImpl::execute(source, timezone);
if (day < 10)
@ -180,101 +179,107 @@ private:
writeNumber2(target, day);
}
static void ISO8601Date(char * target, Time source, const DateLUTImpl & timezone) // NOLINT
static void ISO8601Date(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone) // NOLINT
{
writeNumber4(target, ToYearImpl::execute(source, timezone));
writeNumber2(target + 5, ToMonthImpl::execute(source, timezone));
writeNumber2(target + 8, ToDayOfMonthImpl::execute(source, timezone));
}
static void dayOfYear(char * target, Time source, const DateLUTImpl & timezone)
static void dayOfYear(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
writeNumber3(target, ToDayOfYearImpl::execute(source, timezone));
}
static void month(char * target, Time source, const DateLUTImpl & timezone)
static void month(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
writeNumber2(target, ToMonthImpl::execute(source, timezone));
}
static void dayOfWeek(char * target, Time source, const DateLUTImpl & timezone)
static void dayOfWeek(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
*target += ToDayOfWeekImpl::execute(source, timezone);
}
static void dayOfWeek0To6(char * target, Time source, const DateLUTImpl & timezone)
static void dayOfWeek0To6(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
auto day = ToDayOfWeekImpl::execute(source, timezone);
*target += (day == 7 ? 0 : day);
}
static void ISO8601Week(char * target, Time source, const DateLUTImpl & timezone) // NOLINT
static void ISO8601Week(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone) // NOLINT
{
writeNumber2(target, ToISOWeekImpl::execute(source, timezone));
}
static void ISO8601Year2(char * target, Time source, const DateLUTImpl & timezone) // NOLINT
static void ISO8601Year2(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone) // NOLINT
{
writeNumber2(target, ToISOYearImpl::execute(source, timezone) % 100);
}
static void ISO8601Year4(char * target, Time source, const DateLUTImpl & timezone) // NOLINT
static void ISO8601Year4(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone) // NOLINT
{
writeNumber4(target, ToISOYearImpl::execute(source, timezone));
}
static void year2(char * target, Time source, const DateLUTImpl & timezone)
static void year2(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
writeNumber2(target, ToYearImpl::execute(source, timezone) % 100);
}
static void year4(char * target, Time source, const DateLUTImpl & timezone)
static void year4(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
writeNumber4(target, ToYearImpl::execute(source, timezone));
}
static void hour24(char * target, Time source, const DateLUTImpl & timezone)
static void hour24(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
writeNumber2(target, ToHourImpl::execute(source, timezone));
}
static void hour12(char * target, Time source, const DateLUTImpl & timezone)
static void hour12(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
auto x = ToHourImpl::execute(source, timezone);
writeNumber2(target, x == 0 ? 12 : (x > 12 ? x - 12 : x));
}
static void minute(char * target, Time source, const DateLUTImpl & timezone)
static void minute(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
writeNumber2(target, ToMinuteImpl::execute(source, timezone));
}
static void AMPM(char * target, Time source, const DateLUTImpl & timezone) // NOLINT
static void AMPM(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone) // NOLINT
{
auto hour = ToHourImpl::execute(source, timezone);
if (hour >= 12)
*target = 'P';
}
static void hhmm24(char * target, Time source, const DateLUTImpl & timezone)
static void hhmm24(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
writeNumber2(target, ToHourImpl::execute(source, timezone));
writeNumber2(target + 3, ToMinuteImpl::execute(source, timezone));
}
static void second(char * target, Time source, const DateLUTImpl & timezone)
static void second(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
writeNumber2(target, ToSecondImpl::execute(source, timezone));
}
static void ISO8601Time(char * target, Time source, const DateLUTImpl & timezone) // NOLINT
static void fractionalSecond(char * target, Time /*source*/, UInt64 fractional_second, UInt32 scale, const DateLUTImpl & /*timezone*/)
{
for (Int64 i = scale, value = fractional_second; i > 0; --i, value /= 10)
target[i - 1] += value % 10;
}
static void ISO8601Time(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone) // NOLINT
{
writeNumber2(target, ToHourImpl::execute(source, timezone));
writeNumber2(target + 3, ToMinuteImpl::execute(source, timezone));
writeNumber2(target + 6, ToSecondImpl::execute(source, timezone));
}
static void timezoneOffset(char * target, Time source, const DateLUTImpl & timezone)
static void timezoneOffset(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
auto offset = TimezoneOffsetImpl::execute(source, timezone);
if (offset < 0)
@ -287,7 +292,7 @@ private:
writeNumber2(target + 3, offset % 3600 / 60);
}
static void quarter(char * target, Time source, const DateLUTImpl & timezone)
static void quarter(char * target, Time source, UInt64 /*fractional_second*/, UInt32 /*scale*/, const DateLUTImpl & timezone)
{
*target += ToQuarterImpl::execute(source, timezone);
}
@ -426,9 +431,15 @@ public:
String pattern = pattern_column->getValue<String>();
UInt32 scale [[maybe_unused]] = 0;
if constexpr (std::is_same_v<DataType, DataTypeDateTime64>)
{
scale = times->getScale();
}
using T = typename ActionValueTypeMap<DataType>::ActionValueType;
std::vector<Action<T>> instructions;
String pattern_to_fill = parsePattern(pattern, instructions);
String pattern_to_fill = parsePattern(pattern, instructions, scale);
size_t result_size = pattern_to_fill.size();
const DateLUTImpl * time_zone_tmp = nullptr;
@ -444,12 +455,6 @@ public:
const DateLUTImpl & time_zone = *time_zone_tmp;
const auto & vec = times->getData();
UInt32 scale [[maybe_unused]] = 0;
if constexpr (std::is_same_v<DataType, DataTypeDateTime64>)
{
scale = times->getScale();
}
auto col_res = ColumnString::create();
auto & dst_data = col_res->getChars();
auto & dst_offsets = col_res->getOffsets();
@ -484,16 +489,16 @@ public:
{
if constexpr (std::is_same_v<DataType, DataTypeDateTime64>)
{
const auto c = DecimalUtils::split(vec[i], scale);
for (auto & instruction : instructions)
{
const auto c = DecimalUtils::split(vec[i], scale);
instruction.perform(pos, static_cast<Int64>(c.whole), time_zone);
instruction.perform(pos, static_cast<Int64>(c.whole), c.fractional, scale, time_zone);
}
}
else
{
for (auto & instruction : instructions)
instruction.perform(pos, static_cast<UInt32>(vec[i]), time_zone);
instruction.perform(pos, static_cast<UInt32>(vec[i]), 0, 0, time_zone);
}
dst_offsets[i] = pos - begin;
@ -504,7 +509,7 @@ public:
}
template <typename T>
String parsePattern(const String & pattern, std::vector<Action<T>> & instructions) const
String parsePattern(const String & pattern, std::vector<Action<T>> & instructions, UInt32 scale) const
{
String result;
@ -573,6 +578,16 @@ public:
result.append(" 0");
break;
// Fractional seconds
case 'f':
{
/// If the time data type has no fractional part, then we print '0' as the fractional part.
const auto actual_scale = std::max<UInt32>(1, scale);
instructions.emplace_back(&Action<T>::fractionalSecond, actual_scale);
result.append(actual_scale, '0');
break;
}
// Short YYYY-MM-DD date, equivalent to %Y-%m-%d 2001-08-23
case 'F':
instructions.emplace_back(&Action<T>::ISO8601Date, 10);

View File

@ -15,6 +15,8 @@ struct WriteSettings
bool enable_filesystem_cache_on_write_operations = false;
bool enable_filesystem_cache_log = false;
bool is_file_cache_persistent = false;
bool throw_on_error_from_cache = false;
bool s3_allow_parallel_part_upload = true;
/// Monitoring

View File

@ -9,6 +9,7 @@
#include <Functions/FunctionsLogical.h>
#include <Functions/CastOverloadResolver.h>
#include <Interpreters/Context.h>
#include <Interpreters/ArrayJoinAction.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Core/SortDescription.h>
@ -141,7 +142,7 @@ const ActionsDAG::Node & ActionsDAG::addAlias(const Node & child, std::string al
const ActionsDAG::Node & ActionsDAG::addArrayJoin(const Node & child, std::string result_name)
{
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(child.result_type.get());
const auto & array_type = getArrayJoinDataType(child.result_type);
if (!array_type)
throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH);
@ -463,11 +464,10 @@ static ColumnWithTypeAndName executeActionForHeader(const ActionsDAG::Node * nod
auto key = arguments.at(0);
key.column = key.column->convertToFullColumnIfConst();
const ColumnArray * array = typeid_cast<const ColumnArray *>(key.column.get());
const auto * array = getArrayJoinColumnRawPtr(key.column);
if (!array)
throw Exception(ErrorCodes::TYPE_MISMATCH,
"ARRAY JOIN of not array: {}", node->result_name);
"ARRAY JOIN of not array nor map: {}", node->result_name);
res_column.column = array->getDataPtr()->cloneEmpty();
break;
}

View File

@ -1,6 +1,8 @@
#include <Common/typeid_cast.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnMap.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
@ -16,6 +18,46 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
}
std::shared_ptr<const DataTypeArray> getArrayJoinDataType(DataTypePtr type)
{
if (const auto * array_type = typeid_cast<const DataTypeArray *>(type.get()))
return std::shared_ptr<const DataTypeArray>{type, array_type};
else if (const auto * map_type = typeid_cast<const DataTypeMap *>(type.get()))
{
const auto & nested_type = map_type->getNestedType();
const auto * nested_array_type = typeid_cast<const DataTypeArray *>(nested_type.get());
return std::shared_ptr<const DataTypeArray>{nested_type, nested_array_type};
}
else
return nullptr;
}
ColumnPtr getArrayJoinColumn(const ColumnPtr & column)
{
if (typeid_cast<const ColumnArray *>(column.get()))
return column;
else if (const auto * map = typeid_cast<const ColumnMap *>(column.get()))
return map->getNestedColumnPtr();
else
return nullptr;
}
const ColumnArray * getArrayJoinColumnRawPtr(const ColumnPtr & column)
{
if (const auto & col_arr = getArrayJoinColumn(column))
return typeid_cast<const ColumnArray *>(col_arr.get());
return nullptr;
}
ColumnWithTypeAndName convertArrayJoinColumn(const ColumnWithTypeAndName & src_col)
{
ColumnWithTypeAndName array_col;
array_col.name = src_col.name;
array_col.type = getArrayJoinDataType(src_col.type);
array_col.column = getArrayJoinColumn(src_col.column->convertToFullColumnIfConst());
return array_col;
}
ArrayJoinAction::ArrayJoinAction(const NameSet & array_joined_columns_, bool array_join_is_left, ContextPtr context)
: columns(array_joined_columns_)
, is_left(array_join_is_left)
@ -28,13 +70,12 @@ ArrayJoinAction::ArrayJoinAction(const NameSet & array_joined_columns_, bool arr
{
function_length = FunctionFactory::instance().get("length", context);
function_greatest = FunctionFactory::instance().get("greatest", context);
function_arrayResize = FunctionFactory::instance().get("arrayResize", context);
function_array_resize = FunctionFactory::instance().get("arrayResize", context);
}
else if (is_left)
function_builder = FunctionFactory::instance().get("emptyArrayToSingle", context);
}
void ArrayJoinAction::prepare(ColumnsWithTypeAndName & sample) const
{
for (auto & current : sample)
@ -42,11 +83,13 @@ void ArrayJoinAction::prepare(ColumnsWithTypeAndName & sample) const
if (!columns.contains(current.name))
continue;
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(&*current.type);
if (!array_type)
throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH);
current.type = array_type->getNestedType();
current.column = nullptr;
if (const auto & type = getArrayJoinDataType(current.type))
{
current.column = nullptr;
current.type = type->getNestedType();
}
else
throw Exception("ARRAY JOIN requires array or map argument", ErrorCodes::TYPE_MISMATCH);
}
}
@ -55,10 +98,10 @@ void ArrayJoinAction::execute(Block & block)
if (columns.empty())
throw Exception("No arrays to join", ErrorCodes::LOGICAL_ERROR);
ColumnPtr any_array_ptr = block.getByName(*columns.begin()).column->convertToFullColumnIfConst();
const ColumnArray * any_array = typeid_cast<const ColumnArray *>(&*any_array_ptr);
ColumnPtr any_array_map_ptr = block.getByName(*columns.begin()).column->convertToFullColumnIfConst();
const auto * any_array = getArrayJoinColumnRawPtr(any_array_map_ptr);
if (!any_array)
throw Exception("ARRAY JOIN of not array: " + *columns.begin(), ErrorCodes::TYPE_MISMATCH);
throw Exception("ARRAY JOIN requires array or map argument", ErrorCodes::TYPE_MISMATCH);
/// If LEFT ARRAY JOIN, then we create columns in which empty arrays are replaced by arrays with one element - the default value.
std::map<String, ColumnPtr> non_empty_array_columns;
@ -78,7 +121,8 @@ void ArrayJoinAction::execute(Block & block)
{
auto & src_col = block.getByName(name);
ColumnsWithTypeAndName tmp_block{src_col}; //, {{}, uint64, {}}};
ColumnWithTypeAndName array_col = convertArrayJoinColumn(src_col);
ColumnsWithTypeAndName tmp_block{array_col}; //, {{}, uint64, {}}};
auto len_col = function_length->build(tmp_block)->execute(tmp_block, uint64, rows);
ColumnsWithTypeAndName tmp_block2{column_of_max_length, {len_col, uint64, {}}};
@ -89,28 +133,35 @@ void ArrayJoinAction::execute(Block & block)
{
auto & src_col = block.getByName(name);
ColumnsWithTypeAndName tmp_block{src_col, column_of_max_length};
src_col.column = function_arrayResize->build(tmp_block)->execute(tmp_block, src_col.type, rows);
any_array_ptr = src_col.column->convertToFullColumnIfConst();
ColumnWithTypeAndName array_col = convertArrayJoinColumn(src_col);
ColumnsWithTypeAndName tmp_block{array_col, column_of_max_length};
array_col.column = function_array_resize->build(tmp_block)->execute(tmp_block, array_col.type, rows);
src_col = std::move(array_col);
any_array_map_ptr = src_col.column->convertToFullColumnIfConst();
}
any_array = typeid_cast<const ColumnArray *>(&*any_array_ptr);
any_array = getArrayJoinColumnRawPtr(any_array_map_ptr);
if (!any_array)
throw Exception("ARRAY JOIN requires array or map argument", ErrorCodes::TYPE_MISMATCH);
}
else if (is_left)
{
for (const auto & name : columns)
{
auto src_col = block.getByName(name);
ColumnsWithTypeAndName tmp_block{src_col};
non_empty_array_columns[name] = function_builder->build(tmp_block)->execute(tmp_block, src_col.type, src_col.column->size());
const auto & src_col = block.getByName(name);
ColumnWithTypeAndName array_col = convertArrayJoinColumn(src_col);
ColumnsWithTypeAndName tmp_block{array_col};
non_empty_array_columns[name] = function_builder->build(tmp_block)->execute(tmp_block, array_col.type, array_col.column->size());
}
any_array_ptr = non_empty_array_columns.begin()->second->convertToFullColumnIfConst();
any_array = &typeid_cast<const ColumnArray &>(*any_array_ptr);
any_array_map_ptr = non_empty_array_columns.begin()->second->convertToFullColumnIfConst();
any_array = getArrayJoinColumnRawPtr(any_array_map_ptr);
if (!any_array)
throw Exception("ARRAY JOIN requires array or map argument", ErrorCodes::TYPE_MISMATCH);
}
size_t num_columns = block.columns();
for (size_t i = 0; i < num_columns; ++i)
{
@ -118,18 +169,30 @@ void ArrayJoinAction::execute(Block & block)
if (columns.contains(current.name))
{
if (!typeid_cast<const DataTypeArray *>(&*current.type))
throw Exception("ARRAY JOIN of not array: " + current.name, ErrorCodes::TYPE_MISMATCH);
if (const auto & type = getArrayJoinDataType(current.type))
{
ColumnPtr array_ptr;
if (typeid_cast<const DataTypeArray *>(current.type.get()))
{
array_ptr = (is_left && !is_unaligned) ? non_empty_array_columns[current.name] : current.column;
array_ptr = array_ptr->convertToFullColumnIfConst();
}
else
{
ColumnPtr map_ptr = current.column->convertToFullColumnIfConst();
const ColumnMap & map = typeid_cast<const ColumnMap &>(*map_ptr);
array_ptr = (is_left && !is_unaligned) ? non_empty_array_columns[current.name] : map.getNestedColumnPtr();
}
ColumnPtr array_ptr = (is_left && !is_unaligned) ? non_empty_array_columns[current.name] : current.column;
array_ptr = array_ptr->convertToFullColumnIfConst();
const ColumnArray & array = typeid_cast<const ColumnArray &>(*array_ptr);
if (!is_unaligned && !array.hasEqualOffsets(*any_array))
throw Exception("Sizes of ARRAY-JOIN-ed arrays do not match", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
const ColumnArray & array = typeid_cast<const ColumnArray &>(*array_ptr);
if (!is_unaligned && !array.hasEqualOffsets(typeid_cast<const ColumnArray &>(*any_array_ptr)))
throw Exception("Sizes of ARRAY-JOIN-ed arrays do not match", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
current.column = typeid_cast<const ColumnArray &>(*array_ptr).getDataPtr();
current.type = typeid_cast<const DataTypeArray &>(*current.type).getNestedType();
current.column = typeid_cast<const ColumnArray &>(*array_ptr).getDataPtr();
current.type = type->getNestedType();
}
else
throw Exception("ARRAY JOIN of not array nor map: " + current.name, ErrorCodes::TYPE_MISMATCH);
}
else
{

View File

@ -11,6 +11,15 @@ namespace DB
class IFunctionOverloadResolver;
using FunctionOverloadResolverPtr = std::shared_ptr<IFunctionOverloadResolver>;
class DataTypeArray;
class ColumnArray;
std::shared_ptr<const DataTypeArray> getArrayJoinDataType(DataTypePtr type);
const ColumnArray * getArrayJoinColumnRawPtr(const ColumnPtr & column);
/// If input array join column has map type, convert it to array type.
/// Otherwise do nothing.
ColumnWithTypeAndName convertArrayJoinColumn(const ColumnWithTypeAndName & src_col);
class ArrayJoinAction
{
public:
@ -21,7 +30,7 @@ public:
/// For unaligned [LEFT] ARRAY JOIN
FunctionOverloadResolverPtr function_length;
FunctionOverloadResolverPtr function_greatest;
FunctionOverloadResolverPtr function_arrayResize;
FunctionOverloadResolverPtr function_array_resize;
/// For LEFT ARRAY JOIN.
FunctionOverloadResolverPtr function_builder;

View File

@ -18,7 +18,6 @@ namespace DB
{
namespace ErrorCodes
{
extern const int REMOTE_FS_OBJECT_CACHE_ERROR;
extern const int LOGICAL_ERROR;
}
@ -98,7 +97,7 @@ void FileCache::assertInitialized(std::lock_guard<std::mutex> & /* cache_lock */
if (initialization_exception)
std::rethrow_exception(initialization_exception);
else
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache not initialized");
}
}
@ -541,12 +540,12 @@ FileSegmentPtr FileCache::createFileSegmentForDownload(
#endif
if (size > max_file_segment_size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Requested size exceeds max file segment size");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Requested size exceeds max file segment size");
auto * cell = getCell(key, offset, cache_lock);
if (cell)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Cache cell already exists for key `{}` and offset {}",
key.toString(), offset);
@ -738,7 +737,7 @@ bool FileCache::tryReserveForMainList(
auto * cell = getCell(entry_key, entry_offset, cache_lock);
if (!cell)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Cache became inconsistent. Key: {}, offset: {}",
key.toString(), offset);
@ -964,7 +963,7 @@ void FileCache::remove(
catch (...)
{
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {}",
key.toString(), offset, cache_file_path, getCurrentExceptionMessage(false));
}
@ -981,7 +980,7 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock
/// cache_base_path / key_prefix / key / offset
if (!files.empty())
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Cache initialization is partially made. "
"This can be a result of a failed first attempt to initialize cache. "
"Please, check log for error messages");
@ -1214,7 +1213,7 @@ FileCache::FileSegmentCell::FileSegmentCell(
}
default:
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state, got: {}",
FileSegment::stateToString(file_segment->download_state));
}

View File

@ -19,7 +19,6 @@ namespace DB
namespace ErrorCodes
{
extern const int REMOTE_FS_OBJECT_CACHE_ERROR;
extern const int LOGICAL_ERROR;
}
@ -66,7 +65,7 @@ FileSegment::FileSegment(
default:
{
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Can only create cell with either EMPTY, DOWNLOADED or SKIP_CACHE state");
}
}
@ -278,7 +277,7 @@ void FileSegment::resetRemoteFileReader()
void FileSegment::write(const char * from, size_t size, size_t offset)
{
if (!size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing zero size is not allowed");
{
std::unique_lock segment_lock(mutex);
@ -294,7 +293,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
size_t first_non_downloaded_offset = getFirstNonDownloadedOffsetUnlocked(segment_lock);
if (offset != first_non_downloaded_offset)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Attempt to write {} bytes to offset: {}, but current write offset is {}",
size, offset, first_non_downloaded_offset);
@ -304,7 +303,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
if (free_reserved_size < size)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", free_reserved_size, size);
if (current_downloaded_size == range().size())
@ -364,7 +363,7 @@ FileSegment::State FileSegment::wait()
return download_state;
if (download_state == State::EMPTY)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot wait on a file segment with empty state");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot wait on a file segment with empty state");
if (download_state == State::DOWNLOADING)
{
@ -382,7 +381,7 @@ FileSegment::State FileSegment::wait()
bool FileSegment::reserve(size_t size_to_reserve)
{
if (!size_to_reserve)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Zero space reservation is not allowed");
size_t expected_downloaded_size;
@ -396,7 +395,7 @@ bool FileSegment::reserve(size_t size_to_reserve)
if (expected_downloaded_size + size_to_reserve > range().size())
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})",
size_to_reserve, range().toString(), downloaded_size);
@ -434,9 +433,6 @@ void FileSegment::setDownloadedUnlocked([[maybe_unused]] std::unique_lock<std::m
if (is_downloaded)
return;
setDownloadState(State::DOWNLOADED);
is_downloaded = true;
if (cache_writer)
{
cache_writer->finalize();
@ -498,7 +494,7 @@ void FileSegment::completeWithState(State state)
{
cv.notify_all();
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Cannot complete file segment with state: {}", stateToString(state));
}
@ -559,8 +555,7 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
{
if (is_last_holder)
cache->remove(key(), offset(), cache_lock, segment_lock);
return;
break;
}
case State::DOWNLOADED:
{
@ -613,6 +608,7 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
}
}
is_completed = true;
LOG_TEST(log, "Completed file segment: {}", getInfoForLogUnlocked(segment_lock));
}
@ -748,6 +744,12 @@ bool FileSegment::isDetached() const
return is_detached;
}
bool FileSegment::isCompleted() const
{
std::unique_lock segment_lock(mutex);
return is_completed;
}
void FileSegment::detach(std::lock_guard<std::mutex> & /* cache_lock */, std::unique_lock<std::mutex> & segment_lock)
{
if (is_detached)

View File

@ -181,6 +181,8 @@ public:
bool isDetached() const;
bool isCompleted() const;
void assertCorrectness() const;
/**
@ -294,6 +296,7 @@ private:
/// "detached" file segment means that it is not owned by cache ("detached" from cache).
/// In general case, all file segments are owned by cache.
bool is_detached = false;
bool is_completed = false;
bool is_downloaded{false};
@ -317,11 +320,6 @@ struct FileSegmentsHolder : private boost::noncopyable
String toString();
FileSegments::iterator add(FileSegmentPtr && file_segment)
{
return file_segments.insert(file_segments.end(), file_segment);
}
FileSegments file_segments{};
};

View File

@ -3743,6 +3743,8 @@ WriteSettings Context::getWriteSettings() const
res.enable_filesystem_cache_on_write_operations = settings.enable_filesystem_cache_on_write_operations;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.throw_on_error_from_cache = settings.throw_on_error_from_cache_on_write_operations;
res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload;
res.remote_throttler = getRemoteWriteThrottler();

View File

@ -620,9 +620,9 @@ static void executeAction(const ExpressionActions::Action & action, ExecutionCon
array_join_key.column = array_join_key.column->convertToFullColumnIfConst();
const ColumnArray * array = typeid_cast<const ColumnArray *>(array_join_key.column.get());
const auto * array = getArrayJoinColumnRawPtr(array_join_key.column);
if (!array)
throw Exception("ARRAY JOIN of not array: " + action.node->result_name, ErrorCodes::TYPE_MISMATCH);
throw Exception("ARRAY JOIN of not array nor map: " + action.node->result_name, ErrorCodes::TYPE_MISMATCH);
for (auto & column : columns)
if (column.column)
@ -635,7 +635,7 @@ static void executeAction(const ExpressionActions::Action & action, ExecutionCon
auto & res_column = columns[action.result_position];
res_column.column = array->getDataPtr();
res_column.type = assert_cast<const DataTypeArray &>(*array_join_key.type).getNestedType();
res_column.type = getArrayJoinDataType(array_join_key.type)->getNestedType();
res_column.name = action.node->result_name;
num_rows = res_column.column->size();
@ -1008,7 +1008,7 @@ ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(ArrayJoinActionPtr array_jo
if (array_join->columns.contains(column.name))
{
const auto * array = typeid_cast<const DataTypeArray *>(column.type.get());
const auto & array = getArrayJoinDataType(column.type);
column.type = array->getNestedType();
/// Arrays are materialized
column.column = nullptr;

View File

@ -220,8 +220,13 @@ bool isStorageTouchedByMutations(
if (all_commands_can_be_skipped)
return false;
/// We must read with one thread because it guarantees that
/// output stream will be sorted after reading from MergeTree parts.
/// Disable all settings that can enable reading with several streams.
context_copy->setSetting("max_streams_to_max_threads_ratio", 1);
context_copy->setSetting("max_threads", 1);
context_copy->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_copy->setSetting("max_streams_for_merge_tree_reading", Field(0));
ASTPtr select_query = prepareQueryAffectedAST(commands, storage, context_copy);

View File

@ -8,6 +8,7 @@
#include <Parsers/DumpASTNode.h>
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/BinStringDecodeHelper.h>
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTCollation.h>
@ -986,6 +987,38 @@ bool ParserUnsignedInteger::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
return true;
}
inline static bool makeStringLiteral(IParser::Pos & pos, ASTPtr & node, String str)
{
auto literal = std::make_shared<ASTLiteral>(str);
literal->begin = pos;
literal->end = ++pos;
node = literal;
return true;
}
inline static bool makeHexOrBinStringLiteral(IParser::Pos & pos, ASTPtr & node, bool hex, size_t word_size)
{
const char * str_begin = pos->begin + 2;
const char * str_end = pos->end - 1;
if (str_begin == str_end)
return makeStringLiteral(pos, node, "");
PODArray<UInt8> res;
res.resize((pos->size() + word_size) / word_size + 1);
char * res_begin = reinterpret_cast<char *>(res.data());
char * res_pos = res_begin;
if (hex)
{
hexStringDecode(str_begin, str_end, res_pos);
}
else
{
binStringDecode(str_begin, str_end, res_pos);
}
return makeStringLiteral(pos, node, String(reinterpret_cast<char *>(res.data()), (res_pos - res_begin - 1)));
}
bool ParserStringLiteral::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
@ -996,6 +1029,18 @@ bool ParserStringLiteral::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
if (pos->type == TokenType::StringLiteral)
{
if (*pos->begin == 'x' || *pos->begin == 'X')
{
constexpr size_t word_size = 2;
return makeHexOrBinStringLiteral(pos, node, true, word_size);
}
if (*pos->begin == 'b' || *pos->begin == 'B')
{
constexpr size_t word_size = 8;
return makeHexOrBinStringLiteral(pos, node, false, word_size);
}
ReadBufferFromMemory in(pos->begin, pos->size());
try
@ -1022,11 +1067,7 @@ bool ParserStringLiteral::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
s = String(pos->begin + heredoc_size, pos->size() - heredoc_size * 2);
}
auto literal = std::make_shared<ASTLiteral>(s);
literal->begin = pos;
literal->end = ++pos;
node = literal;
return true;
return makeStringLiteral(pos, node, s);
}
template <typename Collection>
@ -1128,36 +1169,42 @@ class ICollection
{
public:
virtual ~ICollection() = default;
virtual bool parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected) = 0;
virtual bool parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected, bool allow_map) = 0;
};
template <class Container, TokenType end_token>
class CommonCollection : public ICollection
{
public:
bool parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected) override;
explicit CommonCollection(const IParser::Pos & pos) : begin(pos) {}
bool parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected, bool allow_map) override;
private:
Container container;
IParser::Pos begin;
};
class MapCollection : public ICollection
{
public:
bool parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected) override;
explicit MapCollection(const IParser::Pos & pos) : begin(pos) {}
bool parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected, bool allow_map) override;
private:
Map container;
IParser::Pos begin;
};
bool parseAllCollectionsStart(IParser::Pos & pos, Collections & collections, Expected & /*expected*/)
bool parseAllCollectionsStart(IParser::Pos & pos, Collections & collections, Expected & /*expected*/, bool allow_map)
{
if (pos->type == TokenType::OpeningCurlyBrace)
collections.push_back(std::make_unique<MapCollection>());
if (allow_map && pos->type == TokenType::OpeningCurlyBrace)
collections.push_back(std::make_unique<MapCollection>(pos));
else if (pos->type == TokenType::OpeningRoundBracket)
collections.push_back(std::make_unique<CommonCollection<Tuple, TokenType::ClosingRoundBracket>>());
collections.push_back(std::make_unique<CommonCollection<Tuple, TokenType::ClosingRoundBracket>>(pos));
else if (pos->type == TokenType::OpeningSquareBracket)
collections.push_back(std::make_unique<CommonCollection<Array, TokenType::ClosingSquareBracket>>());
collections.push_back(std::make_unique<CommonCollection<Array, TokenType::ClosingSquareBracket>>(pos));
else
return false;
@ -1166,7 +1213,7 @@ bool parseAllCollectionsStart(IParser::Pos & pos, Collections & collections, Exp
}
template <class Container, TokenType end_token>
bool CommonCollection<Container, end_token>::parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected)
bool CommonCollection<Container, end_token>::parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected, bool allow_map)
{
if (node)
{
@ -1183,23 +1230,27 @@ bool CommonCollection<Container, end_token>::parse(IParser::Pos & pos, Collectio
{
if (end_p.ignore(pos, expected))
{
node = std::make_shared<ASTLiteral>(std::move(container));
auto result = std::make_shared<ASTLiteral>(std::move(container));
result->begin = begin;
result->end = pos;
node = std::move(result);
break;
}
if (!container.empty() && !comma_p.ignore(pos, expected))
return false;
return false;
if (literal_p.parse(pos, literal, expected))
container.push_back(std::move(literal->as<ASTLiteral &>().value));
else
return parseAllCollectionsStart(pos, collections, expected);
return parseAllCollectionsStart(pos, collections, expected, allow_map);
}
return true;
}
bool MapCollection::parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected)
bool MapCollection::parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected, bool allow_map)
{
if (node)
{
@ -1217,7 +1268,11 @@ bool MapCollection::parse(IParser::Pos & pos, Collections & collections, ASTPtr
{
if (end_p.ignore(pos, expected))
{
node = std::make_shared<ASTLiteral>(std::move(container));
auto result = std::make_shared<ASTLiteral>(std::move(container));
result->begin = begin;
result->end = pos;
node = std::move(result);
break;
}
@ -1235,7 +1290,7 @@ bool MapCollection::parse(IParser::Pos & pos, Collections & collections, ASTPtr
if (literal_p.parse(pos, literal, expected))
container.push_back(std::move(literal->as<ASTLiteral &>().value));
else
return parseAllCollectionsStart(pos, collections, expected);
return parseAllCollectionsStart(pos, collections, expected, allow_map);
}
return true;
@ -1248,12 +1303,12 @@ bool ParserAllCollectionsOfLiterals::parseImpl(Pos & pos, ASTPtr & node, Expecte
{
Collections collections;
if (!parseAllCollectionsStart(pos, collections, expected))
if (!parseAllCollectionsStart(pos, collections, expected, allow_map))
return false;
while (!collections.empty())
{
if (!collections.back()->parse(pos, collections, node, expected))
if (!collections.back()->parse(pos, collections, node, expected, allow_map))
return false;
if (node)

View File

@ -307,9 +307,14 @@ protected:
class ParserAllCollectionsOfLiterals : public IParserBase
{
public:
explicit ParserAllCollectionsOfLiterals(bool allow_map_ = true) : allow_map(allow_map_) {}
protected:
const char * getName() const override { return "combination of maps, arrays, tuples"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
private:
bool allow_map;
};

View File

@ -1,3 +1,4 @@
#include <cassert>
#include <base/defines.h>
#include <Parsers/Lexer.h>
#include <Common/StringUtils/StringUtils.h>
@ -44,6 +45,36 @@ Token quotedString(const char *& pos, const char * const token_begin, const char
}
}
Token quotedHexOrBinString(const char *& pos, const char * const token_begin, const char * const end)
{
constexpr char quote = '\'';
assert(pos[1] == quote);
bool hex = (*pos == 'x' || *pos == 'X');
pos += 2;
if (hex)
{
while (pos < end && isHexDigit(*pos))
++pos;
}
else
{
pos = find_first_not_symbols<'0', '1'>(pos, end);
}
if (pos >= end || *pos != quote)
{
pos = end;
return Token(TokenType::ErrorSingleQuoteIsNotClosed, token_begin, end);
}
++pos;
return Token(TokenType::StringLiteral, token_begin, pos);
}
}
@ -420,6 +451,12 @@ Token Lexer::nextTokenImpl()
return Token(TokenType::DollarSign, token_begin, ++pos);
}
}
if (pos + 2 < end && pos[1] == '\'' && (*pos == 'x' || *pos == 'b' || *pos == 'X' || *pos == 'B'))
{
return quotedHexOrBinString(pos, token_begin, end);
}
if (isWordCharASCII(*pos) || *pos == '$')
{
++pos;

View File

@ -18,6 +18,7 @@
#include <Columns/ColumnMap.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -282,7 +283,7 @@ static void readAndInsertString(ReadBuffer & in, IColumn & column, BSONType bson
}
else if (bson_type == BSONType::OBJECT_ID)
{
readAndInsertStringImpl<is_fixed_string>(in, column, 12);
readAndInsertStringImpl<is_fixed_string>(in, column, BSON_OBJECT_ID_SIZE);
}
else
{
@ -664,7 +665,7 @@ static void skipBSONField(ReadBuffer & in, BSONType type)
}
case BSONType::OBJECT_ID:
{
in.ignore(12);
in.ignore(BSON_OBJECT_ID_SIZE);
break;
}
case BSONType::REGEXP:
@ -677,7 +678,7 @@ static void skipBSONField(ReadBuffer & in, BSONType type)
{
BSONSizeT size;
readBinary(size, in);
in.ignore(size + 12);
in.ignore(size + BSON_DB_POINTER_SIZE);
break;
}
case BSONType::JAVA_SCRIPT_CODE_W_SCOPE:
@ -796,7 +797,6 @@ DataTypePtr BSONEachRowSchemaReader::getDataTypeFromBSONField(BSONType type, boo
}
case BSONType::SYMBOL: [[fallthrough]];
case BSONType::JAVA_SCRIPT_CODE: [[fallthrough]];
case BSONType::OBJECT_ID: [[fallthrough]];
case BSONType::STRING:
{
BSONSizeT size;
@ -804,6 +804,11 @@ DataTypePtr BSONEachRowSchemaReader::getDataTypeFromBSONField(BSONType type, boo
in.ignore(size);
return std::make_shared<DataTypeString>();
}
case BSONType::OBJECT_ID:;
{
in.ignore(BSON_OBJECT_ID_SIZE);
return makeNullable(std::make_shared<DataTypeFixedString>(BSON_OBJECT_ID_SIZE));
}
case BSONType::DOCUMENT:
{
auto nested_names_and_types = getDataTypesFromBSONDocument(false);
@ -954,6 +959,7 @@ void registerInputFormatBSONEachRow(FormatFactory & factory)
"BSONEachRow",
[](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings)
{ return std::make_shared<BSONEachRowRowInputFormat>(buf, sample, std::move(params), settings); });
factory.registerFileExtension("bson", "BSONEachRow");
}
void registerFileSegmentationEngineBSONEachRow(FormatFactory & factory)

View File

@ -99,6 +99,12 @@ static void insertSignedInteger(IColumn & column, const DataTypePtr & column_typ
case TypeIndex::DateTime64:
assert_cast<ColumnDecimal<DateTime64> &>(column).insertValue(value);
break;
case TypeIndex::Decimal32:
assert_cast<ColumnDecimal<Decimal32> &>(column).insertValue(static_cast<Int32>(value));
break;
case TypeIndex::Decimal64:
assert_cast<ColumnDecimal<Decimal64> &>(column).insertValue(value);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type is not a signed integer.");
}
@ -178,14 +184,14 @@ static void insertEnum(IColumn & column, const DataTypePtr & column_type, const
}
}
static void insertValue(IColumn & column, const DataTypePtr & column_type, const capnp::DynamicValue::Reader & value, FormatSettings::EnumComparingMode enum_comparing_mode)
static void insertValue(IColumn & column, const DataTypePtr & column_type, const String & column_name, const capnp::DynamicValue::Reader & value, FormatSettings::EnumComparingMode enum_comparing_mode)
{
if (column_type->lowCardinality())
{
auto & lc_column = assert_cast<ColumnLowCardinality &>(column);
auto tmp_column = lc_column.getDictionary().getNestedColumn()->cloneEmpty();
auto dict_type = assert_cast<const DataTypeLowCardinality *>(column_type.get())->getDictionaryType();
insertValue(*tmp_column, dict_type, value, enum_comparing_mode);
insertValue(*tmp_column, dict_type, column_name, value, enum_comparing_mode);
lc_column.insertFromFullColumn(*tmp_column, 0);
return;
}
@ -226,7 +232,7 @@ static void insertValue(IColumn & column, const DataTypePtr & column_type, const
auto & nested_column = column_array.getData();
auto nested_type = assert_cast<const DataTypeArray *>(column_type.get())->getNestedType();
for (const auto & nested_value : list_value)
insertValue(nested_column, nested_type, nested_value, enum_comparing_mode);
insertValue(nested_column, nested_type, column_name, nested_value, enum_comparing_mode);
break;
}
case capnp::DynamicValue::Type::STRUCT:
@ -243,11 +249,11 @@ static void insertValue(IColumn & column, const DataTypePtr & column_type, const
auto & nested_column = nullable_column.getNestedColumn();
auto nested_type = assert_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
auto nested_value = struct_value.get(field);
insertValue(nested_column, nested_type, nested_value, enum_comparing_mode);
insertValue(nested_column, nested_type, column_name, nested_value, enum_comparing_mode);
nullable_column.getNullMapData().push_back(0);
}
}
else
else if (isTuple(column_type))
{
auto & tuple_column = assert_cast<ColumnTuple &>(column);
const auto * tuple_type = assert_cast<const DataTypeTuple *>(column_type.get());
@ -255,9 +261,16 @@ static void insertValue(IColumn & column, const DataTypePtr & column_type, const
insertValue(
tuple_column.getColumn(i),
tuple_type->getElements()[i],
tuple_type->getElementNames()[i],
struct_value.get(tuple_type->getElementNames()[i]),
enum_comparing_mode);
}
else
{
/// It can be nested column from Nested type.
auto [field_name, nested_name] = splitCapnProtoFieldName(column_name);
insertValue(column, column_type, nested_name, struct_value.get(nested_name), enum_comparing_mode);
}
break;
}
default:
@ -278,7 +291,7 @@ bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
for (size_t i = 0; i != columns.size(); ++i)
{
auto value = getReaderByColumnName(root_reader, column_names[i]);
insertValue(*columns[i], column_types[i], value, format_settings.capn_proto.enum_comparing_mode);
insertValue(*columns[i], column_types[i], column_names[i], value, format_settings.capn_proto.enum_comparing_mode);
}
}
catch (const kj::Exception & e)

View File

@ -92,6 +92,7 @@ static std::optional<capnp::DynamicValue::Reader> convertToDynamicValue(
const ColumnPtr & column,
const DataTypePtr & data_type,
size_t row_num,
const String & column_name,
capnp::DynamicValue::Builder builder,
FormatSettings::EnumComparingMode enum_comparing_mode,
std::vector<std::unique_ptr<String>> & temporary_text_data_storage)
@ -103,15 +104,12 @@ static std::optional<capnp::DynamicValue::Reader> convertToDynamicValue(
const auto * lc_column = assert_cast<const ColumnLowCardinality *>(column.get());
const auto & dict_type = assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType();
size_t index = lc_column->getIndexAt(row_num);
return convertToDynamicValue(lc_column->getDictionary().getNestedColumn(), dict_type, index, builder, enum_comparing_mode, temporary_text_data_storage);
return convertToDynamicValue(lc_column->getDictionary().getNestedColumn(), dict_type, index, column_name, builder, enum_comparing_mode, temporary_text_data_storage);
}
switch (builder.getType())
{
case capnp::DynamicValue::Type::INT:
/// We allow output DateTime64 as Int64.
if (WhichDataType(data_type).isDateTime64())
return capnp::DynamicValue::Reader(assert_cast<const ColumnDecimal<DateTime64> *>(column.get())->getElement(row_num));
return capnp::DynamicValue::Reader(column->getInt(row_num));
case capnp::DynamicValue::Type::UINT:
return capnp::DynamicValue::Reader(column->getUInt(row_num));
@ -150,7 +148,7 @@ static std::optional<capnp::DynamicValue::Reader> convertToDynamicValue(
{
auto struct_builder = builder.as<capnp::DynamicStruct>();
auto nested_struct_schema = struct_builder.getSchema();
/// Struct can be represent Tuple or Naullable (named union with two fields)
/// Struct can represent Tuple, Nullable (named union with two fields) or single column when it contains one nested column.
if (data_type->isNullable())
{
const auto * nullable_type = assert_cast<const DataTypeNullable *>(data_type.get());
@ -167,12 +165,12 @@ static std::optional<capnp::DynamicValue::Reader> convertToDynamicValue(
struct_builder.clear(value_field);
const auto & nested_column = nullable_column->getNestedColumnPtr();
auto value_builder = initStructFieldBuilder(nested_column, row_num, struct_builder, value_field);
auto value = convertToDynamicValue(nested_column, nullable_type->getNestedType(), row_num, value_builder, enum_comparing_mode, temporary_text_data_storage);
auto value = convertToDynamicValue(nested_column, nullable_type->getNestedType(), row_num, column_name, value_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(value_field, *value);
}
}
else
else if (isTuple(data_type))
{
const auto * tuple_data_type = assert_cast<const DataTypeTuple *>(data_type.get());
auto nested_types = tuple_data_type->getElements();
@ -182,11 +180,21 @@ static std::optional<capnp::DynamicValue::Reader> convertToDynamicValue(
auto pos = tuple_data_type->getPositionByName(name);
auto field_builder
= initStructFieldBuilder(nested_columns[pos], row_num, struct_builder, nested_struct_schema.getFieldByName(name));
auto value = convertToDynamicValue(nested_columns[pos], nested_types[pos], row_num, field_builder, enum_comparing_mode, temporary_text_data_storage);
auto value = convertToDynamicValue(nested_columns[pos], nested_types[pos], row_num, column_name, field_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(name, *value);
}
}
else
{
/// It can be nested column from Nested type.
auto [field_name, nested_name] = splitCapnProtoFieldName(column_name);
auto nested_field = nested_struct_schema.getFieldByName(nested_name);
auto field_builder = initStructFieldBuilder(column, row_num, struct_builder, nested_field);
auto value = convertToDynamicValue(column, data_type, row_num, nested_name, field_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(nested_field, *value);
}
return std::nullopt;
}
case capnp::DynamicValue::Type::LIST:
@ -213,7 +221,7 @@ static std::optional<capnp::DynamicValue::Reader> convertToDynamicValue(
else
value_builder = list_builder[i];
auto value = convertToDynamicValue(nested_column, nested_type, offset + i, value_builder, enum_comparing_mode, temporary_text_data_storage);
auto value = convertToDynamicValue(nested_column, nested_type, offset + i, column_name, value_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
list_builder.set(i, *value);
}
@ -231,11 +239,19 @@ void CapnProtoRowOutputFormat::write(const Columns & columns, size_t row_num)
/// See comment in convertToDynamicValue() for more details.
std::vector<std::unique_ptr<String>> temporary_text_data_storage;
capnp::DynamicStruct::Builder root = message.initRoot<capnp::DynamicStruct>(schema);
/// Some columns can share same field builder. For example when we have
/// column with Nested type that was flattened into several columns.
std::unordered_map<size_t, capnp::DynamicValue::Builder> field_builders;
for (size_t i = 0; i != columns.size(); ++i)
{
auto [struct_builder, field] = getStructBuilderAndFieldByColumnName(root, column_names[i]);
auto field_builder = initStructFieldBuilder(columns[i], row_num, struct_builder, field);
auto value = convertToDynamicValue(columns[i], column_types[i], row_num, field_builder, format_settings.capn_proto.enum_comparing_mode, temporary_text_data_storage);
if (!field_builders.contains(field.getIndex()))
{
auto field_builder = initStructFieldBuilder(columns[i], row_num, struct_builder, field);
field_builders[field.getIndex()] = field_builder;
}
auto value = convertToDynamicValue(columns[i], column_types[i], row_num, column_names[i], field_builders[field.getIndex()], format_settings.capn_proto.enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(field, *value);
}

View File

@ -297,9 +297,14 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
{
part = merge_task->getFuture().get();
/// Task is not needed
merge_task.reset();
storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, *transaction_ptr);
/// Why we reset task here? Because it holds shared pointer to part and tryRemovePartImmediately will
/// not able to remove the part and will throw an exception (because someone holds the pointer).
///
/// Why we cannot reset task right after obtaining part from getFuture()? Because it holds RAII wrapper for
/// temp directories which guards temporary dir from background removal. So it's right place to reset the task
/// and it's really needed.
merge_task.reset();
try
{

View File

@ -1348,6 +1348,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
loadDataPartsFromDisk(
broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings);
bool is_static_storage = isStaticStorage();
if (settings->in_memory_parts_enable_wal)
{
std::map<String, MutableDataPartsVector> disk_wal_part_map;
@ -1376,13 +1378,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
ErrorCodes::CORRUPTED_DATA);
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext(), part_lock))
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext(), part_lock, is_static_storage))
disk_wal_parts.push_back(std::move(part));
}
else
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot, getContext(), part_lock))
for (auto && part : wal.restore(metadata_snapshot, getContext(), part_lock, is_static_storage))
disk_wal_parts.push_back(std::move(part));
}
}
@ -1408,11 +1410,17 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
return;
}
for (auto & part : broken_parts_to_detach)
part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
if (!is_static_storage)
{
for (auto & part : broken_parts_to_detach)
{
/// detached parts must not have '_' in prefixes
part->renameToDetached("broken-on-start");
}
for (auto & part : duplicate_parts_to_remove)
part->remove();
for (auto & part : duplicate_parts_to_remove)
part->remove();
}
auto deactivate_part = [&] (DataPartIteratorByStateAndInfo it)
{
@ -2167,6 +2175,8 @@ size_t MergeTreeData::clearEmptyParts()
void MergeTreeData::rename(const String & new_table_path, const StorageID & new_table_id)
{
LOG_INFO(log, "Renaming table to path {} with ID {}", new_table_path, new_table_id.getFullTableName());
auto disks = getStoragePolicy()->getDisks();
for (const auto & disk : disks)

View File

@ -9,6 +9,10 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Functions/FunctionFactory.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Storages/MergeTree/MergeTreeIndexUtils.h>
namespace DB
{
@ -242,67 +246,78 @@ MergeTreeIndexGranulePtr MergeTreeIndexAggregatorSet::getGranuleAndReset()
MergeTreeIndexConditionSet::MergeTreeIndexConditionSet(
const String & index_name_,
const Block & index_sample_block_,
const Block & index_sample_block,
size_t max_rows_,
const SelectQueryInfo & query,
const SelectQueryInfo & query_info,
ContextPtr context)
: index_name(index_name_)
, max_rows(max_rows_)
, index_sample_block(index_sample_block_)
{
for (const auto & name : index_sample_block.getNames())
if (!key_columns.contains(name))
key_columns.insert(name);
const auto & select = query.query->as<ASTSelectQuery &>();
if (select.where() && select.prewhere())
expression_ast = makeASTFunction(
"and",
select.where()->clone(),
select.prewhere()->clone());
else if (select.where())
expression_ast = select.where()->clone();
else if (select.prewhere())
expression_ast = select.prewhere()->clone();
useless = checkASTUseless(expression_ast);
/// Do not proceed if index is useless for this query.
if (useless)
ASTPtr ast_filter_node = buildFilterNode(query_info.query);
if (!ast_filter_node)
return;
/// Replace logical functions with bit functions.
/// Working with UInt8: last bit = can be true, previous = can be false (Like src/Storages/MergeTree/BoolMask.h).
traverseAST(expression_ast);
if (context->getSettingsRef().allow_experimental_analyzer)
{
if (!query_info.filter_actions_dag)
return;
auto syntax_analyzer_result = TreeRewriter(context).analyze(
expression_ast, index_sample_block.getNamesAndTypesList());
actions = ExpressionAnalyzer(expression_ast, syntax_analyzer_result, context).getActions(true);
if (checkDAGUseless(*query_info.filter_actions_dag->getOutputs().at(0), context))
return;
const auto * filter_node = query_info.filter_actions_dag->getOutputs().at(0);
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG({filter_node}, {}, context);
const auto * filter_actions_dag_node = filter_actions_dag->getOutputs().at(0);
std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *> node_to_result_node;
filter_actions_dag->getOutputs()[0] = &traverseDAG(*filter_actions_dag_node, filter_actions_dag, context, node_to_result_node);
filter_actions_dag->removeUnusedActions();
actions = std::make_shared<ExpressionActions>(filter_actions_dag);
}
else
{
if (checkASTUseless(ast_filter_node))
return;
auto expression_ast = ast_filter_node->clone();
/// Replace logical functions with bit functions.
/// Working with UInt8: last bit = can be true, previous = can be false (Like src/Storages/MergeTree/BoolMask.h).
traverseAST(expression_ast);
auto syntax_analyzer_result = TreeRewriter(context).analyze(expression_ast, index_sample_block.getNamesAndTypesList());
actions = ExpressionAnalyzer(expression_ast, syntax_analyzer_result, context).getActions(true);
}
}
bool MergeTreeIndexConditionSet::alwaysUnknownOrTrue() const
{
return useless;
return isUseless();
}
bool MergeTreeIndexConditionSet::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
{
if (useless)
if (isUseless())
return true;
auto granule = std::dynamic_pointer_cast<MergeTreeIndexGranuleSet>(idx_granule);
if (!granule)
throw Exception(
"Set index condition got a granule with the wrong type.", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Set index condition got a granule with the wrong type");
if (useless || granule->empty() || (max_rows != 0 && granule->size() > max_rows))
if (isUseless() || granule->empty() || (max_rows != 0 && granule->size() > max_rows))
return true;
Block result = granule->block;
actions->execute(result);
auto column
= result.getByName(expression_ast->getColumnName()).column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality();
const auto & filter_node_name = actions->getActionsDAG().getOutputs().at(0)->result_name;
auto column = result.getByName(filter_node_name).column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality();
if (column->onlyNull())
return false;
@ -318,17 +333,214 @@ bool MergeTreeIndexConditionSet::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx
}
if (!col_uint8)
throw Exception("ColumnUInt8 expected as Set index condition result.", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR,
"ColumnUInt8 expected as Set index condition result");
const auto & condition = col_uint8->getData();
size_t column_size = column->size();
for (size_t i = 0; i < column->size(); ++i)
for (size_t i = 0; i < column_size; ++i)
if ((!null_map || (*null_map)[i] == 0) && condition[i] & 1)
return true;
return false;
}
const ActionsDAG::Node & MergeTreeIndexConditionSet::traverseDAG(const ActionsDAG::Node & node,
ActionsDAGPtr & result_dag,
const ContextPtr & context,
std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *> & node_to_result_node) const
{
auto result_node_it = node_to_result_node.find(&node);
if (result_node_it != node_to_result_node.end())
return *result_node_it->second;
const ActionsDAG::Node * result_node = nullptr;
if (const auto * operator_node_ptr = operatorFromDAG(node, result_dag, context, node_to_result_node))
{
result_node = operator_node_ptr;
}
else if (const auto * atom_node_ptr = atomFromDAG(node, result_dag, context))
{
result_node = atom_node_ptr;
if (atom_node_ptr->type == ActionsDAG::ActionType::INPUT ||
atom_node_ptr->type == ActionsDAG::ActionType::FUNCTION)
{
auto bit_wrapper_function = FunctionFactory::instance().get("__bitWrapperFunc", context);
result_node = &result_dag->addFunction(bit_wrapper_function, {atom_node_ptr}, {});
}
}
else
{
ColumnWithTypeAndName unknown_field_column_with_type;
unknown_field_column_with_type.name = calculateConstantActionNodeName(UNKNOWN_FIELD);
unknown_field_column_with_type.type = std::make_shared<DataTypeUInt8>();
unknown_field_column_with_type.column = unknown_field_column_with_type.type->createColumnConst(1, UNKNOWN_FIELD);
result_node = &result_dag->addColumn(unknown_field_column_with_type);
}
node_to_result_node.emplace(&node, result_node);
return *result_node;
}
const ActionsDAG::Node * MergeTreeIndexConditionSet::atomFromDAG(const ActionsDAG::Node & node, ActionsDAGPtr & result_dag, const ContextPtr & context) const
{
/// Function, literal or column
const auto * node_to_check = &node;
while (node_to_check->type == ActionsDAG::ActionType::ALIAS)
node_to_check = node_to_check->children[0];
if (node_to_check->column && isColumnConst(*node_to_check->column))
return &node;
RPNBuilderTreeContext tree_context(context);
RPNBuilderTreeNode tree_node(node_to_check, tree_context);
auto column_name = tree_node.getColumnName();
if (key_columns.contains(column_name))
{
const auto * result_node = node_to_check;
if (node.type != ActionsDAG::ActionType::INPUT)
result_node = &result_dag->addInput(column_name, node.result_type);
return result_node;
}
if (node.type != ActionsDAG::ActionType::FUNCTION)
return nullptr;
const auto & arguments = node.children;
size_t arguments_size = arguments.size();
ActionsDAG::NodeRawConstPtrs children(arguments_size);
for (size_t i = 0; i < arguments_size; ++i)
{
children[i] = atomFromDAG(*arguments[i], result_dag, context);
if (!children[i])
return nullptr;
}
return &result_dag->addFunction(node.function_builder, children, {});
}
const ActionsDAG::Node * MergeTreeIndexConditionSet::operatorFromDAG(const ActionsDAG::Node & node,
ActionsDAGPtr & result_dag,
const ContextPtr & context,
std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *> & node_to_result_node) const
{
/// Functions AND, OR, NOT. Replace with bit*.
const auto * node_to_check = &node;
while (node_to_check->type == ActionsDAG::ActionType::ALIAS)
node_to_check = node_to_check->children[0];
if (node_to_check->column && isColumnConst(*node_to_check->column))
return nullptr;
if (node_to_check->type != ActionsDAG::ActionType::FUNCTION)
return nullptr;
auto function_name = node_to_check->function->getName();
const auto & arguments = node_to_check->children;
size_t arguments_size = arguments.size();
if (function_name == "not")
{
if (arguments_size != 1)
return nullptr;
auto bit_swap_last_two_function = FunctionFactory::instance().get("__bitSwapLastTwo", context);
return &result_dag->addFunction(bit_swap_last_two_function, {arguments[0]}, {});
}
else if (function_name == "and" || function_name == "indexHint" || function_name == "or")
{
if (arguments_size < 2)
return nullptr;
ActionsDAG::NodeRawConstPtrs children;
children.resize(arguments_size);
for (size_t i = 0; i < arguments_size; ++i)
children[i] = &traverseDAG(*arguments[i], result_dag, context, node_to_result_node);
FunctionOverloadResolverPtr function;
if (function_name == "and" || function_name == "indexHint")
function = FunctionFactory::instance().get("__bitBoolMaskAnd", context);
else
function = FunctionFactory::instance().get("__bitBoolMaskOr", context);
const auto * last_argument = children.back();
children.pop_back();
const auto * before_last_argument = children.back();
children.pop_back();
while (true)
{
last_argument = &result_dag->addFunction(function, {before_last_argument, last_argument}, {});
if (children.empty())
break;
before_last_argument = children.back();
children.pop_back();
}
return last_argument;
}
return nullptr;
}
bool MergeTreeIndexConditionSet::checkDAGUseless(const ActionsDAG::Node & node, const ContextPtr & context, bool atomic) const
{
const auto * node_to_check = &node;
while (node_to_check->type == ActionsDAG::ActionType::ALIAS)
node_to_check = node_to_check->children[0];
RPNBuilderTreeContext tree_context(context);
RPNBuilderTreeNode tree_node(node_to_check, tree_context);
if (node.column && isColumnConst(*node.column))
{
Field literal;
node.column->get(0, literal);
return !atomic && literal.safeGet<bool>();
}
else if (node.type == ActionsDAG::ActionType::FUNCTION)
{
auto column_name = tree_node.getColumnName();
if (key_columns.contains(column_name))
return false;
auto function_name = node.function_builder->getName();
const auto & arguments = node.children;
if (function_name == "and" || function_name == "indexHint")
return std::all_of(arguments.begin(), arguments.end(), [&, atomic](const auto & arg) { return checkDAGUseless(*arg, context, atomic); });
else if (function_name == "or")
return std::any_of(arguments.begin(), arguments.end(), [&, atomic](const auto & arg) { return checkDAGUseless(*arg, context, atomic); });
else if (function_name == "not")
return checkDAGUseless(*arguments.at(0), context, atomic);
else
return std::any_of(arguments.begin(), arguments.end(),
[&](const auto & arg) { return checkDAGUseless(*arg, context, true /*atomic*/); });
}
auto column_name = tree_node.getColumnName();
return !key_columns.contains(column_name);
}
void MergeTreeIndexConditionSet::traverseAST(ASTPtr & node) const
{
if (operatorFromAST(node))
@ -465,7 +677,7 @@ bool MergeTreeIndexConditionSet::checkASTUseless(const ASTPtr & node, bool atomi
else if (const auto * literal = node->as<ASTLiteral>())
return !atomic && literal->value.safeGet<bool>();
else if (const auto * identifier = node->as<ASTIdentifier>())
return key_columns.find(identifier->getColumnName()) == std::end(key_columns);
return !key_columns.contains(identifier->getColumnName());
else
return true;
}

View File

@ -84,9 +84,9 @@ class MergeTreeIndexConditionSet final : public IMergeTreeIndexCondition
public:
MergeTreeIndexConditionSet(
const String & index_name_,
const Block & index_sample_block_,
const Block & index_sample_block,
size_t max_rows_,
const SelectQueryInfo & query,
const SelectQueryInfo & query_info,
ContextPtr context);
bool alwaysUnknownOrTrue() const override;
@ -95,20 +95,39 @@ public:
~MergeTreeIndexConditionSet() override = default;
private:
const ActionsDAG::Node & traverseDAG(const ActionsDAG::Node & node,
ActionsDAGPtr & result_dag,
const ContextPtr & context,
std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *> & node_to_result_node) const;
const ActionsDAG::Node * atomFromDAG(const ActionsDAG::Node & node,
ActionsDAGPtr & result_dag,
const ContextPtr & context) const;
const ActionsDAG::Node * operatorFromDAG(const ActionsDAG::Node & node,
ActionsDAGPtr & result_dag,
const ContextPtr & context,
std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *> & node_to_result_node) const;
bool checkDAGUseless(const ActionsDAG::Node & node, const ContextPtr & context, bool atomic = false) const;
void traverseAST(ASTPtr & node) const;
bool atomFromAST(ASTPtr & node) const;
static bool operatorFromAST(ASTPtr & node);
bool checkASTUseless(const ASTPtr & node, bool atomic = false) const;
String index_name;
size_t max_rows;
Block index_sample_block;
bool useless;
std::set<String> key_columns;
ASTPtr expression_ast;
bool isUseless() const
{
return actions == nullptr;
}
std::unordered_set<String> key_columns;
ExpressionActionsPtr actions;
};

View File

@ -138,7 +138,8 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
std::unique_lock<std::mutex> & parts_lock)
std::unique_lock<std::mutex> & parts_lock,
bool readonly)
{
std::unique_lock lock(write_mutex);
@ -207,7 +208,10 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
/// If file is broken, do not write new parts to it.
/// But if it contains any part rotate and save them.
if (max_block_number == -1)
disk->removeFile(path);
{
if (!readonly)
disk->removeFile(path);
}
else if (name == DEFAULT_WAL_FILE_NAME)
rotate(lock);
@ -256,7 +260,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
[&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; });
/// All parts in WAL had been already committed into the disk -> clear the WAL
if (result.empty())
if (!readonly && result.empty())
{
LOG_DEBUG(log, "WAL file '{}' had been completely processed. Removing.", path);
disk->removeFile(path);

View File

@ -65,7 +65,8 @@ public:
std::vector<MergeTreeMutableDataPartPtr> restore(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
std::unique_lock<std::mutex> & parts_lock);
std::unique_lock<std::mutex> & parts_lock,
bool readonly);
using MinMaxBlockNumber = std::pair<Int64, Int64>;
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);

View File

@ -1514,8 +1514,14 @@ bool MutateTask::prepare()
ctx->num_mutations = std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::PartMutation);
auto context_for_reading = Context::createCopy(ctx->context);
/// We must read with one thread because it guarantees that output stream will be sorted.
/// Disable all settings that can enable reading with several streams.
context_for_reading->setSetting("max_streams_to_max_threads_ratio", 1);
context_for_reading->setSetting("max_threads", 1);
context_for_reading->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0));
/// Allow mutations to work when force_index_by_date or force_primary_key is on.
context_for_reading->setSetting("force_index_by_date", false);
context_for_reading->setSetting("force_primary_key", false);

View File

@ -1105,7 +1105,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
auto metadata_snapshot = getInMemoryMetadataPtr();
MergeMutateSelectedEntryPtr merge_entry, mutate_entry;
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto shared_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
MergeTreeTransactionHolder transaction_for_merge;
MergeTreeTransactionPtr txn;
@ -1122,17 +1122,17 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
if (merger_mutator.merges_blocker.isCancelled())
return false;
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock, lock, txn);
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, shared_lock, lock, txn);
if (!merge_entry && !current_mutations_by_version.empty())
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock, lock);
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, shared_lock, lock);
has_mutations = !current_mutations_by_version.empty();
}
if (merge_entry)
{
auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, false, Names{}, merge_entry, share_lock, common_assignee_trigger);
auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, false, Names{}, merge_entry, shared_lock, common_assignee_trigger);
task->setCurrentTransaction(std::move(transaction_for_merge), std::move(txn));
bool scheduled = assignee.scheduleMergeMutateTask(task);
/// The problem that we already booked a slot for TTL merge, but a merge list entry will be created only in a prepare method
@ -1143,7 +1143,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
}
if (mutate_entry)
{
auto task = std::make_shared<MutatePlainMergeTreeTask>(*this, metadata_snapshot, mutate_entry, share_lock, common_assignee_trigger);
auto task = std::make_shared<MutatePlainMergeTreeTask>(*this, metadata_snapshot, mutate_entry, shared_lock, common_assignee_trigger);
assignee.scheduleMergeMutateTask(task);
return true;
}
@ -1160,7 +1160,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
getSettings()->merge_tree_clear_old_temporary_directories_interval_seconds))
{
assignee.scheduleCommonTask(std::make_shared<ExecutableLambdaAdapter>(
[this, share_lock] ()
[this, shared_lock] ()
{
return clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds());
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
@ -1171,7 +1171,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
getSettings()->merge_tree_clear_old_parts_interval_seconds))
{
assignee.scheduleCommonTask(std::make_shared<ExecutableLambdaAdapter>(
[this, share_lock] ()
[this, shared_lock] ()
{
/// All use relative_data_path which changes during rename
/// so execute under share lock.

View File

@ -456,6 +456,7 @@ class SettingsRandomizer:
"merge_tree_coarse_index_granularity": lambda: random.randint(2, 32),
"optimize_distinct_in_order": lambda: random.randint(0, 1),
"optimize_sorting_by_input_stream_properties": lambda: random.randint(0, 1),
"enable_memory_bound_merging_of_aggregation_results": lambda: random.randint(0, 1),
}
@staticmethod

View File

@ -100,7 +100,7 @@
<max_size>22548578304</max_size>
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
<enable_bypass_cache_with_threashold>1</enable_bypass_cache_with_threashold>
<bypass_cache_threashold>100</bypass_cache_threashold>
<bypass_cache_threashold>100</bypass_cache_threashold>
</s3_cache_6>
<s3_cache_small>
<type>cache</type>
@ -109,6 +109,15 @@
<max_size>1000</max_size>
<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>
</s3_cache_small>
<s3_cache_small_segment_size>
<type>cache</type>
<disk>s3_disk_6</disk>
<path>s3_cache_small_segment_size/</path>
<max_size>22548578304</max_size>
<max_file_segment_size>10Ki</max_file_segment_size>
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
<cache_on_write_operations>1</cache_on_write_operations>
</s3_cache_small_segment_size>
<!-- local disks -->
<local_disk>
<type>local</type>
@ -234,6 +243,13 @@
</main>
</volumes>
</local_cache_3>
<s3_cache_small_segment_size>
<volumes>
<main>
<disk>s3_cache_small_segment_size</disk>
</main>
</volumes>
</s3_cache_small_segment_size>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,8 @@
<clickhouse>
<profiles>
<default>
<allow_asynchronous_read_from_io_pool_for_merge_tree>1</allow_asynchronous_read_from_io_pool_for_merge_tree>
<max_streams_for_merge_tree_reading>64</max_streams_for_merge_tree_reading>
</default>
</profiles>
</clickhouse>

View File

@ -19,6 +19,14 @@ node2 = cluster.add_instance(
stay_alive=True,
)
node3 = cluster.add_instance(
"node3",
main_configs=["configs/logs_config.xml", "configs/cluster.xml"],
user_configs=["configs/max_threads.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
@ -180,3 +188,20 @@ def test_trivial_alter_in_partition_replicated_merge_tree(started_cluster):
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
node2.query("DROP TABLE IF EXISTS {}".format(name))
def test_mutation_max_streams(started_cluster):
try:
node3.query("DROP TABLE IF EXISTS t_mutations")
node3.query("CREATE TABLE t_mutations (a UInt32) ENGINE = MergeTree ORDER BY a")
node3.query("INSERT INTO t_mutations SELECT number FROM numbers(10000000)")
node3.query(
"ALTER TABLE t_mutations DELETE WHERE a = 300000",
settings={"mutations_sync": "2"},
)
assert node3.query("SELECT count() FROM t_mutations") == "9999999\n"
finally:
node3.query("DROP TABLE IF EXISTS t_mutations")

View File

@ -33,6 +33,9 @@ instance = cluster.add_instance(
],
user_configs=["configs/default_passwd.xml"],
with_zookeeper=True,
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
# second_deadlock_stack -- just ordinary option we use everywhere, don't want to overwrite it
env_variables={"TSAN_OPTIONS": "report_atomic_races=0 second_deadlock_stack=1"},
)

View File

@ -34,3 +34,11 @@ no formatting pattern no formatting pattern
-1100
+0300
+0530
1234560
000340
2022-12-08 18:11:29.123400000
2022-12-08 18:11:29.1
2022-12-08 18:11:29.0
2022-12-08 18:11:29.0
2022-12-08 00:00:00.0
2022-12-08 00:00:00.0

View File

@ -54,3 +54,13 @@ SELECT formatDateTime(toDateTime('2020-01-01 01:00:00', 'UTC'), '%z');
SELECT formatDateTime(toDateTime('2020-01-01 01:00:00', 'US/Samoa'), '%z');
SELECT formatDateTime(toDateTime('2020-01-01 01:00:00', 'Europe/Moscow'), '%z');
SELECT formatDateTime(toDateTime('1970-01-01 00:00:00', 'Asia/Kolkata'), '%z');
select formatDateTime(toDateTime64('2010-01-04 12:34:56.123456', 7), '%f');
select formatDateTime(toDateTime64('2022-12-08 18:11:29.00034', 6, 'UTC'), '%f');
select formatDateTime(toDateTime64('2022-12-08 18:11:29.1234', 9, 'UTC'), '%F %T.%f');
select formatDateTime(toDateTime64('2022-12-08 18:11:29.1234', 1, 'UTC'), '%F %T.%f');
select formatDateTime(toDateTime64('2022-12-08 18:11:29.1234', 0, 'UTC'), '%F %T.%f');
select formatDateTime(toDateTime('2022-12-08 18:11:29', 'UTC'), '%F %T.%f');
select formatDateTime(toDate32('2022-12-08 18:11:29', 'UTC'), '%F %T.%f');
select formatDateTime(toDate('2022-12-08 18:11:29', 'UTC'), '%F %T.%f');

View File

@ -1,4 +1,5 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
@ -18,7 +19,7 @@ INSERT INTO mt VALUES ('test1', 'test2');
EOF
while true; do
$CLICKHOUSE_CLIENT --query="SELECT count(*) FROM dst" | grep -q "1" && break || sleep .5 ||:
$CLICKHOUSE_CLIENT --query="SELECT count(*) FROM dst" | grep -q "1" && break || sleep .1 ||:
done
$CLICKHOUSE_CLIENT --query="SELECT colA, colB FROM dst"

View File

@ -6,8 +6,9 @@ SET memory_profiler_step = 1000000;
SET memory_profiler_sample_probability = 1;
SET log_queries = 1;
SELECT ignore(groupArray(number), 'test memory profiler') FROM numbers(10000000);
SELECT ignore(groupArray(number), 'test memory profiler') FROM numbers(10000000) SETTINGS log_comment = '01092_memory_profiler';
SYSTEM FLUSH LOGS;
WITH addressToSymbol(arrayJoin(trace)) AS symbol SELECT count() > 0 FROM system.trace_log t WHERE event_date >= yesterday() AND trace_type = 'Memory' AND query_id = (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND event_date >= yesterday() AND query LIKE '%test memory profiler%' ORDER BY event_time DESC LIMIT 1);
WITH addressToSymbol(arrayJoin(trace)) AS symbol SELECT count() > 0 FROM system.trace_log t WHERE event_date >= yesterday() AND trace_type = 'MemoryPeak' AND query_id = (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND event_date >= yesterday() AND query LIKE '%test memory profiler%' ORDER BY event_time DESC LIMIT 1);
WITH addressToSymbol(arrayJoin(trace)) AS symbol SELECT count() > 0 FROM system.trace_log t WHERE event_date >= yesterday() AND trace_type = 'MemorySample' AND query_id = (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND event_date >= yesterday() AND query LIKE '%test memory profiler%' ORDER BY event_time DESC LIMIT 1);
WITH addressToSymbol(arrayJoin(trace)) AS symbol SELECT count() > 0 FROM system.trace_log t WHERE event_date >= yesterday() AND trace_type = 'Memory' AND query_id = (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND event_date >= yesterday() AND query LIKE '%test memory profiler%' AND has(used_table_functions, 'numbers') AND log_comment = '01092_memory_profiler' ORDER BY event_time DESC LIMIT 1);
WITH addressToSymbol(arrayJoin(trace)) AS symbol SELECT count() > 0 FROM system.trace_log t WHERE event_date >= yesterday() AND trace_type = 'MemoryPeak' AND query_id = (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND event_date >= yesterday() AND query LIKE '%test memory profiler%' AND has(used_table_functions, 'numbers') AND log_comment = '01092_memory_profiler' ORDER BY event_time DESC LIMIT 1);
WITH addressToSymbol(arrayJoin(trace)) AS symbol SELECT count() > 0 FROM system.trace_log t WHERE event_date >= yesterday() AND trace_type = 'MemorySample' AND query_id = (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND event_date >= yesterday() AND query LIKE '%test memory profiler%' AND has(used_table_functions, 'numbers') AND log_comment = '01092_memory_profiler' ORDER BY event_time DESC LIMIT 1);

View File

@ -1,4 +1,4 @@
-- Tags: no-replicated-database
SET max_memory_usage = '100M';
SET max_memory_usage = '75M';
SELECT cityHash64(rand() % 1000) as n, groupBitmapState(number) FROM numbers_mt(2000000000) GROUP BY n FORMAT Null; -- { serverError 241 }

View File

@ -60,3 +60,19 @@ test add[...]seconds()
2220-12-12 12:12:12.124
2220-12-12 12:12:12.121
2220-12-12 12:12:12.124456
test subtract[...]seconds()
- test nanoseconds
2022-12-31 23:59:59.999999999
2022-12-31 23:59:59.999999900
2023-01-01 00:00:00.000000001
2023-01-01 00:00:00.000000100
- test microseconds
2022-12-31 23:59:59.999999
2022-12-31 23:59:59.999900
2023-01-01 00:00:00.000001
2023-01-01 00:00:00.000100
- test milliseconds
2022-12-31 23:59:59.999
2022-12-31 23:59:59.900
2023-01-01 00:00:00.001
2023-01-01 00:00:00.100

View File

@ -92,3 +92,22 @@ select addMilliseconds(toDateTime64('1930-12-12 12:12:12.123456', 6), 1); -- Bel
select addMilliseconds(toDateTime64('2220-12-12 12:12:12.123', 3), 1); -- Above normal range, source scale matches result
select addMilliseconds(toDateTime64('2220-12-12 12:12:12.12', 2), 1); -- Above normal range, source scale less than result
select addMilliseconds(toDateTime64('2220-12-12 12:12:12.123456', 6), 1); -- Above normal range, source scale greater than result
select 'test subtract[...]seconds()';
select '- test nanoseconds';
select subtractNanoseconds(toDateTime64('2023-01-01 00:00:00.0000000', 7, 'UTC'), 1);
select subtractNanoseconds(toDateTime64('2023-01-01 00:00:00.0000000', 7, 'UTC'), 100);
select subtractNanoseconds(toDateTime64('2023-01-01 00:00:00.0000000', 7, 'UTC'), -1);
select subtractNanoseconds(toDateTime64('2023-01-01 00:00:00.0000000', 7, 'UTC'), -100);
select '- test microseconds';
select subtractMicroseconds(toDateTime64('2023-01-01 00:00:00.0000', 4, 'UTC'), 1);
select subtractMicroseconds(toDateTime64('2023-01-01 00:00:00.0000', 4, 'UTC'), 100);
select subtractMicroseconds(toDateTime64('2023-01-01 00:00:00.0000', 4, 'UTC'), -1);
select subtractMicroseconds(toDateTime64('2023-01-01 00:00:00.0000', 4, 'UTC'), -100);
select '- test milliseconds';
select subtractMilliseconds(toDateTime64('2023-01-01 00:00:00.0', 1, 'UTC'), 1);
select subtractMilliseconds(toDateTime64('2023-01-01 00:00:00.0', 1, 'UTC'), 100);
select subtractMilliseconds(toDateTime64('2023-01-01 00:00:00.0', 1, 'UTC'), -1);
select subtractMilliseconds(toDateTime64('2023-01-01 00:00:00.0', 1, 'UTC'), -100);

View File

@ -1,3 +1,6 @@
-- produces different pipeline if enabled
set enable_memory_bound_merging_of_aggregation_results = 0;
set max_threads = 16;
set prefer_localhost_replica = 1;
set optimize_aggregation_in_order = 0;

View File

@ -5,6 +5,7 @@ s3_cache_3
s3_cache_multi
s3_cache_4
s3_cache_5
s3_cache_small_segment_size
local_cache
s3_cache_6
s3_cache_small

View File

@ -0,0 +1,33 @@
Hello 1 (1,'1')
Hello 2 (2,'2')
World 3 (3,'3')
World 4 (4,'4')
World 5 (5,'5')
Hello 1 (1,'1')
Hello 2 (2,'2')
World 3 (3,'3')
World 4 (4,'4')
World 5 (5,'5')
Goodbye 0 (0,'')
Hello (1,'1')
Hello (2,'2')
World (3,'3')
World (4,'4')
World (5,'5')
Hello (1,'1')
Hello (2,'2')
World (3,'3')
World (4,'4')
World (5,'5')
Goodbye (0,'')
Hello (1,'1') (1,'1')
Hello (2,'2') (0,'')
World (3,'3') (3,'3')
World (4,'4') (4,'4')
World (5,'5') (0,'')
Hello (1,'1') (1,'1')
Hello (2,'2') (0,'')
World (3,'3') (3,'3')
World (4,'4') (4,'4')
World (5,'5') (0,'')
Goodbye (0,'') (0,'')

View File

@ -0,0 +1,25 @@
DROP TABLE IF EXISTS arrays_test;
CREATE TABLE arrays_test
(
s String,
arr1 Array(UInt8),
map1 Map(UInt8, String),
map2 Map(UInt8, String)
) ENGINE = Memory;
INSERT INTO arrays_test
VALUES ('Hello', [1,2], map(1, '1', 2, '2'), map(1, '1')), ('World', [3,4,5], map(3, '3', 4, '4', 5, '5'), map(3, '3', 4, '4')), ('Goodbye', [], map(), map());
select s, arr1, map1 from arrays_test array join arr1, map1 settings enable_unaligned_array_join = 1;
select s, arr1, map1 from arrays_test left array join arr1, map1 settings enable_unaligned_array_join = 1;
select s, map1 from arrays_test array join map1;
select s, map1 from arrays_test left array join map1;
select s, map1, map2 from arrays_test array join map1, map2 settings enable_unaligned_array_join = 1;
select s, map1, map2 from arrays_test left array join map1, map2 settings enable_unaligned_array_join = 1;

View File

@ -0,0 +1,4 @@
[(1,3),(2,4)]
[1,2] [3,4]
[1,2] [3,4]
[1,2]

View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
USER_FILES_PATH=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
touch $USER_FILES_PATH/data.capnp
SCHEMADIR=$(clickhouse-client --query "select * from file('data.capnp', 'CapnProto', 'val1 char') settings format_schema='nonexist:Message'" 2>&1 | grep Exception | grep -oP "file \K.*(?=/nonexist.capnp)")
CLIENT_SCHEMADIR=$CURDIR/format_schemas
SERVER_SCHEMADIR=test_02482
mkdir -p $SCHEMADIR/$SERVER_SCHEMADIR
cp -r $CLIENT_SCHEMADIR/02482_* $SCHEMADIR/$SERVER_SCHEMADIR/
$CLICKHOUSE_CLIENT -q "insert into function file(02482_data.capnp, auto, 'nested Nested(x Int64, y Int64)') select [1,2], [3,4] settings format_schema='$SERVER_SCHEMADIR/02482_list_of_structs.capnp:Nested', engine_file_truncate_on_insert=1"
$CLICKHOUSE_CLIENT -q "select * from file(02482_data.capnp) settings format_schema='$SERVER_SCHEMADIR/02482_list_of_structs.capnp:Nested'"
$CLICKHOUSE_CLIENT -q "select * from file(02482_data.capnp, auto, 'nested Nested(x Int64, y Int64)') settings format_schema='$SERVER_SCHEMADIR/02482_list_of_structs.capnp:Nested'"
$CLICKHOUSE_CLIENT -q "select * from file(02482_data.capnp, auto, '\`nested.x\` Array(Int64), \`nested.y\` Array(Int64)') settings format_schema='$SERVER_SCHEMADIR/02482_list_of_structs.capnp:Nested'"
$CLICKHOUSE_CLIENT -q "select * from file(02482_data.capnp, auto, '\`nested.x\` Array(Int64)') settings format_schema='$SERVER_SCHEMADIR/02482_list_of_structs.capnp:Nested'"
rm $USER_FILES_PATH/data.capnp
rm $USER_FILES_PATH/02482_data.capnp

View File

@ -0,0 +1,2 @@
4242424242 42420
4242.424242 42.42

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
USER_FILES_PATH=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
touch $USER_FILES_PATH/data.capnp
SCHEMADIR=$(clickhouse-client --query "select * from file('data.capnp', 'CapnProto', 'val1 char') settings format_schema='nonexist:Message'" 2>&1 | grep Exception | grep -oP "file \K.*(?=/nonexist.capnp)")
CLIENT_SCHEMADIR=$CURDIR/format_schemas
SERVER_SCHEMADIR=test_02483
mkdir -p $SCHEMADIR/$SERVER_SCHEMADIR
cp -r $CLIENT_SCHEMADIR/02483_* $SCHEMADIR/$SERVER_SCHEMADIR/
$CLICKHOUSE_CLIENT -q "insert into function file(02483_data.capnp, auto, 'decimal32 Decimal32(3), decimal64 Decimal64(6)') select 42.42, 4242.424242 settings format_schema='$SERVER_SCHEMADIR/02483_decimals.capnp:Message', engine_file_truncate_on_insert=1"
$CLICKHOUSE_CLIENT -q "select * from file(02483_data.capnp) settings format_schema='$SERVER_SCHEMADIR/02483_decimals.capnp:Message'"
$CLICKHOUSE_CLIENT -q "select * from file(02483_data.capnp, auto, 'decimal64 Decimal64(6), decimal32 Decimal32(3)') settings format_schema='$SERVER_SCHEMADIR/02483_decimals.capnp:Message'"
rm $USER_FILES_PATH/data.capnp
rm $USER_FILES_PATH/02483_data.capnp

View File

@ -0,0 +1,24 @@
1
0
10
1
1
0
10
1
1
0
10
1
1
0
10
1
1
1
1
1

View File

@ -0,0 +1,29 @@
select b'';
select b'0' == '\0';
select b'00110000'; -- 0
select b'0011000100110000'; -- 10
select b'111001101011010110001011111010001010111110010101' == '测试';
select B'';
select B'0' == '\0';
select B'00110000'; -- 0
select B'0011000100110000'; -- 10
select B'111001101011010110001011111010001010111110010101' == '测试';
select x'';
select x'0' == '\0';
select x'30'; -- 0
select x'3130'; -- 10
select x'e6b58be8af95' == '测试';
select X'';
select X'0' == '\0';
select X'30'; -- 0
select X'3130'; -- 10
select X'e6b58be8af95' == '测试';
select x'' == b'';
select x'0' == b'0';
select X'' == X'';
select X'0' == X'0';

View File

@ -0,0 +1,6 @@
Syntax error
Syntax error
Syntax error
Syntax error
Syntax error
Syntax error

View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query="SELECT b '0';" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT x 'a'" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT b'3';" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT x'k'" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT b'1" 2>&1 | grep -o 'Syntax error'
$CLICKHOUSE_CLIENT --query="SELECT x'a" 2>&1 | grep -o 'Syntax error'

View File

@ -0,0 +1,2 @@
1
1

View File

@ -0,0 +1,18 @@
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table
(
id UInt64,
value String,
INDEX value_idx (value) TYPE set(1000) GRANULARITY 1
) ENGINE=MergeTree ORDER BY id;
INSERT INTO test_table SELECT number, toString(number) FROM numbers(10);
SELECT count() FROM test_table WHERE value = '1' SETTINGS force_data_skipping_indices = 'value_idx';
SELECT count() FROM test_table AS t1 INNER JOIN (SELECT number AS id FROM numbers(10)) AS t2 ON t1.id = t2.id
WHERE t1.value = '1' SETTINGS force_data_skipping_indices = 'value_idx';
DROP TABLE test_table;

View File

@ -0,0 +1,6 @@
_id Nullable(FixedString(12))
name Nullable(String)
email Nullable(String)
movie_id Nullable(FixedString(12))
text Nullable(String)
date Nullable(DateTime64(6, \'UTC\'))

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "desc file('$CURDIR/data_bson/comments.bson')"
$CLICKHOUSE_LOCAL -q "select _id from file('$CURDIR/data_bson/comments.bson') format Null"

View File

@ -0,0 +1,3 @@
0
83
100000

View File

@ -0,0 +1,37 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest, no-s3-storage, no-random-settings
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function random {
cat /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z' | fold -w ${1:-8} | head -n 1
}
${CLICKHOUSE_CLIENT} --multiline --multiquery -q "
drop table if exists ttt;
create table ttt (id Int32, value String) engine=MergeTree() order by tuple() settings storage_policy='s3_cache_small_segment_size', min_bytes_for_wide_part=0;
insert into ttt select number, toString(number) from numbers(100000) settings throw_on_error_from_cache_on_write_operations = 1;
"
query_id=$(random 8)
${CLICKHOUSE_CLIENT} --query_id "$query_id" -q "
select * from ttt format Null settings enable_filesystem_cache_log=1;
"
${CLICKHOUSE_CLIENT} --query_id "$query_id" -q " system flush logs"
${CLICKHOUSE_CLIENT} -q "
select count() from system.filesystem_cache_log where query_id = '$query_id' AND read_type != 'READ_FROM_CACHE';
"
${CLICKHOUSE_CLIENT} -q "
select count() from system.filesystem_cache_log where query_id = '$query_id' AND read_type == 'READ_FROM_CACHE';
"
${CLICKHOUSE_CLIENT} --multiline --multiquery -q "
select count() from ttt;
drop table ttt no delay;
"

View File

@ -0,0 +1 @@
1

View File

@ -0,0 +1,8 @@
DROP TABLE IF EXISTS products;
SET allow_experimental_analyzer = 1;
CREATE TABLE products (`price` UInt32) ENGINE = Memory;
INSERT INTO products VALUES (1);
SELECT rank() OVER (ORDER BY price) AS rank FROM products ORDER BY rank;

View File

@ -0,0 +1,27 @@
Date
2
2
2
2
DateTime
3
3
3
3
3
Date String
2
2
2
DateTime String
3
3
3
Date LC
2
2
2
DateTime LC
3
3
3

View File

@ -0,0 +1,65 @@
CREATE TABLE datetime_date_table (
col_date Date,
col_datetime DateTime,
col_datetime64 DateTime64(3),
col_date_string String,
col_datetime_string String,
col_datetime64_string DateTime64,
col_date_lc LowCardinality(String),
col_datetime_lc LowCardinality(String),
col_datetime64_lc LowCardinality(String),
PRIMARY KEY col_date
) ENGINE = MergeTree;
INSERT INTO datetime_date_table VALUES ('2020-03-04', '2020-03-04 10:23:45', '2020-03-04 10:23:45.123', '2020-03-04', '2020-03-04 10:23:45', '2020-03-04 10:23:45.123', '2020-03-04', '2020-03-04 10:23:45', '2020-03-04 10:23:45.123');
INSERT INTO datetime_date_table VALUES ('2020-03-05', '2020-03-05 12:23:45', '2020-03-05 12:23:45.123', '2020-03-05', '2020-03-05 12:23:45', '2020-03-05 12:23:45.123', '2020-03-05', '2020-03-05 12:23:45', '2020-03-05 12:23:45.123');
INSERT INTO datetime_date_table VALUES ('2020-04-05', '2020-04-05 00:10:45', '2020-04-05 00:10:45.123', '2020-04-05', '2020-04-05 00:10:45', '2020-04-05 00:10:45.123', '2020-04-05', '2020-04-05 00:10:45', '2020-04-05 00:10:45.123');
SELECT 'Date';
SELECT count() FROM datetime_date_table WHERE col_date > '2020-03-04';
SELECT count() FROM datetime_date_table WHERE col_date > '2020-03-04'::Date;
SELECT count() FROM datetime_date_table WHERE col_date > '2020-03-04 10:20:45'; -- { serverError TYPE_MISMATCH }
SELECT count() FROM datetime_date_table WHERE col_date > '2020-03-04 10:20:45'::DateTime;
SELECT count() FROM datetime_date_table WHERE col_date > '2020-03-04 10:20:45.100'; -- { serverError TYPE_MISMATCH }
SELECT count() FROM datetime_date_table WHERE col_date > '2020-03-04 10:20:45.100'::DateTime64(3);
SELECT 'DateTime';
SELECT count() FROM datetime_date_table WHERE col_datetime > '2020-03-04';
SELECT count() FROM datetime_date_table WHERE col_datetime > '2020-03-04'::Date;
SELECT count() FROM datetime_date_table WHERE col_datetime > '2020-03-04 10:20:45';
SELECT count() FROM datetime_date_table WHERE col_datetime > '2020-03-04 10:20:45'::DateTime;
SELECT count() FROM datetime_date_table WHERE col_datetime > '2020-03-04 10:20:45.100'; -- { serverError TYPE_MISMATCH }
SELECT count() FROM datetime_date_table WHERE col_datetime > '2020-03-04 10:20:45.100'::DateTime64(3);
SELECT 'Date String';
SELECT count() FROM datetime_date_table WHERE col_date_string > '2020-03-04';
SELECT count() FROM datetime_date_table WHERE col_date_string > '2020-03-04'::Date; -- { serverError NO_COMMON_TYPE }
SELECT count() FROM datetime_date_table WHERE col_date_string > '2020-03-04 10:20:45';
SELECT count() FROM datetime_date_table WHERE col_date_string > '2020-03-04 10:20:45'::DateTime; -- { serverError NO_COMMON_TYPE }
SELECT count() FROM datetime_date_table WHERE col_date_string > '2020-03-04 10:20:45.100';
SELECT count() FROM datetime_date_table WHERE col_date_string > '2020-03-04 10:20:45.100'::DateTime64(3); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT 'DateTime String';
SELECT count() FROM datetime_date_table WHERE col_datetime_string > '2020-03-04';
SELECT count() FROM datetime_date_table WHERE col_datetime_string > '2020-03-04'::Date; -- { serverError NO_COMMON_TYPE }
SELECT count() FROM datetime_date_table WHERE col_datetime_string > '2020-03-04 10:20:45';
SELECT count() FROM datetime_date_table WHERE col_datetime_string > '2020-03-04 10:20:45'::DateTime; -- { serverError NO_COMMON_TYPE }
SELECT count() FROM datetime_date_table WHERE col_datetime_string > '2020-03-04 10:20:45.100';
SELECT count() FROM datetime_date_table WHERE col_datetime_string > '2020-03-04 10:20:45.100'::DateTime64(3); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT 'Date LC';
SELECT count() FROM datetime_date_table WHERE col_date_lc > '2020-03-04';
SELECT count() FROM datetime_date_table WHERE col_date_lc > '2020-03-04'::Date; -- { serverError NO_COMMON_TYPE }
SELECT count() FROM datetime_date_table WHERE col_date_lc > '2020-03-04 10:20:45';
SELECT count() FROM datetime_date_table WHERE col_date_lc > '2020-03-04 10:20:45'::DateTime; -- { serverError NO_COMMON_TYPE }
SELECT count() FROM datetime_date_table WHERE col_date_lc > '2020-03-04 10:20:45.100';
SELECT count() FROM datetime_date_table WHERE col_date_lc > '2020-03-04 10:20:45.100'::DateTime64(3); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT 'DateTime LC';
SELECT count() FROM datetime_date_table WHERE col_datetime_lc > '2020-03-04';
SELECT count() FROM datetime_date_table WHERE col_datetime_lc > '2020-03-04'::Date; -- { serverError NO_COMMON_TYPE }
SELECT count() FROM datetime_date_table WHERE col_datetime_lc > '2020-03-04 10:20:45';
SELECT count() FROM datetime_date_table WHERE col_datetime_lc > '2020-03-04 10:20:45'::DateTime; -- { serverError NO_COMMON_TYPE }
SELECT count() FROM datetime_date_table WHERE col_datetime_lc > '2020-03-04 10:20:45.100';
SELECT count() FROM datetime_date_table WHERE col_datetime_lc > '2020-03-04 10:20:45.100'::DateTime64(3); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }

Binary file not shown.

View File

@ -0,0 +1,11 @@
@0xb6ecde1cd54a101d;
struct Nested {
nested @0 :List(MyField);
}
struct MyField {
x @0 :Int64;
y @1 :Int64;
}

View File

@ -0,0 +1,7 @@
@0xb6acde1cd54a101d;
struct Message {
decimal64 @0 :Int64;
decimal32 @1 :Int32;
}

View File

@ -1,4 +1,4 @@
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge', 'grace_hash'] -%}
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%}
--- {{ join_algorithm }} ---
2014-03-17 1406958 265108
2014-03-19 1405797 261624

View File

@ -1,4 +1,4 @@
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge', 'grace_hash'] -%}
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%}
SET max_bytes_in_join = '{% if join_algorithm == 'grace_hash' %}20K{% else %}0{% endif %}';