Merge remote-tracking branch 'origin/master' into pr-3-way-joins

This commit is contained in:
Igor Nikonov 2024-12-10 22:03:23 +00:00
commit 020e7822a4
139 changed files with 2534 additions and 1320 deletions

View File

@ -101,3 +101,4 @@ wadllib==1.3.6
websocket-client==1.8.0
wheel==0.38.1
zipp==1.0.0
jinja2==3.1.3

View File

@ -36,6 +36,8 @@ Upper and lower bounds can be specified to limit Memory engine table size, effec
- Requires `max_rows_to_keep`
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
- Default value: `0`
- `compress` - Whether to compress data in memory.
- Default value: `false`
## Usage {#usage}

View File

@ -157,13 +157,14 @@ For your convenience, the old documentation is located [here](https://pastila.nl
## Refreshable Materialized View {#refreshable-materialized-view}
```sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
REFRESH EVERY|AFTER interval [OFFSET interval]
RANDOMIZE FOR interval
DEPENDS ON [db.]name [, [db.]name [, ...]]
SETTINGS name = value [, name = value [, ...]]
[RANDOMIZE FOR interval]
[DEPENDS ON [db.]name [, [db.]name [, ...]]]
[SETTINGS name = value [, name = value [, ...]]]
[APPEND]
[TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY]
[TO[db.]name] [(columns)] [ENGINE = engine]
[EMPTY]
AS SELECT ...
[COMMENT 'comment']
```
@ -281,7 +282,7 @@ This replaces *all* refresh parameters at once: schedule, dependencies, settings
The status of all refreshable materialized views is available in table [`system.view_refreshes`](../../../operations/system-tables/view_refreshes.md). In particular, it contains refresh progress (if running), last and next refresh time, exception message if a refresh failed.
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views).
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|WAIT|CANCEL VIEW`](../system.md#refreshable-materialized-views).
To wait for a refresh to complete, use [`SYSTEM WAIT VIEW`](../system.md#refreshable-materialized-views). In particular, useful for waiting for initial refresh after creating a view.

View File

@ -39,7 +39,7 @@ The GCS Table Function integrates with Google Cloud Storage by using the GCS XML
- `hmac_key` and `hmac_secret` — Keys that specify credentials to use with given endpoint. Optional.
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
- `compression_method` — Parameter is optional. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. By default, it will autodetect compression method by file extension.
- `compression_method` — Parameter is optional. Supported values: `none`, `gzip` or `gz`, `brotli` or `br`, `xz` or `LZMA`, `zstd` or `zst`. By default, it will autodetect compression method by file extension.
Arguments can also be passed using [named collections](/docs/en/operations/named-collections.md). In this case `url`, `format`, `structure`, `compression_method` work in the same way, and some extra parameters are supported:

View File

@ -43,7 +43,7 @@ For GCS, substitute your HMAC key and HMAC secret where you see `access_key_id`
- `session_token` - Session token to use with the given keys. Optional when passing keys.
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
- `compression_method` — Parameter is optional. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. By default, it will autodetect compression method by file extension.
- `compression_method` — Parameter is optional. Supported values: `none`, `gzip` or `gz`, `brotli` or `br`, `xz` or `LZMA`, `zstd` or `zst`. By default, it will autodetect compression method by file extension.
- `headers` - Parameter is optional. Allows headers to be passed in the S3 request. Pass in the format `headers(key=value)` e.g. `headers('x-amz-request-payer' = 'requester')`.
Arguments can also be passed using [named collections](/docs/en/operations/named-collections.md). In this case `url`, `access_key_id`, `secret_access_key`, `format`, `structure`, `compression_method` work in the same way, and some extra parameters are supported:

View File

@ -24,7 +24,7 @@ s3Cluster(cluster_name, named_collection[, option=value [,..]])
- `session_token` - Session token to use with the given keys. Optional when passing keys.
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
- `compression_method` — Parameter is optional. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. By default, it will autodetect compression method by file extension.
- `compression_method` — Parameter is optional. Supported values: `none`, `gzip` or `gz`, `brotli` or `br`, `xz` or `LZMA`, `zstd` or `zst`. By default, it will autodetect compression method by file extension.
Arguments can also be passed using [named collections](/docs/en/operations/named-collections.md). In this case `url`, `access_key_id`, `secret_access_key`, `format`, `structure`, `compression_method` work in the same way, and some extra parameters are supported:

View File

@ -182,6 +182,7 @@ BackupCoordinationOnCluster::BackupCoordinationOnCluster(
, current_host(current_host_)
, current_host_index(findCurrentHostIndex(current_host, all_hosts))
, plain_backup(is_plain_backup_)
, process_list_element(process_list_element_)
, log(getLogger("BackupCoordinationOnCluster"))
, with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); })
, cleaner(/* is_restore = */ false, zookeeper_path, with_retries, log)
@ -273,7 +274,8 @@ ZooKeeperRetriesInfo BackupCoordinationOnCluster::getOnClusterInitializationKeep
{
return ZooKeeperRetriesInfo{keeper_settings.max_retries_while_initializing,
static_cast<UInt64>(keeper_settings.retry_initial_backoff_ms.count()),
static_cast<UInt64>(keeper_settings.retry_max_backoff_ms.count())};
static_cast<UInt64>(keeper_settings.retry_max_backoff_ms.count()),
process_list_element};
}
void BackupCoordinationOnCluster::serializeToMultipleZooKeeperNodes(const String & path, const String & value, const String & logging_name)

View File

@ -107,7 +107,8 @@ private:
const String current_host;
const size_t current_host_index;
const bool plain_backup;
LoggerPtr const log;
const QueryStatusPtr process_list_element;
const LoggerPtr log;
/// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`.
const WithRetries with_retries;

View File

@ -112,10 +112,11 @@ BackupEntriesCollector::BackupEntriesCollector(
context->getConfigRef().getUInt64("backups.max_sleep_before_next_attempt_to_collect_metadata", 5000))
, compare_collected_metadata(context->getConfigRef().getBool("backups.compare_collected_metadata", true))
, log(getLogger("BackupEntriesCollector"))
, global_zookeeper_retries_info(
, zookeeper_retries_info(
context->getSettingsRef()[Setting::backup_restore_keeper_max_retries],
context->getSettingsRef()[Setting::backup_restore_keeper_retry_initial_backoff_ms],
context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms])
context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms],
context->getProcessListElementSafe())
, threadpool(threadpool_)
{
}
@ -583,8 +584,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> BackupEntriesCollector::findTablesInD
try
{
/// Database or table could be replicated - so may use ZooKeeper. We need to retry.
auto zookeeper_retries_info = global_zookeeper_retries_info;
ZooKeeperRetriesControl retries_ctl("getTablesForBackup", log, zookeeper_retries_info, nullptr);
ZooKeeperRetriesControl retries_ctl("getTablesForBackup", log, zookeeper_retries_info);
retries_ctl.retryLoop([&](){ db_tables = database->getTablesForBackup(filter_by_table_name, context); });
}
catch (Exception & e)

View File

@ -48,7 +48,7 @@ public:
std::shared_ptr<IBackupCoordination> getBackupCoordination() const { return backup_coordination; }
const ReadSettings & getReadSettings() const { return read_settings; }
ContextPtr getContext() const { return context; }
const ZooKeeperRetriesInfo & getZooKeeperRetriesInfo() const { return global_zookeeper_retries_info; }
const ZooKeeperRetriesInfo & getZooKeeperRetriesInfo() const { return zookeeper_retries_info; }
/// Returns all access entities which can be put into a backup.
std::unordered_map<UUID, AccessEntityPtr> getAllAccessEntities();
@ -129,7 +129,7 @@ private:
LoggerPtr log;
/// Unfortunately we can use ZooKeeper for collecting information for backup
/// and we need to retry...
ZooKeeperRetriesInfo global_zookeeper_retries_info;
ZooKeeperRetriesInfo zookeeper_retries_info;
Strings all_hosts;
DDLRenamingMap renaming_map;

View File

@ -33,6 +33,7 @@ RestoreCoordinationOnCluster::RestoreCoordinationOnCluster(
, all_hosts_without_initiator(BackupCoordinationOnCluster::excludeInitiator(all_hosts))
, current_host(current_host_)
, current_host_index(BackupCoordinationOnCluster::findCurrentHostIndex(current_host, all_hosts))
, process_list_element(process_list_element_)
, log(getLogger("RestoreCoordinationOnCluster"))
, with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); })
, cleaner(/* is_restore = */ true, zookeeper_path, with_retries, log)
@ -122,7 +123,8 @@ ZooKeeperRetriesInfo RestoreCoordinationOnCluster::getOnClusterInitializationKee
{
return ZooKeeperRetriesInfo{keeper_settings.max_retries_while_initializing,
static_cast<UInt64>(keeper_settings.retry_initial_backoff_ms.count()),
static_cast<UInt64>(keeper_settings.retry_max_backoff_ms.count())};
static_cast<UInt64>(keeper_settings.retry_max_backoff_ms.count()),
process_list_element};
}
bool RestoreCoordinationOnCluster::acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name)

View File

@ -75,7 +75,8 @@ private:
const Strings all_hosts_without_initiator;
const String current_host;
const size_t current_host_index;
LoggerPtr const log;
const QueryStatusPtr process_list_element;
const LoggerPtr log;
/// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`.
const WithRetries with_retries;

View File

@ -20,6 +20,7 @@
#include <Databases/IDatabase.h>
#include <Databases/DDLDependencyVisitor.h>
#include <Storages/IStorage.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/quoteString.h>
#include <Common/escapeForFileName.h>
#include <base/insertAtEnd.h>
@ -39,6 +40,9 @@ namespace DB
{
namespace Setting
{
extern const SettingsUInt64 backup_restore_keeper_retry_initial_backoff_ms;
extern const SettingsUInt64 backup_restore_keeper_retry_max_backoff_ms;
extern const SettingsUInt64 backup_restore_keeper_max_retries;
extern const SettingsSeconds lock_acquire_timeout;
}
@ -103,6 +107,11 @@ RestorerFromBackup::RestorerFromBackup(
, after_task_callback(after_task_callback_)
, create_table_timeout(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000))
, log(getLogger("RestorerFromBackup"))
, zookeeper_retries_info(
context->getSettingsRef()[Setting::backup_restore_keeper_max_retries],
context->getSettingsRef()[Setting::backup_restore_keeper_retry_initial_backoff_ms],
context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms],
context->getProcessListElementSafe())
, tables_dependencies("RestorerFromBackup")
, thread_pool(thread_pool_)
{
@ -977,6 +986,11 @@ void RestorerFromBackup::createTable(const QualifiedTableName & table_name)
query_context->setSetting("database_replicated_allow_explicit_uuid", 3);
query_context->setSetting("database_replicated_allow_replicated_engine_arguments", 3);
/// Creating of replicated tables may need retries.
query_context->setSetting("keeper_max_retries", zookeeper_retries_info.max_retries);
query_context->setSetting("keeper_initial_backoff_ms", zookeeper_retries_info.initial_backoff_ms);
query_context->setSetting("keeper_max_backoff_ms", zookeeper_retries_info.max_backoff_ms);
/// Execute CREATE TABLE query (we call IDatabase::createTableRestoredFromBackup() to allow the database to do some
/// database-specific things).
database->createTableRestoredFromBackup(

View File

@ -1,6 +1,7 @@
#pragma once
#include <Backups/RestoreSettings.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Databases/DDLRenamingVisitor.h>
#include <Databases/TablesDependencyGraph.h>
#include <Parsers/ASTBackupQuery.h>
@ -85,6 +86,7 @@ private:
std::chrono::milliseconds create_table_timeout;
LoggerPtr log;
const ZooKeeperRetriesInfo zookeeper_retries_info;
Mode mode = Mode::RESTORE;
Strings all_hosts;
DDLRenamingMap renaming_map;
@ -172,7 +174,6 @@ private:
TablesDependencyGraph tables_dependencies TSA_GUARDED_BY(mutex);
std::vector<DataRestoreTask> data_restore_tasks TSA_GUARDED_BY(mutex);
std::unique_ptr<AccessRestorerFromBackup> access_restorer TSA_GUARDED_BY(mutex);
bool access_restored TSA_GUARDED_BY(mutex) = false;
std::vector<std::future<void>> futures TSA_GUARDED_BY(mutex);
std::atomic<bool> exception_caught = false;

View File

@ -20,9 +20,10 @@ WithRetries::RetriesControlHolder::RetriesControlHolder(const WithRetries * pare
: (kind == kErrorHandling) ? parent->settings.max_retries_while_handling_error
: parent->settings.max_retries,
parent->settings.retry_initial_backoff_ms.count(),
parent->settings.retry_max_backoff_ms.count())
parent->settings.retry_max_backoff_ms.count(),
(kind == kErrorHandling) ? nullptr : parent->process_list_element)
/// We don't use process_list_element while handling an error because the error handling can't be cancellable.
, retries_ctl(name, parent->log, info, (kind == kErrorHandling) ? nullptr : parent->process_list_element)
, retries_ctl(name, parent->log, info)
, faulty_zookeeper(parent->getFaultyZooKeeper())
{}

View File

@ -1024,10 +1024,10 @@ void ColumnArray::updatePermutationWithCollation(const Collator & collator, Perm
DefaultPartialSort());
}
ColumnPtr ColumnArray::compress() const
ColumnPtr ColumnArray::compress(bool force_compression) const
{
ColumnPtr data_compressed = data->compress();
ColumnPtr offsets_compressed = offsets->compress();
ColumnPtr data_compressed = data->compress(force_compression);
ColumnPtr offsets_compressed = offsets->compress(force_compression);
size_t byte_size = data_compressed->byteSize() + offsets_compressed->byteSize();

View File

@ -159,7 +159,7 @@ public:
/// For example, `getDataInRange(0, size())` is the same as `getDataPtr()->clone()`.
MutableColumnPtr getDataInRange(size_t start, size_t length) const;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
}
std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, size_t data_size, bool always_compress)
std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, size_t data_size, bool force_compression)
{
size_t max_dest_size = LZ4_COMPRESSBOUND(data_size);
@ -35,7 +35,8 @@ std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, si
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress column");
/// If compression is inefficient.
if (!always_compress && static_cast<size_t>(compressed_size) * 2 > data_size)
const size_t threshold = force_compression ? 1 : 2;
if (static_cast<size_t>(compressed_size) * threshold > data_size)
return {};
/// Shrink to fit.

View File

@ -70,9 +70,11 @@ public:
/// Helper methods for compression.
/// If data is not worth to be compressed and not 'always_compress' - returns nullptr.
/// If data is not worth to be compressed - returns nullptr.
/// By default it requires that compressed data is at least 50% smaller than original.
/// With `force_compression` set to true, it requires compressed data to be not larger than the source data.
/// Note: shared_ptr is to allow to be captured by std::function.
static std::shared_ptr<Memory<>> compressBuffer(const void * data, size_t data_size, bool always_compress);
static std::shared_ptr<Memory<>> compressBuffer(const void * data, size_t data_size, bool force_compression);
static void decompressBuffer(
const void * compressed_data, void * decompressed_data, size_t compressed_size, size_t decompressed_size);

View File

@ -478,7 +478,7 @@ ColumnPtr ColumnDecimal<T>::replicate(const IColumn::Offsets & offsets) const
}
template <is_decimal T>
ColumnPtr ColumnDecimal<T>::compress() const
ColumnPtr ColumnDecimal<T>::compress(bool force_compression) const
{
const size_t data_size = data.size();
const size_t source_size = data_size * sizeof(T);
@ -487,7 +487,7 @@ ColumnPtr ColumnDecimal<T>::compress() const
if (source_size < 4096) /// A wild guess.
return ColumnCompressed::wrap(this->getPtr());
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, false);
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, force_compression);
if (!compressed)
return ColumnCompressed::wrap(this->getPtr());

