Merge branch 'master' into keeper-more-reduce

This commit is contained in:
Antonio Andelic 2024-02-08 10:54:26 +01:00
commit 33f12c9f35
45 changed files with 452 additions and 2373 deletions

View File

@ -47,7 +47,7 @@ RUN apt-get update -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /var/cache/debconf /tmp/*
RUN pip3 install numpy scipy pandas Jinja2 pyarrow
RUN pip3 install numpy==1.26.3 scipy==1.12.0 pandas==1.5.3 Jinja2==3.1.3 pyarrow==15.0.0
RUN mkdir -p /tmp/clickhouse-odbc-tmp \
&& wget -nv -O - ${odbc_driver_url} | tar --strip-components=1 -xz -C /tmp/clickhouse-odbc-tmp \

View File

@ -112,7 +112,7 @@ Note that:
For the query to run successfully, the following conditions must be met:
- Both tables must have the same structure.
- Both tables must have the same order by key and the same primary key.
- Both tables must have the same partition key, the same order by key and the same primary key.
- Both tables must have the same indices and projections.
- Both tables must have the same storage policy.

View File

@ -826,10 +826,12 @@ try
0, // We don't need any threads one all the parts will be deleted
server_settings.max_parts_cleaning_thread_pool_size);
auto max_database_replicated_create_table_thread_pool_size = server_settings.max_database_replicated_create_table_thread_pool_size ?
server_settings.max_database_replicated_create_table_thread_pool_size : getNumberOfPhysicalCPUCores();
getDatabaseReplicatedCreateTablesThreadPool().initialize(
server_settings.max_database_replicated_create_table_thread_pool_size,
max_database_replicated_create_table_thread_pool_size,
0, // We don't need any threads once all the tables will be created
server_settings.max_database_replicated_create_table_thread_pool_size);
max_database_replicated_create_table_thread_pool_size);
/// Initialize global local cache for remote filesystem.
if (config().has("local_cache_for_remote_fs"))

View File

@ -27,14 +27,12 @@ ConnectionEstablisher::ConnectionEstablisher(
const Settings & settings_,
LoggerPtr log_,
const QualifiedTableName * table_to_check_)
: pool(std::move(pool_)), timeouts(timeouts_), settings(settings_), log(log_), table_to_check(table_to_check_), is_finished(false)
: pool(std::move(pool_)), timeouts(timeouts_), settings(settings_), log(log_), table_to_check(table_to_check_)
{
}
void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::string & fail_message)
{
is_finished = false;
SCOPE_EXIT(is_finished = true);
try
{
ProfileEvents::increment(ProfileEvents::DistributedConnectionTries);

View File

@ -30,8 +30,6 @@ public:
/// Set async callback that will be called when reading from socket blocks.
void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); }
bool isFinished() const { return is_finished; }
private:
ConnectionPoolPtr pool;
const ConnectionTimeouts * timeouts;
@ -39,7 +37,6 @@ private:
LoggerPtr log;
const QualifiedTableName * table_to_check;
bool is_finished;
AsyncCallback async_callback = {};
};

View File

@ -114,7 +114,7 @@ namespace DB
M(Bool, validate_tcp_client_information, false, "Validate client_information in the query packet over the native TCP protocol.", 0) \
M(Bool, storage_metadata_write_full_object_key, false, "Write disk metadata files with VERSION_FULL_OBJECT_KEY format", 0) \
M(UInt64, max_materialized_views_count_for_table, 0, "A limit on the number of materialized views attached to a table.", 0) \
M(UInt64, max_database_replicated_create_table_thread_pool_size, 0, "The number of threads to create tables during replica recovery in DatabaseReplicated. Value less than two means tables will be created sequentially.", 0) \
M(UInt32, max_database_replicated_create_table_thread_pool_size, 1, "The number of threads to create tables during replica recovery in DatabaseReplicated. Zero means number of threads equal number of cores.", 0) \
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

View File

@ -559,7 +559,7 @@ class IColumn;
M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\
M(DefaultTableEngine, default_temporary_table_engine, DefaultTableEngine::Memory, "Default table engine used when ENGINE is not set in CREATE TEMPORARY statement.",0) \
M(DefaultTableEngine, default_table_engine, DefaultTableEngine::MergeTree, "Default table engine used when ENGINE is not set in CREATE statement.",0) \
M(DefaultTableEngine, default_table_engine, DefaultTableEngine::None, "Default table engine used when ENGINE is not set in CREATE statement.",0) \
M(Bool, show_table_uuid_in_table_create_query_if_not_nil, false, "For tables in databases with Engine=Atomic show UUID of the table in its CREATE query.", 0) \
M(Bool, database_atomic_wait_for_drop_and_detach_synchronously, false, "When executing DROP or DETACH TABLE in Atomic database, wait for table data to be finally dropped or detached.", 0) \
M(Bool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.", 0) \
@ -855,7 +855,7 @@ class IColumn;
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_kvp_max_pairs_per_row, 1000, "Max number pairs that can be produced by extractKeyValuePairs function. Used to safeguard against consuming too much memory.", 0) \
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(Timezone, session_timezone, "", "This setting can be removed in the future due to potential caveats. It is experimental and is not suitable for production usage. The default timezone for current session or query. The server default timezone if empty.", 0) \
M(Bool, allow_create_index_without_type, false, "Allow CREATE INDEX query without TYPE. Query will be ignored. Made for SQL compatibility tests.", 0) \
M(Bool, create_index_ignore_unique, false, "Ignore UNIQUE keyword in CREATE UNIQUE INDEX. Made for SQL compatibility tests.", 0) \

View File

@ -109,7 +109,6 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"function_visible_width_behavior", 0, 1, "We changed the default behavior of `visibleWidth` to be more precise"},
{"max_estimated_execution_time", 0, 0, "Separate max_execution_time and max_estimated_execution_time"},
{"iceberg_engine_ignore_schema_evolution", false, false, "Allow to ignore schema evolution in Iceberg table engine"},
{"default_table_engine", "None", "MergeTree", "Set default table engine to MergeTree for better usability"},
{"optimize_injective_functions_in_group_by", false, true, "Replace injective functions by it's arguments in GROUP BY section in analyzer"},
{"update_insert_deduplication_token_in_dependent_materialized_views", false, false, "Allow to update insert deduplication token with table identifier during insert in dependent materialized views"},
{"azure_max_unexpected_write_error_retries", 4, 4, "The maximum number of retries in case of unexpected errors during Azure blob storage write"}}},

View File

@ -1094,12 +1094,12 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
tables_dependencies.checkNoCyclicDependencies();
auto allow_concurrent_table_creation = getContext()->getServerSettings().max_database_replicated_create_table_thread_pool_size > 1;
auto tables_to_create_by_level = tables_dependencies.getTablesSortedByDependencyWithLevels();
auto tables_to_create_by_level = tables_dependencies.getTablesSplitByDependencyLevel();
auto create_tables_runner = threadPoolCallbackRunner<void>(getDatabaseReplicatedCreateTablesThreadPool().get(), "CreateTables");
std::vector<std::future<void>> create_table_futures;
for (const auto & [_, tables_to_create] : tables_to_create_by_level)
for (const auto & tables_to_create : tables_to_create_by_level)
{
for (const auto & table_id : tables_to_create)
{

View File

@ -699,14 +699,19 @@ std::vector<StorageID> TablesDependencyGraph::getTablesSortedByDependency() cons
}
std::map<size_t, std::vector<StorageID>> TablesDependencyGraph::getTablesSortedByDependencyWithLevels() const
std::vector<std::vector<StorageID>> TablesDependencyGraph::getTablesSplitByDependencyLevel() const
{
std::map<size_t, std::vector<StorageID>> tables_by_level;
for (const auto * node : getNodesSortedByLevel())
std::vector<std::vector<StorageID>> tables_split_by_level;
auto sorted_nodes = getNodesSortedByLevel();
if (sorted_nodes.empty())
return tables_split_by_level;
tables_split_by_level.resize(sorted_nodes.back()->level + 1);
for (const auto * node : sorted_nodes)
{
tables_by_level[node->level].emplace_back(node->storage_id);
tables_split_by_level[node->level].emplace_back(node->storage_id);
}
return tables_by_level;
return tables_split_by_level;
}

View File

@ -107,11 +107,11 @@ public:
/// tables which depend on the tables which depend on the tables without dependencies, and so on.
std::vector<StorageID> getTablesSortedByDependency() const;
/// Returns a map of lists of tables by the number of dependencies they have:
/// tables without dependencies first with level 0, then
/// tables with depend on the tables without dependencies with level 1, then
/// tables which depend on the tables which depend on the tables without dependencies with level 2, and so on.
std::map<size_t, std::vector<StorageID>> getTablesSortedByDependencyWithLevels() const;
/// Returns a list of lists of tables by the number of dependencies they have:
/// tables without dependencies are in the first list, then
/// tables which depend on the tables without dependencies are in the second list, then
/// tables which depend on the tables which depend on the tables without dependencies are in the third list, and so on.
std::vector<std::vector<StorageID>> getTablesSplitByDependencyLevel() const;
/// Outputs information about this graph as a bunch of logging messages.
void log() const;

View File

@ -43,11 +43,11 @@ class ExtractKeyValuePairs : public IFunction
builder.withQuotingCharacter(parsed_arguments.quoting_character.value());
}
bool is_number_of_pairs_unlimited = context->getSettingsRef().extract_kvp_max_pairs_per_row == 0;
bool is_number_of_pairs_unlimited = context->getSettingsRef().extract_key_value_pairs_max_pairs_per_row == 0;
if (!is_number_of_pairs_unlimited)
{
builder.withMaxNumberOfPairs(context->getSettingsRef().extract_kvp_max_pairs_per_row);
builder.withMaxNumberOfPairs(context->getSettingsRef().extract_key_value_pairs_max_pairs_per_row);
}
return builder.build();

View File

@ -1,17 +1,13 @@
#pragma once
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Core/Range.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/FieldToDataType.h>
#include <Functions/FunctionFactory.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/applyFunction.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/IAST.h>
@ -37,8 +33,6 @@ public:
ASTIdentifier * identifier = nullptr;
DataTypePtr arg_data_type = {};
Range range = Range::createWholeUniverse();
void reject() { monotonicity.is_monotonic = false; }
bool isRejected() const { return !monotonicity.is_monotonic; }
@ -103,30 +97,13 @@ public:
if (data.isRejected())
return;
/// Monotonicity check only works for functions that contain at most two arguments and one of them must be a constant.
if (!ast_function.arguments)
/// TODO: monotonicity for functions of several arguments
if (!ast_function.arguments || ast_function.arguments->children.size() != 1)
{
data.reject();
return;
}
auto arguments_size = ast_function.arguments->children.size();
if (arguments_size == 0 || arguments_size > 2)
{
data.reject();
return;
}
else if (arguments_size == 2)
{
/// If the function has two arguments, then one of them must be a constant.
if (!ast_function.arguments->children[0]->as<ASTLiteral>() && !ast_function.arguments->children[1]->as<ASTLiteral>())
{
data.reject();
return;
}
}
if (!data.canOptimize(ast_function))
{
data.reject();
@ -147,33 +124,14 @@ public:
return;
}
auto function_arguments = getFunctionArguments(ast_function, data);
auto function_base = function->build(function_arguments);
ColumnsWithTypeAndName args;
args.emplace_back(data.arg_data_type, "tmp");
auto function_base = function->build(args);
if (function_base && function_base->hasInformationAboutMonotonicity())
{
bool is_positive = data.monotonicity.is_positive;
data.monotonicity = function_base->getMonotonicityForRange(*data.arg_data_type, data.range.left, data.range.right);
auto & key_range = data.range;
/// If we apply function to open interval, we can get empty intervals in result.
/// E.g. for ('2020-01-03', '2020-01-20') after applying 'toYYYYMM' we will get ('202001', '202001').
/// To avoid this we make range left and right included.
/// Any function that treats NULL specially is not monotonic.
/// Thus we can safely use isNull() as an -Inf/+Inf indicator here.
if (!key_range.left.isNull())
{
key_range.left = applyFunction(function_base, data.arg_data_type, key_range.left);
key_range.left_included = true;
}
if (!key_range.right.isNull())
{
key_range.right = applyFunction(function_base, data.arg_data_type, key_range.right);
key_range.right_included = true;
}
data.monotonicity = function_base->getMonotonicityForRange(*data.arg_data_type, Field(), Field());
if (!is_positive)
data.monotonicity.is_positive = !data.monotonicity.is_positive;
@ -185,53 +143,13 @@ public:
static bool needChildVisit(const ASTPtr & parent, const ASTPtr &)
{
/// Multi-argument functions with all but one constant arguments can be monotonic.
/// Currently we check monotonicity only for single-argument functions.
/// Although, multi-argument functions with all but one constant arguments can also be monotonic.
if (const auto * func = typeid_cast<const ASTFunction *>(parent.get()))
return func->arguments->children.size() <= 2;
return func->arguments->children.size() < 2;
return true;
}
static ColumnWithTypeAndName extractLiteralColumnAndTypeFromAstLiteral(const ASTLiteral * literal)
{
ColumnWithTypeAndName result;
result.type = applyVisitor(FieldToDataType(), literal->value);
result.column = result.type->createColumnConst(0, literal->value);
return result;
}
static ColumnsWithTypeAndName getFunctionArguments(const ASTFunction & ast_function, const Data & data)
{
ColumnsWithTypeAndName args;
auto arguments_size = ast_function.arguments->children.size();
chassert(arguments_size == 1 || arguments_size == 2);
if (arguments_size == 2)
{
if (ast_function.arguments->children[0]->as<ASTLiteral>())
{
const auto * literal = ast_function.arguments->children[0]->as<ASTLiteral>();
args.push_back(extractLiteralColumnAndTypeFromAstLiteral(literal));
args.emplace_back(data.arg_data_type, "tmp");
}
else
{
const auto * literal = ast_function.arguments->children[1]->as<ASTLiteral>();
args.emplace_back(data.arg_data_type, "tmp");
args.push_back(extractLiteralColumnAndTypeFromAstLiteral(literal));
}
}
else
{
args.emplace_back(data.arg_data_type, "tmp");
}
return args;
}
};
using MonotonicityCheckVisitor = ConstInDepthNodeVisitor<MonotonicityCheckMatcher, false>;

View File

@ -1,43 +0,0 @@
#include <Interpreters/applyFunction.h>
#include <Core/Range.h>
#include <Functions/IFunction.h>
namespace DB
{
static Field applyFunctionForField(const FunctionBasePtr & func, const DataTypePtr & arg_type, const Field & arg_value)
{
ColumnsWithTypeAndName columns{
{arg_type->createColumnConst(1, arg_value), arg_type, "x"},
};
auto col = func->execute(columns, func->getResultType(), 1);
return (*col)[0];
}
FieldRef applyFunction(const FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field)
{
/// Fallback for fields without block reference.
if (field.isExplicit())
return applyFunctionForField(func, current_type, field);
String result_name = "_" + func->getName() + "_" + toString(field.column_idx);
const auto & columns = field.columns;
size_t result_idx = columns->size();
for (size_t i = 0; i < result_idx; ++i)
if ((*columns)[i].name == result_name)
result_idx = i;
if (result_idx == columns->size())
{
ColumnsWithTypeAndName args{(*columns)[field.column_idx]};
field.columns->emplace_back(ColumnWithTypeAndName{nullptr, func->getResultType(), result_name});
(*columns)[result_idx].column = func->execute(args, (*columns)[result_idx].type, columns->front().column->size());
}
return {field.columns, field.row_idx, result_idx};
}
}

View File

@ -1,16 +0,0 @@
#pragma once
#include <memory>
namespace DB
{
struct FieldRef;
class IFunctionBase;
class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
using FunctionBasePtr = std::shared_ptr<const IFunctionBase>;
FieldRef applyFunction(const FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field);
}

View File

@ -3,11 +3,6 @@
namespace DB
{
String queryToStringNullable(const ASTPtr & query)
{
return query ? queryToString(query) : "";
}
String queryToString(const ASTPtr & query)
{
return queryToString(*query);

View File

@ -6,5 +6,4 @@ namespace DB
{
String queryToString(const ASTPtr & query);
String queryToString(const IAST & query);
String queryToStringNullable(const ASTPtr & query);
}

View File

@ -84,32 +84,38 @@ namespace
{
/// Check if current user has privileges to SELECT columns from table
void checkAccessRights(const TableNode & table_node, const Names & column_names, const ContextPtr & query_context)
/// Throws an exception if access to any column from `column_names` is not granted
/// If `column_names` is empty, check access to any columns and return names of accessible columns
NameSet checkAccessRights(const TableNode & table_node, Names & column_names, const ContextPtr & query_context)
{
/// StorageDummy is created on preliminary stage, ignore access check for it.
if (typeid_cast<const StorageDummy *>(table_node.getStorage().get()))
return;
return {};
const auto & storage_id = table_node.getStorageID();
const auto & storage_snapshot = table_node.getStorageSnapshot();
if (column_names.empty())
{
NameSet accessible_columns;
/** For a trivial queries like "SELECT count() FROM table", "SELECT 1 FROM table" access is granted if at least
* one table column is accessible.
*/
auto access = query_context->getAccess();
for (const auto & column : storage_snapshot->metadata->getColumns())
{
if (access->isGranted(AccessType::SELECT, storage_id.database_name, storage_id.table_name, column.name))
return;
accessible_columns.insert(column.name);
}
throw Exception(ErrorCodes::ACCESS_DENIED,
"{}: Not enough privileges. To execute this query, it's necessary to have the grant SELECT for at least one column on {}",
query_context->getUserName(),
storage_id.getFullTableName());
if (accessible_columns.empty())
{
throw Exception(ErrorCodes::ACCESS_DENIED,
"{}: Not enough privileges. To execute this query, it's necessary to have the grant SELECT for at least one column on {}",
query_context->getUserName(),
storage_id.getFullTableName());
}
return accessible_columns;
}
// In case of cross-replication we don't know what database is used for the table.
@ -117,6 +123,8 @@ void checkAccessRights(const TableNode & table_node, const Names & column_names,
// Each shard will use the default database (in the case of cross-replication shards may have different defaults).
if (storage_id.hasDatabase())
query_context->checkAccess(AccessType::SELECT, storage_id, column_names);
return {};
}
bool shouldIgnoreQuotaAndLimits(const TableNode & table_node)
@ -133,7 +141,7 @@ bool shouldIgnoreQuotaAndLimits(const TableNode & table_node)
return false;
}
NameAndTypePair chooseSmallestColumnToReadFromStorage(const StoragePtr & storage, const StorageSnapshotPtr & storage_snapshot)
NameAndTypePair chooseSmallestColumnToReadFromStorage(const StoragePtr & storage, const StorageSnapshotPtr & storage_snapshot, const NameSet & column_names_allowed_to_select)
{
/** We need to read at least one column to find the number of rows.
* We will find a column with minimum <compressed_size, type_size, uncompressed_size>.
@ -167,6 +175,18 @@ NameAndTypePair chooseSmallestColumnToReadFromStorage(const StoragePtr & storage
auto column_sizes = storage->getColumnSizes();
auto column_names_and_types = storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns());
if (!column_names_allowed_to_select.empty())
{
auto it = column_names_and_types.begin();
while (it != column_names_and_types.end())
{
if (!column_names_allowed_to_select.contains(it->name))
it = column_names_and_types.erase(it);
else
++it;
}
}
if (!column_sizes.empty())
{
for (auto & column_name_and_type : column_names_and_types)
@ -330,12 +350,13 @@ void prepareBuildQueryPlanForTableExpression(const QueryTreeNodePtr & table_expr
/** The current user must have the SELECT privilege.
* We do not check access rights for table functions because they have been already checked in ITableFunction::execute().
*/
NameSet columns_names_allowed_to_select;
if (table_node)
{
auto column_names_with_aliases = columns_names;
const auto & alias_columns_names = table_expression_data.getAliasColumnsNames();
column_names_with_aliases.insert(column_names_with_aliases.end(), alias_columns_names.begin(), alias_columns_names.end());
checkAccessRights(*table_node, column_names_with_aliases, query_context);
columns_names_allowed_to_select = checkAccessRights(*table_node, column_names_with_aliases, query_context);
}
if (columns_names.empty())
@ -346,8 +367,7 @@ void prepareBuildQueryPlanForTableExpression(const QueryTreeNodePtr & table_expr
{
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
additional_column_to_read = chooseSmallestColumnToReadFromStorage(storage, storage_snapshot);
additional_column_to_read = chooseSmallestColumnToReadFromStorage(storage, storage_snapshot, columns_names_allowed_to_select);
}
else if (query_node || union_node)
{

View File

@ -81,7 +81,6 @@ void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const Par
auto minmax_column_types = data.getMinMaxColumnsTypes(partition_key);
size_t minmax_idx_size = minmax_column_types.size();
hyperrectangle.clear();
hyperrectangle.reserve(minmax_idx_size);
for (size_t i = 0; i < minmax_idx_size; ++i)
{
@ -105,39 +104,6 @@ void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const Par
initialized = true;
}
Block IMergeTreeDataPart::MinMaxIndex::getBlock(const MergeTreeData & data) const
{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to get block from uninitialized MinMax index.");
Block block;
const auto metadata_snapshot = data.getInMemoryMetadataPtr();
const auto & partition_key = metadata_snapshot->getPartitionKey();
const auto minmax_column_names = data.getMinMaxColumnsNames(partition_key);
const auto minmax_column_types = data.getMinMaxColumnsTypes(partition_key);
const auto minmax_idx_size = minmax_column_types.size();
for (size_t i = 0; i < minmax_idx_size; ++i)
{
const auto & data_type = minmax_column_types[i];
const auto & column_name = minmax_column_names[i];
const auto column = data_type->createColumn();
const auto min_val = hyperrectangle.at(i).left;
const auto max_val = hyperrectangle.at(i).right;
column->insert(min_val);
column->insert(max_val);
block.insert(ColumnWithTypeAndName(column->getPtr(), data_type, column_name));
}
return block;
}
IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::store(
const MergeTreeData & data, IDataPartStorage & part_storage, Checksums & out_checksums) const
{
@ -219,7 +185,8 @@ void IMergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
if (!initialized)
{
*this = other;
hyperrectangle = other.hyperrectangle;
initialized = true;
}
else
{

View File

@ -342,7 +342,6 @@ public:
}
void load(const MergeTreeData & data, const PartMetadataManagerPtr & manager);
Block getBlock(const MergeTreeData & data) const;
using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>;

View File

@ -1,37 +1,36 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnSet.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/BoolMask.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/FieldToDataType.h>
#include <DataTypes/Utils.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/CastOverloadResolver.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Functions/indexHint.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Set.h>
#include <DataTypes/Utils.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/applyFunction.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/MergeTree/BoolMask.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/MergeTreeIndexUtils.h>
#include <Functions/FunctionFactory.h>
#include <Functions/indexHint.h>
#include <Functions/CastOverloadResolver.h>
#include <Functions/IFunction.h>
#include <Common/FieldVisitorToString.h>
#include <Common/MortonUtils.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/Set.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Storages/MergeTree/MergeTreeIndexUtils.h>
#include <algorithm>
#include <cassert>
@ -837,6 +836,21 @@ bool KeyCondition::getConstant(const ASTPtr & expr, Block & block_with_constants
return node.tryGetConstant(out_value, out_type);
}
static Field applyFunctionForField(
const FunctionBasePtr & func,
const DataTypePtr & arg_type,
const Field & arg_value)
{
ColumnsWithTypeAndName columns
{
{ arg_type->createColumnConst(1, arg_value), arg_type, "x" },
};
auto col = func->execute(columns, func->getResultType(), 1);
return (*col)[0];
}
/// The case when arguments may have types different than in the primary key.
static std::pair<Field, DataTypePtr> applyFunctionForFieldOfUnknownType(
const FunctionBasePtr & func,
@ -876,6 +890,33 @@ static std::pair<Field, DataTypePtr> applyBinaryFunctionForFieldOfUnknownType(
return {std::move(result), std::move(return_type)};
}
static FieldRef applyFunction(const FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field)
{
/// Fallback for fields without block reference.
if (field.isExplicit())
return applyFunctionForField(func, current_type, field);
String result_name = "_" + func->getName() + "_" + toString(field.column_idx);
const auto & columns = field.columns;
size_t result_idx = columns->size();
for (size_t i = 0; i < result_idx; ++i)
{
if ((*columns)[i].name == result_name)
result_idx = i;
}
if (result_idx == columns->size())
{
ColumnsWithTypeAndName args{(*columns)[field.column_idx]};
field.columns->emplace_back(ColumnWithTypeAndName {nullptr, func->getResultType(), result_name});
(*columns)[result_idx].column = func->execute(args, (*columns)[result_idx].type, columns->front().column->size());
}
return {field.columns, field.row_idx, result_idx};
}
/** When table's key has expression with these functions from a column,
* and when a column in a query is compared with a constant, such as:
* CREATE TABLE (x String) ORDER BY toDate(x)

View File

@ -8,6 +8,21 @@
#include <Backups/BackupEntryWrappedWith.h>
#include <Backups/IBackup.h>
#include <Backups/RestorerFromBackup.h>
#include <Common/Config/ConfigHelper.h>
#include <Common/CurrentMetrics.h>
#include <Common/Increment.h>
#include <Common/ProfileEventsScope.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ThreadFuzzer.h>
#include <Common/escapeForFileName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/noexcept_scope.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <Common/typeid_cast.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeEnum.h>
@ -27,20 +42,19 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Context.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTHelpers.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTHelpers.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTNameTypePair.h>
#include <Parsers/ASTPartition.h>
@ -48,37 +62,25 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTAlterQuery.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPlan/QueryIdHolder.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/AlterCommands.h>
#include <Storages/BlockNumberColumn.h>
#include <Storages/Freeze.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MergeTree/MergeTreeDataPartCloner.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/Statistics/Estimator.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Config/ConfigHelper.h>
#include <Common/CurrentMetrics.h>
#include <Common/Increment.h>
#include <Common/ProfileEventsScope.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ThreadFuzzer.h>
#include <Common/escapeForFileName.h>
#include <Common/noexcept_scope.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <Common/typeid_cast.h>
#include <boost/range/algorithm_ext/erase.hpp>
#include <boost/algorithm/string/join.hpp>
@ -190,50 +192,6 @@ namespace ErrorCodes
extern const int LIMIT_EXCEEDED;
}
static size_t getPartitionAstFieldsCount(const ASTPartition & partition_ast, ASTPtr partition_value_ast)
{
if (partition_ast.fields_count.has_value())
return *partition_ast.fields_count;
if (partition_value_ast->as<ASTLiteral>())
return 1;
const auto * tuple_ast = partition_value_ast->as<ASTFunction>();
if (!tuple_ast)
{
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE, "Expected literal or tuple for partition key, got {}", partition_value_ast->getID());
}
if (tuple_ast->name != "tuple")
{
if (!isFunctionCast(tuple_ast))
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
if (tuple_ast->arguments->as<ASTExpressionList>()->children.empty())
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
auto first_arg = tuple_ast->arguments->as<ASTExpressionList>()->children.at(0);
if (const auto * inner_tuple = first_arg->as<ASTFunction>(); inner_tuple && inner_tuple->name == "tuple")
{
const auto * arguments_ast = tuple_ast->arguments->as<ASTExpressionList>();
return arguments_ast ? arguments_ast->children.size() : 0;
}
else if (const auto * inner_literal_tuple = first_arg->as<ASTLiteral>(); inner_literal_tuple)
{
return inner_literal_tuple->value.getType() == Field::Types::Tuple ? inner_literal_tuple->value.safeGet<Tuple>().size() : 1;
}
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
}
else
{
const auto * arguments_ast = tuple_ast->arguments->as<ASTExpressionList>();
return arguments_ast ? arguments_ast->children.size() : 0;
}
}
static void checkSuspiciousIndices(const ASTFunction * index_function)
{
std::unordered_set<UInt64> unique_index_expression_hashes;
@ -4902,7 +4860,7 @@ void MergeTreeData::removePartContributionToColumnAndSecondaryIndexSizes(const D
}
void MergeTreeData::checkAlterPartitionIsPossible(
const PartitionCommands & commands, const StorageMetadataPtr & /*metadata_snapshot*/, const Settings & settings, ContextPtr) const
const PartitionCommands & commands, const StorageMetadataPtr & /*metadata_snapshot*/, const Settings & settings, ContextPtr local_context) const
{
for (const auto & command : commands)
{
@ -4930,15 +4888,7 @@ void MergeTreeData::checkAlterPartitionIsPossible(
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH PARTITION ALL currently");
}
else
{
// The below `getPartitionIDFromQuery` call will not work for attach / replace because it assumes the partition expressions
// are the same and deliberately uses this storage. Later on, `MergeTreeData::replaceFrom` is called, and it makes the right
// call to `getPartitionIDFromQuery` using source storage.
// Note: `PartitionCommand::REPLACE_PARTITION` is used both for `REPLACE PARTITION` and `ATTACH PARTITION FROM` queries.
// But not for `ATTACH PARTITION` queries.
if (command.type != PartitionCommand::REPLACE_PARTITION)
getPartitionIDFromQuery(command.partition, getContext());
}
getPartitionIDFromQuery(command.partition, local_context);
}
}
}
@ -5675,8 +5625,69 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
MergeTreePartInfo::validatePartitionID(partition_ast.id->clone(), format_version);
return partition_ast.id->as<ASTLiteral>()->value.safeGet<String>();
}
size_t partition_ast_fields_count = 0;
ASTPtr partition_value_ast = partition_ast.value->clone();
auto partition_ast_fields_count = getPartitionAstFieldsCount(partition_ast, partition_value_ast);
if (!partition_ast.fields_count.has_value())
{
if (partition_value_ast->as<ASTLiteral>())
{
partition_ast_fields_count = 1;
}
else if (const auto * tuple_ast = partition_value_ast->as<ASTFunction>())
{
if (tuple_ast->name != "tuple")
{
if (isFunctionCast(tuple_ast))
{
if (tuple_ast->arguments->as<ASTExpressionList>()->children.empty())
{
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
}
auto first_arg = tuple_ast->arguments->as<ASTExpressionList>()->children.at(0);
if (const auto * inner_tuple = first_arg->as<ASTFunction>(); inner_tuple && inner_tuple->name == "tuple")
{
const auto * arguments_ast = tuple_ast->arguments->as<ASTExpressionList>();
if (arguments_ast)
partition_ast_fields_count = arguments_ast->children.size();
else
partition_ast_fields_count = 0;
}
else if (const auto * inner_literal_tuple = first_arg->as<ASTLiteral>(); inner_literal_tuple)
{
if (inner_literal_tuple->value.getType() == Field::Types::Tuple)
partition_ast_fields_count = inner_literal_tuple->value.safeGet<Tuple>().size();
else
partition_ast_fields_count = 1;
}
else
{
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
}
}
else
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
}
else
{
const auto * arguments_ast = tuple_ast->arguments->as<ASTExpressionList>();
if (arguments_ast)
partition_ast_fields_count = arguments_ast->children.size();
else
partition_ast_fields_count = 0;
}
}
else
{
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE, "Expected literal or tuple for partition key, got {}", partition_value_ast->getID());
}
}
else
{
partition_ast_fields_count = *partition_ast.fields_count;
}
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
@ -7012,35 +7023,23 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage & sour
if (my_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");
if (queryToStringNullable(my_snapshot->getSortingKeyAST()) != queryToStringNullable(src_snapshot->getSortingKeyAST()))
auto query_to_string = [] (const ASTPtr & ast)
{
return ast ? queryToString(ast) : "";
};
if (query_to_string(my_snapshot->getSortingKeyAST()) != query_to_string(src_snapshot->getSortingKeyAST()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different ordering");
if (query_to_string(my_snapshot->getPartitionKeyAST()) != query_to_string(src_snapshot->getPartitionKeyAST()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key");
if (format_version != src_data->format_version)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different format_version");
if (queryToStringNullable(my_snapshot->getPrimaryKeyAST()) != queryToStringNullable(src_snapshot->getPrimaryKeyAST()))
if (query_to_string(my_snapshot->getPrimaryKeyAST()) != query_to_string(src_snapshot->getPrimaryKeyAST()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different primary key");
const auto is_a_subset_of = [](const auto & lhs, const auto & rhs)
{
if (lhs.size() > rhs.size())
return false;
const auto rhs_set = NameSet(rhs.begin(), rhs.end());
for (const auto & lhs_element : lhs)
if (!rhs_set.contains(lhs_element))
return false;
return true;
};
if (!is_a_subset_of(my_snapshot->getColumnsRequiredForPartitionKey(), src_snapshot->getColumnsRequiredForPartitionKey()))
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Destination table partition expression columns must be a subset of source table partition expression columns");
}
const auto check_definitions = [](const auto & my_descriptions, const auto & src_descriptions)
{
if (my_descriptions.size() != src_descriptions.size())
@ -7081,56 +7080,130 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
const ReadSettings & read_settings,
const WriteSettings & write_settings)
{
return MergeTreeDataPartCloner::clone(
this, src_part, metadata_snapshot, dst_part_info, tmp_part_prefix, require_part_metadata, params, read_settings, write_settings);
}
chassert(!isStaticStorage());
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAndLoadPartOnSameDiskWithDifferentPartitionKey(
const MergeTreeData::DataPartPtr & src_part,
const MergeTreePartition & new_partition,
const String & partition_id,
const IMergeTreeDataPart::MinMaxIndex & min_max_index,
const String & tmp_part_prefix,
const StorageMetadataPtr & my_metadata_snapshot,
const IDataPartStorage::ClonePartParams & clone_params,
ContextPtr local_context,
Int64 min_block,
Int64 max_block
)
{
MergeTreePartInfo dst_part_info(partition_id, min_block, max_block, src_part->info.level);
/// Check that the storage policy contains the disk where the src_part is located.
bool does_storage_policy_allow_same_disk = false;
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
{
if (disk->getName() == src_part->getDataPartStorage().getDiskName())
{
does_storage_policy_allow_same_disk = true;
break;
}
}
if (!does_storage_policy_allow_same_disk)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Could not clone and load part {} because disk does not belong to storage policy",
quoteString(src_part->getDataPartStorage().getFullPath()));
return MergeTreeDataPartCloner::cloneWithDistinctPartitionExpression(
this,
src_part,
my_metadata_snapshot,
dst_part_info,
tmp_part_prefix,
local_context->getReadSettings(),
local_context->getWriteSettings(),
new_partition,
min_max_index,
false,
clone_params);
}
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto temporary_directory_lock = getTemporaryPartDirectoryHolder(tmp_dst_part_name);
std::pair<MergeTreePartition, IMergeTreeDataPart::MinMaxIndex> MergeTreeData::createPartitionAndMinMaxIndexFromSourcePart(
const MergeTreeData::DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr local_context)
{
const auto & src_data = src_part->storage;
/// Why it is needed if we only hardlink files?
auto reservation = src_part->getDataPartStorage().reserve(src_part->getBytesOnDisk());
auto src_part_storage = src_part->getDataPartStoragePtr();
auto metadata_manager = std::make_shared<PartMetadataManagerOrdinary>(src_part.get());
IMergeTreeDataPart::MinMaxIndex min_max_index;
scope_guard src_flushed_tmp_dir_lock;
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part;
min_max_index.load(src_data, metadata_manager);
/// If source part is in memory, flush it to disk and clone it already in on-disk format
/// Protect tmp dir from removing by cleanup thread with src_flushed_tmp_dir_lock
/// Construct src_flushed_tmp_part in order to delete part with its directory at destructor
if (auto src_part_in_memory = asInMemoryPart(src_part))
{
auto flushed_part_path = *src_part_in_memory->getRelativePathForPrefix(tmp_part_prefix);
MergeTreePartition new_partition;
auto tmp_src_part_file_name = fs::path(tmp_dst_part_name).filename();
src_flushed_tmp_dir_lock = src_part->storage.getTemporaryPartDirectoryHolder(tmp_src_part_file_name);
new_partition.create(metadata_snapshot, min_max_index.getBlock(src_data), 0u, local_context);
auto flushed_part_storage = src_part_in_memory->flushToDisk(flushed_part_path, metadata_snapshot);
return {new_partition, min_max_index};
src_flushed_tmp_part = MergeTreeDataPartBuilder(*this, src_part->name, flushed_part_storage)
.withPartInfo(src_part->info)
.withPartFormatFromDisk()
.build();
src_flushed_tmp_part->is_temp = true;
src_part_storage = flushed_part_storage;
}
String with_copy;
if (params.copy_instead_of_hardlink)
with_copy = " (copying data)";
auto dst_part_storage = src_part_storage->freeze(
relative_data_path,
tmp_dst_part_name,
read_settings,
write_settings,
/* save_metadata_callback= */ {},
params);
if (params.metadata_version_to_write.has_value())
{
chassert(!params.keep_metadata_version);
auto out_metadata = dst_part_storage->writeFile(IMergeTreeDataPart::METADATA_VERSION_FILE_NAME, 4096, getContext()->getWriteSettings());
writeText(metadata_snapshot->getMetadataVersion(), *out_metadata);
out_metadata->finalize();
if (getSettings()->fsync_after_insert)
out_metadata->sync();
}
LOG_DEBUG(log, "Clone{} part {} to {}{}",
src_flushed_tmp_part ? " flushed" : "",
src_part_storage->getFullPath(),
std::string(fs::path(dst_part_storage->getFullRootPath()) / tmp_dst_part_name),
with_copy);
auto dst_data_part = MergeTreeDataPartBuilder(*this, dst_part_name, dst_part_storage)
.withPartFormatFromDisk()
.build();
if (!params.copy_instead_of_hardlink && params.hardlinked_files)
{
params.hardlinked_files->source_part_name = src_part->name;
params.hardlinked_files->source_table_shared_id = src_part->storage.getTableSharedID();
for (auto it = src_part->getDataPartStorage().iterate(); it->isValid(); it->next())
{
if (!params.files_to_copy_instead_of_hardlinks.contains(it->name())
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
{
params.hardlinked_files->hardlinks_from_source_part.insert(it->name());
}
}
auto projections = src_part->getProjectionParts();
for (const auto & [name, projection_part] : projections)
{
const auto & projection_storage = projection_part->getDataPartStorage();
for (auto it = projection_storage.iterate(); it->isValid(); it->next())
{
auto file_name_with_projection_prefix = fs::path(projection_storage.getPartDirectory()) / it->name();
if (!params.files_to_copy_instead_of_hardlinks.contains(file_name_with_projection_prefix)
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
{
params.hardlinked_files->hardlinks_from_source_part.insert(file_name_with_projection_prefix);
}
}
}
}
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
TransactionID tid = params.txn ? params.txn->tid : Tx::PrehistoricTID;
dst_data_part->version.setCreationTID(tid, nullptr);
dst_data_part->storeVersionMetadata();
dst_data_part->is_temp = true;
dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
dst_data_part->modification_time = dst_part_storage->getLastModified().epochTime();
return std::make_pair(dst_data_part, std::move(temporary_directory_lock));
}
String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const

View File

@ -232,7 +232,6 @@ public:
}
};
using DataParts = std::set<DataPartPtr, LessDataPart>;
using MutableDataParts = std::set<MutableDataPartPtr, LessDataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
@ -855,23 +854,6 @@ public:
const ReadSettings & read_settings,
const WriteSettings & write_settings);
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneAndLoadPartOnSameDiskWithDifferentPartitionKey(
const MergeTreeData::DataPartPtr & src_part,
const MergeTreePartition & new_partition,
const String & partition_id,
const IMergeTreeDataPart::MinMaxIndex & min_max_index,
const String & tmp_part_prefix,
const StorageMetadataPtr & my_metadata_snapshot,
const IDataPartStorage::ClonePartParams & clone_params,
ContextPtr local_context,
Int64 min_block,
Int64 max_block);
static std::pair<MergeTreePartition, IMergeTreeDataPart::MinMaxIndex> createPartitionAndMinMaxIndexFromSourcePart(
const MergeTreeData::DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr local_context);
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
/// Returns true if table can create new parts with adaptive granularity

View File

@ -1,319 +0,0 @@
#include <Interpreters/MergeTreeTransaction.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MergeTree/MergeTreeDataPartCloner.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace DistinctPartitionExpression
{
std::unique_ptr<WriteBufferFromFileBase> updatePartitionFile(
const MergeTreeData & merge_tree_data,
const MergeTreePartition & partition,
const MergeTreeData::MutableDataPartPtr & dst_part,
IDataPartStorage & storage)
{
storage.removeFile("partition.dat");
// Leverage already implemented MergeTreePartition::store to create & store partition.dat.
// Checksum is re-calculated later.
return partition.store(merge_tree_data, storage, dst_part->checksums);
}
IMergeTreeDataPart::MinMaxIndex::WrittenFiles updateMinMaxFiles(
const MergeTreeData & merge_tree_data,
const MergeTreeData::MutableDataPartPtr & dst_part,
IDataPartStorage & storage,
const StorageMetadataPtr & metadata_snapshot)
{
for (const auto & column_name : MergeTreeData::getMinMaxColumnsNames(metadata_snapshot->partition_key))
{
auto file = "minmax_" + escapeForFileName(column_name) + ".idx";
storage.removeFile(file);
}
return dst_part->minmax_idx->store(merge_tree_data, storage, dst_part->checksums);
}
void finalizeNewFiles(const std::vector<std::unique_ptr<WriteBufferFromFileBase>> & files, bool sync_new_files)
{
for (const auto & file : files)
{
file->finalize();
if (sync_new_files)
file->sync();
}
}
void updateNewPartFiles(
const MergeTreeData & merge_tree_data,
const MergeTreeData::MutableDataPartPtr & dst_part,
const MergeTreePartition & new_partition,
const IMergeTreeDataPart::MinMaxIndex & new_min_max_index,
const StorageMetadataPtr & src_metadata_snapshot,
bool sync_new_files)
{
auto & storage = dst_part->getDataPartStorage();
*dst_part->minmax_idx = new_min_max_index;
auto partition_file = updatePartitionFile(merge_tree_data, new_partition, dst_part, storage);
auto min_max_files = updateMinMaxFiles(merge_tree_data, dst_part, storage, src_metadata_snapshot);
IMergeTreeDataPart::MinMaxIndex::WrittenFiles written_files;
if (partition_file)
written_files.emplace_back(std::move(partition_file));
written_files.insert(written_files.end(), std::make_move_iterator(min_max_files.begin()), std::make_move_iterator(min_max_files.end()));
finalizeNewFiles(written_files, sync_new_files);
// MergeTreeDataPartCloner::finalize_part calls IMergeTreeDataPart::loadColumnsChecksumsIndexes, which will re-create
// the checksum file if it doesn't exist. Relying on that is cumbersome, but this refactoring is simply a code extraction
// with small improvements. It can be further improved in the future.
storage.removeFile("checksums.txt");
}
}
namespace
{
bool doesStoragePolicyAllowSameDisk(MergeTreeData * merge_tree_data, const MergeTreeData::DataPartPtr & src_part)
{
for (const DiskPtr & disk : merge_tree_data->getStoragePolicy()->getDisks())
if (disk->getName() == src_part->getDataPartStorage().getDiskName())
return true;
return false;
}
DataPartStoragePtr flushPartStorageToDiskIfInMemory(
MergeTreeData * merge_tree_data,
const MergeTreeData::DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const String & tmp_part_prefix,
const String & tmp_dst_part_name,
scope_guard & src_flushed_tmp_dir_lock,
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part)
{
if (auto src_part_in_memory = asInMemoryPart(src_part))
{
auto flushed_part_path = src_part_in_memory->getRelativePathForPrefix(tmp_part_prefix);
auto tmp_src_part_file_name = fs::path(tmp_dst_part_name).filename();
src_flushed_tmp_dir_lock = src_part->storage.getTemporaryPartDirectoryHolder(tmp_src_part_file_name);
auto flushed_part_storage = src_part_in_memory->flushToDisk(*flushed_part_path, metadata_snapshot);
src_flushed_tmp_part = MergeTreeDataPartBuilder(*merge_tree_data, src_part->name, flushed_part_storage)
.withPartInfo(src_part->info)
.withPartFormatFromDisk()
.build();
src_flushed_tmp_part->is_temp = true;
return flushed_part_storage;
}
return src_part->getDataPartStoragePtr();
}
std::shared_ptr<IDataPartStorage> hardlinkAllFiles(
MergeTreeData * merge_tree_data,
const DB::ReadSettings & read_settings,
const DB::WriteSettings & write_settings,
const DataPartStoragePtr & storage,
const String & path,
const DB::IDataPartStorage::ClonePartParams & params)
{
return storage->freeze(
merge_tree_data->getRelativeDataPath(),
path,
read_settings,
write_settings,
/*save_metadata_callback=*/{},
params);
}
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneSourcePart(
MergeTreeData * merge_tree_data,
const MergeTreeData::DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const DB::IDataPartStorage::ClonePartParams & params)
{
const auto dst_part_name = src_part->getNewName(dst_part_info);
const auto tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto temporary_directory_lock = merge_tree_data->getTemporaryPartDirectoryHolder(tmp_dst_part_name);
src_part->getDataPartStorage().reserve(src_part->getBytesOnDisk());
scope_guard src_flushed_tmp_dir_lock;
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part;
auto src_part_storage = flushPartStorageToDiskIfInMemory(
merge_tree_data, src_part, metadata_snapshot, tmp_part_prefix, tmp_dst_part_name, src_flushed_tmp_dir_lock, src_flushed_tmp_part);
auto dst_part_storage = hardlinkAllFiles(merge_tree_data, read_settings, write_settings, src_part_storage, tmp_dst_part_name, params);
if (params.metadata_version_to_write.has_value())
{
chassert(!params.keep_metadata_version);
auto out_metadata = dst_part_storage->writeFile(
IMergeTreeDataPart::METADATA_VERSION_FILE_NAME, 4096, merge_tree_data->getContext()->getWriteSettings());
writeText(metadata_snapshot->getMetadataVersion(), *out_metadata);
out_metadata->finalize();
if (merge_tree_data->getSettings()->fsync_after_insert)
out_metadata->sync();
}
LOG_DEBUG(
&Poco::Logger::get("MergeTreeDataPartCloner"),
"Clone {} part {} to {}{}",
src_flushed_tmp_part ? "flushed" : "",
src_part_storage->getFullPath(),
std::string(fs::path(dst_part_storage->getFullRootPath()) / tmp_dst_part_name),
false);
auto part = MergeTreeDataPartBuilder(*merge_tree_data, dst_part_name, dst_part_storage).withPartFormatFromDisk().build();
return std::make_pair(part, std::move(temporary_directory_lock));
}
void handleHardLinkedParameterFiles(const MergeTreeData::DataPartPtr & src_part, const DB::IDataPartStorage::ClonePartParams & params)
{
const auto & hardlinked_files = params.hardlinked_files;
hardlinked_files->source_part_name = src_part->name;
hardlinked_files->source_table_shared_id = src_part->storage.getTableSharedID();
for (auto it = src_part->getDataPartStorage().iterate(); it->isValid(); it->next())
{
if (!params.files_to_copy_instead_of_hardlinks.contains(it->name())
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
{
hardlinked_files->hardlinks_from_source_part.insert(it->name());
}
}
}
void handleProjections(const MergeTreeData::DataPartPtr & src_part, const DB::IDataPartStorage::ClonePartParams & params)
{
auto projections = src_part->getProjectionParts();
for (const auto & [name, projection_part] : projections)
{
const auto & projection_storage = projection_part->getDataPartStorage();
for (auto it = projection_storage.iterate(); it->isValid(); it->next())
{
auto file_name_with_projection_prefix = fs::path(projection_storage.getPartDirectory()) / it->name();
if (!params.files_to_copy_instead_of_hardlinks.contains(file_name_with_projection_prefix)
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
{
params.hardlinked_files->hardlinks_from_source_part.insert(file_name_with_projection_prefix);
}
}
}
}
MergeTreeData::MutableDataPartPtr finalizePart(
const MergeTreeData::MutableDataPartPtr & dst_part, const DB::IDataPartStorage::ClonePartParams & params, bool require_part_metadata)
{
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
TransactionID tid = params.txn ? params.txn->tid : Tx::PrehistoricTID;
dst_part->version.setCreationTID(tid, nullptr);
dst_part->storeVersionMetadata();
dst_part->is_temp = true;
dst_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
dst_part->modification_time = dst_part->getDataPartStorage().getLastModified().epochTime();
return dst_part;
}
std::pair<MergeTreeDataPartCloner::MutableDataPartPtr, scope_guard> cloneAndHandleHardlinksAndProjections(
MergeTreeData * merge_tree_data,
const DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const IDataPartStorage::ClonePartParams & params)
{
chassert(!merge_tree_data->isStaticStorage());
if (!doesStoragePolicyAllowSameDisk(merge_tree_data, src_part))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Could not clone and load part {} because disk does not belong to storage policy",
quoteString(src_part->getDataPartStorage().getFullPath()));
auto [destination_part, temporary_directory_lock] = cloneSourcePart(
merge_tree_data, src_part, metadata_snapshot, dst_part_info, tmp_part_prefix, read_settings, write_settings, params);
if (!params.copy_instead_of_hardlink && params.hardlinked_files)
{
handleHardLinkedParameterFiles(src_part, params);
handleProjections(src_part, params);
}
return std::make_pair(destination_part, std::move(temporary_directory_lock));
}
}
std::pair<MergeTreeDataPartCloner::MutableDataPartPtr, scope_guard> MergeTreeDataPartCloner::clone(
MergeTreeData * merge_tree_data,
const DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
bool require_part_metadata,
const IDataPartStorage::ClonePartParams & params,
const ReadSettings & read_settings,
const WriteSettings & write_settings)
{
auto [destination_part, temporary_directory_lock] = cloneAndHandleHardlinksAndProjections(
merge_tree_data, src_part, metadata_snapshot, dst_part_info, tmp_part_prefix, read_settings, write_settings, params);
return std::make_pair(finalizePart(destination_part, params, require_part_metadata), std::move(temporary_directory_lock));
}
std::pair<MergeTreeDataPartCloner::MutableDataPartPtr, scope_guard> MergeTreeDataPartCloner::cloneWithDistinctPartitionExpression(
MergeTreeData * merge_tree_data,
const DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const MergeTreePartition & new_partition,
const IMergeTreeDataPart::MinMaxIndex & new_min_max_index,
bool sync_new_files,
const IDataPartStorage::ClonePartParams & params)
{
auto [destination_part, temporary_directory_lock] = cloneAndHandleHardlinksAndProjections(
merge_tree_data, src_part, metadata_snapshot, dst_part_info, tmp_part_prefix, read_settings, write_settings, params);
DistinctPartitionExpression::updateNewPartFiles(
*merge_tree_data, destination_part, new_partition, new_min_max_index, src_part->storage.getInMemoryMetadataPtr(), sync_new_files);
return std::make_pair(finalizePart(destination_part, params, false), std::move(temporary_directory_lock));
}
}

View File

@ -1,43 +0,0 @@
#pragma once
namespace DB
{
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
struct MergeTreePartition;
class IMergeTreeDataPart;
class MergeTreeDataPartCloner
{
public:
using DataPart = IMergeTreeDataPart;
using MutableDataPartPtr = std::shared_ptr<DataPart>;
using DataPartPtr = std::shared_ptr<const DataPart>;
static std::pair<MutableDataPartPtr, scope_guard> clone(
MergeTreeData * merge_tree_data,
const DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
bool require_part_metadata,
const IDataPartStorage::ClonePartParams & params,
const ReadSettings & read_settings,
const WriteSettings & write_settings);
static std::pair<MutableDataPartPtr, scope_guard> cloneWithDistinctPartitionExpression(
MergeTreeData * merge_tree_data,
const DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const MergeTreePartition & new_partition,
const IMergeTreeDataPart::MinMaxIndex & new_min_max_index,
bool sync_new_files,
const IDataPartStorage::ClonePartParams & params);
};
}

View File

@ -467,45 +467,6 @@ void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Bl
}
}
void MergeTreePartition::createAndValidateMinMaxPartitionIds(
const StorageMetadataPtr & metadata_snapshot, Block block_with_min_max_partition_ids, ContextPtr context)
{
if (!metadata_snapshot->hasPartitionKey())
return;
auto partition_key_names_and_types = executePartitionByExpression(metadata_snapshot, block_with_min_max_partition_ids, context);
value.resize(partition_key_names_and_types.size());
/// Executing partition_by expression adds new columns to passed block according to partition functions.
/// The block is passed by reference and is used afterwards. `moduloLegacy` needs to be substituted back
/// with just `modulo`, because it was a temporary substitution.
static constexpr std::string_view modulo_legacy_function_name = "moduloLegacy";
size_t i = 0;
for (const auto & element : partition_key_names_and_types)
{
auto & partition_column = block_with_min_max_partition_ids.getByName(element.name);
if (element.name.starts_with(modulo_legacy_function_name))
partition_column.name.replace(0, modulo_legacy_function_name.size(), "modulo");
Field extracted_min_partition_id_field;
Field extracted_max_partition_id_field;
partition_column.column->get(0, extracted_min_partition_id_field);
partition_column.column->get(1, extracted_max_partition_id_field);
if (extracted_min_partition_id_field != extracted_max_partition_id_field)
{
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE,
"Can not create the partition. A partition can not contain values that have different partition ids");
}
partition_column.column->get(0u, value[i++]);
}
}
NamesAndTypesList MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context)
{
auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context);

