mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 04:12:19 +00:00
Merge branch 'master' into gcddelta-codec
This commit is contained in:
commit
877a55e98a
@ -3,7 +3,7 @@
|
||||
<default>
|
||||
<allow_introspection_functions>1</allow_introspection_functions>
|
||||
<log_queries>1</log_queries>
|
||||
<metrics_perf_events_enabled>1</metrics_perf_events_enabled>
|
||||
<metrics_perf_events_enabled>0</metrics_perf_events_enabled>
|
||||
<!--
|
||||
If a test takes too long by mistake, the entire test task can
|
||||
time out and the author won't get a proper message. Put some cap
|
||||
|
@ -369,6 +369,7 @@ for query_index in queries_to_run:
|
||||
"max_execution_time": args.prewarm_max_query_seconds,
|
||||
"query_profiler_real_time_period_ns": 10000000,
|
||||
"query_profiler_cpu_time_period_ns": 10000000,
|
||||
"metrics_perf_events_enabled": 1,
|
||||
"memory_profiler_step": "4Mi",
|
||||
},
|
||||
)
|
||||
@ -503,6 +504,7 @@ for query_index in queries_to_run:
|
||||
settings={
|
||||
"query_profiler_real_time_period_ns": 10000000,
|
||||
"query_profiler_cpu_time_period_ns": 10000000,
|
||||
"metrics_perf_events_enabled": 1,
|
||||
},
|
||||
)
|
||||
print(
|
||||
|
@ -84,6 +84,7 @@ The BACKUP and RESTORE statements take a list of DATABASE and TABLE names, a des
|
||||
- `password` for the file on disk
|
||||
- `base_backup`: the destination of the previous backup of this source. For example, `Disk('backups', '1.zip')`
|
||||
- `structure_only`: if enabled, allows to only backup or restore the CREATE statements without the data of tables
|
||||
- `storage_policy`: storage policy for the tables being restored. See [Using Multiple Block Devices for Data Storage](../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes). This setting is only applicable to the `RESTORE` command. The specified storage policy applies only to tables with an engine from the `MergeTree` family.
|
||||
- `s3_storage_class`: the storage class used for S3 backup. For example, `STANDARD`
|
||||
|
||||
### Usage examples
|
||||
|
@ -298,7 +298,7 @@ Default value: `THROW`.
|
||||
- [JOIN clause](../../sql-reference/statements/select/join.md#select-join)
|
||||
- [Join table engine](../../engines/table-engines/special/join.md)
|
||||
|
||||
## max_partitions_per_insert_block {#max-partitions-per-insert-block}
|
||||
## max_partitions_per_insert_block {#settings-max_partitions_per_insert_block}
|
||||
|
||||
Limits the maximum number of partitions in a single inserted block.
|
||||
|
||||
@ -309,9 +309,18 @@ Default value: 100.
|
||||
|
||||
**Details**
|
||||
|
||||
When inserting data, ClickHouse calculates the number of partitions in the inserted block. If the number of partitions is more than `max_partitions_per_insert_block`, ClickHouse throws an exception with the following text:
|
||||
When inserting data, ClickHouse calculates the number of partitions in the inserted block. If the number of partitions is more than `max_partitions_per_insert_block`, ClickHouse either logs a warning or throws an exception based on `throw_on_max_partitions_per_insert_block`. Exceptions have the following text:
|
||||
|
||||
> “Too many partitions for single INSERT block (more than” + toString(max_parts) + “). The limit is controlled by ‘max_partitions_per_insert_block’ setting. A large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).”
|
||||
> “Too many partitions for a single INSERT block (`partitions_count` partitions, limit is ” + toString(max_partitions) + “). The limit is controlled by the ‘max_partitions_per_insert_block’ setting. A large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).”
|
||||
|
||||
## throw_on_max_partitions_per_insert_block {#settings-throw_on_max_partition_per_insert_block}
|
||||
|
||||
Allows you to control behaviour when `max_partitions_per_insert_block` is reached.
|
||||
|
||||
- `true` - When an insert block reaches `max_partitions_per_insert_block`, an exception is raised.
|
||||
- `false` - Logs a warning when `max_partitions_per_insert_block` is reached.
|
||||
|
||||
Default value: `true`
|
||||
|
||||
## max_temporary_data_on_disk_size_for_user {#settings_max_temporary_data_on_disk_size_for_user}
|
||||
|
||||
|
@ -311,9 +311,18 @@ FORMAT Null;
|
||||
|
||||
**Подробности**
|
||||
|
||||
При вставке данных, ClickHouse вычисляет количество партиций во вставленном блоке. Если число партиций больше, чем `max_partitions_per_insert_block`, ClickHouse генерирует исключение со следующим текстом:
|
||||
При вставке данных ClickHouse проверяет количество партиций во вставляемом блоке. Если количество разделов превышает число `max_partitions_per_insert_block`, ClickHouse либо логирует предупреждение, либо выбрасывает исключение в зависимости от значения `throw_on_max_partitions_per_insert_block`. Исключения имеют следующий текст:
|
||||
|
||||
> «Too many partitions for single INSERT block (more than» + toString(max_parts) + «). The limit is controlled by ‘max_partitions_per_insert_block’ setting. Large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).»
|
||||
> “Too many partitions for a single INSERT block (`partitions_count` partitions, limit is ” + toString(max_partitions) + “). The limit is controlled by the ‘max_partitions_per_insert_block’ setting. A large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).”
|
||||
|
||||
## throw_on_max_partitions_per_insert_block {#settings-throw_on_max_partition_per_insert_block}
|
||||
|
||||
Позволяет контролировать поведение при достижении `max_partitions_per_insert_block`
|
||||
|
||||
- `true` - Когда вставляемый блок достигает `max_partitions_per_insert_block`, возникает исключение.
|
||||
- `false` - Записывает предупреждение при достижении `max_partitions_per_insert_block`.
|
||||
|
||||
Значение по умолчанию: `true`
|
||||
|
||||
## max_sessions_for_user {#max-sessions-per-user}
|
||||
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <Common/FieldVisitorConvertToNumber.h>
|
||||
#include <Backups/SettingsFieldOptionalUUID.h>
|
||||
#include <Backups/SettingsFieldOptionalString.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -164,6 +165,7 @@ namespace
|
||||
M(Bool, allow_s3_native_copy) \
|
||||
M(Bool, internal) \
|
||||
M(String, host_id) \
|
||||
M(OptionalString, storage_policy) \
|
||||
M(OptionalUUID, restore_uuid)
|
||||
|
||||
|
||||
|
@ -117,6 +117,9 @@ struct RestoreSettings
|
||||
/// The current host's ID in the format 'escaped_host_name:port'.
|
||||
String host_id;
|
||||
|
||||
/// Alternative storage policy that may be specified in the SETTINGS clause of RESTORE queries
|
||||
std::optional<String> storage_policy;
|
||||
|
||||
/// Internal, should not be specified by user.
|
||||
/// Cluster's hosts' IDs in the format 'escaped_host_name:port' for all shards and replicas in a cluster specified in BACKUP ON CLUSTER.
|
||||
std::vector<Strings> cluster_host_ids;
|
||||
|
@ -322,6 +322,7 @@ void RestorerFromBackup::findTableInBackup(const QualifiedTableName & table_name
|
||||
read_buffer.reset();
|
||||
ParserCreateQuery create_parser;
|
||||
ASTPtr create_table_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
applyCustomStoragePolicy(create_table_query);
|
||||
renameDatabaseAndTableNameInCreateQuery(create_table_query, renaming_map, context->getGlobalContext());
|
||||
|
||||
QualifiedTableName table_name = renaming_map.getNewTableName(table_name_in_backup);
|
||||
@ -625,6 +626,24 @@ void RestorerFromBackup::checkDatabase(const String & database_name)
|
||||
}
|
||||
}
|
||||
|
||||
void RestorerFromBackup::applyCustomStoragePolicy(ASTPtr query_ptr)
|
||||
{
|
||||
constexpr auto setting_name = "storage_policy";
|
||||
if (query_ptr && restore_settings.storage_policy.has_value())
|
||||
{
|
||||
ASTStorage * storage = query_ptr->as<ASTCreateQuery &>().storage;
|
||||
if (storage && storage->settings)
|
||||
{
|
||||
if (restore_settings.storage_policy.value().empty())
|
||||
/// it has been set to "" deliberately, so the source storage policy is erased
|
||||
storage->settings->changes.removeSetting(setting_name);
|
||||
else
|
||||
/// it has been set to a custom value, so it either overwrites the existing value or is added as a new one
|
||||
storage->settings->changes.setSetting(setting_name, restore_settings.storage_policy.value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void RestorerFromBackup::removeUnresolvedDependencies()
|
||||
{
|
||||
auto need_exclude_dependency = [this](const StorageID & table_id)
|
||||
|
@ -95,6 +95,8 @@ private:
|
||||
void createDatabase(const String & database_name) const;
|
||||
void checkDatabase(const String & database_name);
|
||||
|
||||
void applyCustomStoragePolicy(ASTPtr query_ptr);
|
||||
|
||||
void removeUnresolvedDependencies();
|
||||
void createTables();
|
||||
void createTable(const QualifiedTableName & table_name);
|
||||
|
29
src/Backups/SettingsFieldOptionalString.cpp
Normal file
29
src/Backups/SettingsFieldOptionalString.cpp
Normal file
@ -0,0 +1,29 @@
|
||||
#include <Backups/SettingsFieldOptionalString.h>
|
||||
#include <Common/ErrorCodes.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_PARSE_BACKUP_SETTINGS;
|
||||
}
|
||||
|
||||
SettingFieldOptionalString::SettingFieldOptionalString(const Field & field)
|
||||
{
|
||||
if (field.getType() == Field::Types::Null)
|
||||
{
|
||||
value = std::nullopt;
|
||||
return;
|
||||
}
|
||||
|
||||
if (field.getType() == Field::Types::String)
|
||||
{
|
||||
value = field.get<const String &>();
|
||||
return;
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::CANNOT_PARSE_BACKUP_SETTINGS, "Cannot get string from {}", field);
|
||||
}
|
||||
|
||||
}
|
20
src/Backups/SettingsFieldOptionalString.h
Normal file
20
src/Backups/SettingsFieldOptionalString.h
Normal file
@ -0,0 +1,20 @@
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
#include <Core/SettingsFields.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct SettingFieldOptionalString
|
||||
{
|
||||
std::optional<String> value;
|
||||
|
||||
explicit SettingFieldOptionalString(const std::optional<String> & value_) : value(value_) {}
|
||||
|
||||
explicit SettingFieldOptionalString(const Field & field);
|
||||
|
||||
explicit operator Field() const { return Field(value ? toString(*value) : ""); }
|
||||
};
|
||||
|
||||
}
|
@ -43,7 +43,7 @@ void setThreadName(const char * name)
|
||||
#else
|
||||
if (0 != prctl(PR_SET_NAME, name, 0, 0, 0))
|
||||
#endif
|
||||
if (errno != ENOSYS) /// It's ok if the syscall is unsupported in some environments.
|
||||
if (errno != ENOSYS && errno != EPERM) /// It's ok if the syscall is unsupported or not allowed in some environments.
|
||||
DB::throwFromErrno("Cannot set thread name with prctl(PR_SET_NAME, ...)", DB::ErrorCodes::PTHREAD_ERROR);
|
||||
|
||||
memcpy(thread_name, name, std::min<size_t>(1 + strlen(name), THREAD_NAME_SIZE - 1));
|
||||
@ -63,7 +63,7 @@ const char * getThreadName()
|
||||
// throw DB::Exception(DB::ErrorCodes::PTHREAD_ERROR, "Cannot get thread name with pthread_get_name_np()");
|
||||
#else
|
||||
if (0 != prctl(PR_GET_NAME, thread_name, 0, 0, 0))
|
||||
if (errno != ENOSYS) /// It's ok if the syscall is unsupported in some environments.
|
||||
if (errno != ENOSYS && errno != EPERM) /// It's ok if the syscall is unsupported or not allowed in some environments.
|
||||
DB::throwFromErrno("Cannot get thread name with prctl(PR_GET_NAME)", DB::ErrorCodes::PTHREAD_ERROR);
|
||||
#endif
|
||||
|
||||
|
@ -500,6 +500,7 @@ class IColumn;
|
||||
M(Bool, formatdatetime_parsedatetime_m_is_month_name, true, "Formatter '%M' in functions 'formatDateTime()' and 'parseDateTime()' produces the month name instead of minutes.", 0) \
|
||||
\
|
||||
M(UInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.", 0) \
|
||||
M(Bool, throw_on_max_partitions_per_insert_block, true, "Used with max_partitions_per_insert_block. If true (default), an exception will be thrown when max_partitions_per_insert_block is reached. If false, details of the insert query reaching this limit with the number of partitions will be logged. This can be useful if you're trying to understand the impact on users when changing max_partitions_per_insert_block.", 0) \
|
||||
M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited.", 0) \
|
||||
M(Bool, check_query_single_value_result, true, "Return check query result as single 1/0 value", 0) \
|
||||
M(Bool, allow_drop_detached, false, "Allow ALTER TABLE ... DROP DETACHED PART[ITION] ... queries", 0) \
|
||||
@ -961,7 +962,7 @@ class IColumn;
|
||||
M(ParquetVersion, output_format_parquet_version, "2.latest", "Parquet format version for output format. Supported versions: 1.0, 2.4, 2.6 and 2.latest (default)", 0) \
|
||||
M(ParquetCompression, output_format_parquet_compression_method, "lz4", "Compression method for Parquet output format. Supported codecs: snappy, lz4, brotli, zstd, gzip, none (uncompressed)", 0) \
|
||||
M(Bool, output_format_parquet_compliant_nested_types, true, "In parquet file schema, use name 'element' instead of 'item' for list elements. This is a historical artifact of Arrow library implementation. Generally increases compatibility, except perhaps with some old versions of Arrow.", 0) \
|
||||
M(Bool, output_format_parquet_use_custom_encoder, true, "Use experimental faster Parquet encoder implementation.", 0) \
|
||||
M(Bool, output_format_parquet_use_custom_encoder, false, "Use a faster Parquet encoder implementation.", 0) \
|
||||
M(Bool, output_format_parquet_parallel_encoding, true, "Do Parquet encoding in multiple threads. Requires output_format_parquet_use_custom_encoder.", 0) \
|
||||
M(UInt64, output_format_parquet_data_page_size, 1024 * 1024, "Target page size in bytes, before compression.", 0) \
|
||||
M(UInt64, output_format_parquet_batch_size, 1024, "Check page size every this many rows. Consider decreasing if you have columns with average values size above a few KBs.", 0) \
|
||||
|
@ -1,5 +1,4 @@
|
||||
#include <Disks/TemporaryFileOnDisk.h>
|
||||
#include <Poco/TemporaryFile.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
@ -41,17 +40,9 @@ TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & p
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
|
||||
|
||||
/// Do not use default temporaty root path `/tmp/tmpXXXXXX`.
|
||||
/// The `dummy_prefix` is used to know what to replace with the real prefix.
|
||||
String dummy_prefix = "a/";
|
||||
relative_path = Poco::TemporaryFile::tempName(dummy_prefix);
|
||||
dummy_prefix += "tmp";
|
||||
/// a/tmpXXXXX -> <prefix>XXXXX
|
||||
assert(relative_path.starts_with(dummy_prefix));
|
||||
relative_path.replace(0, dummy_prefix.length(), prefix);
|
||||
|
||||
if (relative_path.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file name is empty");
|
||||
/// A disk can be remote and shared between multiple replicas.
|
||||
/// That's why we must not use Poco::TemporaryFile::tempName() here (Poco::TemporaryFile::tempName() can return the same names for different processes on different nodes).
|
||||
relative_path = prefix + toString(UUIDHelpers::generateV4());
|
||||
}
|
||||
|
||||
String TemporaryFileOnDisk::getAbsolutePath() const
|
||||
|
@ -382,12 +382,10 @@ void ThreadStatus::finalizePerformanceCounters()
|
||||
updatePerformanceCounters();
|
||||
|
||||
// We want to close perf file descriptors if the perf events were enabled for
|
||||
// one query. What this code does in practice is less clear -- e.g., if I run
|
||||
// 'select 1 settings metrics_perf_events_enabled = 1', I still get
|
||||
// query_context->getSettingsRef().metrics_perf_events_enabled == 0 *shrug*.
|
||||
// one query.
|
||||
bool close_perf_descriptors = true;
|
||||
if (auto query_context_ptr = query_context.lock())
|
||||
close_perf_descriptors = !query_context_ptr->getSettingsRef().metrics_perf_events_enabled;
|
||||
if (auto global_context_ptr = global_context.lock())
|
||||
close_perf_descriptors = !global_context_ptr->getSettingsRef().metrics_perf_events_enabled;
|
||||
|
||||
try
|
||||
{
|
||||
@ -410,7 +408,7 @@ void ThreadStatus::finalizePerformanceCounters()
|
||||
if (settings.log_queries && settings.log_query_threads)
|
||||
{
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
Int64 query_duration_ms = std::chrono::duration_cast<std::chrono::microseconds>(now - query_start_time.point).count();
|
||||
Int64 query_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - query_start_time.point).count();
|
||||
if (query_duration_ms >= settings.log_queries_min_query_duration_ms.totalMilliseconds())
|
||||
{
|
||||
if (auto thread_log = global_context_ptr->getQueryThreadLog())
|
||||
|
@ -306,7 +306,9 @@ void fillMissingColumns(
|
||||
return;
|
||||
|
||||
size_t level = ISerialization::getArrayLevel(subpath);
|
||||
assert(level < num_dimensions);
|
||||
/// It can happen if element of Array is Map.
|
||||
if (level >= num_dimensions)
|
||||
return;
|
||||
|
||||
auto stream_name = ISerialization::getFileNameForStream(*requested_column, subpath);
|
||||
auto it = offsets_columns.find(stream_name);
|
||||
|
@ -419,7 +419,7 @@ catch (...)
|
||||
throw;
|
||||
}
|
||||
|
||||
void DistributedAsyncInsertDirectoryQueue::processFile(const std::string & file_path)
|
||||
void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path)
|
||||
{
|
||||
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
|
||||
|
||||
@ -459,7 +459,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(const std::string & file_
|
||||
if (isDistributedSendBroken(e.code(), e.isRemoteException()))
|
||||
{
|
||||
markAsBroken(file_path);
|
||||
current_file.clear();
|
||||
file_path.clear();
|
||||
}
|
||||
throw;
|
||||
}
|
||||
@ -473,8 +473,8 @@ void DistributedAsyncInsertDirectoryQueue::processFile(const std::string & file_
|
||||
|
||||
auto dir_sync_guard = getDirectorySyncGuard(relative_path);
|
||||
markAsSend(file_path);
|
||||
current_file.clear();
|
||||
LOG_TRACE(log, "Finished processing `{}` (took {} ms)", file_path, watch.elapsedMilliseconds());
|
||||
file_path.clear();
|
||||
}
|
||||
|
||||
struct DistributedAsyncInsertDirectoryQueue::BatchHeader
|
||||
|
@ -100,7 +100,7 @@ private:
|
||||
void addFile(const std::string & file_path);
|
||||
void initializeFilesFromDisk();
|
||||
void processFiles();
|
||||
void processFile(const std::string & file_path);
|
||||
void processFile(std::string & file_path);
|
||||
void processFilesWithBatching();
|
||||
|
||||
void markAsBroken(const std::string & file_path);
|
||||
|
@ -89,7 +89,7 @@ public:
|
||||
|
||||
virtual MergeTreeReaderPtr getReader(
|
||||
const NamesAndTypesList & columns_,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
UncompressedCache * uncompressed_cache,
|
||||
MarkCache * mark_cache,
|
||||
|
@ -24,7 +24,7 @@ namespace ErrorCodes
|
||||
IMergeTreeReader::IMergeTreeReader(
|
||||
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
|
||||
const NamesAndTypesList & columns_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
UncompressedCache * uncompressed_cache_,
|
||||
MarkCache * mark_cache_,
|
||||
const MarkRanges & all_mark_ranges_,
|
||||
@ -35,7 +35,7 @@ IMergeTreeReader::IMergeTreeReader(
|
||||
, uncompressed_cache(uncompressed_cache_)
|
||||
, mark_cache(mark_cache_)
|
||||
, settings(settings_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, storage_snapshot(storage_snapshot_)
|
||||
, all_mark_ranges(all_mark_ranges_)
|
||||
, alter_conversions(data_part_info_for_read->getAlterConversions())
|
||||
/// For wide parts convert plain arrays of Nested to subcolumns
|
||||
@ -71,7 +71,7 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
|
||||
res_columns, num_rows,
|
||||
Nested::convertToSubcolumns(requested_columns),
|
||||
Nested::convertToSubcolumns(available_columns),
|
||||
partially_read_columns, metadata_snapshot);
|
||||
partially_read_columns, storage_snapshot->metadata);
|
||||
|
||||
should_evaluate_missing_defaults = std::any_of(
|
||||
res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; });
|
||||
@ -110,7 +110,10 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
|
||||
}
|
||||
|
||||
auto dag = DB::evaluateMissingDefaults(
|
||||
additional_columns, requested_columns, metadata_snapshot->getColumns(), data_part_info_for_read->getContext());
|
||||
additional_columns, requested_columns,
|
||||
storage_snapshot->metadata->getColumns(),
|
||||
data_part_info_for_read->getContext());
|
||||
|
||||
if (dag)
|
||||
{
|
||||
dag->addMaterializingOutputActions();
|
||||
@ -216,7 +219,7 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
|
||||
}
|
||||
}
|
||||
|
||||
IMergeTreeReader::ColumnPositionLevel IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const
|
||||
IMergeTreeReader::ColumnNameLevel IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const
|
||||
{
|
||||
auto get_offsets_streams = [](const auto & serialization, const auto & name_in_storage)
|
||||
{
|
||||
@ -238,11 +241,11 @@ IMergeTreeReader::ColumnPositionLevel IMergeTreeReader::findColumnForOffsets(con
|
||||
auto required_offsets_streams = get_offsets_streams(getSerializationInPart(required_column), required_name_in_storage);
|
||||
|
||||
size_t max_matched_streams = 0;
|
||||
ColumnPositionLevel position_level;
|
||||
ColumnNameLevel name_level;
|
||||
|
||||
/// Find column that has maximal number of matching
|
||||
/// offsets columns with required_column.
|
||||
for (const auto & part_column : data_part_info_for_read->getColumns())
|
||||
for (const auto & part_column : Nested::convertToSubcolumns(data_part_info_for_read->getColumns()))
|
||||
{
|
||||
auto name_in_storage = Nested::extractTableName(part_column.name);
|
||||
if (name_in_storage != required_name_in_storage)
|
||||
@ -261,14 +264,14 @@ IMergeTreeReader::ColumnPositionLevel IMergeTreeReader::findColumnForOffsets(con
|
||||
it = current_it;
|
||||
}
|
||||
|
||||
if (i && (!position_level || i > max_matched_streams))
|
||||
if (i && (!name_level || i > max_matched_streams))
|
||||
{
|
||||
max_matched_streams = i;
|
||||
position_level.emplace(*data_part_info_for_read->getColumnPosition(part_column.name), it->second);
|
||||
name_level.emplace(part_column.name, it->second);
|
||||
}
|
||||
}
|
||||
|
||||
return position_level;
|
||||
return name_level;
|
||||
}
|
||||
|
||||
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const
|
||||
|
@ -24,7 +24,7 @@ public:
|
||||
IMergeTreeReader(
|
||||
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
|
||||
const NamesAndTypesList & columns_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
UncompressedCache * uncompressed_cache_,
|
||||
MarkCache * mark_cache_,
|
||||
const MarkRanges & all_mark_ranges_,
|
||||
@ -92,22 +92,23 @@ protected:
|
||||
|
||||
MergeTreeReaderSettings settings;
|
||||
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
StorageSnapshotPtr storage_snapshot;
|
||||
MarkRanges all_mark_ranges;
|
||||
|
||||
/// Position and level (of nesting).
|
||||
using ColumnPositionLevel = std::optional<std::pair<size_t, size_t>>;
|
||||
using ColumnNameLevel = std::optional<std::pair<String, size_t>>;
|
||||
|
||||
/// In case of part of the nested column does not exists, offsets should be
|
||||
/// read, but only the offsets for the current column, that is why it
|
||||
/// returns pair of size_t, not just one.
|
||||
ColumnPositionLevel findColumnForOffsets(const NameAndTypePair & column) const;
|
||||
ColumnNameLevel findColumnForOffsets(const NameAndTypePair & column) const;
|
||||
|
||||
NameSet partially_read_columns;
|
||||
|
||||
private:
|
||||
/// Alter conversions, which must be applied on fly if required
|
||||
AlterConversionsPtr alter_conversions;
|
||||
|
||||
private:
|
||||
/// Columns that are requested to read.
|
||||
NamesAndTypesList requested_columns;
|
||||
|
||||
|
@ -191,7 +191,6 @@ ChunkAndProgress IMergeTreeSelectAlgorithm::read()
|
||||
}
|
||||
|
||||
void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForCurrentTask(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const IMergeTreeReader::ValueSizeMap & value_size_map,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
|
||||
{
|
||||
@ -206,7 +205,7 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForCurrentTask(
|
||||
else
|
||||
{
|
||||
reader = task->data_part->getReader(
|
||||
task->task_columns.columns, metadata_snapshot, task->mark_ranges,
|
||||
task->task_columns.columns, storage_snapshot, task->mark_ranges,
|
||||
owned_uncompressed_cache.get(), owned_mark_cache.get(),
|
||||
task->alter_conversions, reader_settings, value_size_map, profile_callback);
|
||||
}
|
||||
@ -222,8 +221,8 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForCurrentTask(
|
||||
{
|
||||
initializeMergeTreePreReadersForPart(
|
||||
task->data_part, task->alter_conversions,
|
||||
task->task_columns, metadata_snapshot,
|
||||
task->mark_ranges, value_size_map, profile_callback);
|
||||
task->task_columns, task->mark_ranges,
|
||||
value_size_map, profile_callback);
|
||||
}
|
||||
}
|
||||
|
||||
@ -231,18 +230,17 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForPart(
|
||||
const MergeTreeData::DataPartPtr & data_part,
|
||||
const AlterConversionsPtr & alter_conversions,
|
||||
const MergeTreeReadTaskColumns & task_columns,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
const IMergeTreeReader::ValueSizeMap & value_size_map,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
|
||||
{
|
||||
reader = data_part->getReader(
|
||||
task_columns.columns, metadata_snapshot, mark_ranges,
|
||||
task_columns.columns, storage_snapshot, mark_ranges,
|
||||
owned_uncompressed_cache.get(), owned_mark_cache.get(),
|
||||
alter_conversions, reader_settings, value_size_map, profile_callback);
|
||||
|
||||
initializeMergeTreePreReadersForPart(
|
||||
data_part, alter_conversions, task_columns, metadata_snapshot,
|
||||
data_part, alter_conversions, task_columns,
|
||||
mark_ranges, value_size_map, profile_callback);
|
||||
}
|
||||
|
||||
@ -250,7 +248,6 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreePreReadersForPart(
|
||||
const MergeTreeData::DataPartPtr & data_part,
|
||||
const AlterConversionsPtr & alter_conversions,
|
||||
const MergeTreeReadTaskColumns & task_columns,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
const IMergeTreeReader::ValueSizeMap & value_size_map,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
|
||||
@ -262,7 +259,7 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreePreReadersForPart(
|
||||
{
|
||||
pre_reader_for_step.push_back(
|
||||
data_part->getReader(
|
||||
{LightweightDeleteDescription::FILTER_COLUMN}, metadata_snapshot,
|
||||
{LightweightDeleteDescription::FILTER_COLUMN}, storage_snapshot,
|
||||
mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(),
|
||||
alter_conversions, reader_settings, value_size_map, profile_callback));
|
||||
}
|
||||
@ -271,7 +268,7 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreePreReadersForPart(
|
||||
{
|
||||
pre_reader_for_step.push_back(
|
||||
data_part->getReader(
|
||||
pre_columns_per_step, metadata_snapshot, mark_ranges,
|
||||
pre_columns_per_step, storage_snapshot, mark_ranges,
|
||||
owned_uncompressed_cache.get(), owned_mark_cache.get(),
|
||||
alter_conversions, reader_settings, value_size_map, profile_callback));
|
||||
}
|
||||
|
@ -120,7 +120,6 @@ protected:
|
||||
|
||||
/// Sets up data readers for each step of prewhere and where
|
||||
void initializeMergeTreeReadersForCurrentTask(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const IMergeTreeReader::ValueSizeMap & value_size_map,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
|
||||
|
||||
@ -128,7 +127,6 @@ protected:
|
||||
const MergeTreeData::DataPartPtr & data_part,
|
||||
const AlterConversionsPtr & alter_conversions,
|
||||
const MergeTreeReadTaskColumns & task_columns,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
const IMergeTreeReader::ValueSizeMap & value_size_map,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
|
||||
@ -207,7 +205,6 @@ private:
|
||||
const MergeTreeData::DataPartPtr & data_part,
|
||||
const AlterConversionsPtr & alter_conversions,
|
||||
const MergeTreeReadTaskColumns & task_columns,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
const IMergeTreeReader::ValueSizeMap & value_size_map,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
|
||||
|
@ -30,7 +30,7 @@ MergeTreeDataPartCompact::MergeTreeDataPartCompact(
|
||||
|
||||
IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
|
||||
const NamesAndTypesList & columns_to_read,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
UncompressedCache * uncompressed_cache,
|
||||
MarkCache * mark_cache,
|
||||
@ -43,7 +43,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
|
||||
auto * load_marks_threadpool = reader_settings.read_settings.load_marks_asynchronously ? &read_info->getContext()->getLoadMarksThreadpool() : nullptr;
|
||||
|
||||
return std::make_unique<MergeTreeReaderCompact>(
|
||||
read_info, columns_to_read, metadata_snapshot, uncompressed_cache,
|
||||
read_info, columns_to_read, storage_snapshot, uncompressed_cache,
|
||||
mark_cache, mark_ranges, reader_settings, load_marks_threadpool,
|
||||
avg_value_size_hints, profile_callback);
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ public:
|
||||
|
||||
MergeTreeReaderPtr getReader(
|
||||
const NamesAndTypesList & columns,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
UncompressedCache * uncompressed_cache,
|
||||
MarkCache * mark_cache,
|
||||
|
@ -32,7 +32,7 @@ MergeTreeDataPartInMemory::MergeTreeDataPartInMemory(
|
||||
|
||||
IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
|
||||
const NamesAndTypesList & columns_to_read,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
UncompressedCache * /* uncompressed_cache */,
|
||||
MarkCache * /* mark_cache */,
|
||||
@ -45,7 +45,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
|
||||
auto ptr = std::static_pointer_cast<const MergeTreeDataPartInMemory>(shared_from_this());
|
||||
|
||||
return std::make_unique<MergeTreeReaderInMemory>(
|
||||
read_info, ptr, columns_to_read, metadata_snapshot, mark_ranges, reader_settings);
|
||||
read_info, ptr, columns_to_read, storage_snapshot, mark_ranges, reader_settings);
|
||||
}
|
||||
|
||||
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(
|
||||
|
@ -19,7 +19,7 @@ public:
|
||||
|
||||
MergeTreeReaderPtr getReader(
|
||||
const NamesAndTypesList & columns,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
UncompressedCache * uncompressed_cache,
|
||||
MarkCache * mark_cache,
|
||||
|
@ -29,7 +29,7 @@ MergeTreeDataPartWide::MergeTreeDataPartWide(
|
||||
|
||||
IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
|
||||
const NamesAndTypesList & columns_to_read,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
UncompressedCache * uncompressed_cache,
|
||||
MarkCache * mark_cache,
|
||||
@ -41,7 +41,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
|
||||
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), alter_conversions);
|
||||
return std::make_unique<MergeTreeReaderWide>(
|
||||
read_info, columns_to_read,
|
||||
metadata_snapshot, uncompressed_cache,
|
||||
storage_snapshot, uncompressed_cache,
|
||||
mark_cache, mark_ranges, reader_settings,
|
||||
avg_value_size_hints, profile_callback);
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ public:
|
||||
|
||||
MergeTreeReaderPtr getReader(
|
||||
const NamesAndTypesList & columns,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
const MarkRanges & mark_ranges,
|
||||
UncompressedCache * uncompressed_cache,
|
||||
MarkCache * mark_cache,
|
||||
|
@ -39,6 +39,7 @@ namespace ProfileEvents
|
||||
extern const Event MergeTreeDataProjectionWriterRows;
|
||||
extern const Event MergeTreeDataProjectionWriterUncompressedBytes;
|
||||
extern const Event MergeTreeDataProjectionWriterCompressedBytes;
|
||||
extern const Event RejectedInserts;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -58,7 +59,8 @@ void buildScatterSelector(
|
||||
const ColumnRawPtrs & columns,
|
||||
PODArray<size_t> & partition_num_to_first_row,
|
||||
IColumn::Selector & selector,
|
||||
size_t max_parts)
|
||||
size_t max_parts,
|
||||
ContextPtr context)
|
||||
{
|
||||
/// Use generic hashed variant since partitioning is unlikely to be a bottleneck.
|
||||
using Data = HashMap<UInt128, size_t, UInt128TrivialHash>;
|
||||
@ -66,6 +68,8 @@ void buildScatterSelector(
|
||||
|
||||
size_t num_rows = columns[0]->size();
|
||||
size_t partitions_count = 0;
|
||||
size_t throw_on_limit = context->getSettingsRef().throw_on_max_partitions_per_insert_block;
|
||||
|
||||
for (size_t i = 0; i < num_rows; ++i)
|
||||
{
|
||||
Data::key_type key = hash128(i, columns.size(), columns);
|
||||
@ -75,7 +79,9 @@ void buildScatterSelector(
|
||||
|
||||
if (inserted)
|
||||
{
|
||||
if (max_parts && partitions_count >= max_parts)
|
||||
if (max_parts && partitions_count >= max_parts && throw_on_limit)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception(ErrorCodes::TOO_MANY_PARTS,
|
||||
"Too many partitions for single INSERT block (more than {}). "
|
||||
"The limit is controlled by 'max_partitions_per_insert_block' setting. "
|
||||
@ -85,6 +91,7 @@ void buildScatterSelector(
|
||||
"for a table is under 1000..10000. Please note, that partitioning is not intended "
|
||||
"to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). "
|
||||
"Partitions are intended for data manipulation (DROP PARTITION, etc).", max_parts);
|
||||
}
|
||||
|
||||
partition_num_to_first_row.push_back(i);
|
||||
it->getMapped() = partitions_count;
|
||||
@ -102,6 +109,18 @@ void buildScatterSelector(
|
||||
if (partitions_count > 1)
|
||||
selector[i] = it->getMapped();
|
||||
}
|
||||
// Checking partitions per insert block again here outside the loop above
|
||||
// so we can log the total number of partitions that would have parts created
|
||||
if (max_parts && partitions_count >= max_parts && !throw_on_limit)
|
||||
{
|
||||
const auto & client_info = context->getClientInfo();
|
||||
Poco::Logger * log = &Poco::Logger::get("MergeTreeDataWriter");
|
||||
|
||||
LOG_WARNING(log, "INSERT query from initial_user {} (query ID: {}) inserted a block "
|
||||
"that created parts in {} partitions. This is being logged "
|
||||
"rather than throwing an exception as throw_on_max_partitions_per_insert_block=false.",
|
||||
client_info.initial_user, client_info.initial_query_id, partitions_count);
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes ttls and updates ttl infos
|
||||
@ -240,7 +259,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
|
||||
|
||||
PODArray<size_t> partition_num_to_first_row;
|
||||
IColumn::Selector selector;
|
||||
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts);
|
||||
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts, context);
|
||||
|
||||
auto async_insert_info_with_partition = scatterAsyncInsertInfoBySelector(async_insert_info, selector, partition_num_to_first_row.size());
|
||||
|
||||
|
@ -97,7 +97,7 @@ std::future<MergeTreeReaderPtr> MergeTreePrefetchedReadPool::createPrefetchedRea
|
||||
Priority priority) const
|
||||
{
|
||||
auto reader = data_part.getReader(
|
||||
columns, storage_snapshot->metadata, required_ranges,
|
||||
columns, storage_snapshot, required_ranges,
|
||||
uncompressed_cache, mark_cache, alter_conversions, reader_settings,
|
||||
IMergeTreeReader::ValueSizeMap{}, profile_callback);
|
||||
|
||||
|
@ -17,7 +17,7 @@ namespace ErrorCodes
|
||||
MergeTreeReaderCompact::MergeTreeReaderCompact(
|
||||
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
|
||||
NamesAndTypesList columns_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
UncompressedCache * uncompressed_cache_,
|
||||
MarkCache * mark_cache_,
|
||||
MarkRanges mark_ranges_,
|
||||
@ -29,7 +29,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
|
||||
: IMergeTreeReader(
|
||||
data_part_info_for_read_,
|
||||
columns_,
|
||||
metadata_snapshot_,
|
||||
storage_snapshot_,
|
||||
uncompressed_cache_,
|
||||
mark_cache_,
|
||||
mark_ranges_,
|
||||
@ -130,7 +130,7 @@ void MergeTreeReaderCompact::fillColumnPositions()
|
||||
size_t columns_num = columns_to_read.size();
|
||||
|
||||
column_positions.resize(columns_num);
|
||||
read_only_offsets.resize(columns_num);
|
||||
columns_for_offsets.resize(columns_num);
|
||||
|
||||
for (size_t i = 0; i < columns_num; ++i)
|
||||
{
|
||||
@ -149,20 +149,48 @@ void MergeTreeReaderCompact::fillColumnPositions()
|
||||
position.reset();
|
||||
}
|
||||
|
||||
/// If array of Nested column is missing in part,
|
||||
/// we have to read its offsets if they exist.
|
||||
if (!position && is_array)
|
||||
{
|
||||
/// If array of Nested column is missing in part,
|
||||
/// we have to read its offsets if they exist.
|
||||
auto position_level = findColumnForOffsets(column_to_read);
|
||||
if (position_level.has_value())
|
||||
NameAndTypePair column_to_read_with_subcolumns = column_to_read;
|
||||
auto [name_in_storage, subcolumn_name] = Nested::splitName(column_to_read.name);
|
||||
|
||||
/// If it is a part of Nested, we need to get the column from
|
||||
/// storage metatadata which is converted to Nested type with subcolumns.
|
||||
/// It is required for proper counting of shared streams.
|
||||
if (!subcolumn_name.empty())
|
||||
{
|
||||
column_positions[i].emplace(position_level->first);
|
||||
read_only_offsets[i].emplace(position_level->second);
|
||||
/// If column is renamed get the new name from storage metadata.
|
||||
if (alter_conversions->columnHasNewName(name_in_storage))
|
||||
name_in_storage = alter_conversions->getColumnNewName(name_in_storage);
|
||||
|
||||
if (!storage_columns_with_collected_nested)
|
||||
{
|
||||
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
|
||||
auto storage_columns_list = Nested::collect(storage_snapshot->getColumns(options));
|
||||
storage_columns_with_collected_nested = ColumnsDescription(std::move(storage_columns_list));
|
||||
}
|
||||
|
||||
column_to_read_with_subcolumns = storage_columns_with_collected_nested
|
||||
->getColumnOrSubcolumn(
|
||||
GetColumnsOptions::All,
|
||||
Nested::concatenateName(name_in_storage, subcolumn_name));
|
||||
}
|
||||
|
||||
auto name_level_for_offsets = findColumnForOffsets(column_to_read_with_subcolumns);
|
||||
|
||||
if (name_level_for_offsets.has_value())
|
||||
{
|
||||
column_positions[i] = data_part_info_for_read->getColumnPosition(name_level_for_offsets->first);
|
||||
columns_for_offsets[i] = name_level_for_offsets;
|
||||
partially_read_columns.insert(column_to_read.name);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
column_positions[i] = std::move(position);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -203,7 +231,7 @@ size_t MergeTreeReaderCompact::readRows(
|
||||
auto & column = res_columns[pos];
|
||||
size_t column_size_before_reading = column->size();
|
||||
|
||||
readData(columns_to_read[pos], column, from_mark, current_task_last_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]);
|
||||
readData(columns_to_read[pos], column, from_mark, current_task_last_mark, *column_positions[pos], rows_to_read, columns_for_offsets[pos]);
|
||||
|
||||
size_t read_rows_in_column = column->size() - column_size_before_reading;
|
||||
if (read_rows_in_column != rows_to_read)
|
||||
@ -239,23 +267,37 @@ size_t MergeTreeReaderCompact::readRows(
|
||||
void MergeTreeReaderCompact::readData(
|
||||
const NameAndTypePair & name_and_type, ColumnPtr & column,
|
||||
size_t from_mark, size_t current_task_last_mark, size_t column_position, size_t rows_to_read,
|
||||
std::optional<size_t> only_offsets_level)
|
||||
ColumnNameLevel name_level_for_offsets)
|
||||
{
|
||||
const auto & [name, type] = name_and_type;
|
||||
std::optional<NameAndTypePair> column_for_offsets;
|
||||
|
||||
if (name_level_for_offsets.has_value())
|
||||
{
|
||||
const auto & part_columns = data_part_info_for_read->getColumnsDescription();
|
||||
column_for_offsets = part_columns.getPhysical(name_level_for_offsets->first);
|
||||
}
|
||||
|
||||
adjustUpperBound(current_task_last_mark); /// Must go before seek.
|
||||
|
||||
if (!isContinuousReading(from_mark, column_position))
|
||||
seekToMark(from_mark, column_position);
|
||||
|
||||
/// If we read only offsets we have to read prefix anyway
|
||||
/// to preserve correctness of serialization.
|
||||
auto buffer_getter_for_prefix = [&](const auto &) -> ReadBuffer *
|
||||
{
|
||||
return data_buffer;
|
||||
};
|
||||
|
||||
auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer *
|
||||
{
|
||||
/// Offset stream from another column could be read, in case of current
|
||||
/// column does not exists (see findColumnForOffsets() in
|
||||
/// MergeTreeReaderCompact::fillColumnPositions())
|
||||
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
|
||||
if (only_offsets_level.has_value())
|
||||
if (name_level_for_offsets.has_value())
|
||||
{
|
||||
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
|
||||
if (!is_offsets)
|
||||
return nullptr;
|
||||
|
||||
@ -275,7 +317,7 @@ void MergeTreeReaderCompact::readData(
|
||||
///
|
||||
/// Here only_offsets_level is the level of the alternative stream,
|
||||
/// and substream_path.size() is the level of the current stream.
|
||||
if (only_offsets_level.value() < ISerialization::getArrayLevel(substream_path))
|
||||
if (name_level_for_offsets->second < ISerialization::getArrayLevel(substream_path))
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@ -283,22 +325,32 @@ void MergeTreeReaderCompact::readData(
|
||||
};
|
||||
|
||||
ISerialization::DeserializeBinaryBulkStatePtr state;
|
||||
ISerialization::DeserializeBinaryBulkStatePtr state_for_prefix;
|
||||
|
||||
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
|
||||
deserialize_settings.getter = buffer_getter;
|
||||
deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
|
||||
|
||||
if (name_and_type.isSubcolumn())
|
||||
{
|
||||
const auto & type_in_storage = name_and_type.getTypeInStorage();
|
||||
const auto & name_in_storage = name_and_type.getNameInStorage();
|
||||
NameAndTypePair name_type_in_storage{name_and_type.getNameInStorage(), name_and_type.getTypeInStorage()};
|
||||
|
||||
auto serialization = getSerializationInPart({name_in_storage, type_in_storage});
|
||||
ColumnPtr temp_column = type_in_storage->createColumn(*serialization);
|
||||
/// In case of reading onlys offset use the correct serialization for reading of the prefix
|
||||
auto serialization = getSerializationInPart(name_type_in_storage);
|
||||
ColumnPtr temp_column = name_type_in_storage.type->createColumn(*serialization);
|
||||
|
||||
if (column_for_offsets)
|
||||
{
|
||||
auto serialization_for_prefix = getSerializationInPart(*column_for_offsets);
|
||||
|
||||
deserialize_settings.getter = buffer_getter_for_prefix;
|
||||
serialization_for_prefix->deserializeBinaryBulkStatePrefix(deserialize_settings, state_for_prefix);
|
||||
}
|
||||
|
||||
deserialize_settings.getter = buffer_getter;
|
||||
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
|
||||
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, state, nullptr);
|
||||
|
||||
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
|
||||
auto subcolumn = name_type_in_storage.type->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
|
||||
|
||||
/// TODO: Avoid extra copying.
|
||||
if (column->empty())
|
||||
@ -308,13 +360,24 @@ void MergeTreeReaderCompact::readData(
|
||||
}
|
||||
else
|
||||
{
|
||||
/// In case of reading only offsets use the correct serialization for reading the prefix
|
||||
auto serialization = getSerializationInPart(name_and_type);
|
||||
|
||||
if (column_for_offsets)
|
||||
{
|
||||
auto serialization_for_prefix = getSerializationInPart(*column_for_offsets);
|
||||
|
||||
deserialize_settings.getter = buffer_getter_for_prefix;
|
||||
serialization_for_prefix->deserializeBinaryBulkStatePrefix(deserialize_settings, state_for_prefix);
|
||||
}
|
||||
|
||||
deserialize_settings.getter = buffer_getter;
|
||||
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
|
||||
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state, nullptr);
|
||||
}
|
||||
|
||||
/// The buffer is left in inconsistent state after reading single offsets
|
||||
if (only_offsets_level.has_value())
|
||||
if (name_level_for_offsets.has_value())
|
||||
last_read_granule.reset();
|
||||
else
|
||||
last_read_granule.emplace(from_mark, column_position);
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
MergeTreeReaderCompact(
|
||||
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
|
||||
NamesAndTypesList columns_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
UncompressedCache * uncompressed_cache_,
|
||||
MarkCache * mark_cache_,
|
||||
MarkRanges mark_ranges_,
|
||||
@ -52,12 +52,19 @@ private:
|
||||
|
||||
MergeTreeMarksLoader marks_loader;
|
||||
|
||||
/// Storage columns with collected separate arrays of Nested to columns of Nested type.
|
||||
/// They maybe be needed for finding offsets of missed Nested columns in parts.
|
||||
/// They are rarely used and are heavy to initialized, so we create them
|
||||
/// only on demand and cache in this field.
|
||||
std::optional<ColumnsDescription> storage_columns_with_collected_nested;
|
||||
|
||||
/// Positions of columns in part structure.
|
||||
using ColumnPositions = std::vector<std::optional<size_t>>;
|
||||
ColumnPositions column_positions;
|
||||
|
||||
/// Should we read full column or only it's offsets.
|
||||
/// Element of the vector is the level of the alternative stream.
|
||||
std::vector<std::optional<size_t>> read_only_offsets;
|
||||
std::vector<ColumnNameLevel> columns_for_offsets;
|
||||
|
||||
/// For asynchronous reading from remote fs. Same meaning as in MergeTreeReaderStream.
|
||||
std::optional<size_t> last_right_offset;
|
||||
@ -68,8 +75,8 @@ private:
|
||||
void seekToMark(size_t row_index, size_t column_index);
|
||||
|
||||
void readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark,
|
||||
size_t current_task_last_mark, size_t column_position, size_t rows_to_read,
|
||||
std::optional<size_t> only_offsets_level);
|
||||
size_t current_task_last_mark, size_t column_position,
|
||||
size_t rows_to_read, ColumnNameLevel name_level_for_offsets);
|
||||
|
||||
/// Returns maximal value of granule size in compressed file from @mark_ranges.
|
||||
/// This value is used as size of read buffer.
|
||||
@ -84,7 +91,6 @@ private:
|
||||
|
||||
ReadBufferFromFileBase::ProfileCallback profile_callback;
|
||||
clockid_t clock_type;
|
||||
|
||||
bool initialized = false;
|
||||
};
|
||||
|
||||
|
@ -19,13 +19,13 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
|
||||
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
|
||||
DataPartInMemoryPtr data_part_,
|
||||
NamesAndTypesList columns_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
MarkRanges mark_ranges_,
|
||||
MergeTreeReaderSettings settings_)
|
||||
: IMergeTreeReader(
|
||||
data_part_info_for_read_,
|
||||
columns_,
|
||||
metadata_snapshot_,
|
||||
storage_snapshot_,
|
||||
nullptr,
|
||||
nullptr,
|
||||
mark_ranges_,
|
||||
@ -42,7 +42,7 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
|
||||
{
|
||||
if (auto offsets_position = findColumnForOffsets(column_to_read))
|
||||
{
|
||||
positions_for_offsets[column_to_read.name] = offsets_position->first;
|
||||
positions_for_offsets[column_to_read.name] = *data_part_info_for_read->getColumnPosition(offsets_position->first);
|
||||
partially_read_columns.insert(column_to_read.name);
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ public:
|
||||
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
|
||||
DataPartInMemoryPtr data_part_,
|
||||
NamesAndTypesList columns_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
MarkRanges mark_ranges_,
|
||||
MergeTreeReaderSettings settings_);
|
||||
|
||||
|
@ -24,7 +24,7 @@ namespace
|
||||
MergeTreeReaderWide::MergeTreeReaderWide(
|
||||
MergeTreeDataPartInfoForReaderPtr data_part_info_,
|
||||
NamesAndTypesList columns_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
UncompressedCache * uncompressed_cache_,
|
||||
MarkCache * mark_cache_,
|
||||
MarkRanges mark_ranges_,
|
||||
@ -35,7 +35,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
|
||||
: IMergeTreeReader(
|
||||
data_part_info_,
|
||||
columns_,
|
||||
metadata_snapshot_,
|
||||
storage_snapshot_,
|
||||
uncompressed_cache_,
|
||||
mark_cache_,
|
||||
mark_ranges_,
|
||||
|
@ -17,7 +17,7 @@ public:
|
||||
MergeTreeReaderWide(
|
||||
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
|
||||
NamesAndTypesList columns_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
UncompressedCache * uncompressed_cache_,
|
||||
MarkCache * mark_cache_,
|
||||
MarkRanges mark_ranges_,
|
||||
|
@ -65,7 +65,7 @@ void MergeTreeSelectAlgorithm::initializeReaders()
|
||||
|
||||
initializeMergeTreeReadersForPart(
|
||||
data_part, alter_conversions, task_columns,
|
||||
storage_snapshot->getMetadataForQuery(), all_mark_ranges, {}, {});
|
||||
all_mark_ranges, /*value_size_map=*/ {}, /*profile_callback=*/ {});
|
||||
}
|
||||
|
||||
|
||||
|
@ -151,7 +151,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
|
||||
mark_ranges.emplace(MarkRanges{MarkRange(0, data_part->getMarksCount())});
|
||||
|
||||
reader = data_part->getReader(
|
||||
columns_for_reader, storage_snapshot->metadata,
|
||||
columns_for_reader, storage_snapshot,
|
||||
*mark_ranges, /* uncompressed_cache = */ nullptr,
|
||||
mark_cache.get(), alter_conversions, reader_settings, {}, {});
|
||||
}
|
||||
|
@ -45,8 +45,6 @@ void MergeTreeThreadSelectAlgorithm::finalizeNewTask()
|
||||
|
||||
/// Allows pool to reduce number of threads in case of too slow reads.
|
||||
auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info_) { pool->profileFeedback(info_); };
|
||||
const auto & metadata_snapshot = storage_snapshot->metadata;
|
||||
|
||||
IMergeTreeReader::ValueSizeMap value_size_map;
|
||||
|
||||
if (reader && part_name != last_read_part_name)
|
||||
@ -57,7 +55,7 @@ void MergeTreeThreadSelectAlgorithm::finalizeNewTask()
|
||||
/// task->reader.valid() means there is a prefetched reader in this test, use it.
|
||||
const bool init_new_readers = !reader || task->reader.valid() || part_name != last_read_part_name;
|
||||
if (init_new_readers)
|
||||
initializeMergeTreeReadersForCurrentTask(metadata_snapshot, value_size_map, profile_callback);
|
||||
initializeMergeTreeReadersForCurrentTask(value_size_map, profile_callback);
|
||||
|
||||
last_read_part_name = part_name;
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ from commit_status_helper import (
|
||||
get_commit,
|
||||
get_commit_filtered_statuses,
|
||||
post_commit_status,
|
||||
update_mergeable_check,
|
||||
)
|
||||
from get_robot_token import get_best_robot_token
|
||||
from pr_info import PRInfo
|
||||
@ -18,6 +19,8 @@ def main():
|
||||
|
||||
pr_info = PRInfo(need_orgs=True)
|
||||
gh = Github(get_best_robot_token(), per_page=100)
|
||||
# Update the Mergeable Check at the final step
|
||||
update_mergeable_check(gh, pr_info, CI_STATUS_NAME)
|
||||
commit = get_commit(gh, pr_info.sha)
|
||||
|
||||
statuses = [
|
||||
@ -27,7 +30,8 @@ def main():
|
||||
]
|
||||
if not statuses:
|
||||
return
|
||||
status = statuses[0]
|
||||
# Take the latest status
|
||||
status = statuses[-1]
|
||||
if status.state == "pending":
|
||||
post_commit_status(
|
||||
commit,
|
||||
|
@ -0,0 +1,33 @@
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<one>
|
||||
<type>local</type>
|
||||
<path>/var/lib/disks/one/</path>
|
||||
</one>
|
||||
<two>
|
||||
<type>local</type>
|
||||
<path>/var/lib/disks/two/</path>
|
||||
</two>
|
||||
</disks>
|
||||
<policies>
|
||||
<policy1>
|
||||
<volumes>
|
||||
<single>
|
||||
<disk>one</disk>
|
||||
</single>
|
||||
</volumes>
|
||||
</policy1>
|
||||
<policy2>
|
||||
<volumes>
|
||||
<single>
|
||||
<disk>two</disk>
|
||||
</single>
|
||||
</volumes>
|
||||
</policy2>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
<backups>
|
||||
<allowed_path>/backups</allowed_path>
|
||||
</backups>
|
||||
</clickhouse>
|
76
tests/integration/test_backup_restore_storage_policy/test.py
Normal file
76
tests/integration/test_backup_restore_storage_policy/test.py
Normal file
@ -0,0 +1,76 @@
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
|
||||
backup_id_counter = 0
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
instance = cluster.add_instance(
|
||||
"instance",
|
||||
main_configs=["configs/storage_config.xml"],
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_after_test():
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
instance.query("DROP DATABASE IF EXISTS test")
|
||||
|
||||
|
||||
def new_backup_name():
|
||||
global backup_id_counter
|
||||
backup_id_counter += 1
|
||||
return f"File('/backups/{backup_id_counter}/')"
|
||||
|
||||
|
||||
def create_table_backup(backup_name, storage_policy=None):
|
||||
instance.query("CREATE DATABASE test")
|
||||
create_query = "CREATE TABLE test.table(x UInt32) ENGINE=MergeTree ORDER BY x"
|
||||
if storage_policy is not None:
|
||||
create_query += f" SETTINGS storage_policy = '{storage_policy}'"
|
||||
instance.query(create_query)
|
||||
instance.query(f"INSERT INTO test.table SELECT number FROM numbers(10)")
|
||||
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
|
||||
instance.query("DROP TABLE test.table SYNC")
|
||||
|
||||
|
||||
def restore_table(backup_name, storage_policy=None):
|
||||
restore_query = f"RESTORE TABLE test.table FROM {backup_name}"
|
||||
if storage_policy is not None:
|
||||
restore_query += f" SETTINGS storage_policy = '{storage_policy}'"
|
||||
instance.query(restore_query)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"origin_policy, restore_policy, expected_policy",
|
||||
[
|
||||
(None, "", "default"),
|
||||
(None, None, "default"),
|
||||
(None, "policy1", "policy1"),
|
||||
("policy1", "policy1", "policy1"),
|
||||
("policy1", "policy2", "policy2"),
|
||||
("policy1", "", "default"),
|
||||
("policy1", None, "policy1"),
|
||||
],
|
||||
)
|
||||
def test_storage_policies(origin_policy, restore_policy, expected_policy):
|
||||
backup_name = new_backup_name()
|
||||
create_table_backup(backup_name, origin_policy)
|
||||
restore_table(backup_name, restore_policy)
|
||||
|
||||
assert (
|
||||
instance.query("SELECT storage_policy FROM system.tables WHERE name='table'")
|
||||
== f"{expected_policy}\n"
|
||||
)
|
@ -44,6 +44,18 @@ def fill_table():
|
||||
check_data(499500, 1000)
|
||||
|
||||
|
||||
# kazoo.delete may throw NotEmptyError on concurrent modifications of the path
|
||||
def zk_rmr_with_retries(zk, path):
|
||||
for i in range(1, 10):
|
||||
try:
|
||||
zk.delete(path, recursive=True)
|
||||
return
|
||||
except Exception as ex:
|
||||
print(ex)
|
||||
time.sleep(0.5)
|
||||
assert False
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
@ -84,7 +96,7 @@ def test_restore_replica_sequential(start_cluster):
|
||||
fill_table()
|
||||
|
||||
print("Deleting root ZK path metadata")
|
||||
zk.delete("/clickhouse/tables/test", recursive=True)
|
||||
zk_rmr_with_retries(zk, "/clickhouse/tables/test")
|
||||
assert zk.exists("/clickhouse/tables/test") is None
|
||||
|
||||
node_1.query("SYSTEM RESTART REPLICA test")
|
||||
@ -119,7 +131,7 @@ def test_restore_replica_parallel(start_cluster):
|
||||
fill_table()
|
||||
|
||||
print("Deleting root ZK path metadata")
|
||||
zk.delete("/clickhouse/tables/test", recursive=True)
|
||||
zk_rmr_with_retries(zk, "/clickhouse/tables/test")
|
||||
assert zk.exists("/clickhouse/tables/test") is None
|
||||
|
||||
node_1.query("SYSTEM RESTART REPLICA test")
|
||||
@ -147,12 +159,12 @@ def test_restore_replica_alive_replicas(start_cluster):
|
||||
fill_table()
|
||||
|
||||
print("Deleting replica2 path, trying to restore replica1")
|
||||
zk.delete("/clickhouse/tables/test/replicas/replica2", recursive=True)
|
||||
zk_rmr_with_retries(zk, "/clickhouse/tables/test/replicas/replica2")
|
||||
assert zk.exists("/clickhouse/tables/test/replicas/replica2") is None
|
||||
node_1.query_and_get_error("SYSTEM RESTORE REPLICA test")
|
||||
|
||||
print("Deleting replica1 path, trying to restore replica1")
|
||||
zk.delete("/clickhouse/tables/test/replicas/replica1", recursive=True)
|
||||
zk_rmr_with_retries(zk, "/clickhouse/tables/test/replicas/replica1")
|
||||
assert zk.exists("/clickhouse/tables/test/replicas/replica1") is None
|
||||
|
||||
node_1.query("SYSTEM RESTART REPLICA test")
|
||||
|
@ -3,6 +3,8 @@ create table data_01593 (key Int) engine=MergeTree() order by key partition by k
|
||||
|
||||
insert into data_01593 select * from numbers_mt(10);
|
||||
insert into data_01593 select * from numbers_mt(10) settings max_partitions_per_insert_block=1; -- { serverError TOO_MANY_PARTS }
|
||||
-- throw_on_max_partitions_per_insert_block=false means we'll just log that the limit was reached rather than throw
|
||||
insert into data_01593 select * from numbers_mt(10) settings max_partitions_per_insert_block=1, throw_on_max_partitions_per_insert_block=false;
|
||||
-- settings for INSERT is prefered
|
||||
insert into data_01593 settings max_partitions_per_insert_block=100 select * from numbers_mt(10) settings max_partitions_per_insert_block=1;
|
||||
|
||||
|
@ -3,8 +3,8 @@
|
||||
{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
|
||||
1
|
||||
===2===
|
||||
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
|
||||
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
|
||||
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
|
||||
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
|
||||
3
|
||||
2
|
||||
===3===
|
||||
|
@ -0,0 +1,30 @@
|
||||
[] []
|
||||
['0'] ['']
|
||||
['0','1'] ['','']
|
||||
['0','1','2'] ['','','']
|
||||
['0','1','2','3'] ['','','','']
|
||||
['0','1','2','3','4'] ['','','','','']
|
||||
['0','1','2','3','4','5'] ['','','','','','']
|
||||
['0','1','2','3','4','5','6'] ['','','','','','','']
|
||||
['0','1','2','3','4','5','6','7'] ['','','','','','','','']
|
||||
['0','1','2','3','4','5','6','7','8'] ['','','','','','','','','']
|
||||
[] []
|
||||
[[]] [[]]
|
||||
[[],['0']] [[],[]]
|
||||
[[],['0'],['0','1']] [[],[],[]]
|
||||
[[],['0'],['0','1'],['0','1','2']] [[],[],[],[]]
|
||||
[[],['0'],['0','1'],['0','1','2'],[]] [[],[],[],[],[]]
|
||||
[[],['0'],['0','1'],['0','1','2'],[],['0']] [[],[],[],[],[],[]]
|
||||
[[],['0'],['0','1'],['0','1','2'],[],['0'],['0','1']] [[],[],[],[],[],[],[]]
|
||||
[[],['0'],['0','1'],['0','1','2'],[],['0'],['0','1'],['0','1','2']] [[],[],[],[],[],[],[],[]]
|
||||
[[],['0'],['0','1'],['0','1','2'],[],['0'],['0','1'],['0','1','2'],[]] [[],[],[],[],[],[],[],[],[]]
|
||||
[] []
|
||||
[{}] [{}]
|
||||
[{},{'k0':0}] [{},{}]
|
||||
[{},{'k0':0},{'k0':0,'k1':1}] [{},{},{}]
|
||||
[{},{'k0':0},{'k0':0,'k1':1},{'k0':0,'k1':1,'k2':2}] [{},{},{},{}]
|
||||
[{},{'k0':0},{'k0':0,'k1':1},{'k0':0,'k1':1,'k2':2},{}] [{},{},{},{},{}]
|
||||
[{},{'k0':0},{'k0':0,'k1':1},{'k0':0,'k1':1,'k2':2},{},{'k0':0}] [{},{},{},{},{},{}]
|
||||
[{},{'k0':0},{'k0':0,'k1':1},{'k0':0,'k1':1,'k2':2},{},{'k0':0},{'k0':0,'k1':1}] [{},{},{},{},{},{},{}]
|
||||
[{},{'k0':0},{'k0':0,'k1':1},{'k0':0,'k1':1,'k2':2},{},{'k0':0},{'k0':0,'k1':1},{'k0':0,'k1':1,'k2':2}] [{},{},{},{},{},{},{},{}]
|
||||
[{},{'k0':0},{'k0':0,'k1':1},{'k0':0,'k1':1,'k2':2},{},{'k0':0},{'k0':0,'k1':1},{'k0':0,'k1':1,'k2':2},{}] [{},{},{},{},{},{},{},{},{}]
|
@ -0,0 +1,49 @@
|
||||
DROP TABLE IF EXISTS cool_table;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS cool_table
|
||||
(
|
||||
id UInt64,
|
||||
n Nested(n UInt64, lc1 LowCardinality(String))
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY id;
|
||||
|
||||
INSERT INTO cool_table SELECT number, range(number), range(number) FROM numbers(10);
|
||||
|
||||
ALTER TABLE cool_table ADD COLUMN IF NOT EXISTS `n.lc2` Array(LowCardinality(String));
|
||||
|
||||
SELECT n.lc1, n.lc2 FROM cool_table ORDER BY id;
|
||||
|
||||
DROP TABLE IF EXISTS cool_table;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS cool_table
|
||||
(
|
||||
id UInt64,
|
||||
n Nested(n UInt64, lc1 Array(LowCardinality(String)))
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY id;
|
||||
|
||||
INSERT INTO cool_table SELECT number, range(number), arrayMap(x -> range(x % 4), range(number)) FROM numbers(10);
|
||||
|
||||
ALTER TABLE cool_table ADD COLUMN IF NOT EXISTS `n.lc2` Array(Array(LowCardinality(String)));
|
||||
|
||||
SELECT n.lc1, n.lc2 FROM cool_table ORDER BY id;
|
||||
|
||||
DROP TABLE IF EXISTS cool_table;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS cool_table
|
||||
(
|
||||
id UInt64,
|
||||
n Nested(n UInt64, lc1 Map(LowCardinality(String), UInt64))
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY id;
|
||||
|
||||
INSERT INTO cool_table SELECT number, range(number), arrayMap(x -> (arrayMap(y -> 'k' || toString(y), range(x % 4)), range(x % 4))::Map(LowCardinality(String), UInt64), range(number)) FROM numbers(10);
|
||||
|
||||
ALTER TABLE cool_table ADD COLUMN IF NOT EXISTS `n.lc2` Array(Map(LowCardinality(String), UInt64));
|
||||
|
||||
SELECT n.lc1, n.lc2 FROM cool_table ORDER BY id;
|
||||
|
||||
DROP TABLE IF EXISTS cool_table;
|
Loading…
Reference in New Issue
Block a user