View File

@ -140,7 +140,7 @@ public:
return false;
}
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
void insertValue(const T value) { data.push_back(value); }
Container & getData() { return data; }

View File

@ -991,9 +991,9 @@ void ColumnDynamic::updatePermutation(IColumn::PermutationSortDirection directio
updatePermutationImpl(limit, res, equal_ranges, ComparatorDescendingStable(*this, nan_direction_hint), comparator_equal, DefaultSort(), DefaultPartialSort());
}
ColumnPtr ColumnDynamic::compress() const
ColumnPtr ColumnDynamic::compress(bool force_compression) const
{
ColumnPtr variant_compressed = variant_column_ptr->compress();
ColumnPtr variant_compressed = variant_column_ptr->compress(force_compression);
size_t byte_size = variant_compressed->byteSize();
return ColumnCompressed::create(size(), byte_size,
[my_variant_compressed = std::move(variant_compressed), my_variant_info = variant_info, my_max_dynamic_types = max_dynamic_types, my_global_max_dynamic_types = global_max_dynamic_types, my_statistics = statistics]() mutable

View File

@ -335,7 +335,7 @@ public:
return false;
}
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
double getRatioOfDefaultRows(double sample_ratio) const override
{

View File

@ -419,7 +419,7 @@ void ColumnFixedString::getExtremes(Field & min, Field & max) const
get(max_idx, max);
}
ColumnPtr ColumnFixedString::compress() const
ColumnPtr ColumnFixedString::compress(bool force_compression) const
{
size_t source_size = chars.size();
@ -427,7 +427,7 @@ ColumnPtr ColumnFixedString::compress() const
if (source_size < 4096) /// A wild guess.
return ColumnCompressed::wrap(this->getPtr());
auto compressed = ColumnCompressed::compressBuffer(chars.data(), source_size, false);
auto compressed = ColumnCompressed::compressBuffer(chars.data(), source_size, force_compression);
if (!compressed)
return ColumnCompressed::wrap(this->getPtr());

View File

@ -175,7 +175,7 @@ public:
ColumnPtr replicate(const Offsets & offsets) const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
void reserve(size_t size) override
{

View File

@ -352,9 +352,9 @@ bool ColumnMap::dynamicStructureEquals(const IColumn & rhs) const
return false;
}
ColumnPtr ColumnMap::compress() const
ColumnPtr ColumnMap::compress(bool force_compression) const
{
auto compressed = nested->compress();
auto compressed = nested->compress(force_compression);
const auto byte_size = compressed->byteSize();
/// The order of evaluation of function arguments is unspecified
/// and could cause interacting with object in moved-from state

View File

@ -120,7 +120,7 @@ public:
const ColumnTuple & getNestedData() const { return assert_cast<const ColumnTuple &>(getNestedColumn().getData()); }
ColumnTuple & getNestedData() { return assert_cast<ColumnTuple &>(getNestedColumn().getData()); }
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
bool hasDynamicStructure() const override { return nested->hasDynamicStructure(); }
bool dynamicStructureEquals(const IColumn & rhs) const override;

View File

@ -773,10 +773,10 @@ void ColumnNullable::protect()
getNullMapColumn().protect();
}
ColumnPtr ColumnNullable::compress() const
ColumnPtr ColumnNullable::compress(bool force_compression) const
{
ColumnPtr nested_compressed = nested_column->compress();
ColumnPtr null_map_compressed = null_map->compress();
ColumnPtr nested_compressed = nested_column->compress(force_compression);
ColumnPtr null_map_compressed = null_map->compress(force_compression);
size_t byte_size = nested_column->byteSize() + null_map->byteSize();

View File

@ -141,7 +141,7 @@ public:
// Special function for nullable minmax index
void getExtremesNullLast(Field & min, Field & max) const;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;

View File

@ -1225,14 +1225,14 @@ bool ColumnObject::structureEquals(const IColumn & rhs) const
return true;
}
ColumnPtr ColumnObject::compress() const
ColumnPtr ColumnObject::compress(bool force_compression) const
{
std::unordered_map<String, ColumnPtr> compressed_typed_paths;
compressed_typed_paths.reserve(typed_paths.size());
size_t byte_size = 0;
for (const auto & [path, column] : typed_paths)
{
auto compressed_column = column->compress();
auto compressed_column = column->compress(force_compression);
byte_size += compressed_column->byteSize();
compressed_typed_paths[path] = std::move(compressed_column);
}
@ -1241,12 +1241,12 @@ ColumnPtr ColumnObject::compress() const
compressed_dynamic_paths.reserve(dynamic_paths_ptrs.size());
for (const auto & [path, column] : dynamic_paths_ptrs)
{
auto compressed_column = column->compress();
auto compressed_column = column->compress(force_compression);
byte_size += compressed_column->byteSize();
compressed_dynamic_paths[path] = std::move(compressed_column);
}
auto compressed_shared_data = shared_data->compress();
auto compressed_shared_data = shared_data->compress(force_compression);
byte_size += compressed_shared_data->byteSize();
auto decompress =

View File

@ -171,7 +171,7 @@ public:
bool structureEquals(const IColumn & rhs) const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
void finalize() override;
bool isFinalized() const override;

View File

@ -774,10 +774,10 @@ UInt64 ColumnSparse::getNumberOfDefaultRows() const
return _size - offsets->size();
}
ColumnPtr ColumnSparse::compress() const
ColumnPtr ColumnSparse::compress(bool force_compression) const
{
auto values_compressed = values->compress();
auto offsets_compressed = offsets->compress();
auto values_compressed = values->compress(force_compression);
auto offsets_compressed = offsets->compress(force_compression);
size_t byte_size = values_compressed->byteSize() + offsets_compressed->byteSize();

View File

@ -147,7 +147,7 @@ public:
double getRatioOfDefaultRows(double sample_ratio) const override;
UInt64 getNumberOfDefaultRows() const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;

View File

@ -628,33 +628,46 @@ void ColumnString::getExtremes(Field & min, Field & max) const
get(max_idx, max);
}
ColumnPtr ColumnString::compress() const
ColumnPtr ColumnString::compress(bool force_compression) const
{
const size_t source_chars_size = chars.size();
const size_t source_offsets_elements = offsets.size();
const size_t source_offsets_size = source_offsets_elements * sizeof(Offset);
/// Don't compress small blocks.
if (source_chars_size < 4096) /// A wild guess.
if (source_chars_size < min_size_to_compress)
{
return ColumnCompressed::wrap(this->getPtr());
}
auto chars_compressed = ColumnCompressed::compressBuffer(chars.data(), source_chars_size, false);
auto chars_compressed = ColumnCompressed::compressBuffer(chars.data(), source_chars_size, force_compression);
/// Return original column if not compressible.
if (!chars_compressed)
{
return ColumnCompressed::wrap(this->getPtr());
}
auto offsets_compressed = ColumnCompressed::compressBuffer(offsets.data(), source_offsets_size, true);
auto offsets_compressed = ColumnCompressed::compressBuffer(offsets.data(), source_offsets_size, force_compression);
const bool offsets_were_compressed = !!offsets_compressed;
/// Offsets are not compressible. Use the source data.
if (!offsets_compressed)
{
offsets_compressed = std::make_shared<Memory<>>(source_offsets_size);
memcpy(offsets_compressed->data(), offsets.data(), source_offsets_size);
}
const size_t chars_compressed_size = chars_compressed->size();
const size_t offsets_compressed_size = offsets_compressed->size();
return ColumnCompressed::create(source_offsets_elements, chars_compressed_size + offsets_compressed_size,
[
my_chars_compressed = std::move(chars_compressed),
my_offsets_compressed = std::move(offsets_compressed),
source_chars_size,
source_offsets_elements
]
return ColumnCompressed::create(
source_offsets_elements,
chars_compressed_size + offsets_compressed_size,
[my_chars_compressed = std::move(chars_compressed),
my_offsets_compressed = std::move(offsets_compressed),
source_chars_size,
source_offsets_elements,
offsets_were_compressed]
{
auto res = ColumnString::create();
@ -664,8 +677,18 @@ ColumnPtr ColumnString::compress() const
ColumnCompressed::decompressBuffer(
my_chars_compressed->data(), res->getChars().data(), my_chars_compressed->size(), source_chars_size);
ColumnCompressed::decompressBuffer(
my_offsets_compressed->data(), res->getOffsets().data(), my_offsets_compressed->size(), source_offsets_elements * sizeof(Offset));
if (offsets_were_compressed)
{
ColumnCompressed::decompressBuffer(
my_offsets_compressed->data(),
res->getOffsets().data(),
my_offsets_compressed->size(),
source_offsets_elements * sizeof(Offset));
}
else
{
memcpy(res->getOffsets().data(), my_offsets_compressed->data(), my_offsets_compressed->size());
}
return res;
});

View File

@ -29,6 +29,8 @@ public:
using Char = UInt8;
using Chars = PaddedPODArray<UInt8>;
static constexpr size_t min_size_to_compress = 4096;
private:
friend class COWHelper<IColumnHelper<ColumnString>, ColumnString>;
@ -272,7 +274,7 @@ public:
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
void reserve(size_t n) override;
size_t capacity() const override;

View File

@ -796,7 +796,7 @@ void ColumnTuple::takeDynamicStructureFromSourceColumns(const Columns & source_c
}
ColumnPtr ColumnTuple::compress() const
ColumnPtr ColumnTuple::compress(bool force_compression) const
{
if (columns.empty())
{
@ -812,7 +812,7 @@ ColumnPtr ColumnTuple::compress() const
compressed.reserve(columns.size());
for (const auto & column : columns)
{
auto compressed_column = column->compress();
auto compressed_column = column->compress(force_compression);
byte_size += compressed_column->byteSize();
compressed.emplace_back(std::move(compressed_column));
}

View File

@ -125,7 +125,7 @@ public:
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;
bool isCollationSupported() const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
void finalize() override;
bool isFinalized() const override;

View File

@ -1426,16 +1426,16 @@ bool ColumnVariant::dynamicStructureEquals(const IColumn & rhs) const
return true;
}
ColumnPtr ColumnVariant::compress() const
ColumnPtr ColumnVariant::compress(bool force_compression) const
{
ColumnPtr local_discriminators_compressed = local_discriminators->compress();
ColumnPtr offsets_compressed = offsets->compress();
ColumnPtr local_discriminators_compressed = local_discriminators->compress(force_compression);
ColumnPtr offsets_compressed = offsets->compress(force_compression);
size_t byte_size = local_discriminators_compressed->byteSize() + offsets_compressed->byteSize();
Columns compressed;
compressed.reserve(variants.size());
for (const auto & variant : variants)
{
auto compressed_variant = variant->compress();
auto compressed_variant = variant->compress(force_compression);
byte_size += compressed_variant->byteSize();
compressed.emplace_back(std::move(compressed_variant));
}

View File

@ -254,7 +254,7 @@ public:
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
double getRatioOfDefaultRows(double sample_ratio) const override;
UInt64 getNumberOfDefaultRows() const override;
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;

View File

@ -951,7 +951,7 @@ void ColumnVector<T>::getExtremes(Field & min, Field & max) const
}
template <typename T>
ColumnPtr ColumnVector<T>::compress() const
ColumnPtr ColumnVector<T>::compress(bool force_compression) const
{
const size_t data_size = data.size();
const size_t source_size = data_size * sizeof(T);
@ -960,7 +960,7 @@ ColumnPtr ColumnVector<T>::compress() const
if (source_size < 4096) /// A wild guess.
return ColumnCompressed::wrap(this->getPtr());
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, false);
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, force_compression);
if (!compressed)
return ColumnCompressed::wrap(this->getPtr());

View File

@ -287,7 +287,7 @@ public:
ColumnPtr createWithOffsets(const IColumn::Offsets & offsets, const ColumnConst & column_with_default_value, size_t total_rows, size_t shift) const override;
ColumnPtr compress() const override;
ColumnPtr compress(bool force_compression) const override;
/// Replace elements that match the filter with zeroes. If inverted replaces not matched elements.
void applyZeroMap(const IColumn::Filter & filt, bool inverted = false);

View File

@ -601,7 +601,8 @@ public:
/// Compress column in memory to some representation that allows to decompress it back.
/// Return itself if compression is not applicable for this column type.
[[nodiscard]] virtual Ptr compress() const
/// The flag `force_compression` indicates that compression should be performed even if it's not efficient (if only compression factor < 1).
[[nodiscard]] virtual Ptr compress([[maybe_unused]] bool force_compression) const
{
/// No compression by default.
return getPtr();

View File

@ -0,0 +1,88 @@
#include <gtest/gtest.h>
#include <Columns/ColumnString.h>
#include <Common/randomSeed.h>
#include <Common/thread_local_rng.h>
using namespace DB;
static pcg64 rng(randomSeed());
constexpr size_t bytes_per_string = sizeof(uint64_t) + 1;
/// Column should have enough bytes to be compressed
constexpr size_t column_size = ColumnString::min_size_to_compress / bytes_per_string + 42;
TEST(ColumnString, Incompressible)
{
auto col = ColumnString::create();
auto & chars = col->getChars();
auto & offsets = col->getOffsets();
chars.resize(column_size * bytes_per_string);
for (size_t i = 0; i < column_size; ++i)
{
const uint64_t value = rng();
memcpy(&chars[i * bytes_per_string], &value, sizeof(uint64_t));
chars[i * bytes_per_string + sizeof(uint64_t)] = '\0';
offsets.push_back((i + 1) * bytes_per_string);
}
auto compressed = col->compress(true);
auto decompressed = compressed->decompress();
// When column is incompressible, we return the original column wrapped in CompressedColumn
ASSERT_EQ(decompressed.get(), col.get());
ASSERT_EQ(compressed->size(), col->size());
ASSERT_EQ(compressed->allocatedBytes(), col->allocatedBytes());
ASSERT_EQ(decompressed->size(), col->size());
ASSERT_EQ(decompressed->allocatedBytes(), col->allocatedBytes());
}
TEST(ColumnString, CompressibleCharsAndIncompressibleOffsets)
{
auto col = ColumnString::create();
auto & chars = col->getChars();
auto & offsets = col->getOffsets();
chars.resize(column_size * bytes_per_string);
for (size_t i = 0; i < column_size; ++i)
{
static const uint64_t value = 42;
memcpy(&chars[i * bytes_per_string], &value, sizeof(uint64_t));
chars[i * bytes_per_string + sizeof(uint64_t)] = '\0';
}
offsets.push_back(chars.size());
auto compressed = col->compress(true);
auto decompressed = compressed->decompress();
// For actually compressed column only compressed `chars` and `offsets` arrays are stored.
// Upon decompression, a new column is created.
ASSERT_NE(decompressed.get(), col.get());
ASSERT_EQ(compressed->size(), col->size());
ASSERT_LE(compressed->allocatedBytes(), col->allocatedBytes());
ASSERT_EQ(decompressed->size(), col->size());
ASSERT_LE(decompressed->allocatedBytes(), col->allocatedBytes());
}
TEST(ColumnString, CompressibleCharsAndCompressibleOffsets)
{
auto col = ColumnString::create();
auto & chars = col->getChars();
auto & offsets = col->getOffsets();
chars.resize(column_size * bytes_per_string);
for (size_t i = 0; i < column_size; ++i)
{
static const uint64_t value = 42;
memcpy(&chars[i * bytes_per_string], &value, sizeof(uint64_t));
chars[i * bytes_per_string + sizeof(uint64_t)] = '\0';
offsets.push_back((i + 1) * bytes_per_string);
}
auto compressed = col->compress(true);
auto decompressed = compressed->decompress();
// For actually compressed column only compressed `chars` and `offsets` arrays are stored.
// Upon decompression, a new column is created.
ASSERT_NE(decompressed.get(), col.get());
ASSERT_EQ(compressed->size(), col->size());
ASSERT_LE(compressed->allocatedBytes(), col->allocatedBytes());
ASSERT_EQ(decompressed->size(), col->size());
ASSERT_LE(decompressed->allocatedBytes(), col->allocatedBytes());
}

View File

@ -208,13 +208,19 @@ typename SystemLogQueue<LogElement>::PopResult SystemLogQueue<LogElement>::pop()
if (is_shutdown)
return PopResult{.is_shutdown = true};
queue_front_index += queue.size();
const auto queue_size = queue.size();
queue_front_index += queue_size;
prev_ignored_logs = ignored_logs;
ignored_logs = 0;
result.last_log_index = queue_front_index;
result.logs.swap(queue);
if (!queue.empty())
result.logs.swap(queue);
result.create_table_force = requested_prepare_tables > prepared_tables;
/// Preallocate same amount of memory for the next batch to minimize reallocations.
if (queue_size > queue.capacity())
queue.reserve(std::max(settings.reserved_size_rows, queue_size));
}
if (prev_ignored_logs)

View File

@ -16,21 +16,25 @@ namespace ErrorCodes
struct ZooKeeperRetriesInfo
{
ZooKeeperRetriesInfo() = default;
ZooKeeperRetriesInfo(UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_)
ZooKeeperRetriesInfo(UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_, QueryStatusPtr query_status_)
: max_retries(max_retries_), initial_backoff_ms(std::min(initial_backoff_ms_, max_backoff_ms_)), max_backoff_ms(max_backoff_ms_)
, query_status(query_status_)
{
}
UInt64 max_retries = 0; /// "max_retries = 0" means only one attempt.
UInt64 initial_backoff_ms = 100;
UInt64 max_backoff_ms = 5000;
UInt64 initial_backoff_ms = 0;
UInt64 max_backoff_ms = 0;
QueryStatusPtr query_status; /// can be nullptr
};
class ZooKeeperRetriesControl
{
public:
ZooKeeperRetriesControl(std::string name_, LoggerPtr logger_, ZooKeeperRetriesInfo retries_info_, QueryStatusPtr elem)
: name(std::move(name_)), logger(logger_), retries_info(retries_info_), process_list_element(elem)
ZooKeeperRetriesControl(std::string name_, LoggerPtr logger_, ZooKeeperRetriesInfo retries_info_)
: name(std::move(name_)), logger(logger_), retries_info(retries_info_)
{
}
@ -39,7 +43,6 @@ public:
, logger(other.logger)
, retries_info(other.retries_info)
, total_failures(other.total_failures)
, process_list_element(other.process_list_element)
, current_backoff_ms(other.current_backoff_ms)
{
}
@ -222,8 +225,8 @@ private:
}
/// Check if the query was cancelled.
if (process_list_element)
process_list_element->checkTimeLimit();
if (retries_info.query_status)
retries_info.query_status->checkTimeLimit();
/// retries
logLastError("will retry due to error");
@ -231,8 +234,8 @@ private:
current_backoff_ms = std::min(current_backoff_ms * 2, retries_info.max_backoff_ms);
/// Check if the query was cancelled again after sleeping.
if (process_list_element)
process_list_element->checkTimeLimit();
if (retries_info.query_status)
retries_info.query_status->checkTimeLimit();
return true;
}
@ -288,7 +291,6 @@ private:
std::function<void()> action_after_last_failed_retry = []() {};
bool iteration_succeeded = true;
bool stop_retries = false;
QueryStatusPtr process_list_element;
UInt64 current_iteration = 0;
UInt64 current_backoff_ms = 0;

View File

@ -209,14 +209,12 @@ template <typename A, typename B> struct EqualsOp
using SymmetricOp = EqualsOp<B, A>;
static UInt8 apply(A a, B b) { return accurate::equalsOp(a, b); }
static constexpr bool compilable = true;
};
template <typename A, typename B> struct NotEqualsOp
{
using SymmetricOp = NotEqualsOp<B, A>;
static UInt8 apply(A a, B b) { return accurate::notEqualsOp(a, b); }
static constexpr bool compilable = true;
};
template <typename A, typename B> struct GreaterOp;
@ -225,14 +223,12 @@ template <typename A, typename B> struct LessOp
{
using SymmetricOp = GreaterOp<B, A>;
static UInt8 apply(A a, B b) { return accurate::lessOp(a, b); }
static constexpr bool compilable = true;
};
template <typename A, typename B> struct GreaterOp
{
using SymmetricOp = LessOp<B, A>;
static UInt8 apply(A a, B b) { return accurate::greaterOp(a, b); }
static constexpr bool compilable = true;
};
template <typename A, typename B> struct GreaterOrEqualsOp;
@ -241,14 +237,12 @@ template <typename A, typename B> struct LessOrEqualsOp
{
using SymmetricOp = GreaterOrEqualsOp<B, A>;
static UInt8 apply(A a, B b) { return accurate::lessOrEqualsOp(a, b); }
static constexpr bool compilable = true;
};
template <typename A, typename B> struct GreaterOrEqualsOp
{
using SymmetricOp = LessOrEqualsOp<B, A>;
static UInt8 apply(A a, B b) { return accurate::greaterOrEqualsOp(a, b); }
static constexpr bool compilable = true;
};
}

View File

@ -616,7 +616,7 @@ Block Block::compress() const
size_t num_columns = data.size();
Columns new_columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
new_columns[i] = data[i].column->compress();
new_columns[i] = data[i].column->compress(/*force_compression=*/false);
return cloneWithColumns(new_columns);
}