View File

@ -1,12 +1,11 @@
#pragma once
#include <Core/Field.h>
#include <base/types.h>
#include <Disks/IDisk.h>
#include <IO/WriteBuffer.h>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/IPartMetadataManager.h>
#include <Storages/MergeTree/PartMetadataManagerOrdinary.h>
#include <base/types.h>
#include <Core/Field.h>
namespace DB
{
@ -52,11 +51,6 @@ public:
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context);
/// Copy of MergeTreePartition::create, but also validates if min max partition keys are equal. If they are different,
/// it means the partition can't be created because the data doesn't belong to the same partition.
void createAndValidateMinMaxPartitionIds(
const StorageMetadataPtr & metadata_snapshot, Block block_with_min_max_partition_ids, ContextPtr context);
static void appendFiles(const MergeTreeData & storage, Strings & files);
/// Adjust partition key and execute its expression on block. Return sample block according to used expression.

View File

@ -1,91 +0,0 @@
#include <Interpreters/MonotonicityCheckVisitor.h>
#include <Interpreters/getTableExpressions.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreePartitionCompatibilityVerifier.h>
#include <Storages/MergeTree/MergeTreePartitionGlobalMinMaxIdxCalculator.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
bool isDestinationPartitionExpressionMonotonicallyIncreasing(
const std::vector<Range> & hyperrectangle, const MergeTreeData & destination_storage)
{
auto destination_table_metadata = destination_storage.getInMemoryMetadataPtr();
auto key_description = destination_table_metadata->getPartitionKey();
auto definition_ast = key_description.definition_ast->clone();
auto table_identifier = std::make_shared<ASTIdentifier>(destination_storage.getStorageID().getTableName());
auto table_with_columns
= TableWithColumnNamesAndTypes{DatabaseAndTableWithAlias(table_identifier), destination_table_metadata->getColumns().getOrdinary()};
auto expression_list = extractKeyExpressionList(definition_ast);
MonotonicityCheckVisitor::Data data{{table_with_columns}, destination_storage.getContext(), /*group_by_function_hashes*/ {}};
for (auto i = 0u; i < expression_list->children.size(); i++)
{
data.range = hyperrectangle[i];
MonotonicityCheckVisitor(data).visit(expression_list->children[i]);
if (!data.monotonicity.is_monotonic || !data.monotonicity.is_positive)
return false;
}
return true;
}
bool isExpressionDirectSubsetOf(const ASTPtr source, const ASTPtr destination)
{
auto source_expression_list = extractKeyExpressionList(source);
auto destination_expression_list = extractKeyExpressionList(destination);
std::unordered_set<std::string> source_columns;
for (auto i = 0u; i < source_expression_list->children.size(); ++i)
source_columns.insert(source_expression_list->children[i]->getColumnName());
for (auto i = 0u; i < destination_expression_list->children.size(); ++i)
if (!source_columns.contains(destination_expression_list->children[i]->getColumnName()))
return false;
return true;
}
}
void MergeTreePartitionCompatibilityVerifier::verify(
const MergeTreeData & source_storage, const MergeTreeData & destination_storage, const DataPartsVector & source_parts)
{
const auto source_metadata = source_storage.getInMemoryMetadataPtr();
const auto destination_metadata = destination_storage.getInMemoryMetadataPtr();
const auto source_partition_key_ast = source_metadata->getPartitionKeyAST();
const auto destination_partition_key_ast = destination_metadata->getPartitionKeyAST();
// If destination partition expression columns are a subset of source partition expression columns,
// there is no need to check for monotonicity.
if (isExpressionDirectSubsetOf(source_partition_key_ast, destination_partition_key_ast))
return;
const auto src_global_min_max_indexes = MergeTreePartitionGlobalMinMaxIdxCalculator::calculate(source_parts, destination_storage);
assert(!src_global_min_max_indexes.hyperrectangle.empty());
if (!isDestinationPartitionExpressionMonotonicallyIncreasing(src_global_min_max_indexes.hyperrectangle, destination_storage))
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Destination table partition expression is not monotonically increasing");
MergeTreePartition().createAndValidateMinMaxPartitionIds(
destination_storage.getInMemoryMetadataPtr(),
src_global_min_max_indexes.getBlock(destination_storage),
destination_storage.getContext());
}
}

View File

@ -1,30 +0,0 @@
#pragma once
#include <Core/Field.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{
/*
* Verifies that source and destination partitions are compatible.
* To be compatible, one of the following criteria must be met:
* 1. Destination partition expression columns are a subset of source partition columns; or
* 2. Destination partition expression is monotonic on the source global min_max idx Range AND the computer partition id for
* the source global min_max idx range is the same.
*
* If not, an exception is thrown.
* */
class MergeTreePartitionCompatibilityVerifier
{
public:
using DataPart = IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
static void
verify(const MergeTreeData & source_storage, const MergeTreeData & destination_storage, const DataPartsVector & source_parts);
};
}

View File

@ -1,25 +0,0 @@
#include <Storages/MergeTree/MergeTreePartitionGlobalMinMaxIdxCalculator.h>
namespace DB
{
IMergeTreeDataPart::MinMaxIndex
MergeTreePartitionGlobalMinMaxIdxCalculator::calculate(const DataPartsVector & parts, const MergeTreeData & storage)
{
IMergeTreeDataPart::MinMaxIndex global_min_max_indexes;
for (const auto & part : parts)
{
auto metadata_manager = std::make_shared<PartMetadataManagerOrdinary>(part.get());
auto local_min_max_index = MergeTreeData::DataPart::MinMaxIndex();
local_min_max_index.load(storage, metadata_manager);
global_min_max_indexes.merge(local_min_max_index);
}
return global_min_max_indexes;
}
}