View File

@ -555,7 +555,6 @@ inline bool isNullableOrLowCardinalityNullable(const DataTypePtr & data_type)
template <typename DataType> constexpr bool IsDataTypeDecimal = false;
template <typename DataType> constexpr bool IsDataTypeNumber = false;
template <typename DataType> constexpr bool IsDataTypeNativeNumber = false;
template <typename DataType> constexpr bool IsDataTypeDateOrDateTime = false;
template <typename DataType> constexpr bool IsDataTypeDate = false;
template <typename DataType> constexpr bool IsDataTypeEnum = false;
@ -582,9 +581,6 @@ template <is_decimal T> constexpr bool IsDataTypeDecimal<DataTypeDecimal<T>> = t
template <> inline constexpr bool IsDataTypeDecimal<DataTypeDateTime64> = true;
template <typename T> constexpr bool IsDataTypeNumber<DataTypeNumber<T>> = true;
template <typename T>
requires std::is_arithmetic_v<T>
constexpr bool IsDataTypeNativeNumber<DataTypeNumber<T>> = true;
template <> inline constexpr bool IsDataTypeDate<DataTypeDate> = true;
template <> inline constexpr bool IsDataTypeDate<DataTypeDate32> = true;

View File

@ -205,39 +205,6 @@ struct ResultOfIf
ConstructedType, Error>>>;
};
/** Type casting for `modulo` function:
* UInt<x>, UInt<y> -> UInt<max(x,y)>
* Int<x>, Int<y> -> Int<max(x,y)>
* UInt<x>, Int<y> -> Int<max(x*2, y)>
* UInt64, Int<x> -> Error
* Float<x>, Float<y> -> Float64
* Float<x>, [U]Int<y> -> Float64
*/
template <typename A, typename B>
struct ResultOfModuloNativePromotion
{
static_assert(is_arithmetic_v<A> && is_arithmetic_v<B>);
static constexpr bool has_float = std::is_floating_point_v<A> || std::is_floating_point_v<B>;
static constexpr bool has_integer = is_integer<A> || is_integer<B>;
static constexpr bool has_signed = is_signed_v<A> || is_signed_v<B>;
static constexpr bool has_unsigned = !is_signed_v<A> || !is_signed_v<B>;
static constexpr size_t max_size_of_unsigned_integer = max(is_signed_v<A> ? 0 : sizeof(A), is_signed_v<B> ? 0 : sizeof(B));
static constexpr size_t max_size_of_signed_integer = max(is_signed_v<A> ? sizeof(A) : 0, is_signed_v<B> ? sizeof(B) : 0);
static constexpr size_t max_size_of_integer = max(is_integer<A> ? sizeof(A) : 0, is_integer<B> ? sizeof(B) : 0);
using ConstructedType = typename Construct<
has_signed,
false,
(has_signed ^ has_unsigned) ? max(max_size_of_unsigned_integer * 2, max_size_of_signed_integer) : max(sizeof(A), sizeof(B))>::Type;
using Type = std::conditional_t<
std::is_same_v<A, B>,
A,
std::conditional_t<has_float, Float64, std::conditional_t<sizeof(ConstructedType) <= 8, ConstructedType, Error>>>;
};
/** Before applying operator `%` and bitwise operations, operands are cast to whole numbers. */
template <typename A> struct ToInteger
{

View File

@ -408,7 +408,7 @@ void DatabaseOrdinary::restoreMetadataAfterConvertingToReplicated(StoragePtr tab
}
else
{
rmt->restoreMetadataInZooKeeper();
rmt->restoreMetadataInZooKeeper(/* zookeeper_retries_info = */ {});
LOG_INFO
(
log,

View File

@ -199,7 +199,7 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
}
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &, QueryStatusPtr)
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &)
{
auto zookeeper = getAndSetZooKeeper();
return enqueueQueryImpl(zookeeper, entry, database);

View File

@ -24,7 +24,7 @@ class DatabaseReplicatedDDLWorker : public DDLWorker
public:
DatabaseReplicatedDDLWorker(DatabaseReplicated * db, ContextPtr context_);
String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &, QueryStatusPtr) override;
String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &) override;
String tryEnqueueAndExecuteEntry(DDLLogEntry & entry, ContextPtr query_context);

View File

@ -159,13 +159,6 @@ public:
virtual bool isRemote() const = 0;
/// Remove object. Throws exception if object doesn't exists.
// virtual void removeObject(const StoredObject & object) = 0;
/// Remove multiple objects. Some object storages can do batch remove in a more
/// optimal way.
// virtual void removeObjects(const StoredObjects & objects) = 0;
/// Remove object on path if exists
virtual void removeObjectIfExists(const StoredObject & object) = 0;

View File

@ -6,14 +6,8 @@
#include <Common/NaNUtils.h>
#include <DataTypes/NumberTraits.h>
#include "DataTypes/Native.h"
#include "config.h"
#if USE_EMBEDDED_COMPILER
# include <Core/ValuesWithType.h>
# include <llvm/IR/IRBuilder.h>
#endif
namespace DB
{
@ -21,42 +15,8 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_DIVISION;
extern const int LOGICAL_ERROR;
}
#if USE_EMBEDDED_COMPILER
template <typename F>
static llvm::Value * compileWithNullableValues(llvm::IRBuilder<> & b, llvm::Value * left, llvm::Value * right, bool is_signed, F && compile_func)
{
auto * left_type = left->getType();
auto * right_type = right->getType();
if (!left_type->isStructTy() && !right_type->isStructTy())
{
// Both arguments are not nullable.
return compile_func(b, left, right, is_signed);
}
auto * denull_left = left_type->isStructTy() ? b.CreateExtractValue(left, {1}) : left;
auto * denull_right = right_type->isStructTy() ? b.CreateExtractValue(right, {1}) : right;
auto * denull_result = compile_func(b, denull_left, denull_right, is_signed);
auto * nullable_result_type = toNullableType(b, denull_result->getType());
llvm::Value * nullable_result = llvm::Constant::getNullValue(nullable_result_type);
nullable_result = b.CreateInsertValue(nullable_result, denull_result, {0});
auto * result_is_null = b.CreateExtractValue(nullable_result, {1});
if (left_type->isStructTy())
result_is_null = b.CreateOr(result_is_null, b.CreateExtractValue(left, {1}));
if (right_type->isStructTy())
result_is_null = b.CreateOr(result_is_null, b.CreateExtractValue(right, {1}));
return b.CreateInsertValue(nullable_result, result_is_null, {1});
}
#endif
template <typename A, typename B>
inline void throwIfDivisionLeadsToFPE(A a, B b)
{
@ -198,39 +158,14 @@ struct ModuloImpl
}
#if USE_EMBEDDED_COMPILER
static constexpr bool compilable = true; /// Ignore exceptions in LLVM IR
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * left, llvm::Value * right, bool is_signed)
{
return compileWithNullableValues(
b,
left,
right,
is_signed,
[](auto & b_, auto * left_, auto * right_, auto is_signed_) { return compileImpl(b_, left_, right_, is_signed_); });
}
static llvm::Value * compileImpl(llvm::IRBuilder<> & b, llvm::Value * left, llvm::Value * right, bool is_signed)
{
if (left->getType()->isFloatingPointTy())
return b.CreateFRem(left, right);
else if (left->getType()->isIntegerTy())
return is_signed ? b.CreateSRem(left, right) : b.CreateURem(left, right);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "ModuloImpl compilation expected native integer or floating point type");
}
#endif
static constexpr bool compilable = false; /// don't know how to throw from LLVM IR
#endif
};
template <typename A, typename B>
struct ModuloLegacyImpl : ModuloImpl<A, B>
{
using ResultType = typename NumberTraits::ResultOfModuloLegacy<A, B>::Type;
#if USE_EMBEDDED_COMPILER
static constexpr bool compilable = false; /// moduloLegacy is only used in partition key expression
#endif
};
template <typename A, typename B>
@ -259,36 +194,6 @@ struct PositiveModuloImpl : ModuloImpl<A, B>
}
return static_cast<ResultType>(res);
}
#if USE_EMBEDDED_COMPILER
static constexpr bool compilable = true; /// Ignore exceptions in LLVM IR
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * left, llvm::Value * right, bool is_signed)
{
return compileWithNullableValues(
b,
left,
right,
is_signed,
[](auto & b_, auto * left_, auto * right_, auto is_signed_) { return compileImpl(b_, left_, right_, is_signed_); });
}
static llvm::Value * compileImpl(llvm::IRBuilder<> & b, llvm::Value * left, llvm::Value * right, bool is_signed)
{
auto * result = ModuloImpl<A, B>::compileImpl(b, left, right, is_signed);
if (is_signed)
{
/// If result is negative, result += abs(right).
auto * zero = llvm::Constant::getNullValue(result->getType());
auto * is_negative = b.CreateICmpSLT(result, zero);
auto * abs_right = b.CreateSelect(b.CreateICmpSLT(right, zero), b.CreateNeg(right), right);
return b.CreateSelect(is_negative, b.CreateAdd(result, abs_right), result);
}
else
return result;
}
#endif
};
}

View File

@ -810,7 +810,6 @@ class FunctionBinaryArithmetic : public IFunction
static constexpr bool is_division = IsOperation<Op>::division;
static constexpr bool is_bit_hamming_distance = IsOperation<Op>::bit_hamming_distance;
static constexpr bool is_modulo = IsOperation<Op>::modulo;
static constexpr bool is_positive_modulo = IsOperation<Op>::positive_modulo;
static constexpr bool is_int_div = IsOperation<Op>::int_div;
static constexpr bool is_int_div_or_zero = IsOperation<Op>::int_div_or_zero;
@ -2388,105 +2387,59 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
if (!canBeNativeType(*arguments[0]) || !canBeNativeType(*arguments[1]) || !canBeNativeType(*result_type))
return false;
auto denull_left_type = removeNullable(arguments[0]);
auto denull_right_type = removeNullable(arguments[1]);
WhichDataType data_type_lhs(denull_left_type);
WhichDataType data_type_rhs(denull_right_type);
WhichDataType data_type_lhs(arguments[0]);
WhichDataType data_type_rhs(arguments[1]);
if ((data_type_lhs.isDateOrDate32() || data_type_lhs.isDateTime()) ||
(data_type_rhs.isDateOrDate32() || data_type_rhs.isDateTime()))
return false;
return castBothTypes(
denull_left_type.get(),
denull_right_type.get(),
[&](const auto & left, const auto & right)
return castBothTypes(arguments[0].get(), arguments[1].get(), [&](const auto & left, const auto & right)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
if constexpr (!std::is_same_v<DataTypeFixedString, LeftDataType> &&
!std::is_same_v<DataTypeFixedString, RightDataType> &&
!std::is_same_v<DataTypeString, LeftDataType> &&
!std::is_same_v<DataTypeString, RightDataType>)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
if constexpr (
!std::is_same_v<DataTypeFixedString, LeftDataType> && !std::is_same_v<DataTypeFixedString, RightDataType>
&& !std::is_same_v<DataTypeString, LeftDataType> && !std::is_same_v<DataTypeString, RightDataType>)
{
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
using OpSpec = Op<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
if constexpr (
!std::is_same_v<ResultDataType, InvalidType> && !IsDataTypeDecimal<ResultDataType>
&& !IsDataTypeDecimal<LeftDataType> && !IsDataTypeDecimal<RightDataType> && OpSpec::compilable)
{
if constexpr (is_modulo || is_positive_modulo)
{
using LeftType = typename LeftDataType::FieldType;
using RightType = typename RightDataType::FieldType;
using PromotedType = typename NumberTraits::ResultOfModuloNativePromotion<LeftType, RightType>::Type;
if constexpr (std::is_arithmetic_v<PromotedType>)
{
return true;
}
}
else
return true;
}
}
return false;
});
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
using OpSpec = Op<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
if constexpr (!std::is_same_v<ResultDataType, InvalidType> && !IsDataTypeDecimal<ResultDataType> && OpSpec::compilable)
return true;
}
return false;
});
}
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const ValuesWithType & arguments, const DataTypePtr & result_type) const override
{
assert(2 == arguments.size());
auto denull_left_type = removeNullable(arguments[0].type);
auto denull_right_type = removeNullable(arguments[1].type);
llvm::Value * result = nullptr;
castBothTypes(
denull_left_type.get(),
denull_right_type.get(),
[&](const auto & left, const auto & right)
castBothTypes(arguments[0].type.get(), arguments[1].type.get(), [&](const auto & left, const auto & right)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
if constexpr (!std::is_same_v<DataTypeFixedString, LeftDataType> &&
!std::is_same_v<DataTypeFixedString, RightDataType> &&
!std::is_same_v<DataTypeString, LeftDataType> &&
!std::is_same_v<DataTypeString, RightDataType>)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
if constexpr (
!std::is_same_v<DataTypeFixedString, LeftDataType> && !std::is_same_v<DataTypeFixedString, RightDataType>
&& !std::is_same_v<DataTypeString, LeftDataType> && !std::is_same_v<DataTypeString, RightDataType>)
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
using OpSpec = Op<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
if constexpr (!std::is_same_v<ResultDataType, InvalidType> && !IsDataTypeDecimal<ResultDataType> && OpSpec::compilable)
{
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
using OpSpec = Op<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
if constexpr (
!std::is_same_v<ResultDataType, InvalidType> && !IsDataTypeDecimal<ResultDataType>
&& !IsDataTypeDecimal<LeftDataType> && !IsDataTypeDecimal<RightDataType> && OpSpec::compilable)
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
if constexpr (is_modulo || is_positive_modulo)
{
using LeftType = typename LeftDataType::FieldType;
using RightType = typename RightDataType::FieldType;
using PromotedType = typename NumberTraits::ResultOfModuloNativePromotion<LeftType, RightType>::Type;
if constexpr (std::is_arithmetic_v<PromotedType>)
{
DataTypePtr promoted_type = std::make_shared<DataTypeNumber<PromotedType>>();
if (result_type->isNullable())
promoted_type = std::make_shared<DataTypeNullable>(promoted_type);
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto * lval = nativeCast(b, arguments[0], result_type);
auto * rval = nativeCast(b, arguments[1], result_type);
result = OpSpec::compile(b, lval, rval, std::is_signed_v<typename ResultDataType::FieldType>);
auto * lval = nativeCast(b, arguments[0], promoted_type);
auto * rval = nativeCast(b, arguments[1], promoted_type);
result = nativeCast(
b, promoted_type, OpSpec::compile(b, lval, rval, std::is_signed_v<PromotedType>), result_type);
return true;
}
}
else
{
auto * lval = nativeCast(b, arguments[0], result_type);
auto * rval = nativeCast(b, arguments[1], result_type);
result = OpSpec::compile(b, lval, rval, std::is_signed_v<typename ResultDataType::FieldType>);
return true;
}
}
return true;
}
return false;
});
}
return false;
});
return result;
}

View File

@ -489,7 +489,9 @@ public:
{
using DataType = std::decay_t<decltype(type)>;
if constexpr (std::is_same_v<DataTypeFixedString, DataType> || std::is_same_v<DataTypeString, DataType>)
{
return false;
}
else
{
using T0 = typename DataType::FieldType;
@ -511,7 +513,9 @@ public:
{
using DataType = std::decay_t<decltype(type)>;
if constexpr (std::is_same_v<DataTypeFixedString, DataType> || std::is_same_v<DataTypeString, DataType>)
{
return false;
}
else
{
using T0 = typename DataType::FieldType;
@ -519,16 +523,8 @@ public:
if constexpr (!std::is_same_v<T1, InvalidType> && !IsDataTypeDecimal<DataType> && Op<T0>::compilable)
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
if constexpr (std::is_same_v<Op<T0>, AbsImpl<T0>> || std::is_same_v<Op<T0>, BitCountImpl<T0>>)
{
/// We don't need to cast the argument to the result type if it's abs/bitcount function.
result = Op<T0>::compile(b, arguments[0].value, is_signed_v<T0>);
}
else
{
auto * v = nativeCast(b, arguments[0], result_type);
result = Op<T0>::compile(b, v, is_signed_v<T1>);
}
auto * v = nativeCast(b, arguments[0], result_type);
result = Op<T0>::compile(b, v, is_signed_v<T1>);
return true;
}

View File

@ -1,21 +1,17 @@
#pragma once
// Include this first, because `#define _asan_poison_address` from
// llvm/Support/Compiler.h conflicts with its forward declaration in
// sanitizer/asan_interface.h
#include <memory>
#include <limits>
#include <type_traits>
#include <Common/memcmpSmall.h>
#include <Common/assert_cast.h>
#include <Common/TargetSpecific.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnsNumber.h>
#include <Core/AccurateComparison.h>
#include <Core/DecimalComparison.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -28,23 +24,22 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/castColumn.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IsOperation.h>
#include <Core/AccurateComparison.h>
#include <Core/DecimalComparison.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/convertFieldToType.h>
#include <Common/TargetSpecific.h>
#include <Common/assert_cast.h>
#include <Common/memcmpSmall.h>
#include "DataTypes/NumberTraits.h"
#if USE_EMBEDDED_COMPILER
# include <DataTypes/Native.h>
# include <Functions/castTypeToEither.h>
# include <llvm/IR/IRBuilder.h>
#endif
#include <limits>
#include <type_traits>
namespace DB
{
@ -635,61 +630,6 @@ struct GenericComparisonImpl
}
};
#if USE_EMBEDDED_COMPILER
template <template <typename, typename> typename Op> struct CompileOp;
template <> struct CompileOp<EqualsOp>
{
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * x, llvm::Value * y, bool /*is_signed*/)
{
return x->getType()->isIntegerTy() ? b.CreateICmpEQ(x, y) : b.CreateFCmpOEQ(x, y); /// qNaNs always compare false
}
};
template <> struct CompileOp<NotEqualsOp>
{
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * x, llvm::Value * y, bool /*is_signed*/)
{
return x->getType()->isIntegerTy() ? b.CreateICmpNE(x, y) : b.CreateFCmpUNE(x, y);
}
};
template <> struct CompileOp<LessOp>
{
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * x, llvm::Value * y, bool is_signed)
{
return x->getType()->isIntegerTy() ? (is_signed ? b.CreateICmpSLT(x, y) : b.CreateICmpULT(x, y)) : b.CreateFCmpOLT(x, y);
}
};
template <> struct CompileOp<GreaterOp>
{
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * x, llvm::Value * y, bool is_signed)
{
return x->getType()->isIntegerTy() ? (is_signed ? b.CreateICmpSGT(x, y) : b.CreateICmpUGT(x, y)) : b.CreateFCmpOGT(x, y);
}
};
template <> struct CompileOp<LessOrEqualsOp>
{
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * x, llvm::Value * y, bool is_signed)
{
return x->getType()->isIntegerTy() ? (is_signed ? b.CreateICmpSLE(x, y) : b.CreateICmpULE(x, y)) : b.CreateFCmpOLE(x, y);
}
};
template <> struct CompileOp<GreaterOrEqualsOp>
{
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * x, llvm::Value * y, bool is_signed)
{
return x->getType()->isIntegerTy() ? (is_signed ? b.CreateICmpSGE(x, y) : b.CreateICmpUGE(x, y)) : b.CreateFCmpOGE(x, y);
}
};
#endif
struct NameEquals { static constexpr auto name = "equals"; };
struct NameNotEquals { static constexpr auto name = "notEquals"; };
struct NameLess { static constexpr auto name = "less"; };
@ -1416,108 +1356,6 @@ public:
return executeGeneric(col_with_type_and_name_left, col_with_type_and_name_right);
}
#if USE_EMBEDDED_COMPILER
template <typename F>
static bool castType(const IDataType * type, F && f)
{
return castTypeToEither<
DataTypeUInt8,
DataTypeUInt16,
DataTypeUInt32,
DataTypeUInt64,
DataTypeInt8,
DataTypeInt16,
DataTypeInt32,
DataTypeInt64,
DataTypeFloat32,
DataTypeFloat64>(type, std::forward<F>(f));
}
template <typename F>
static bool castBothTypes(const IDataType * left, const IDataType * right, F && f)
{
return castType(left, [&](const auto & left_)
{
return castType(right, [&](const auto & right_)
{
return f(left_, right_);
});
});
}
bool isCompilableImpl(const DataTypes & arguments, const DataTypePtr & result_type) const override
{
if (2 != arguments.size())
return false;
if (!canBeNativeType(*arguments[0]) || !canBeNativeType(*arguments[1]) || !canBeNativeType(*result_type))
return false;
WhichDataType data_type_lhs(arguments[0]);
WhichDataType data_type_rhs(arguments[1]);
/// TODO support date/date32
if ((data_type_lhs.isDateOrDate32() || data_type_lhs.isDateTime()) ||
(data_type_rhs.isDateOrDate32() || data_type_rhs.isDateTime()))
return false;
return castBothTypes(arguments[0].get(), arguments[1].get(), [&](const auto & left, const auto & right)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
using LeftType = typename LeftDataType::FieldType;
using RightType = typename RightDataType::FieldType;
using PromotedType = typename NumberTraits::ResultOfIf<LeftType, RightType>::Type;
if constexpr (
!std::is_same_v<DataTypeFixedString, LeftDataType> && !std::is_same_v<DataTypeFixedString, RightDataType>
&& !std::is_same_v<DataTypeString, LeftDataType> && !std::is_same_v<DataTypeString, RightDataType>
&& (std::is_integral_v<PromotedType> || std::is_floating_point_v<PromotedType>))
{
using OpSpec = Op<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
return OpSpec::compilable;
}
return false;
});
return false;
}
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const ValuesWithType & arguments, const DataTypePtr &) const override
{
assert(2 == arguments.size());
llvm::Value * result = nullptr;
castBothTypes(arguments[0].type.get(), arguments[1].type.get(), [&](const auto & left, const auto & right)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
using LeftType = typename LeftDataType::FieldType;
using RightType = typename RightDataType::FieldType;
using PromotedType = typename NumberTraits::ResultOfIf<LeftType, RightType>::Type;
if constexpr (
!std::is_same_v<DataTypeFixedString, LeftDataType> && !std::is_same_v<DataTypeFixedString, RightDataType>
&& !std::is_same_v<DataTypeString, LeftDataType> && !std::is_same_v<DataTypeString, RightDataType>
&& (std::is_integral_v<PromotedType> || std::is_floating_point_v<PromotedType>))
{
using OpSpec = Op<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
if constexpr (OpSpec::compilable)
{
auto promoted_type = std::make_shared<DataTypeNumber<PromotedType>>();
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto * left_value = nativeCast(b, arguments[0], promoted_type);
auto * right_value = nativeCast(b, arguments[1], promoted_type);
result = b.CreateSelect(
CompileOp<Op>::compile(b, left_value, right_value, std::is_signed_v<PromotedType>), b.getInt8(1), b.getInt8(0));
return true;
}
}
return false;
});
return result;
}
#endif
};
}