View File

@ -1,24 +0,0 @@
#pragma once
#include <utility>
#include <Core/Field.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
/*
* Calculates global min max indexes for a given set of parts on given storage.
* */
class MergeTreePartitionGlobalMinMaxIdxCalculator
{
using DataPart = IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
public:
static IMergeTreeDataPart::MinMaxIndex calculate(const DataPartsVector & parts, const MergeTreeData & storage);
};
}

View File

@ -5,9 +5,9 @@
#include <optional>
#include <ranges>
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
#include <IO/copyData.h>
#include "Common/Exception.h"
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
@ -20,30 +20,27 @@
#include <Interpreters/TransactionLog.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <IO/copyData.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>
#include <Parsers/formatAST.h>
#include <Planner/Utils.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/AlterCommands.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergePlainMergeTreeTask.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreePartitionCompatibilityVerifier.h>
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/MergeTree/PartMetadataManagerOrdinary.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/PartitionCommands.h>
#include <base/sort.h>
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergePlainMergeTreeTask.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -2064,74 +2061,42 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
ProfileEventsScope profile_events_scope;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot);
String partition_id = src_data.getPartitionIDFromQuery(partition, local_context);
String partition_id = getPartitionIDFromQuery(partition, local_context);
DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
bool attach_empty_partition = !replace && src_parts.empty();
if (attach_empty_partition)
return;
MutableDataPartsVector dst_parts;
std::vector<scope_guard> dst_parts_locks;
static const String TMP_PREFIX = "tmp_replace_from_";
const auto my_partition_expression = my_metadata_snapshot->getPartitionKeyAST();
const auto src_partition_expression = source_metadata_snapshot->getPartitionKeyAST();
const auto is_partition_exp_different = queryToStringNullable(my_partition_expression) != queryToStringNullable(src_partition_expression);
if (is_partition_exp_different && !src_parts.empty())
MergeTreePartitionCompatibilityVerifier::verify(src_data, /* destination_storage */ *this, src_parts);
for (DataPartPtr & src_part : src_parts)
for (const DataPartPtr & src_part : src_parts)
{
if (!canReplacePartition(src_part))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot replace partition '{}' because part '{}' has inconsistent granularity with table",
partition_id, src_part->name);
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
/// This will generate unique name in scope of current server process.
auto index = insert_increment.get();
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
if (is_partition_exp_different)
{
auto [new_partition, new_min_max_index] = createPartitionAndMinMaxIndexFromSourcePart(
src_part, my_metadata_snapshot, local_context);
auto [dst_part, part_lock] = cloneAndLoadPartOnSameDiskWithDifferentPartitionKey(
src_part,
new_partition,
new_partition.getID(*this),
new_min_max_index,
TMP_PREFIX,
my_metadata_snapshot,
clone_params,
local_context,
index,
index);
dst_parts.emplace_back(std::move(dst_part));
dst_parts_locks.emplace_back(std::move(part_lock));
}
else
{
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,
my_metadata_snapshot,
clone_params,
local_context->getReadSettings(),
local_context->getWriteSettings());
dst_parts.emplace_back(std::move(dst_part));
dst_parts_locks.emplace_back(std::move(part_lock));
}
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,
my_metadata_snapshot,
clone_params,
local_context->getReadSettings(),
local_context->getWriteSettings());
dst_parts.emplace_back(std::move(dst_part));
dst_parts_locks.emplace_back(std::move(part_lock));
}
/// ATTACH empty part set
if (!replace && dst_parts.empty())
return;
MergeTreePartInfo drop_range;
if (replace)
{

View File

@ -25,18 +25,20 @@
#include <base/sort.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Storages/AlterCommands.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/Freeze.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/extractZkPathFromCreateQuery.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/LeaderElection.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeFromLogEntryTask.h>
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreePartitionCompatibilityVerifier.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/MutateFromLogEntryTask.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
@ -48,11 +50,9 @@
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Storages/MergeTree/extractZkPathFromCreateQuery.h>
#include <Storages/PartitionCommands.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseReplicated.h>
@ -2704,48 +2704,16 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || ((our_zero_copy_enabled || source_zero_copy_enabled) && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
const auto my_partition_expression = metadata_snapshot->getPartitionKeyAST();
const auto src_partition_expression = source_table->getInMemoryMetadataPtr()->getPartitionKeyAST();
const auto is_partition_exp_different = queryToStringNullable(my_partition_expression) != queryToStringNullable(src_partition_expression);
if (is_partition_exp_different)
{
auto [new_partition, new_min_max_index] = createPartitionAndMinMaxIndexFromSourcePart(
part_desc->src_table_part, metadata_snapshot, getContext());
auto partition_id = new_partition.getID(*this);
auto [res_part, temporary_part_lock] = cloneAndLoadPartOnSameDiskWithDifferentPartitionKey(
part_desc->src_table_part,
new_partition,
partition_id,
new_min_max_index,
TMP_PREFIX + "clone_",
metadata_snapshot,
clone_params,
getContext(),
part_desc->new_part_info.min_block,
part_desc->new_part_info.max_block);
part_desc->res_part = std::move(res_part);
part_desc->temporary_part_lock = std::move(temporary_part_lock);
}
else
{
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
part_desc->src_table_part,
TMP_PREFIX + "clone_",
part_desc->new_part_info,
metadata_snapshot,
clone_params,
getContext()->getReadSettings(),
getContext()->getWriteSettings());
part_desc->res_part = std::move(res_part);
part_desc->temporary_part_lock = std::move(temporary_part_lock);
}
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
part_desc->src_table_part,
TMP_PREFIX + "clone_",
part_desc->new_part_info,
metadata_snapshot,
clone_params,
getContext()->getReadSettings(),
getContext()->getWriteSettings());
part_desc->res_part = std::move(res_part);
part_desc->temporary_part_lock = std::move(temporary_part_lock);
}
else if (!part_desc->replica.empty())
{
@ -7883,22 +7851,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
ProfileEventsScope profile_events_scope;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot);
String partition_id = src_data.getPartitionIDFromQuery(partition, query_context);
String partition_id = getPartitionIDFromQuery(partition, query_context);
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id);
bool attach_empty_partition = !replace && src_all_parts.empty();
if (attach_empty_partition)
return;
const auto my_partition_expression = metadata_snapshot->getPartitionKeyAST();
const auto src_partition_expression = source_metadata_snapshot->getPartitionKeyAST();
const auto is_partition_exp_different = queryToStringNullable(my_partition_expression) != queryToStringNullable(src_partition_expression);
if (is_partition_exp_different && !src_all_parts.empty())
MergeTreePartitionCompatibilityVerifier::verify(src_data, /* destination_storage */ *this, src_all_parts);
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
static const String TMP_PREFIX = "tmp_replace_from_";
@ -7953,18 +7910,6 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
"Cannot replace partition '{}' because part '{}"
"' has inconsistent granularity with table", partition_id, src_part->name);
IMergeTreeDataPart::MinMaxIndex min_max_index = *src_part->minmax_idx;
MergeTreePartition merge_tree_partition = src_part->partition;
if (is_partition_exp_different)
{
auto [new_partition, new_min_max_index] = createPartitionAndMinMaxIndexFromSourcePart(src_part, metadata_snapshot, query_context);
merge_tree_partition = new_partition;
min_max_index = new_min_max_index;
partition_id = merge_tree_partition.getID(*this);
}
String hash_hex = src_part->checksums.getTotalChecksumHex();
const bool is_duplicated_part = replaced_parts.contains(hash_hex);
replaced_parts.insert(hash_hex);
@ -7983,52 +7928,27 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
continue;
}
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
UInt64 index = lock->getNumber();
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
if (is_partition_exp_different)
{
auto [dst_part, part_lock] = cloneAndLoadPartOnSameDiskWithDifferentPartitionKey(
src_part,
merge_tree_partition,
partition_id,
min_max_index,
TMP_PREFIX,
metadata_snapshot,
clone_params,
query_context,
index,
index);
dst_parts.emplace_back(dst_part);
dst_parts_locks.emplace_back(std::move(part_lock));
}
else
{
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,
metadata_snapshot,
clone_params,
query_context->getReadSettings(),
query_context->getWriteSettings());
dst_parts.emplace_back(dst_part);
dst_parts_locks.emplace_back(std::move(part_lock));
}
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,
metadata_snapshot,
clone_params,
query_context->getReadSettings(),
query_context->getWriteSettings());
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
dst_parts_locks.emplace_back(std::move(part_lock));
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);