View File

@ -73,10 +73,6 @@
#include <Common/assert_cast.h>
#include <Common/quoteString.h>
#if USE_EMBEDDED_COMPILER
# include "DataTypes/Native.h"
#endif
namespace DB
{
@ -121,43 +117,6 @@ namespace ErrorCodes
namespace detail
{
#if USE_EMBEDDED_COMPILER
bool castType(const IDataType * type, auto && f)
{
using Types = TypeList<
DataTypeUInt8,
DataTypeUInt16,
DataTypeUInt32,
DataTypeUInt64,
DataTypeUInt128,
DataTypeUInt256,
DataTypeInt8,
DataTypeInt16,
DataTypeInt32,
DataTypeInt64,
DataTypeInt128,
DataTypeInt256,
DataTypeFloat32,
DataTypeFloat64,
DataTypeDecimal32,
DataTypeDecimal64,
DataTypeDecimal128,
DataTypeDecimal256,
DataTypeDate,
DataTypeDateTime,
DataTypeFixedString,
DataTypeString,
DataTypeInterval>;
return castTypeToEither(Types{}, type, std::forward<decltype(f)>(f));
}
template <typename F>
bool castBothTypes(const IDataType * left, const IDataType * right, F && f)
{
return castType(left, [&](const auto & left_) { return castType(right, [&](const auto & right_) { return f(left_, right_); }); });
}
#endif
/** Type conversion functions.
* toType - conversion in "natural way";
*/
@ -2249,52 +2208,6 @@ public:
}
}
#if USE_EMBEDDED_COMPILER
bool isCompilableImpl(const DataTypes & types, const DataTypePtr & result_type) const override
{
if (types.size() != 1)
return false;
if (!canBeNativeType(types[0]) || !canBeNativeType(result_type))
return false;
return castBothTypes(types[0].get(), result_type.get(), [](const auto & left, const auto & right)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
if constexpr (IsDataTypeNativeNumber<LeftDataType> && IsDataTypeNativeNumber<RightDataType>)
return true;
return false;
});
}
llvm::Value *
compileImpl(llvm::IRBuilderBase & builder, const ValuesWithType & arguments, const DataTypePtr & result_type) const override
{
llvm::Value * result = nullptr;
castBothTypes(
arguments[0].type.get(),
result_type.get(),
[&](const auto & left, const auto & right)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
if constexpr (IsDataTypeNativeNumber<LeftDataType> && IsDataTypeNativeNumber<RightDataType>)
{
result = nativeCast(builder, arguments[0], result_type);
return true;
}
return false;
});
return result;
}
#endif
bool hasInformationAboutMonotonicity() const override
{
return Monotonic::has();
@ -3419,60 +3332,6 @@ public:
return monotonicity_for_range(type, left, right);
}
#if USE_EMBEDDED_COMPILER
bool isCompilable() const override
{
if (getName() != "CAST" || argument_types.size() != 2)
return false;
const auto & from_type = argument_types[0];
const auto & to_type = return_type;
auto denull_from_type = removeNullable(from_type);
auto denull_to_type = removeNullable(to_type);
if (!canBeNativeType(denull_from_type) || !canBeNativeType(denull_to_type))
return false;
return castBothTypes(denull_from_type.get(), denull_to_type.get(), [](const auto & left, const auto & right)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
if constexpr (IsDataTypeNativeNumber<LeftDataType> && IsDataTypeNativeNumber<RightDataType>)
return true;
return false;
});
}
llvm::Value * compile(llvm::IRBuilderBase & builder, const ValuesWithType & arguments) const override
{
llvm::Value * result = nullptr;
const auto & from_type = arguments[0].type;
const auto & to_type = return_type;
auto denull_from_type = removeNullable(from_type);
auto denull_to_type = removeNullable(to_type);
castBothTypes(
denull_from_type.get(),
denull_to_type.get(),
[&](const auto & left, const auto & right)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
if constexpr (IsDataTypeNativeNumber<LeftDataType> && IsDataTypeNativeNumber<RightDataType>)
{
result = nativeCast(builder, arguments[0], return_type);
return true;
}
return false;
});
return result;
}
#endif
private:
const char * cast_name;
MonotonicityForRange monotonicity_for_range;

View File

@ -39,7 +39,6 @@ namespace ErrorCodes
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
extern const int ILLEGAL_COLUMN;
extern const int NOT_IMPLEMENTED;
}
namespace
@ -512,44 +511,6 @@ ColumnPtr basicExecuteImpl(ColumnRawPtrs arguments, size_t input_rows_count)
}
namespace FunctionsLogicalDetail
{
#if USE_EMBEDDED_COMPILER
/// Cast LLVM value with type to ternary
llvm::Value * nativeTernaryCast(llvm::IRBuilderBase & b, const DataTypePtr & from_type, llvm::Value * value)
{
auto * result_type = llvm::Type::getInt8Ty(b.getContext());
if (from_type->isNullable())
{
auto * ternary_null = llvm::ConstantInt::get(result_type, 1);
auto * inner = nativeTernaryCast(b, removeNullable(from_type), b.CreateExtractValue(value, {0}));
auto * is_null = b.CreateExtractValue(value, {1});
return b.CreateSelect(is_null, ternary_null, inner);
}
auto * zero = llvm::Constant::getNullValue(value->getType());
auto * ternary_true = llvm::ConstantInt::get(result_type, 2);
auto * ternary_false = llvm::ConstantInt::get(result_type, 0);
if (value->getType()->isIntegerTy())
return b.CreateSelect(b.CreateICmpNE(value, zero), ternary_true, ternary_false);
else if (value->getType()->isFloatingPointTy())
return b.CreateSelect(b.CreateFCmpONE(value, zero), ternary_true, ternary_false);
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot cast non-number {} to ternary", from_type->getName());
}
/// Cast LLVM value with type to ternary
llvm::Value * nativeTernaryCast(llvm::IRBuilderBase & b, const ValueWithType & value_with_type)
{
return nativeTernaryCast(b, value_with_type.type, value_with_type.value);
}
#endif
}
template <typename Impl, typename Name>
DataTypePtr FunctionAnyArityLogical<Impl, Name>::getReturnTypeImpl(const DataTypes & arguments) const
{

View File

@ -6,6 +6,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunction.h>
#include <IO/WriteHelpers.h>
#include <type_traits>
#include <Interpreters/Context_fwd.h>
@ -78,15 +79,6 @@ namespace Ternary
}
}
#if USE_EMBEDDED_COMPILER
/// Cast LLVM value with type to Ternary
llvm::Value * nativeTernaryCast(llvm::IRBuilderBase & b, const DataTypePtr & from_type, llvm::Value * value);
/// Cast LLVM value with type to Ternary
llvm::Value * nativeTernaryCast(llvm::IRBuilderBase & b, const ValueWithType & value_with_type);
#endif
struct AndImpl
{
@ -106,18 +98,6 @@ struct AndImpl
/// Will use three-valued logic for NULLs (see above) or default implementation (any operation with NULL returns NULL).
static constexpr bool specialImplementationForNulls() { return true; }
#if USE_EMBEDDED_COMPILER
static llvm::Value * apply(llvm::IRBuilder<> & builder, llvm::Value * a, llvm::Value * b)
{
return builder.CreateAnd(a, b);
}
static llvm::Value * ternaryApply(llvm::IRBuilder<> & builder, llvm::Value * a, llvm::Value * b)
{
return builder.CreateSelect(builder.CreateICmpUGT(a, b), b, a);
}
#endif
};
struct OrImpl
@ -130,19 +110,6 @@ struct OrImpl
static constexpr ResultType apply(UInt8 a, UInt8 b) { return a | b; }
static constexpr ResultType ternaryApply(UInt8 a, UInt8 b) { return std::max(a, b); }
static constexpr bool specialImplementationForNulls() { return true; }
#if USE_EMBEDDED_COMPILER
static llvm::Value * apply(llvm::IRBuilder<> & builder, llvm::Value * a, llvm::Value * b)
{
return builder.CreateOr(a, b);
}
static llvm::Value * ternaryApply(llvm::IRBuilder<> & builder, llvm::Value * a, llvm::Value * b)
{
return builder.CreateSelect(builder.CreateICmpUGT(a, b), a, b);
}
#endif
};
struct XorImpl
@ -161,12 +128,6 @@ struct XorImpl
{
return builder.CreateXor(a, b);
}
static llvm::Value * ternaryApply(llvm::IRBuilder<> & builder, llvm::Value * a, llvm::Value * b)
{
llvm::Value * xor_result = builder.CreateXor(a, b);
return builder.CreateSelect(xor_result, builder.getInt8(Ternary::True), builder.getInt8(Ternary::False));
}
#endif
};
@ -223,51 +184,47 @@ public:
ColumnPtr getConstantResultForNonConstArguments(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) const override;
#if USE_EMBEDDED_COMPILER
bool isCompilableImpl(const DataTypes & arguments, const DataTypePtr &) const override
{
for (const auto & arg : arguments)
{
if (!canBeNativeType(arg))
return false;
}
return true;
}
bool isCompilableImpl(const DataTypes &, const DataTypePtr &) const override { return useDefaultImplementationForNulls(); }
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const ValuesWithType & values, const DataTypePtr & result_type) const override
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const ValuesWithType & values, const DataTypePtr &) const override
{
assert(!values.empty());
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
if (useDefaultImplementationForNulls() || !result_type->isNullable())
if constexpr (!Impl::isSaturable())
{
llvm::Value * result = nativeBoolCast(b, values[0]);
auto * result = nativeBoolCast(b, values[0]);
for (size_t i = 1; i < values.size(); ++i)
{
llvm::Value * casted_value = nativeBoolCast(b, values[i]);
result = Impl::apply(b, result, casted_value);
}
result = Impl::apply(b, result, nativeBoolCast(b, values[i]));
return b.CreateSelect(result, b.getInt8(1), b.getInt8(0));
}
else
constexpr bool break_on_true = Impl::isSaturatedValue(true);
auto * next = b.GetInsertBlock();
auto * stop = llvm::BasicBlock::Create(next->getContext(), "", next->getParent());
b.SetInsertPoint(stop);
auto * phi = b.CreatePHI(b.getInt8Ty(), static_cast<unsigned>(values.size()));
for (size_t i = 0; i < values.size(); ++i)
{
/// First we need to cast all values to ternary logic
llvm::Value * ternary_result = nativeTernaryCast(b, values[0]);
for (size_t i = 1; i < values.size(); ++i)
b.SetInsertPoint(next);
auto * value = values[i].value;
auto * truth = nativeBoolCast(b, values[i]);
if (!values[i].type->equals(DataTypeUInt8{}))
value = b.CreateSelect(truth, b.getInt8(1), b.getInt8(0));
phi->addIncoming(value, b.GetInsertBlock());
if (i + 1 < values.size())
{
llvm::Value * casted_value = nativeTernaryCast(b, values[i]);
ternary_result = Impl::ternaryApply(b, ternary_result, casted_value);
next = llvm::BasicBlock::Create(next->getContext(), "", next->getParent());
b.CreateCondBr(truth, break_on_true ? stop : next, break_on_true ? next : stop);
}
/// Then transform ternary logic to struct which represents nullable result
llvm::Value * is_null = b.CreateICmpEQ(ternary_result, b.getInt8(Ternary::Null));
llvm::Value * is_true = b.CreateICmpEQ(ternary_result, b.getInt8(Ternary::True));
auto * nullable_result_type = toNativeType(b, result_type);
auto * nullable_result = llvm::Constant::getNullValue(nullable_result_type);
auto * nullable_result_with_value
= b.CreateInsertValue(nullable_result, b.CreateSelect(is_true, b.getInt8(1), b.getInt8(0)), {0});
return b.CreateInsertValue(nullable_result_with_value, is_null, {1});
}
b.CreateBr(stop);
b.SetInsertPoint(stop);
return phi;
}
#endif
};

View File

@ -6,11 +6,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename A>
struct AbsImpl
{
@ -32,65 +27,25 @@ struct AbsImpl
}
#if USE_EMBEDDED_COMPILER
static constexpr bool compilable = true;
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * arg, bool sign)
{
const auto & type = arg->getType();
if (type->isIntegerTy())
{
if (sign)
{
auto & context = b.getContext();
auto * signed_type = arg->getType();
auto * unsigned_type = llvm::IntegerType::get(context, signed_type->getIntegerBitWidth());
auto * is_negative = b.CreateICmpSLT(arg, llvm::ConstantInt::get(signed_type, 0));
auto * neg_value = b.CreateNeg(arg);
auto * abs_value = b.CreateSelect(is_negative, neg_value, arg);
return b.CreateZExt(abs_value, unsigned_type);
}
else
{
return arg;
}
}
else if (type->isDoubleTy() || type->isFloatTy())
{
auto * func_fabs = llvm::Intrinsic::getDeclaration(b.GetInsertBlock()->getModule(), llvm::Intrinsic::fabs, {type});
return b.CreateCall(func_fabs, {arg});
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "AbsImpl compilation expected native integer or floating point type");
}
static constexpr bool compilable = false; /// special type handling, some other time
#endif
};
struct NameAbs
{
static constexpr auto name = "abs";
};
struct NameAbs { static constexpr auto name = "abs"; };
using FunctionAbs = FunctionUnaryArithmetic<AbsImpl, NameAbs, false>;
template <>
struct FunctionUnaryArithmeticMonotonicity<NameAbs>
template <> struct FunctionUnaryArithmeticMonotonicity<NameAbs>
{
static bool has() { return true; }
static IFunction::Monotonicity get(const Field & left, const Field & right)
{
Float64 left_float
= left.isNull() ? -std::numeric_limits<Float64>::infinity() : applyVisitor(FieldVisitorConvertToNumber<Float64>(), left);
Float64 right_float
= right.isNull() ? std::numeric_limits<Float64>::infinity() : applyVisitor(FieldVisitorConvertToNumber<Float64>(), right);
Float64 left_float = left.isNull() ? -std::numeric_limits<Float64>::infinity() : applyVisitor(FieldVisitorConvertToNumber<Float64>(), left);
Float64 right_float = right.isNull() ? std::numeric_limits<Float64>::infinity() : applyVisitor(FieldVisitorConvertToNumber<Float64>(), right);
if ((left_float < 0 && right_float > 0) || (left_float > 0 && right_float < 0))
return {};
return {
.is_monotonic = true,
.is_positive = std::min(left_float, right_float) >= 0,
.is_strict = true,
};
return { .is_monotonic = true, .is_positive = std::min(left_float, right_float) >= 0, .is_strict = true, };
}
};

View File

@ -4,11 +4,6 @@
#include <Core/ColumnNumbers.h>
#include <Columns/ColumnNullable.h>
#if USE_EMBEDDED_COMPILER
# include <DataTypes/Native.h>
# include <llvm/IR/IRBuilder.h>
#endif
namespace DB
{
@ -63,22 +58,6 @@ public:
return nullable_col->getNestedColumnPtr();
return col;
}
#if USE_EMBEDDED_COMPILER
bool isCompilableImpl(const DataTypes & arguments, const DataTypePtr &) const override { return canBeNativeType(arguments[0]); }
llvm::Value *
compileImpl(llvm::IRBuilderBase & builder, const ValuesWithType & arguments, const DataTypePtr & /*result_type*/) const override
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
if (arguments[0].type->isNullable())
return b.CreateExtractValue(arguments[0].value, {0});
else
return arguments[0].value;
}
#endif
};
}

View File

@ -7,10 +7,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename A>
struct BitCountImpl
{
@ -42,26 +38,7 @@ struct BitCountImpl
}
#if USE_EMBEDDED_COMPILER
static constexpr bool compilable = true;
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * arg, bool)
{
const auto & type = arg->getType();
llvm::Value * int_value = nullptr;
if (type->isIntegerTy())
int_value = arg;
else if (type->isFloatTy())
int_value = b.CreateBitCast(arg, llvm::Type::getInt32Ty(b.getContext()));
else if (type->isDoubleTy())
int_value = b.CreateBitCast(arg, llvm::Type::getInt64Ty(b.getContext()));
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "BitCountImpl compilation expected native integer or floating-point type");
auto * func_ctpop = llvm::Intrinsic::getDeclaration(b.GetInsertBlock()->getModule(), llvm::Intrinsic::ctpop, {int_value->getType()});
llvm::Value * ctpop_value = b.CreateCall(func_ctpop, {int_value});
return b.CreateZExtOrTrunc(ctpop_value, llvm::Type::getInt8Ty(b.getContext()));
}
static constexpr bool compilable = false;
#endif
};

View File

@ -2,10 +2,6 @@
#include <Functions/IFunction.h>
#include <Interpreters/Context_fwd.h>
#if USE_EMBEDDED_COMPILER
# include <DataTypes/Native.h>
# include <llvm/IR/IRBuilder.h>
#endif
namespace DB
{
@ -15,11 +11,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
struct IdentityName
{
static constexpr auto name = "identity";
};
template<typename Name>
class FunctionIdentityBase : public IFunction
{
@ -41,21 +32,12 @@ public:
{
return arguments.front().column;
}
#if USE_EMBEDDED_COMPILER
bool isCompilableImpl(const DataTypes & /*types*/, const DataTypePtr & result_type) const override
{
return Name::name == IdentityName::name && canBeNativeType(result_type);
}
llvm::Value *
compileImpl(llvm::IRBuilderBase & /*builder*/, const ValuesWithType & arguments, const DataTypePtr & /*result_type*/) const override
{
return arguments[0].value;
}
#endif
};
struct IdentityName
{
static constexpr auto name = "identity";
};
struct ScalarSubqueryResultName
{

View File

@ -11,11 +11,6 @@
#include <Interpreters/Context.h>
#include <Common/assert_cast.h>
#if USE_EMBEDDED_COMPILER
# include <DataTypes/Native.h>
# include <llvm/IR/IRBuilder.h>
#endif
namespace DB
{
namespace Setting
@ -115,23 +110,6 @@ public:
return DataTypeUInt8().createColumnConst(elem.column->size(), 1u);
}
#if USE_EMBEDDED_COMPILER
bool isCompilableImpl(const DataTypes & arguments, const DataTypePtr &) const override { return canBeNativeType(arguments[0]); }
llvm::Value *
compileImpl(llvm::IRBuilderBase & builder, const ValuesWithType & arguments, const DataTypePtr & /*result_type*/) const override
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
if (arguments[0].type->isNullable())
{
auto * is_null = b.CreateExtractValue(arguments[0].value, {1});
return b.CreateNot(is_null);
}
else
return b.getInt8(1);
}
#endif
private:
MULTITARGET_FUNCTION_AVX2_SSE42(
MULTITARGET_FUNCTION_HEADER(static void NO_INLINE), vectorImpl, MULTITARGET_FUNCTION_BODY((const PaddedPODArray<UInt8> & null_map, PaddedPODArray<UInt8> & res) /// NOLINT

View File

@ -10,11 +10,6 @@
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#if USE_EMBEDDED_COMPILER
# include <DataTypes/Native.h>
# include <llvm/IR/IRBuilder.h>
#endif
namespace DB
{
@ -112,21 +107,6 @@ public:
return DataTypeUInt8().createColumnConst(elem.column->size(), 0u);
}
#if USE_EMBEDDED_COMPILER
bool isCompilableImpl(const DataTypes & arguments, const DataTypePtr &) const override { return canBeNativeType(arguments[0]); }
llvm::Value *
compileImpl(llvm::IRBuilderBase & builder, const ValuesWithType & arguments, const DataTypePtr & /*result_type*/) const override
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
if (arguments[0].type->isNullable())
return b.CreateExtractValue(arguments[0].value, {1});
else
return b.getInt8(0);
}
#endif
private:
bool use_analyzer;
};

View File

@ -5,12 +5,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename A>
struct SignImpl
{
@ -28,44 +22,7 @@ struct SignImpl
}
#if USE_EMBEDDED_COMPILER
static constexpr bool compilable = true;
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * arg, bool sign)
{
auto * result_type = b.getInt8Ty();
auto * res_zero = llvm::ConstantInt::getSigned(result_type, 0);
auto * res_one = llvm::ConstantInt::getSigned(result_type, 1);
auto * res_minus_one = llvm::ConstantInt::getSigned(result_type, -1);
const auto & type = arg->getType();
if (type->isIntegerTy())
{
auto * zero = llvm::ConstantInt::get(type, 0, sign);
auto * is_zero = b.CreateICmpEQ(arg, zero);
if (sign)
{
auto * is_negative = b.CreateICmpSLT(arg, res_zero);
auto * select_zero = b.CreateSelect(is_zero, res_zero, res_one);
return b.CreateSelect(is_negative, res_minus_one, select_zero);
}
else
return b.CreateSelect(is_zero, res_zero, res_one);
}
else if (type->isDoubleTy() || type->isFloatTy())
{
auto * zero = llvm::ConstantFP::get(type, 0.0);
auto * is_zero = b.CreateFCmpOEQ(arg, zero);
auto * is_negative = b.CreateFCmpOLT(arg, zero);
auto * select_zero = b.CreateSelect(is_zero, res_zero, res_one);
return b.CreateSelect(is_negative, res_minus_one, select_zero);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "SignImpl compilation expected native integer or floating point type");
}
static constexpr bool compilable = false;
#endif
};

View File

@ -1468,8 +1468,7 @@ ActionsDAG ActionsDAG::makeConvertingActions(
MatchColumnsMode mode,
bool ignore_constant_values,
bool add_cast_columns,
NameToNameMap * new_names,
NameSet * columns_contain_compiled_function)
NameToNameMap * new_names)
{
size_t num_input_columns = source.size();
size_t num_result_columns = result.size();
@ -1542,15 +1541,6 @@ ActionsDAG ActionsDAG::makeConvertingActions(
"Cannot convert column `{}` because it is constant but values of constants are different in source and result",
res_elem.name);
}
else if (columns_contain_compiled_function && columns_contain_compiled_function->contains(res_elem.name))
{
/// It may happen when JIT compilation is enabled that source column is constant and destination column is not constant.
/// e.g. expression "and(equals(materialize(null::Nullable(UInt64)), null::Nullable(UInt64)), equals(null::Nullable(UInt64), null::Nullable(UInt64)))"
/// compiled expression is "and(equals(input: Nullable(UInt64), null), null). Partial evaluation of the compiled expression isn't able to infer that the result column is constant.
/// It causes inconsistency between pipeline header(source column is not constant) and output header of ExpressionStep(destination column is constant).
/// So we need to convert non-constant column to constant column under this condition.
dst_node = &actions_dag.addColumn(res_elem);
}
else
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,

View File

@ -312,8 +312,8 @@ public:
MatchColumnsMode mode,
bool ignore_constant_values = false,
bool add_cast_columns = false,
NameToNameMap * new_names = nullptr,
NameSet * columns_contain_compiled_function = nullptr);
NameToNameMap * new_names = nullptr);
/// Create expression which add const column and then materialize it.
static ActionsDAG makeAddingColumnActions(ColumnWithTypeAndName column);

View File

@ -469,7 +469,7 @@ void QueryCache::Writer::finalizeWrite()
Columns compressed_columns;
for (const auto & column : columns)
{
auto compressed_column = column->compress();
auto compressed_column = column->compress(/*force_compression=*/false);
compressed_columns.push_back(compressed_column);
}
Chunk compressed_chunk(compressed_columns, chunk.getNumRows());

View File

@ -1054,12 +1054,12 @@ void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperP
}
String DDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info, QueryStatusPtr process_list_element)
String DDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info)
{
String node_path;
if (retries_info.max_retries > 0)
{
ZooKeeperRetriesControl retries_ctl{"DDLWorker::enqueueQuery", log, retries_info, process_list_element};
ZooKeeperRetriesControl retries_ctl{"DDLWorker::enqueueQuery", log, retries_info};
retries_ctl.retryLoop([&]{
node_path = enqueueQueryAttempt(entry);
});

View File

@ -68,7 +68,7 @@ public:
virtual ~DDLWorker();
/// Pushes query into DDL queue, returns path to created node
virtual String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info, QueryStatusPtr process_list_element);
virtual String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info);
/// Host ID (name:port) for logging purposes
/// Note that in each task hosts are identified individually by name:port from initiator server cluster config