View File

@ -12,8 +12,6 @@ test_mysql_database_engine/test.py::test_mysql_ddl_for_mysql_database
test_passing_max_partitions_to_read_remotely/test.py::test_default_database_on_cluster
test_replicating_constants/test.py::test_different_versions
test_select_access_rights/test_main.py::test_alias_columns
test_select_access_rights/test_main.py::test_select_count
test_select_access_rights/test_main.py::test_select_join
test_settings_profile/test.py::test_show_profiles
test_shard_level_const_function/test.py::test_remote
test_sql_user_defined_functions_on_cluster/test.py::test_sql_user_defined_functions_on_cluster

View File

@ -8,11 +8,10 @@ from pprint import pformat
from typing import Any, List, Literal, Optional, Tuple
import boto3 # type: ignore
from lambda_shared import (
RUNNER_TYPE_LABELS,
CHException,
ClickHouseHelper,
RUNNER_TYPE_LABELS,
get_parameter_from_ssm,
)
@ -115,6 +114,8 @@ def set_capacity(
# Are we already at the capacity limits
stop = stop or asg["MaxSize"] <= asg["DesiredCapacity"]
# Let's calculate a new desired capacity
# (capacity_deficit + scale_up - 1) // scale_up : will increase min by 1
# if there is any capacity_deficit
desired_capacity = (
asg["DesiredCapacity"] + (capacity_deficit + scale_up - 1) // scale_up
)

View File

@ -4,7 +4,7 @@ import unittest
from dataclasses import dataclass
from typing import Any, List
from app import set_capacity, Queue
from app import Queue, set_capacity
@dataclass
@ -68,10 +68,16 @@ class TestSetCapacity(unittest.TestCase):
test_cases = (
# Do not change capacity
TestCase("noqueue", 1, 13, 20, [Queue("in_progress", 155, "noqueue")], -1),
TestCase(
"w/reserve-1", 1, 13, 20, [Queue("queued", 15, "w/reserve-1")], 14
),
TestCase("reserve", 1, 13, 20, [Queue("queued", 13, "reserve")], -1),
# Increase capacity
TestCase(
"increase-always",
1,
13,
20,
[Queue("queued", 14, "increase-always")],
14,
),
TestCase("increase-1", 1, 13, 20, [Queue("queued", 23, "increase-1")], 17),
TestCase(
"style-checker", 1, 13, 20, [Queue("queued", 33, "style-checker")], 20

View File

@ -1,17 +0,0 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>replica1</host>
<port>9000</port>
</replica>
<replica>
<host>replica2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -1,214 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
replica1 = cluster.add_instance(
"replica1", with_zookeeper=True, main_configs=["configs/remote_servers.xml"]
)
replica2 = cluster.add_instance(
"replica2", with_zookeeper=True, main_configs=["configs/remote_servers.xml"]
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def cleanup(nodes):
for node in nodes:
node.query("DROP TABLE IF EXISTS source SYNC")
node.query("DROP TABLE IF EXISTS destination SYNC")
def create_table(node, table_name, replicated):
replica = node.name
engine = (
f"ReplicatedMergeTree('/clickhouse/tables/1/{table_name}', '{replica}')"
if replicated
else "MergeTree()"
)
partition_expression = (
"toYYYYMMDD(timestamp)" if table_name == "source" else "toYYYYMM(timestamp)"
)
node.query_with_retry(
"""
CREATE TABLE {table_name}(timestamp DateTime)
ENGINE = {engine}
ORDER BY tuple() PARTITION BY {partition_expression}
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, max_cleanup_delay_period=1;
""".format(
table_name=table_name,
engine=engine,
partition_expression=partition_expression,
)
)
def test_both_replicated(start_cluster):
for node in [replica1, replica2]:
create_table(node, "source", True)
create_table(node, "destination", True)
replica1.query("INSERT INTO source VALUES ('2010-03-02 02:01:01')")
replica1.query("SYSTEM SYNC REPLICA source")
replica1.query("SYSTEM SYNC REPLICA destination")
replica1.query(
f"ALTER TABLE destination ATTACH PARTITION ID '20100302' FROM source"
)
assert_eq_with_retry(
replica1, f"SELECT * FROM destination", "2010-03-02 02:01:01\n"
)
assert_eq_with_retry(
replica1,
f"SELECT * FROM destination",
replica2.query(f"SELECT * FROM destination"),
)
cleanup([replica1, replica2])
def test_only_destination_replicated(start_cluster):
create_table(replica1, "source", False)
create_table(replica1, "destination", True)
create_table(replica2, "destination", True)
replica1.query("INSERT INTO source VALUES ('2010-03-02 02:01:01')")
replica1.query("SYSTEM SYNC REPLICA destination")
replica1.query(
f"ALTER TABLE destination ATTACH PARTITION ID '20100302' FROM source"
)
assert_eq_with_retry(
replica1, f"SELECT * FROM destination", "2010-03-02 02:01:01\n"
)
assert_eq_with_retry(
replica1,
f"SELECT * FROM destination",
replica2.query(f"SELECT * FROM destination"),
)
cleanup([replica1, replica2])
def test_both_replicated_partitioned_to_unpartitioned(start_cluster):
def create_tables(nodes):
for node in nodes:
source_engine = (
f"ReplicatedMergeTree('/clickhouse/tables/1/source', '{node.name}')"
)
node.query(
"""
CREATE TABLE source(timestamp DateTime)
ENGINE = {engine}
ORDER BY tuple() PARTITION BY toYYYYMMDD(timestamp)
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, max_cleanup_delay_period=1;
""".format(
engine=source_engine,
)
)
destination_engine = f"ReplicatedMergeTree('/clickhouse/tables/1/destination', '{node.name}')"
node.query(
"""
CREATE TABLE destination(timestamp DateTime)
ENGINE = {engine}
ORDER BY tuple() PARTITION BY tuple()
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, max_cleanup_delay_period=1;
""".format(
engine=destination_engine,
)
)
create_tables([replica1, replica2])
replica1.query("INSERT INTO source VALUES ('2010-03-02 02:01:01')")
replica1.query("INSERT INTO source VALUES ('2010-03-03 02:01:01')")
replica1.query("SYSTEM SYNC REPLICA source")
replica1.query("SYSTEM SYNC REPLICA destination")
replica1.query(
f"ALTER TABLE destination ATTACH PARTITION ID '20100302' FROM source"
)
replica1.query(
f"ALTER TABLE destination ATTACH PARTITION ID '20100303' FROM source"
)
assert_eq_with_retry(
replica1,
f"SELECT * FROM destination ORDER BY timestamp",
"2010-03-02 02:01:01\n2010-03-03 02:01:01\n",
)
assert_eq_with_retry(
replica1,
f"SELECT * FROM destination ORDER BY timestamp",
replica2.query(f"SELECT * FROM destination ORDER BY timestamp"),
)
cleanup([replica1, replica2])
def test_both_replicated_different_exp_same_id(start_cluster):
def create_tables(nodes):
for node in nodes:
source_engine = (
f"ReplicatedMergeTree('/clickhouse/tables/1/source', '{node.name}')"
)
node.query(
"""
CREATE TABLE source(a UInt16,b UInt16,c UInt16,extra UInt64,Path String,Time DateTime,Value Float64,Timestamp Int64,sign Int8)
ENGINE = {engine}
ORDER BY tuple() PARTITION BY a % 3
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, max_cleanup_delay_period=1;
""".format(
engine=source_engine,
)
)
destination_engine = f"ReplicatedMergeTree('/clickhouse/tables/1/destination', '{node.name}')"
node.query(
"""
CREATE TABLE destination(a UInt16,b UInt16,c UInt16,extra UInt64,Path String,Time DateTime,Value Float64,Timestamp Int64,sign Int8)
ENGINE = {engine}
ORDER BY tuple() PARTITION BY a
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, max_cleanup_delay_period=1;
""".format(
engine=destination_engine,
)
)
create_tables([replica1, replica2])
replica1.query(
"INSERT INTO source (a, b, c, extra, sign) VALUES (1, 5, 9, 1000, 1)"
)
replica1.query(
"INSERT INTO source (a, b, c, extra, sign) VALUES (2, 6, 10, 1000, 1)"
)
replica1.query("SYSTEM SYNC REPLICA source")
replica1.query("SYSTEM SYNC REPLICA destination")
replica1.query(f"ALTER TABLE destination ATTACH PARTITION 1 FROM source")
replica1.query(f"ALTER TABLE destination ATTACH PARTITION 2 FROM source")
assert_eq_with_retry(
replica1,
f"SELECT * FROM destination ORDER BY a",
"1\t5\t9\t1000\t\t1970-01-01 00:00:00\t0\t0\t1\n2\t6\t10\t1000\t\t1970-01-01 00:00:00\t0\t0\t1\n",
)
assert_eq_with_retry(
replica1,
f"SELECT * FROM destination ORDER BY a",
replica2.query(f"SELECT * FROM destination ORDER BY a"),
)
cleanup([replica1, replica2])

View File

@ -1,6 +1,7 @@
import pytest
import re
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance("instance")
@ -185,25 +186,39 @@ def test_select_join():
)
select_query = "SELECT * FROM table1 JOIN table2 USING(d)"
assert (
"it's necessary to have the grant SELECT(d, x, y) ON default.table2"
in instance.query_and_get_error(select_query, user="A")
)
def match_error(err, columns, table):
"""Check if the error message contains the expected table and columns"""
match = re.search(
r"it's necessary to have the grant SELECT\((.*)\) ON default\.(\w+)", err
)
if not match:
return False
if match.group(2) != table:
return False
assert set(match.group(1).split(", ")) == set(
columns.split(", ")
), f"expected {columns} in {err}"
return True
response = instance.query_and_get_error(select_query, user="A")
table1_match = match_error(response, "d, a, b", "table1")
table2_match = match_error(response, "d, x, y", "table2")
assert table1_match or table2_match, response
instance.query("GRANT SELECT(d, x, y) ON default.table2 TO A")
assert (
"it's necessary to have the grant SELECT(d, a, b) ON default.table1"
in instance.query_and_get_error(select_query, user="A")
)
response = instance.query_and_get_error(select_query, user="A")
assert match_error(response, "d, a, b", "table1")
response = instance.query_and_get_error(select_query, user="A")
instance.query("GRANT SELECT(d, a, b) ON default.table1 TO A")
assert instance.query(select_query, user="A") == ""
instance.query("REVOKE SELECT ON default.table2 FROM A")
assert (
"it's necessary to have the grant SELECT(d, x, y) ON default.table2"
in instance.query_and_get_error(select_query, user="A")
)
response = instance.query_and_get_error(select_query, user="A")
assert match_error(response, "d, x, y", "table2")
def test_select_union():

View File

@ -1,5 +1,3 @@
SET default_table_engine = 'None';
CREATE TABLE table_02184 (x UInt8); --{serverError 119}
SET default_table_engine = 'Log';
CREATE TABLE table_02184 (x UInt8);

View File

@ -1,467 +0,0 @@
-- { echoOn }
-- Should be allowed since destination partition expr is monotonically increasing and compatible
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMMDD(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '20100302' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
201003
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '20100302' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
201003
-- Should be allowed since destination partition expr is monotonically increasing and compatible. Note that even though
-- the destination partition expression is more granular, the data would still fall in the same partition. Thus, it is valid
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMMDD(timestamp);
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '201003' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
20100302
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '201003' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
20100302
-- Should be allowed since destination partition expr is monotonically increasing and compatible for those specific values
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime, A Int64) engine=MergeTree ORDER BY timestamp PARTITION BY intDiv(A, 6);
CREATE TABLE destination (timestamp DateTime, A Int64) engine=MergeTree ORDER BY timestamp PARTITION BY A;
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01', 1), ('2010-03-02 02:01:03', 1);
ALTER TABLE destination ATTACH PARTITION ID '0' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01 1
2010-03-02 02:01:03 1
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01 1
2010-03-02 02:01:03 1
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
1
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION 0 FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01 1
2010-03-02 02:01:03 1
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01 1
2010-03-02 02:01:03 1
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
1
-- Should be allowed because dst partition exp is monot inc and data is not split
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY cityHash64(category);
CREATE TABLE destination (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY toString(category);
INSERT INTO TABLE source VALUES ('spaghetti', 'food'), ('mop', 'general');
INSERT INTO TABLE source VALUES ('rice', 'food');
ALTER TABLE destination ATTACH PARTITION ID '17908065610379824077' from source;
SELECT * FROM source ORDER BY productName;
mop general
rice food
spaghetti food
SELECT * FROM destination ORDER BY productName;
rice food
spaghetti food
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
59532f3c39a412a413f0f014c7750a9d
59532f3c39a412a413f0f014c7750a9d
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '17908065610379824077' from source;
SELECT * FROM source ORDER BY productName;
mop general
rice food
spaghetti food
SELECT * FROM destination ORDER BY productName;
rice food
spaghetti food
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
59532f3c39a412a413f0f014c7750a9d
59532f3c39a412a413f0f014c7750a9d
-- Should be allowed, extra test case to validate https://github.com/ClickHouse/ClickHouse/pull/39507#issuecomment-1747574133
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp Int64) engine=MergeTree ORDER BY (timestamp) PARTITION BY intDiv(timestamp, 86400000);
CREATE TABLE destination (timestamp Int64) engine=MergeTree ORDER BY (timestamp) PARTITION BY toYear(toDateTime(intDiv(timestamp, 1000)));
INSERT INTO TABLE source VALUES (1267495261123);
ALTER TABLE destination ATTACH PARTITION ID '14670' FROM source;
SELECT * FROM source ORDER BY timestamp;
1267495261123
SELECT * FROM destination ORDER BY timestamp;
1267495261123
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
2010
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '14670' from source;
SELECT * FROM source ORDER BY timestamp;
1267495261123
SELECT * FROM destination ORDER BY timestamp;
1267495261123
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
2010
-- Should be allowed, extra test case to validate https://github.com/ClickHouse/ClickHouse/pull/39507#issuecomment-1747511726
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime('UTC'), key Int64, f Float64) engine=MergeTree ORDER BY (key, timestamp) PARTITION BY toYear(timestamp);
CREATE TABLE destination (timestamp DateTime('UTC'), key Int64, f Float64) engine=MergeTree ORDER BY (key, timestamp) PARTITION BY (intDiv(toUInt32(timestamp),86400));
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01',1,1),('2010-03-02 02:01:01',1,1),('2011-02-02 02:01:03',1,1);
ALTER TABLE destination ATTACH PARTITION ID '2010' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01 1 1
2010-03-02 02:01:01 1 1
2011-02-02 02:01:03 1 1
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01 1 1
2010-03-02 02:01:01 1 1
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
14670
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '2010' from source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01 1 1
2010-03-02 02:01:01 1 1
2011-02-02 02:01:03 1 1
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01 1 1
2010-03-02 02:01:01 1 1
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
14670
-- Should be allowed, partitioned table to unpartitioned. Since the destination is unpartitioned, parts would ultimately
-- fall into the same partition.
-- Destination partition by expression is omitted, which causes StorageMetadata::getPartitionKeyAST() to be nullptr.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple();
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '201003' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
all
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '201003' from source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
all
-- Same as above, but destination partition by expression is explicitly defined. Test case required to validate that
-- partition by tuple() is accepted.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY tuple();
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '201003' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
all
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '201003' from source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
all
-- Should be allowed because the destination partition expression columns are a subset of the source partition expression columns
-- Columns in this case refer to the expression elements, not to the actual table columns
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (a Int, b Int, c Int) engine=MergeTree ORDER BY tuple() PARTITION BY (a, b, c);
CREATE TABLE destination (a Int, b Int, c Int) engine=MergeTree ORDER BY tuple() PARTITION BY (a, b);
INSERT INTO TABLE source VALUES (1, 2, 3), (1, 2, 4);
ALTER TABLE destination ATTACH PARTITION ID '1-2-3' FROM source;
SELECT * FROM source ORDER BY (a, b, c);
1 2 3
1 2 4
SELECT * FROM destination ORDER BY (a, b, c);
1 2 3
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
1-2
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION (1, 2, 3) from source;
SELECT * FROM source ORDER BY (a, b, c);
1 2 3
1 2 4
SELECT * FROM destination ORDER BY (a, b, c);
1 2 3
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
1-2
-- Should be allowed because the destination partition expression columns are a subset of the source partition expression columns
-- Columns in this case refer to the expression elements, not to the actual table columns
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (a Int, b Int, c Int) engine=MergeTree ORDER BY tuple() PARTITION BY (a, b, c);
CREATE TABLE destination (a Int, b Int, c Int) engine=MergeTree ORDER BY tuple() PARTITION BY a;
INSERT INTO TABLE source VALUES (1, 2, 3), (1, 2, 4);
ALTER TABLE destination ATTACH PARTITION ID '1-2-3' FROM source;
SELECT * FROM source ORDER BY (a, b, c);
1 2 3
1 2 4
SELECT * FROM destination ORDER BY (a, b, c);
1 2 3
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
1
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION (1, 2, 3) from source;
SELECT * FROM source ORDER BY (a, b, c);
1 2 3
1 2 4
SELECT * FROM destination ORDER BY (a, b, c);
1 2 3
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
1
-- Should be allowed. Special test case, tricky to explain. First column of source partition expression is
-- timestamp, while first column of destination partition expression is `A`. One of the previous implementations
-- would not match the columns, which could lead to `timestamp` min max being used to calculate monotonicity of `A`.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (`timestamp` DateTime, `A` Int64) ENGINE = MergeTree PARTITION BY tuple(toYYYYMM(timestamp), intDiv(A, 6)) ORDER BY timestamp;
CREATE TABLE destination (`timestamp` DateTime, `A` Int64) ENGINE = MergeTree PARTITION BY A ORDER BY timestamp;
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01', 5);
ALTER TABLE destination ATTACH PARTITION ID '201003-0' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01 5
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01 5
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
5
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION (201003, 0) from source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01 5
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01 5
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
5
-- Should be allowed. Destination partition expression contains multiple expressions, but all of them are monotonically
-- increasing in the source partition min max indexes.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (A Int, B Int) ENGINE = MergeTree PARTITION BY tuple(A, B) ORDER BY tuple();
CREATE TABLE destination (A Int, B Int) ENGINE = MergeTree PARTITION BY tuple(intDiv(A, 2), intDiv(B, 2)) ORDER BY tuple();
INSERT INTO TABLE source VALUES (6, 12);
ALTER TABLE destination ATTACH PARTITION ID '6-12' FROM source;
SELECT * FROM source ORDER BY A;
6 12
SELECT * FROM destination ORDER BY A;
6 12
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
3-6
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION (6, 12) from source;
SELECT * FROM source ORDER BY A;
6 12
SELECT * FROM destination ORDER BY A;
6 12
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
3-6
-- Should be allowed. The same scenario as above, but partition expressions inverted.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (A Int, B Int) ENGINE = MergeTree PARTITION BY tuple(intDiv(A, 2), intDiv(B, 2)) ORDER BY tuple();
CREATE TABLE destination (A Int, B Int) ENGINE = MergeTree PARTITION BY tuple(A, B) ORDER BY tuple();
INSERT INTO TABLE source VALUES (6, 12);
ALTER TABLE destination ATTACH PARTITION ID '3-6' FROM source;
SELECT * FROM source ORDER BY A;
6 12
SELECT * FROM destination ORDER BY A;
6 12
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
6-12
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION (3, 6) from source;
SELECT * FROM source ORDER BY A;
6 12
SELECT * FROM destination ORDER BY A;
6 12
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
6-12
-- Should be allowed, it is a local operation, no different than regular attach. Replicated to replicated.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE
source(timestamp DateTime)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/source_replicated_to_replicated_distinct_expression', '1')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY tuple();
CREATE TABLE
destination(timestamp DateTime)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/destination_replicated_to_replicated_distinct_expression', '1')
PARTITION BY toYYYYMM(timestamp)
ORDER BY tuple();
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '20100302' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
201003
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '20100302' from source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
201003
-- Should be allowed, it is a local operation, no different than regular attach. Non replicated to replicated
DROP TABLE IF EXISTS source SYNC;
DROP TABLE IF EXISTS destination SYNC;
CREATE TABLE source(timestamp DateTime) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(timestamp) ORDER BY tuple();
CREATE TABLE
destination(timestamp DateTime)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/destination_non_replicated_to_replicated_distinct_expression', '1')
PARTITION BY toYYYYMM(timestamp)
ORDER BY tuple();
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '20100302' FROM source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
201003
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '20100302' from source;
SELECT * FROM source ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT * FROM destination ORDER BY timestamp;
2010-03-02 02:01:01
2010-03-02 02:01:03
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
201003
-- Should not be allowed because data would be split into two different partitions
DROP TABLE IF EXISTS source SYNC;
DROP TABLE IF EXISTS destination SYNC;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMMDD(timestamp);
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-03 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '201003' FROM source; -- { serverError 248 }
ALTER TABLE destination ATTACH PARTITION '201003' from source; -- { serverError 248 }
-- Should not be allowed because data would be split into two different partitions
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime, A Int64) engine=MergeTree ORDER BY timestamp PARTITION BY intDiv(A, 6);
CREATE TABLE destination (timestamp DateTime, A Int64) engine=MergeTree ORDER BY timestamp PARTITION BY A;
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01', 1), ('2010-03-02 02:01:03', 2);
ALTER TABLE destination ATTACH PARTITION ID '0' FROM source; -- { serverError 248 }
ALTER TABLE destination ATTACH PARTITION 0 FROM source; -- { serverError 248 }
-- Should not be allowed because dst partition exp takes more than two arguments, so it's not considered monotonically inc
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY toString(category);
CREATE TABLE destination (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY substring(category, 1, 2);
INSERT INTO TABLE source VALUES ('spaghetti', 'food'), ('mop', 'general');
INSERT INTO TABLE source VALUES ('rice', 'food');
ALTER TABLE destination ATTACH PARTITION ID '4590ba78048910b74a47d5bfb308abed' from source; -- { serverError 36 }
ALTER TABLE destination ATTACH PARTITION 'food' from source; -- { serverError 36 }
-- Should not be allowed because dst partition exp depends on a different set of columns
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY toString(category);
CREATE TABLE destination (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY toString(productName);
INSERT INTO TABLE source VALUES ('spaghetti', 'food'), ('mop', 'general');
INSERT INTO TABLE source VALUES ('rice', 'food');
ALTER TABLE destination ATTACH PARTITION ID '4590ba78048910b74a47d5bfb308abed' from source; -- { serverError 36 }
ALTER TABLE destination ATTACH PARTITION 'food' from source; -- { serverError 36 }
-- Should not be allowed because dst partition exp is not monotonically increasing
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (productName String) engine=MergeTree ORDER BY tuple() PARTITION BY left(productName, 2);
CREATE TABLE destination (productName String) engine=MergeTree ORDER BY tuple() PARTITION BY cityHash64(productName);
INSERT INTO TABLE source VALUES ('bread'), ('mop');
INSERT INTO TABLE source VALUES ('broccoli');
ALTER TABLE destination ATTACH PARTITION ID '4589453b7ee96ce9de1265bd57674496' from source; -- { serverError 36 }
ALTER TABLE destination ATTACH PARTITION 'br' from source; -- { serverError 36 }
-- Empty/ non-existent partition, same partition expression. Nothing should happen
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
ALTER TABLE destination ATTACH PARTITION ID '1' FROM source;
ALTER TABLE destination ATTACH PARTITION 1 FROM source;
SELECT * FROM destination;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Empty/ non-existent partition, different partition expression. Nothing should happen
-- https://github.com/ClickHouse/ClickHouse/pull/39507#discussion_r1399839045
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMMDD(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
ALTER TABLE destination ATTACH PARTITION ID '1' FROM source;
ALTER TABLE destination ATTACH PARTITION 1 FROM source;
SELECT * FROM destination;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Replace instead of attach. Empty/ non-existent partition, same partition expression. Nothing should happen
-- https://github.com/ClickHouse/ClickHouse/pull/39507#discussion_r1399839045
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
ALTER TABLE destination REPLACE PARTITION '1' FROM source;
SELECT * FROM destination;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Replace instead of attach. Empty/ non-existent partition to non-empty partition, same partition id.
-- https://github.com/ClickHouse/ClickHouse/pull/39507#discussion_r1399839045
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (A Int) engine=MergeTree ORDER BY tuple() PARTITION BY A;
CREATE TABLE destination (A Int) engine=MergeTree ORDER BY tuple() PARTITION BY A;
INSERT INTO TABLE destination VALUES (1);
ALTER TABLE destination REPLACE PARTITION '1' FROM source;
SELECT * FROM destination;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;

View File

@ -1,485 +0,0 @@
-- { echoOn }
-- Should be allowed since destination partition expr is monotonically increasing and compatible
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMMDD(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '20100302' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '20100302' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed since destination partition expr is monotonically increasing and compatible. Note that even though
-- the destination partition expression is more granular, the data would still fall in the same partition. Thus, it is valid
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMMDD(timestamp);
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '201003' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '201003' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed since destination partition expr is monotonically increasing and compatible for those specific values
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime, A Int64) engine=MergeTree ORDER BY timestamp PARTITION BY intDiv(A, 6);
CREATE TABLE destination (timestamp DateTime, A Int64) engine=MergeTree ORDER BY timestamp PARTITION BY A;
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01', 1), ('2010-03-02 02:01:03', 1);
ALTER TABLE destination ATTACH PARTITION ID '0' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION 0 FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed because dst partition exp is monot inc and data is not split
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY cityHash64(category);
CREATE TABLE destination (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY toString(category);
INSERT INTO TABLE source VALUES ('spaghetti', 'food'), ('mop', 'general');
INSERT INTO TABLE source VALUES ('rice', 'food');
ALTER TABLE destination ATTACH PARTITION ID '17908065610379824077' from source;
SELECT * FROM source ORDER BY productName;
SELECT * FROM destination ORDER BY productName;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '17908065610379824077' from source;
SELECT * FROM source ORDER BY productName;
SELECT * FROM destination ORDER BY productName;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed, extra test case to validate https://github.com/ClickHouse/ClickHouse/pull/39507#issuecomment-1747574133
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp Int64) engine=MergeTree ORDER BY (timestamp) PARTITION BY intDiv(timestamp, 86400000);
CREATE TABLE destination (timestamp Int64) engine=MergeTree ORDER BY (timestamp) PARTITION BY toYear(toDateTime(intDiv(timestamp, 1000)));
INSERT INTO TABLE source VALUES (1267495261123);
ALTER TABLE destination ATTACH PARTITION ID '14670' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '14670' from source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed, extra test case to validate https://github.com/ClickHouse/ClickHouse/pull/39507#issuecomment-1747511726
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime('UTC'), key Int64, f Float64) engine=MergeTree ORDER BY (key, timestamp) PARTITION BY toYear(timestamp);
CREATE TABLE destination (timestamp DateTime('UTC'), key Int64, f Float64) engine=MergeTree ORDER BY (key, timestamp) PARTITION BY (intDiv(toUInt32(timestamp),86400));
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01',1,1),('2010-03-02 02:01:01',1,1),('2011-02-02 02:01:03',1,1);
ALTER TABLE destination ATTACH PARTITION ID '2010' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '2010' from source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed, partitioned table to unpartitioned. Since the destination is unpartitioned, parts would ultimately
-- fall into the same partition.
-- Destination partition by expression is omitted, which causes StorageMetadata::getPartitionKeyAST() to be nullptr.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple();
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '201003' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '201003' from source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Same as above, but destination partition by expression is explicitly defined. Test case required to validate that
-- partition by tuple() is accepted.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY tuple();
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '201003' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '201003' from source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed because the destination partition expression columns are a subset of the source partition expression columns
-- Columns in this case refer to the expression elements, not to the actual table columns
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (a Int, b Int, c Int) engine=MergeTree ORDER BY tuple() PARTITION BY (a, b, c);
CREATE TABLE destination (a Int, b Int, c Int) engine=MergeTree ORDER BY tuple() PARTITION BY (a, b);
INSERT INTO TABLE source VALUES (1, 2, 3), (1, 2, 4);
ALTER TABLE destination ATTACH PARTITION ID '1-2-3' FROM source;
SELECT * FROM source ORDER BY (a, b, c);
SELECT * FROM destination ORDER BY (a, b, c);
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION (1, 2, 3) from source;
SELECT * FROM source ORDER BY (a, b, c);
SELECT * FROM destination ORDER BY (a, b, c);
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed because the destination partition expression columns are a subset of the source partition expression columns
-- Columns in this case refer to the expression elements, not to the actual table columns
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (a Int, b Int, c Int) engine=MergeTree ORDER BY tuple() PARTITION BY (a, b, c);
CREATE TABLE destination (a Int, b Int, c Int) engine=MergeTree ORDER BY tuple() PARTITION BY a;
INSERT INTO TABLE source VALUES (1, 2, 3), (1, 2, 4);
ALTER TABLE destination ATTACH PARTITION ID '1-2-3' FROM source;
SELECT * FROM source ORDER BY (a, b, c);
SELECT * FROM destination ORDER BY (a, b, c);
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION (1, 2, 3) from source;
SELECT * FROM source ORDER BY (a, b, c);
SELECT * FROM destination ORDER BY (a, b, c);
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed. Special test case, tricky to explain. First column of source partition expression is
-- timestamp, while first column of destination partition expression is `A`. One of the previous implementations
-- would not match the columns, which could lead to `timestamp` min max being used to calculate monotonicity of `A`.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (`timestamp` DateTime, `A` Int64) ENGINE = MergeTree PARTITION BY tuple(toYYYYMM(timestamp), intDiv(A, 6)) ORDER BY timestamp;
CREATE TABLE destination (`timestamp` DateTime, `A` Int64) ENGINE = MergeTree PARTITION BY A ORDER BY timestamp;
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01', 5);
ALTER TABLE destination ATTACH PARTITION ID '201003-0' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION (201003, 0) from source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed. Destination partition expression contains multiple expressions, but all of them are monotonically
-- increasing in the source partition min max indexes.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (A Int, B Int) ENGINE = MergeTree PARTITION BY tuple(A, B) ORDER BY tuple();
CREATE TABLE destination (A Int, B Int) ENGINE = MergeTree PARTITION BY tuple(intDiv(A, 2), intDiv(B, 2)) ORDER BY tuple();
INSERT INTO TABLE source VALUES (6, 12);
ALTER TABLE destination ATTACH PARTITION ID '6-12' FROM source;
SELECT * FROM source ORDER BY A;
SELECT * FROM destination ORDER BY A;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION (6, 12) from source;
SELECT * FROM source ORDER BY A;
SELECT * FROM destination ORDER BY A;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed. The same scenario as above, but partition expressions inverted.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (A Int, B Int) ENGINE = MergeTree PARTITION BY tuple(intDiv(A, 2), intDiv(B, 2)) ORDER BY tuple();
CREATE TABLE destination (A Int, B Int) ENGINE = MergeTree PARTITION BY tuple(A, B) ORDER BY tuple();
INSERT INTO TABLE source VALUES (6, 12);
ALTER TABLE destination ATTACH PARTITION ID '3-6' FROM source;
SELECT * FROM source ORDER BY A;
SELECT * FROM destination ORDER BY A;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION (3, 6) from source;
SELECT * FROM source ORDER BY A;
SELECT * FROM destination ORDER BY A;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed, it is a local operation, no different than regular attach. Replicated to replicated.
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE
source(timestamp DateTime)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/source_replicated_to_replicated_distinct_expression', '1')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY tuple();
CREATE TABLE
destination(timestamp DateTime)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/destination_replicated_to_replicated_distinct_expression', '1')
PARTITION BY toYYYYMM(timestamp)
ORDER BY tuple();
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '20100302' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '20100302' from source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should be allowed, it is a local operation, no different than regular attach. Non replicated to replicated
DROP TABLE IF EXISTS source SYNC;
DROP TABLE IF EXISTS destination SYNC;
CREATE TABLE source(timestamp DateTime) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(timestamp) ORDER BY tuple();
CREATE TABLE
destination(timestamp DateTime)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/destination_non_replicated_to_replicated_distinct_expression', '1')
PARTITION BY toYYYYMM(timestamp)
ORDER BY tuple();
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-02 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '20100302' FROM source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
TRUNCATE TABLE destination;
ALTER TABLE destination ATTACH PARTITION '20100302' from source;
SELECT * FROM source ORDER BY timestamp;
SELECT * FROM destination ORDER BY timestamp;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Should not be allowed because data would be split into two different partitions
DROP TABLE IF EXISTS source SYNC;
DROP TABLE IF EXISTS destination SYNC;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMMDD(timestamp);
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01'), ('2010-03-03 02:01:03');
ALTER TABLE destination ATTACH PARTITION ID '201003' FROM source; -- { serverError 248 }
ALTER TABLE destination ATTACH PARTITION '201003' from source; -- { serverError 248 }
-- Should not be allowed because data would be split into two different partitions
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime, A Int64) engine=MergeTree ORDER BY timestamp PARTITION BY intDiv(A, 6);
CREATE TABLE destination (timestamp DateTime, A Int64) engine=MergeTree ORDER BY timestamp PARTITION BY A;
INSERT INTO TABLE source VALUES ('2010-03-02 02:01:01', 1), ('2010-03-02 02:01:03', 2);
ALTER TABLE destination ATTACH PARTITION ID '0' FROM source; -- { serverError 248 }
ALTER TABLE destination ATTACH PARTITION 0 FROM source; -- { serverError 248 }
-- Should not be allowed because dst partition exp takes more than two arguments, so it's not considered monotonically inc
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY toString(category);
CREATE TABLE destination (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY substring(category, 1, 2);
INSERT INTO TABLE source VALUES ('spaghetti', 'food'), ('mop', 'general');
INSERT INTO TABLE source VALUES ('rice', 'food');
ALTER TABLE destination ATTACH PARTITION ID '4590ba78048910b74a47d5bfb308abed' from source; -- { serverError 36 }
ALTER TABLE destination ATTACH PARTITION 'food' from source; -- { serverError 36 }
-- Should not be allowed because dst partition exp depends on a different set of columns
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY toString(category);
CREATE TABLE destination (productName String, category String) engine=MergeTree ORDER BY tuple() PARTITION BY toString(productName);
INSERT INTO TABLE source VALUES ('spaghetti', 'food'), ('mop', 'general');
INSERT INTO TABLE source VALUES ('rice', 'food');
ALTER TABLE destination ATTACH PARTITION ID '4590ba78048910b74a47d5bfb308abed' from source; -- { serverError 36 }
ALTER TABLE destination ATTACH PARTITION 'food' from source; -- { serverError 36 }
-- Should not be allowed because dst partition exp is not monotonically increasing
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (productName String) engine=MergeTree ORDER BY tuple() PARTITION BY left(productName, 2);
CREATE TABLE destination (productName String) engine=MergeTree ORDER BY tuple() PARTITION BY cityHash64(productName);
INSERT INTO TABLE source VALUES ('bread'), ('mop');
INSERT INTO TABLE source VALUES ('broccoli');
ALTER TABLE destination ATTACH PARTITION ID '4589453b7ee96ce9de1265bd57674496' from source; -- { serverError 36 }
ALTER TABLE destination ATTACH PARTITION 'br' from source; -- { serverError 36 }
-- Empty/ non-existent partition, same partition expression. Nothing should happen
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
ALTER TABLE destination ATTACH PARTITION ID '1' FROM source;
ALTER TABLE destination ATTACH PARTITION 1 FROM source;
SELECT * FROM destination;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Empty/ non-existent partition, different partition expression. Nothing should happen
-- https://github.com/ClickHouse/ClickHouse/pull/39507#discussion_r1399839045
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMMDD(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
ALTER TABLE destination ATTACH PARTITION ID '1' FROM source;
ALTER TABLE destination ATTACH PARTITION 1 FROM source;
SELECT * FROM destination;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Replace instead of attach. Empty/ non-existent partition, same partition expression. Nothing should happen
-- https://github.com/ClickHouse/ClickHouse/pull/39507#discussion_r1399839045
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
CREATE TABLE destination (timestamp DateTime) engine=MergeTree ORDER BY tuple() PARTITION BY toYYYYMM(timestamp);
ALTER TABLE destination REPLACE PARTITION '1' FROM source;
SELECT * FROM destination;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;
-- Replace instead of attach. Empty/ non-existent partition to non-empty partition, same partition id.
-- https://github.com/ClickHouse/ClickHouse/pull/39507#discussion_r1399839045
DROP TABLE IF EXISTS source;
DROP TABLE IF EXISTS destination;
CREATE TABLE source (A Int) engine=MergeTree ORDER BY tuple() PARTITION BY A;
CREATE TABLE destination (A Int) engine=MergeTree ORDER BY tuple() PARTITION BY A;
INSERT INTO TABLE destination VALUES (1);
ALTER TABLE destination REPLACE PARTITION '1' FROM source;
SELECT * FROM destination;
SELECT partition_id FROM system.parts where table='destination' AND database = currentDatabase() AND active = 1;

View File

@ -293,7 +293,7 @@ SELECT
{'age':'31','last_key':'last_value','name':'neymar','nationality':'brazil','team':'psg'}
-- { echoOn }
SET extract_kvp_max_pairs_per_row = 2;
SET extract_key_value_pairs_max_pairs_per_row = 2;
-- Should be allowed because it no longer exceeds the max number of pairs
-- expected output: {'key1':'value1','key2':'value2'}
WITH
@ -307,7 +307,7 @@ WITH
SELECT
x;
{'key1':'value1','key2':'value2'}
SET extract_kvp_max_pairs_per_row = 0;
SET extract_key_value_pairs_max_pairs_per_row = 0;
-- Should be allowed because max pairs per row is set to 0 (unlimited)
-- expected output: {'key1':'value1','key2':'value2'}
WITH

View File

@ -415,7 +415,7 @@ SELECT
x; -- {serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH}
-- Should fail allowed because it exceeds the max number of pairs
SET extract_kvp_max_pairs_per_row = 1;
SET extract_key_value_pairs_max_pairs_per_row = 1;
WITH
extractKeyValuePairs('key1:value1,key2:value2') AS s_map,
CAST(
@ -429,7 +429,7 @@ SELECT
-- { echoOn }
SET extract_kvp_max_pairs_per_row = 2;
SET extract_key_value_pairs_max_pairs_per_row = 2;
-- Should be allowed because it no longer exceeds the max number of pairs
-- expected output: {'key1':'value1','key2':'value2'}
WITH
@ -443,7 +443,7 @@ WITH
SELECT
x;
SET extract_kvp_max_pairs_per_row = 0;
SET extract_key_value_pairs_max_pairs_per_row = 0;
-- Should be allowed because max pairs per row is set to 0 (unlimited)
-- expected output: {'key1':'value1','key2':'value2'}
WITH