View File

@ -133,8 +133,7 @@ ExecutionStatus DistributedQueryStatusSource::getExecutionStatus(const fs::path
String status_data;
bool finished_exists = false;
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster", getLogger("DDLQueryStatusSource"), getRetriesInfo(), context->getProcessListElement());
auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", getLogger("DDLQueryStatusSource"), getRetriesInfo());
retries_ctl.retryLoop([&]() { finished_exists = context->getZooKeeper()->tryGet(status_path, status_data); });
if (finished_exists)
status.tryDeserializeText(status_data);
@ -142,13 +141,14 @@ ExecutionStatus DistributedQueryStatusSource::getExecutionStatus(const fs::path
return status;
}
ZooKeeperRetriesInfo DistributedQueryStatusSource::getRetriesInfo()
ZooKeeperRetriesInfo DistributedQueryStatusSource::getRetriesInfo() const
{
const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef();
return ZooKeeperRetriesInfo(
config_ref.getInt("distributed_ddl_keeper_max_retries", 5),
config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100),
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000));
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000),
context->getProcessListElement());
}
std::pair<String, UInt16> DistributedQueryStatusSource::parseHostAndPort(const String & host_id)
@ -194,8 +194,7 @@ Chunk DistributedQueryStatusSource::generate()
Strings tmp_active_hosts;
{
auto retries_ctl = ZooKeeperRetriesControl(
"executeDistributedQueryOnCluster", getLogger(getName()), getRetriesInfo(), context->getProcessListElement());
auto retries_ctl = ZooKeeperRetriesControl("executeDistributedQueryOnCluster", getLogger(getName()), getRetriesInfo());
retries_ctl.retryLoop(
[&]()
{

View File

@ -38,7 +38,7 @@ protected:
Strings getNewAndUpdate(const Strings & current_finished_hosts);
ExecutionStatus getExecutionStatus(const fs::path & status_path);
static ZooKeeperRetriesInfo getRetriesInfo();
ZooKeeperRetriesInfo getRetriesInfo() const;
static std::pair<String, UInt16> parseHostAndPort(const String & host_id);
String node_path;

View File

@ -1,5 +1,4 @@
#include <Interpreters/Set.h>
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/ExpressionActions.h>
@ -60,15 +59,7 @@ ExpressionActions::ExpressionActions(ActionsDAG actions_dag_, const ExpressionAc
#if USE_EMBEDDED_COMPILER
if (settings.can_compile_expressions && settings.compile_expressions == CompileExpressions::yes)
{
LOG_TEST(
getLogger("ExpressionActions"),
"Actions before compilation: {} with {} lazy_executed_nodes",
actions_dag.dumpDAG(),
lazy_executed_nodes.size());
actions_dag.compileExpressions(settings.min_count_to_compile_expression, lazy_executed_nodes);
LOG_TEST(getLogger("ExpressionActions"), "Actions after compilation: {}", actions_dag.dumpDAG());
}
#endif
linearizeActions(lazy_executed_nodes);

View File

@ -1188,6 +1188,22 @@ namespace
source_ast->children.push_back(source_ast->elements);
dict.set(dict.source, source_ast);
}
ASTs * getEngineArgsFromCreateQuery(ASTCreateQuery & create_query)
{
ASTStorage * storage_def = create_query.storage;
if (!storage_def)
return nullptr;
if (!storage_def->engine)
return nullptr;
const ASTFunction & engine_def = *storage_def->engine;
if (!engine_def.arguments)
return nullptr;
return &engine_def.arguments->children;
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
@ -1884,7 +1900,11 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
mode);
/// If schema wes inferred while storage creation, add columns description to create query.
addColumnsDescriptionToCreateQueryIfNecessary(query_ptr->as<ASTCreateQuery &>(), res);
auto & create_query = query_ptr->as<ASTCreateQuery &>();
addColumnsDescriptionToCreateQueryIfNecessary(create_query, res);
/// Add any inferred engine args if needed. For example, data format for engines File/S3/URL/etc
if (auto * engine_args = getEngineArgsFromCreateQuery(create_query))
res->addInferredEngineArgsToCreateQuery(*engine_args, getContext());
}
validateVirtualColumns(*res);

View File

@ -98,6 +98,9 @@ namespace DB
{
namespace Setting
{
extern const SettingsUInt64 keeper_max_retries;
extern const SettingsUInt64 keeper_retry_initial_backoff_ms;
extern const SettingsUInt64 keeper_retry_max_backoff_ms;
extern const SettingsSeconds lock_acquire_timeout;
extern const SettingsSeconds receive_timeout;
extern const SettingsMaxThreads max_threads;
@ -878,7 +881,13 @@ void InterpreterSystemQuery::restoreReplica()
if (table_replicated_ptr == nullptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
table_replicated_ptr->restoreMetadataInZooKeeper();
const auto & settings = getContext()->getSettingsRef();
table_replicated_ptr->restoreMetadataInZooKeeper(
ZooKeeperRetriesInfo{settings[Setting::keeper_max_retries],
settings[Setting::keeper_retry_initial_backoff_ms],
settings[Setting::keeper_retry_max_backoff_ms],
getContext()->getProcessListElementSafe()});
}
StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context)

View File

@ -3,7 +3,6 @@
#if USE_EMBEDDED_COMPILER
#include <sys/mman.h>
#include <cmath>
#include <boost/noncopyable.hpp>
@ -371,9 +370,6 @@ CHJIT::CHJIT()
symbol_resolver->registerSymbol("memset", reinterpret_cast<void *>(&memset));
symbol_resolver->registerSymbol("memcpy", reinterpret_cast<void *>(&memcpy));
symbol_resolver->registerSymbol("memcmp", reinterpret_cast<void *>(&memcmp));
double (*fmod_ptr)(double, double) = &fmod;
symbol_resolver->registerSymbol("fmod", reinterpret_cast<void *>(fmod_ptr));
}
CHJIT::~CHJIT() = default;

View File

@ -46,18 +46,14 @@ ValueWithType CompileDAG::compile(llvm::IRBuilderBase & builder, const ValuesWit
{
ValuesWithType temporary_values;
temporary_values.reserve(node.arguments.size());
for (auto argument_index : node.arguments)
{
assert(compiled_values[argument_index].value != nullptr);
temporary_values.emplace_back(compiled_values[argument_index]);
}
ValueWithType compiled_value{node.function->compile(builder, temporary_values), node.function->getResultType()};
if (!node.result_type->equals(*node.function->getResultType()))
compiled_values[compiled_values_index] = {nativeCast(b, compiled_value, node.result_type), node.result_type};
else
compiled_values[compiled_values_index] = std::move(compiled_value);
compiled_values[compiled_values_index] = {node.function->compile(builder, temporary_values), node.result_type};
break;
}
case CompileType::INPUT:

View File

@ -189,7 +189,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
entry.setSettingsIfRequired(context);
entry.tracing_context = OpenTelemetry::CurrentContext();
entry.initial_query_id = context->getClientInfo().initial_query_id;
String node_path = ddl_worker.enqueueQuery(entry, params.retries_info, context->getProcessListElement());
String node_path = ddl_worker.enqueueQuery(entry, params.retries_info);
return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context);
}

View File

@ -32,34 +32,6 @@ static ITransformingStep::Traits getTraits(const ActionsDAG & actions)
};
}
static bool containsCompiledFunction(const ActionsDAG::Node * node)
{
if (node->type == ActionsDAG::ActionType::FUNCTION && node->is_function_compiled)
return true;
const auto & children = node->children;
if (children.empty())
return false;
bool result = false;
for (const auto & child : children)
result |= containsCompiledFunction(child);
return result;
}
static NameSet getColumnsContainCompiledFunction(const ActionsDAG & actions_dag)
{
NameSet result;
for (const auto * node : actions_dag.getOutputs())
{
if (containsCompiledFunction(node))
{
result.insert(node->result_name);
}
}
return result;
}
ExpressionStep::ExpressionStep(const Header & input_header_, ActionsDAG actions_dag_)
: ITransformingStep(
input_header_,
@ -80,15 +52,10 @@ void ExpressionStep::transformPipeline(QueryPipelineBuilder & pipeline, const Bu
if (!blocksHaveEqualStructure(pipeline.getHeader(), *output_header))
{
auto columns_contain_compiled_function = getColumnsContainCompiledFunction(expression->getActionsDAG());
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
output_header->getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
false,
false,
nullptr,
&columns_contain_compiled_function);
pipeline.getHeader().getColumnsWithTypeAndName(),
output_header->getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(std::move(convert_actions_dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header)

View File

@ -287,6 +287,10 @@ public:
/// Returns hints for serialization of columns accorsing to statistics accumulated by storage.
virtual SerializationInfoByName getSerializationHints() const { return {}; }
/// Add engine args that were inferred during storage creation to create query to avoid the same
/// inference on server restart. For example - data format inference in File/URL/S3/etc engines.
virtual void addInferredEngineArgsToCreateQuery(ASTs & /*args*/, const ContextPtr & /*context*/) const {}
private:
StorageID storage_id;

View File

@ -81,8 +81,10 @@ protected:
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk
/// Stores states for IDataType::deserializeBinaryBulk for regular columns.
DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map;
/// The same as above, but for subcolumns.
DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map_for_subcolumns;
/// Actual column names and types of columns in part,
/// which may differ from table metadata.

View File

@ -148,7 +148,9 @@ void MergeTreeReaderCompact::readData(
ColumnPtr & column,
size_t rows_to_read,
const InputStreamGetter & getter,
ISerialization::SubstreamsCache & cache)
ISerialization::SubstreamsCache & cache,
std::unordered_map<String, ColumnPtr> & columns_cache_for_subcolumns,
const ColumnNameLevel & name_level_for_offsets)
{
try
{
@ -171,17 +173,33 @@ void MergeTreeReaderCompact::readData(
const auto & type_in_storage = name_and_type.getTypeInStorage();
const auto & name_in_storage = name_and_type.getNameInStorage();
auto serialization = getSerializationInPart({name_in_storage, type_in_storage});
ColumnPtr temp_column = type_in_storage->createColumn(*serialization);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, deserialize_binary_bulk_state_map[name], nullptr);
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
/// TODO: Avoid extra copying.
if (column->empty())
column = subcolumn;
auto cache_for_subcolumns_it = columns_cache_for_subcolumns.find(name_in_storage);
if (!name_level_for_offsets.has_value() && cache_for_subcolumns_it != columns_cache_for_subcolumns.end())
{
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), cache_for_subcolumns_it->second);
/// TODO: Avoid extra copying.
if (column->empty())
column = IColumn::mutate(subcolumn);
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
}
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
{
auto serialization = getSerializationInPart({name_in_storage, type_in_storage});
ColumnPtr temp_column = type_in_storage->createColumn(*serialization);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, deserialize_binary_bulk_state_map_for_subcolumns[name_in_storage], nullptr);
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
/// TODO: Avoid extra copying.
if (column->empty())
column = subcolumn;
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
if (!name_level_for_offsets.has_value())
columns_cache_for_subcolumns[name_in_storage] = temp_column;
}
}
else
{
@ -227,15 +245,23 @@ void MergeTreeReaderCompact::readPrefix(
serialization_for_prefix->deserializeBinaryBulkStatePrefix(deserialize_settings, state_for_prefix, nullptr);
}
SerializationPtr serialization;
if (name_and_type.isSubcolumn())
serialization = getSerializationInPart({name_and_type.getNameInStorage(), name_and_type.getTypeInStorage()});
else
serialization = getSerializationInPart(name_and_type);
deserialize_settings.getter = buffer_getter;
deserialize_settings.object_and_dynamic_read_statistics = true;
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name_and_type.name], nullptr);
if (name_and_type.isSubcolumn())
{
/// For subcolumns of the same column we need to deserialize prefix only once.
if (deserialize_binary_bulk_state_map_for_subcolumns.contains(name_and_type.getNameInStorage()))
return;
auto serialization = getSerializationInPart({name_and_type.getNameInStorage(), name_and_type.getTypeInStorage()});
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map_for_subcolumns[name_and_type.getNameInStorage()], nullptr);
}
else
{
auto serialization = getSerializationInPart(name_and_type);
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name_and_type.getNameInStorage()], nullptr);
}
}
catch (Exception & e)
{

View File

@ -45,7 +45,9 @@ protected:
ColumnPtr & column,
size_t rows_to_read,
const InputStreamGetter & getter,
ISerialization::SubstreamsCache & cache);
ISerialization::SubstreamsCache & cache,
std::unordered_map<String, ColumnPtr> & columns_cache_for_subcolumns,
const ColumnNameLevel & name_level_for_offsets);
void readPrefix(
const NameAndTypePair & name_and_type,

View File

@ -25,10 +25,18 @@ try
while (read_rows < max_rows_to_read)
{
size_t rows_to_read = data_part_info_for_read->getIndexGranularity().getMarkRows(from_mark);
deserialize_binary_bulk_state_map.clear();
deserialize_binary_bulk_state_map_for_subcolumns.clear();
/// Use cache to avoid reading the column with the same name twice.
/// It may happen if there are empty array Nested in the part.
ISerialization::SubstreamsCache cache;
/// If we need to read multiple subcolumns from a single column in storage,
/// we will read it this column only once and then reuse to extract all subcolumns.
/// We cannot use SubstreamsCache for it, because we may also read the full column itself
/// and it might me not empty inside res_columns (and SubstreamsCache contains the whole columns).
/// TODO: refactor the code in a way when we first read all full columns and then extract all subcolumns from them.
std::unordered_map<String, ColumnPtr> columns_cache_for_subcolumns;
for (size_t pos = 0; pos < num_columns; ++pos)
{
@ -56,7 +64,7 @@ try
};
readPrefix(columns_to_read[pos], buffer_getter, buffer_getter_for_prefix, columns_for_offsets[pos]);
readData(columns_to_read[pos], column, rows_to_read, buffer_getter, cache);
readData(columns_to_read[pos], column, rows_to_read, buffer_getter, cache, columns_cache_for_subcolumns, columns_for_offsets[pos]);
}
++from_mark;

View File

@ -166,23 +166,24 @@ void ReplicatedMergeTreeAttachThread::runImpl()
/// Just in case it was not removed earlier due to connection loss
zookeeper->tryRemove(replica_path + "/flags/force_restore_data");
storage.checkTableStructure(replica_path, metadata_snapshot);
/// Here `zookeeper_retries_info = {}` because the attach thread has its own retries (see ReplicatedMergeTreeAttachThread::run()).
storage.checkTableStructure(replica_path, metadata_snapshot, /* metadata_version = */ nullptr, /* strict_check = */ true, /* zookeeper_retries_info = */ {});
storage.checkParts(skip_sanity_checks);
/// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart),
/// don't allow to reinitialize them, delete each of them immediately.
storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
storage.createNewZooKeeperNodes();
storage.syncPinnedPartUUIDs();
storage.createNewZooKeeperNodes(/* zookeeper_retries_info = */ {});
storage.syncPinnedPartUUIDs(/* zookeeper_retries_info = */ {});
std::lock_guard lock(storage.table_shared_id_mutex);
storage.createTableSharedID();
storage.createTableSharedID(/* zookeeper_retries_info = */ {});
};
void ReplicatedMergeTreeAttachThread::finalizeInitialization() TSA_NO_THREAD_SAFETY_ANALYSIS
{
storage.startupImpl(/* from_attach_thread */ true);
storage.startupImpl(/* from_attach_thread */ true, /* zookeeper_retries_info = */ {});
storage.initialization_done = true;
LOG_INFO(log, "Table is initialized");
}

View File

@ -201,8 +201,8 @@ size_t ReplicatedMergeTreeSinkImpl<async_insert>::checkQuorumPrecondition(const
log,
{settings[Setting::insert_keeper_max_retries],
settings[Setting::insert_keeper_retry_initial_backoff_ms],
settings[Setting::insert_keeper_retry_max_backoff_ms]},
context->getProcessListElement());
settings[Setting::insert_keeper_retry_max_backoff_ms],
context->getProcessListElement()});
quorum_retries_ctl.retryLoop(
[&]()
{
@ -725,8 +725,8 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
log,
{settings[Setting::insert_keeper_max_retries],
settings[Setting::insert_keeper_retry_initial_backoff_ms],
settings[Setting::insert_keeper_retry_max_backoff_ms]},
context->getProcessListElement());
settings[Setting::insert_keeper_retry_max_backoff_ms],
context->getProcessListElement()});
auto resolve_duplicate_stage = [&] () -> CommitRetryContext::Stages
{

View File

@ -13,6 +13,7 @@
#include <Core/Settings.h>
#include <Common/Macros.h>
#include <Common/OptimizedRegularExpression.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/typeid_cast.h>
#include <Common/logger_useful.h>
@ -37,6 +38,9 @@ namespace Setting
extern const SettingsBool allow_suspicious_ttl_expressions;
extern const SettingsBool create_table_empty_primary_key_by_default;
extern const SettingsUInt64 database_replicated_allow_replicated_engine_arguments;
extern const SettingsUInt64 keeper_max_retries;
extern const SettingsUInt64 keeper_retry_initial_backoff_ms;
extern const SettingsUInt64 keeper_retry_max_backoff_ms;
}
namespace MergeTreeSetting
@ -831,6 +835,12 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (auto txn = args.getLocalContext()->getZooKeeperMetadataTransaction())
need_check_table_structure = txn->isInitialQuery();
ZooKeeperRetriesInfo create_query_zk_retries_info{
local_settings[Setting::keeper_max_retries],
local_settings[Setting::keeper_retry_initial_backoff_ms],
local_settings[Setting::keeper_retry_max_backoff_ms],
args.getLocalContext()->getProcessListElementSafe()};
return std::make_shared<StorageReplicatedMergeTree>(
zookeeper_info,
args.mode,
@ -841,8 +851,10 @@ static StoragePtr create(const StorageFactory::Arguments & args)
date_column_name,
merging_params,
std::move(storage_settings),
need_check_table_structure);
need_check_table_structure,
create_query_zk_retries_info);
}
return std::make_shared<StorageMergeTree>(
args.table_id,
args.relative_data_path,

View File

@ -283,7 +283,7 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
}
void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
ASTs & args, const String & structure_, const String & format_, ContextPtr context)
ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure)
{
if (auto collection = tryGetNamedCollectionWithOverrides(args, context))
{
@ -295,7 +295,7 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
if (collection->getOrDefault<String>("structure", "auto") == "auto")
if (with_structure && collection->getOrDefault<String>("structure", "auto") == "auto")
{
ASTs structure_equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
@ -319,9 +319,12 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
if (args.size() == 3)
{
args.push_back(format_literal);
/// Add compression = "auto" before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression = "auto" before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
/// (connection_string, container_name, blobpath, structure) or
/// (connection_string, container_name, blobpath, format)
@ -334,12 +337,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (fourth_arg == "auto")
args[3] = format_literal;
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
/// (..., structure) -> (..., format, compression, structure)
else
else if (with_structure)
{
auto structure_arg = args.back();
args[3] = format_literal;
@ -362,15 +368,19 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (fourth_arg == "auto")
args[3] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// (..., account_name, account_key) -> (..., account_name, account_key, format, compression, structure)
else
{
args.push_back(format_literal);
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
}
/// (connection_string, container_name, blobpath, format, compression, structure) or
@ -386,7 +396,7 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (fourth_arg == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[5], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[5], "structure") == "auto")
args[5] = structure_literal;
}
/// (..., account_name, account_key, format) -> (..., account_name, account_key, format, compression, structure)
@ -394,12 +404,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (sixth_arg == "auto")
args[5] = format_literal;
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
/// (..., account_name, account_key, structure) -> (..., account_name, account_key, format, compression, structure)
else
else if (with_structure)
{
auto structure_arg = args.back();
args[5] = format_literal;
@ -417,14 +430,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
/// (..., format, compression) -> (..., format, compression, structure)
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// (storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
else if (args.size() == 8)
{
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = format_literal;
if (checkAndGetLiteralArgument<String>(args[7], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[7], "structure") == "auto")
args[7] = structure_literal;
}
}

View File

@ -76,7 +76,8 @@ public:
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context) override;
ContextPtr context,
bool with_structure) override;
protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;

View File

@ -107,7 +107,8 @@ std::pair<size_t, size_t> parseDecimal(const String & type_name)
return {precision, scale};
}
bool operator==(const Poco::JSON::Object & first, const Poco::JSON::Object & second)
template <typename T>
bool equals(const T & first, const T & second)
{
std::stringstream first_string_stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
std::stringstream second_string_stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
@ -124,9 +125,23 @@ bool operator==(const Poco::JSON::Object & first, const Poco::JSON::Object & sec
return first_string_stream.str() == second_string_stream.str();
}
bool operator!=(const Poco::JSON::Object & first, const Poco::JSON::Object & second)
bool operator==(const Poco::JSON::Object & first, const Poco::JSON::Object & second)
{
return !(first == second);
return equals(first, second);
}
bool operator==(const Poco::JSON::Array & first, const Poco::JSON::Array & second)
{
return equals(first, second);
}
bool schemasAreIdentical(const Poco::JSON::Object & first, const Poco::JSON::Object & second)
{
static String fields_key = "fields";
if (!first.isArray(fields_key) || !second.isArray(fields_key))
return false;
return *(first.getArray(fields_key)) == *(second.getArray(fields_key));
}
}
@ -481,7 +496,7 @@ void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schem
if (iceberg_table_schemas_by_ids.contains(schema_id))
{
chassert(clickhouse_table_schemas_by_ids.contains(schema_id));
chassert(*iceberg_table_schemas_by_ids.at(schema_id) == *schema_ptr);
chassert(schemasAreIdentical(*iceberg_table_schemas_by_ids.at(schema_id), *schema_ptr));
}
else
{

View File

@ -174,7 +174,8 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context)
ContextPtr context,
bool with_structure)
{
if (auto collection = tryGetNamedCollectionWithOverrides(args, context))
{
@ -186,7 +187,7 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
if (collection->getOrDefault<String>("structure", "auto") == "auto")
if (with_structure && collection->getOrDefault<String>("structure", "auto") == "auto")
{
ASTs structure_equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
@ -209,23 +210,26 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
if (count == 1)
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
args.push_back(format_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// hdfs(url, format)
else if (count == 2)
{
if (checkAndGetLiteralArgument<String>(args[1], "format") == "auto")
args.back() = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// hdfs(url, format, structure)
/// hdfs(url, format, structure, compression_method)
/// hdfs(url, format, compression_method)
else if (count >= 3)
{
if (checkAndGetLiteralArgument<String>(args[1], "format") == "auto")
args[1] = format_literal;
if (checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
args[2] = structure_literal;
}
}

View File

@ -62,7 +62,8 @@ public:
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context) override;
ContextPtr context,
bool with_structure) override;
private:
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;

View File

@ -59,7 +59,7 @@ public:
ObjectStoragePtr createObjectStorage(ContextPtr, bool) override { return std::make_shared<LocalObjectStorage>("/"); }
void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr) override { }
void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr, bool) override { }
private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;

View File

@ -395,7 +395,7 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
}
void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
ASTs & args, const String & structure_, const String & format_, ContextPtr context)
ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure)
{
if (auto collection = tryGetNamedCollectionWithOverrides(args, context))
{
@ -407,7 +407,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
if (collection->getOrDefault<String>("structure", "auto") == "auto")
if (with_structure && collection->getOrDefault<String>("structure", "auto") == "auto")
{
ASTs structure_equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
@ -429,8 +429,9 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
if (count == 1)
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
args.push_back(format_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// s3(s3_url, format) or
/// s3(s3_url, NOSIGN)
@ -444,11 +445,13 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
else if (checkAndGetLiteralArgument<String>(args[1], "format") == "auto")
args[1] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// s3(source, format, structure) or
/// s3(source, access_key_id, secret_access_key) or
/// s3(source, NOSIGN, format)
/// s3(source, NOSIGN, format) or
/// s3(source, format, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (count == 3)
{
@ -457,26 +460,29 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[2], "format") == "auto")
args[2] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
else if (second_arg == "auto" || FormatFactory::instance().exists(second_arg))
{
if (second_arg == "auto")
args[1] = format_literal;
if (checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
args[2] = structure_literal;
}
else
{
/// Add format and structure arguments.
args.push_back(format_literal);
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
}
/// s3(source, format, structure, compression_method) or
/// s3(source, access_key_id, secret_access_key, format) or
/// s3(source, access_key_id, secret_access_key, session_token) or
/// s3(source, NOSIGN, format, structure)
/// s3(source, NOSIGN, format, structure) or
/// s3(source, NOSIGN, format, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (count == 4)
{
@ -485,14 +491,14 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[2], "format") == "auto")
args[2] = format_literal;
if (checkAndGetLiteralArgument<String>(args[3], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[3], "structure") == "auto")
args[3] = structure_literal;
}
else if (second_arg == "auto" || FormatFactory::instance().exists(second_arg))
{
if (second_arg == "auto")
args[1] = format_literal;
if (checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
args[2] = structure_literal;
}
else
@ -502,18 +508,21 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
else
{
args.push_back(format_literal);
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
}
}
/// s3(source, access_key_id, secret_access_key, format, structure) or
/// s3(source, access_key_id, secret_access_key, session_token, format) or
/// s3(source, NOSIGN, format, structure, compression_method)
/// s3(source, NOSIGN, format, structure, compression_method) or
/// s3(source, access_key_id, secret_access_key, format, compression)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not.
else if (count == 5)
{
@ -522,7 +531,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[2], "format") == "auto")
args[2] = format_literal;
if (checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
args[3] = structure_literal;
}
else
@ -532,19 +541,21 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
args[4] = structure_literal;
}
else
{
if (checkAndGetLiteralArgument<String>(args[4], "format") == "auto")
args[4] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
}
}
/// s3(source, access_key_id, secret_access_key, format, structure, compression) or
/// s3(source, access_key_id, secret_access_key, session_token, format, structure)
/// s3(source, access_key_id, secret_access_key, session_token, format, structure) or
/// s3(source, access_key_id, secret_access_key, session_token, format, compression_method)
else if (count == 6)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
@ -552,14 +563,14 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
args[4] = structure_literal;
}
else
{
if (checkAndGetLiteralArgument<String>(args[4], "format") == "auto")
args[4] = format_literal;
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = structure_literal;
}
}
@ -568,7 +579,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[4], "format") == "auto")
args[4] = format_literal;
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = structure_literal;
}
}

View File

@ -91,7 +91,8 @@ public:
ASTs & args,
const String & structure,
const String & format,
ContextPtr context) override;
ContextPtr context,
bool with_structure) override;
private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;

View File

@ -508,6 +508,11 @@ std::pair<ColumnsDescription, std::string> StorageObjectStorage::resolveSchemaAn
return std::pair(columns, format);
}
void StorageObjectStorage::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const
{
configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->format, context, /*with_structure=*/false);
}
SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, const std::string & storage_type_name)
{
if (storage_type_name == "s3")

View File

@ -131,6 +131,8 @@ public:
std::string & sample_path,
const ContextPtr & context);
void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override;
bool hasExternalDynamicMetadata() const override;
void updateExternalDynamicMetadata(ContextPtr) override;
@ -193,7 +195,7 @@ public:
/// Add/replace structure and format arguments in the AST arguments if they have 'auto' values.
virtual void addStructureAndFormatToArgsIfNeeded(
ASTs & args, const String & structure_, const String & format_, ContextPtr context) = 0;
ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) = 0;
bool withPartitionWildcard() const;
bool withGlobs() const { return isPathWithGlobs() || isNamespaceWithGlobs(); }

View File

@ -107,7 +107,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
ASTPtr cluster_name_arg = args.front();
args.erase(args.begin());
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context);
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
args.insert(args.begin(), cluster_name_arg);
}

Some files were not shown because too many files have changed in this diff Show More